When reading data from any file source, Spark might face issues if the file contains any bad or corrupted records. Let’s say, as per the schema, we were expecting some column values as IntegerType but we received StringType or DoubleType from the source. As a data engineer, we need to handle these kinds of scenarios, or else Spark will not be able to parse these records and will give a null for these corrupted records, and we will not be able to find or identify these bad/corrupted records.
Solution:
To deal with these cases, we have the following option:
1. PERMISSIVE : This is the default mode. Spark will load and process both complete and corrupted data, but for corrupted data it will store null.
2. DROPMALFORMED : This mode will drop the corrupted records and will only show the correct records.
3. FailFast: In this mode, Spark throws an exception and halts the data loading process when it finds any bad or corrupted records.
4. columnNameOfCorruptRecord Option : This will Store all the corrupted records in new column. This extra column must be defined in schema.
5.badRecordsPath: Spark processes only the correct records and corrupted or bad records are excluded. Corrupted or bad records will be stored in a file at the badRecordsPath location.
Example:
Input File:(/tmp/corruptrecord.csv)1,rahul,10000,bangalore
2,umesh,20000,indore
3,pawan,30000,bhopal
4,123,40000,pune
rohit,rohan,50000,delhi
5,ronak,noida,kolkatta
6,ajay,mumbai,chennai
Note:So, as per our below schema, only the first 4 are correct records. The remaining are bad or corrupted and we need to deal with these bad or corrupted records.
Scala Code:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object curruptedRecord extends App {val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("curruptedRecord")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val dataschema = new StructType()
.add(StructField("id", IntegerType, true))
.add(StructField("Name", StringType, true))
.add(StructField("Salary", IntegerType, true))
.add(StructField("City", StringType, true))
.add(StructField("CorruptRecord", StringType, true))
val dataschema1 = new StructType()
.add(StructField("id", IntegerType, true))
.add(StructField("Name", StringType, true))
.add(StructField("Salary", IntegerType, true))
.add(StructField("City", StringType, true))
val FailFast = spark.read
.format("csv")
.schema(dataschema)
.option("mode", "FAILFAST")
.load("/tmp/corruptrecord.csv")
FailFast.show(false)
val columnNameOfCorruptRecord = spark.read
.format("csv")
.schema(dataschema1)
.option("columnNameOfCorruptRecord", "CorruptRecord")
.load("/tmp/corruptrecord.csv")
columnNameOfCorruptRecord.show(false)
val Permissive = spark.read
.format("csv")
.schema(dataschema)
.option("mode","PERMISSIVE")
.load("/tmp/corruptrecord.csv")
Permissive.show(false)
val DropMalformed = spark.read
.format("csv")
.schema(dataschema)
.option("mode", "DROPMALFORMED")
.load("/tmp/corruptrecord.csv")
DropMalformed.show(false)
val badRecordsPath = spark.read
.format("csv")
.schema(dataschema)
.option("badRecordsPath", "/tmp/")
.load("/tmp/corruptrecord.csv")
}