Apache Spark: Accumulators & Broadcast Variables

 
At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures. 

The second abstraction in Spark is shared variables that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.

Accumulators

Accumulators are one of the shared variables which are used to perform a counter. In layman's term, it is used by the worker node for writing some data. If we want to maintain some state or want to update some data point then can use accumulators and it will be available to all worker node. 

When a user submits a spark-job, the Driver program gets launched. This Driver Program tracks the status of applications and internally launches different worker nodes. All worker nodes then do the processing and finally return the result to Driver Program. 

Now, let's understand accumulator using a small example:

Let's say there are multiple processes running on different workers, but we want to maintain a counter. So, how will you do this?

Here comes the use of the Accumulator. Firstly, we will read the file and which will return an RDD.

So, in the image, we have workers and their corresponding process. as P1 will be processed by W1, P2 by W2 respectively. As the data is distributed over different workers and if you want to count a number of blank lines we will make use of accumulator variable as below:


 val fileRdd = sc.textFile("file.txt")
 


 val a1 = fileRdd.accumulator(0) 
 //Initilized with 0

Now whenever the blank line will appear we need to increment the value of our value as:


 fileRdd .foreach{ line => if(line.contains("_")) 
 a1.add(1)
 }

So, firstly W1 will increment then W2 and so on. Until all lines are covered, a1 will be accessible in Driver Program and this is a shared variable across all Worker Nodes

Hence, this is how Accumulators are used.

Broadcast

Broadcast variables are exactly opposite to accumulator where accumulators are used for writing data, broadcast variables are used for reading data across worker nodes. 

They are read-only variables that will be cached in all the executors instead of shipping every time with the tasks. Basically, broadcast variables are used as lookups without any shuffle, as each executor will keep a local copy of it, so no network I/O overhead is involved here.(Do remember this last line)

Let's say we have 2 datasets.  

Id~Country~Name                                               Country~Code

 1~USA~ABC                                                           USA~+0  

 2~INDIA~XYZ                                                       INDIA~+91 

 3~AUSTRALIA~QWE                                          AUSTRALIA~+61

As this is just raw data, but let's say we have a large file so dataset1 can be a large file as compared to dataset2 because dataset 2 just contains the country code. 

So, in that case, if you want to find the country code along with the name you need to join the tables. But this is not the optimized way because it will do lot of SHUFFLING

So, what should be the optimized way to query this? Hence, comes the use of a Broadcast variable to solve this. 

The approach to solve this is that we will create a map of dataset2 where key be the country & value be the code. And now, we will load this map on each Worker Node of our cluster as below: 

And, now the datset2 is already present in each worker node hence no shuffling. Because Spark will lookup for the map which is broadcasted by Spark using a broadcast variable as below:


sc.broadcast(dataset2map) 

Hence, the broadcast variable helps in solving this problem in an optimized way.

Hope you have got a clear picture of Accumulators & Broadcast Variables. 

If you like this blog, please do show your appreciation by hitting like button and sharing this blog. Also, drop any comments about the post & improvements if needed. Till then HAPPY LEARNING.

References

Comments