Page Contents
What is Spark Accumulator
In this post we will understand what is Spark Accumulator but before understanding Accumulators, we should understand the concept of Shared variables and closure. Lets take a scenario. We want to maintain a shared variable counter which we will increment by multiple tasks running in different executors. Lets write a simple code to achieve this.
var counter = 0
val data = (1 to 1000000)
var orders = sc.parallelize(data)
orders.foreach(order => counter += order)
println("Counter Value: " + counter)
Here we have initialized the counter variable as 0 and then we are incrementing the counter in foreach action.
Is the above code correct?
The answer is NO. The behavior of the above code is undefined, and may not work as intended.
But why ?
In the above example, Spark will break up the processing into stages and then into tasks, each one of them runs in executor. Prior to execution, Spark computes the task’s closure. The closure are those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case foreach()).
Spark serialize this closure and send to each executor. That means these variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program and thus, the counter variable created in the driver is no longer visible to the executor tasks; but they have their own copy of counter from the serialized closure.
Thus, the final value of counter variable in driver will still be 0.
closures – constructs like loops or locally defined methods, should not be used to maintain some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures.
So how can we maintain a shared variable in Spark ?
And the answer is Accumulator.
- We need to collect some simple data across all worker nodes such as maintaining a counter of total records consumed and published by a Spark job , count of total errors encountered in pipeline etc.
- The operation used to aggregate the data is both associative and commutative. This is because in a distributed computing, the order and grouping of the data cannot be guaranteed.
The other type of shared variable provided by Spark is Broadcast variable.
Caveats
There are some caveats that we should be aware of. We should use Accumulators in Spark actions and not transformations because of following reasons
- Transformations are lazily evaluated and are only executed when an action is encountered. As a result, accumulators used inside transformations like map() wont get executed unless some action happens on the RDD.
- Spark takes guarantee to update accumulators inside actions only once. Even if a task is restarted and the lineage is recomputed, the accumulators will be updated only once.
- Spark does not guarantee the above for transformations. So if a task is restarted and the lineage is recomputed, there are chances of unwanted results and the accumulators can be updated more than once.
Example of Spark Accumulators – Scala
We have orders data and we want to keep a count of all COMPLETED orders in Accumulator. The data looks like below. The data can be downloaded from our Github repository
https://github.com/proedu-organisation/CCA-175-practice-tests-resource/tree/master/retail_db/orders_parquet
+--------+-------------+-----------------+--------------+
|order_id|order_date |order_customer_id|order_status |
+--------+-------------+-----------------+--------------+
|50977 |1402272000000|2856 |COMPLETE |
|10947 |1380499200000|1973 |PAYMENT_REVIEW|
|41548 |1396828800000|12225 |PENDING |
|43659 |1398038400000|3830 |COMPLETE |
|43530 |1398038400000|9802 |CLOSED |
+--------+-------------+-----------------+--------------+
Scala code
The example is present in our Github repository
https://github.com/proedu-organisation/spark-scala-examples/blob/main/src/main/scala/broadcast/BroadcastExample1.scala
import org.apache.spark.sql.SparkSession import org.apache.spark.util.LongAccumulator object AccumulatorExample1 extends Serializable { def main(args: Array[String]): Unit = { // Create SparkSession val spark = SparkSession.builder() .master("local[5]") .getOrCreate() // Create LongAccumulator val completeAccum: LongAccumulator = spark.sparkContext.longAccumulator("COMPLETED") // Reading the parquet file val orders = spark.read .parquet("src/main/resources/retail_db/orders_parquet") .repartition(10) // Incrementing the accumulator count in foreach action. orders.foreach( row => { val status = row.getAs[String]("order_status") if (status.equals("COMPLETE")) { completeAccum.add(1L) } } ) // Printing the current value of Accumulator. This piece of code runs in Driver. println("Accumulator Value: " + completeAccum.value) } }
Happy Learning 🙂