Apache Spark: Structured Streaming - Part II


In our previous blog, we discussed some of the terminologies of Structured Streaming such as What is Structured Streaming, How to read a stream, how to write a stream & much more.
Here's in this blog we will discuss a lot about Streaming aggregations, Window Streaming & some more advanced topic to create a successful streaming application.

Streaming Aggregations

Continuous application i.e a continuous stream often requires near real-time decisions in real-time. In the case of continuous application, we don't want to aggregate the entire flow of a stream, and even it is not easy to aggregate a stream of data. As there can be multiple problems while aggregating over a stream such as Stream do have a definitive start but there is no end to it. So, how do we handle this? How to aggregate over continuous data.

To handle this and aggregate the stream, a one has to initiate a Window(let's say 2 hours window). Now, this window of  2 hours will collect the data of 2 hours & finally aggregation can be performed on the Window. 

Windowing

If we were using a static dataframe to produce an aggregate count, we can use groupBy() & count().
Instead, we accumulate count within a sliding window, "How many records are we getting every second?"

So, let's understand this with the help of a diagram shown below:

In case of windows based aggregations, aggregate values are maintained for each window. Imagine stream contains lines along with the time when the line was generated. Instead of running word counts, we want to count words within 10-minute windows, updating every 5 minutes. That is, word counts in words received between 10 minute windows 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20, etc. Note that 12:00 - 12:10 means data that arrived after 12:00 but before 12:10. 

Now, consider a word that was received at 12:07. This word should increment the counts corresponding to two windows 12:00 - 12:10 and 12:05 - 12:15. So the counts will be indexed by both, the grouping key (i.e. the word) and the window (can be calculated from the event-time).

Since this windowing is similar to grouping, in code, we can use groupBy() and window() operations to express windowed aggregations. 

Before, taking a look at the example for a better understanding of the above note. Let's first understand what is Event time? 

Event Time v/s Receipt Time 

Event time is the time at which the event occurred. But event time is not maintained by Structured Streaming Framework. 

Stream only knows of Receipt time i.e time when the data arrived in Spark. But one of the concerns with the Receipt time can be accuracy. So, whenever accuracy is not the concern, receipt time can be used instead of Event time. 

Now let's discuss the concept by taking an example. 

val df = spark    
  .readStream                                        // Returns an instance of DataStreamReader   
  .schema(jsonSchema)                        // Set the schema of the JSON data
  .option("maxFilesPerTrigger", 1)     // Treat a sequence of files as a stream, one file at a time
  .json(inputPath)                                // Specifies the format, path and returns a DataFrame

val countsDF = inputDF
  .groupBy($"action",                               // Aggregate by action...
  window($"time", "1 hour"))                  // ...then by a 1 hour window
  .count()                                                  // For the aggregate, produce a count
  .select($"window.start".as("start"),       // Elevate field to column
          $"count",                                       // Include count
          $"action")                                     // Include action
  .orderBy($"start")                                // Sort by the start time

On running the above query we found some performance issues but what is the issue?

The issue is the job took a long time to generate the date & this is because of a group by operation. 
As we all know that the group by is a shuffling operation and shuffling is very expensive & time taking. So, to get rid of this try to decrease shuffle partitions using spark.conf.set("spark.sql.shuffle.partitions", <numbers>) //replace the number acc. to the use case. 

Another problem is with the defined Window in the above query. Here, we are generating a window for every 1 hour. In this case, every window has to be separately persisted and maintained and even possibility of massive slow down if not an OUT OF MEMORY error.

Solution

One of the solutions is to increase the Window size but still if the Job or stream runs for a long duration, still there can be a possibility of building up an unbounded window and can get resources issues.  

Best Solution - Watermarking

Watermarking allows us to throw the saved windows away. For that, we need to enable watermarking in your data frame using below:
.withWatermark("time", "2 hours")

This is 2 hours watermarking. Means, Data received 2 hours past the watermark will be dropped. 
Data received within 2 hours of the watermark will never be dropped. 

More specifically, any data less than 2 hours behind the latest data processed till then is guaranteed to be aggregated. 
However, the guarantee is strict only in one direction.
Data delayed by more than 2 hours is not guaranteed to be dropped; it may or may not get aggregated. The more delayed the data is, the less likely the engine is going to process it.

Hope after reading Spark Structured Streaming Part1 & Part2 you might have got a clear picture of Structured Streaming.

If you like this blog, please do show your appreciation by hitting like button and sharing this blog. Also, drop any comments about the post & improvements if needed. Till then HAPPY LEARNING.

References


Comments