Apache Spark: Handle Corrupt/bad Records


Most of the time writing ETL jobs becomes very expensive when it comes to handling corrupt records. And in such cases, ETL pipelines need a good solution to handle corrupted records. Because, larger the ETL pipeline is, the more complex it becomes to handle such bad records in between. Corrupt data includes:
  • Missing information
  • Incomplete information
  • Schema mismatch
  • Differing formats or data types
Since ETL pipelines are built to be automated, production-oriented solutions must ensure pipelines behave as expected. This means that data engineers must both expect and systematically handle corrupt records.

So, before proceeding to our main topic, let's first know the pathway to ETL pipeline & where comes the step to handle corrupted records.
As, it is clearly visible that just before loading the final result, it is a good practice to handle corrupted/bad records. Now, the main question arises is How to handle corrupted/bad records? So, here comes the answer to the question.

Handle Corrupt/bad records

To answer this question, we will see a complete example in which I will show you how to play & handle the bad record present in JSON.

Let's say this is the JSON data:
       
{"a": 1, "b":2, "c":3}|{"a": 1, "b":2, "c":3}|{"a": 1, "b, "c":10} 
  
And in the above JSON data {"a": 1, "b, "c":10} is the bad record. Now the main target is how to handle this record?

We have three ways to handle this type of data-

  • To include this data in a separate column
  • To ignore all bad records
  • Throws an exception when it meets corrupted records
So, let's see each of these 3 ways in detail:

A) To include this data in a separate column

As per the use case, if a user wants us to store a bad record in separate column use option mode as "PERMISSIVE". 
Example:
       
val data = """{"a": 1, "b":2, "c":3}|{"a": 1, "b":2, "c":3}|{"a": 1, "b, "c":10}""".split('|')

val corruptDf = spark.read.option("mode", "PERMISSIVE")
                          .option("columnNameOfCorruptRecord", "_corrupt_record")                          
                          .json(sc.parallelize(data)
And for the above query, the result will be displayed as:


And, hence the bad record gets stored in a separate column i.e _corrput_record.

B) To ignore all bad records 

In this particular use case, if a user doesn't want to include the bad records at all and wants to store only the correct records use the "DROPMALFORMED" mode.
Example:
       
val corruptDf = spark.read.option("mode", "DROPMALFORMED")
                          .json(sc.parallelize(data)
             
And for the above query, the result will be displayed as:

Hence, only the correct records will be stored & bad records will be removed.

C) Throws an exception when it meets corrupted records

For this use case, if present any bad record will throw an exception. And the mode for this use case will be "FAILFAST". And it's a best practice to use this mode in a try-catch block.
Example:
       
try {

  val corruptDf = spark.read
    .option("mode", "FAILFAST")
    .json(sc.parallelize(data))
  
} catch {
  case e:Exception => print(e)
}
             

And for the above query, the result will be displayed as:

Hence, will throw an error.

So, that's how Apache Spark handles bad/corrupted records.

If you like this blog, please do show your appreciation by hitting like button and sharing this blog. Also, drop any comments about the post & improvements if needed. Till then HAPPY LEARNING.