Apache Spark: Structured Streaming - Part I


In this blog post, we will be discussing Structured Streaming including all the other concepts which are required to create a successful Streaming Application and to process complete data without losing any.

As we all know that one of the most important points to take care of while designing a Streaming application is to process every batch of data that is getting Streamed, but how?
So, to get a real solution to this question, continue reading this blog.

Structured Streaming

Structured Streaming is an efficient way to ingest large quantities of data from a variety of sources.

Problem

We have a stream of data coming in from a TCP-IP socket, Kafka, Kinesis, or other sources...

The data is coming in faster than it can be consumed

How do we solve this problem?

Micro-Batch as a Solution

Many APIs use micro batching to solve this problem. In this, we take our firehose of data and collect data for a set interval of time (Trigger Interval). Let's say the Trigger Interval is of  2 seconds 



Now when it comes to processing these micro-batches, for each interval, our job is to process the data from the previous 2second interval.

As we are processing data, the next batch of data is being collected for us.

In our example, we are processing two seconds worth of data in about one second.


What happens if we don't process the data fast enough?

Simple as that, in the case, we will most likely be losing data.

But our goal is to process the data for the previous interval before data from the next interval arrives.

From Micro-batch to Table

In Apache Spark, we treat such a stream of micro-batches as continuous updates to a table & later this table can be queried, if static.

The computation on the input table is then pushed to a results table. Finally, the result table is written to an output sink such as Kafka.

Now as we have got some pictures of Streaming, let discuss how to read and write a Stream in Spark.

Read Stream

To read a stream, Spark provides a method called readStream which returns a DataStreamReader for configuring the Stream and to read the data which is getting stream via Sinks. As said, to read any Stream a DataStreamReader needs to be configured but there are different keys that need to be set based on different Stream. 

Configuration to be configured for DataStreamReader:
1) Schema
2) Type of Stream- Can be either File, TCP/IP socket or Kafka
3) Configuration based on the type of Stream:
    a) File- fileType, filePath, maxFiles etc.
    b) TCP/IP socket- Server Address, PortNumber, etc.
    c) Kafka- server Address, Port, Topic, Partitions, etc.

Configuring a Stream

Let's say we are consuming files written continuously somewhere, but instead of consuming the whole data we want to control the data to be pulled into Spark at once. Yeah that is very much possible with a very simple property maxFilePerTrigger

For Instance, if we want to read only two files for every trigger interval, do it as below:
ds.option(maxFilePerTrigger, 2) // here ds is input datastream 

Example

val df= spark
  .readStream                                         // Returns DataStreamReader
  .option("maxFilesPerTrigger", 2)       // Force processing of only 2 file per trigger 
  .schema(Schema)                               // Required for all streaming DataFrames
  .json(path)       // The stream's source directory and file type


This was all about readStream. Now let's dive into WriteStream.

Write Stream

Similar to DataStreamReader provided by readStream method we have writeStream which returns DataStreamWriter for the output of readStream.

There are multiple parameters to be configured for DataStreamWriter such as:

Trigger

Trigger specifies when the system should process the next set of data.


Checkpointing

A checkpoint stores the current state of your streaming job to a reliable storage system. It does not store the state of your streaming job to the local file system of any node in your cluster.

Together with write-ahead logs, a terminated stream can be restarted and it will continue from where it left off.

To enable this feature, we only need to specify the location of a checkpoint directory:

ds.option("checkpointLocation", checkpointPath)

OutputModes

OutputSinks


Above all are the configurations needed to set up a DataStreamWriter. Below given is a simple example to apply all these above configurations to your Spark Job.

val stream = df                                                             // Start with our "streaming" DataFrame
  .writeStream                                                              // Get the DataStreamWriter
  .queryName(StreamName)                                      // Name the query
  .trigger(Trigger.ProcessingTime("3 seconds"))      // Configure for a 3-second micro-batch
  .format("parquet")                                                  // Specify the sink type, a Parquet file
  .option("checkpointLocation", checkpointPath)   // Specify the location of checkpoint
  .outputMode("append")                                        // Write only new data to the "file"
  .start(outputPathDir)                                            // Ouptut Path

I hope the above explanation has given you all a good learning of Spark Structured Streaming. To know more about Streaming concepts such as Windowing operations, don't forget to read Structured Streaming- Part II

If you like this blog, please do show your appreciation by hitting like button and sharing this blog. Also, drop any comments about the post & improvements if needed. Till then HAPPY LEARNING.

References




Comments