What is Spark Accumulator with example

  • Post category:Spark
  • Reading time:7 mins read

What is Spark Accumulator

apache_spark_logo
apache-spark-logo

In this post we will understand what is Spark Accumulator but before understanding Accumulators, we should understand the concept of Shared variables and closure. Lets take a scenario. We want to maintain a shared variable counter which we will increment by multiple tasks running in different executors. Lets write a simple code to achieve this.

var counter = 0
val data = (1 to 1000000)
var orders = sc.parallelize(data)
orders.foreach(order => counter += order)
println("Counter Value: " + counter)

Here we have initialized the counter variable as 0 and then we are incrementing the counter in foreach action.


Is the above code correct?

The answer is NO. The behavior of the above code is undefined, and may not work as intended.

But why ?

In the above example, Spark will break up the processing into stages and then into tasks, each one of them runs in executor. Prior to execution, Spark computes the task’s closure. The closure are those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case foreach()).

Spark serialize this closure and send to each executor. That means these variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program and thus, the counter variable created in the driver is no longer visible to the executor tasks; but they have their own copy of counter from the serialized closure.

Thus, the final value of counter variable in driver will still be 0.

closures – constructs like loops or locally defined methods, should not be used to maintain some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures.

So how can we maintain a shared variable in Spark ?

And the answer is Accumulator.

Accumulators are shared variables provided by Spark that can be mutated by multiple tasks running in different executors. Any task can write to an accumulator but only the application driver can see its value. We should use Accumulators in below scenarios  

  • We need to collect some simple data across all worker nodes such as maintaining a counter of total records consumed and published by a Spark job , count of total errors encountered in pipeline etc.
  • The operation used to aggregate the data is both associative and commutative. This is because in a distributed computing, the order and grouping of the data cannot be guaranteed.

The other type of shared variable provided by Spark is Broadcast variable.

Caveats

There are some caveats that we should be aware of. We should use Accumulators in Spark actions and not transformations because of following reasons

  • Transformations are lazily evaluated and are only executed when an action is encountered. As a result, accumulators used inside transformations like map() wont get executed unless some action happens on the RDD.
  • Spark takes guarantee to update accumulators inside actions only once. Even if a task is restarted and the lineage is recomputed, the accumulators will be updated only once.
  • Spark does not guarantee the above for transformations. So if a task is restarted and the lineage is recomputed, there are chances of unwanted results and the accumulators can be updated more than once.

Example of Spark Accumulators – Scala

We have orders data and we want to keep a count of all COMPLETED orders in Accumulator. The data looks like below. The data can be downloaded from our Github repository

https://github.com/proedu-organisation/CCA-175-practice-tests-resource/tree/master/retail_db/orders_parquet

+--------+-------------+-----------------+--------------+
|order_id|order_date   |order_customer_id|order_status  |
+--------+-------------+-----------------+--------------+
|50977   |1402272000000|2856             |COMPLETE      |
|10947   |1380499200000|1973             |PAYMENT_REVIEW|
|41548   |1396828800000|12225            |PENDING       |
|43659   |1398038400000|3830             |COMPLETE      |
|43530   |1398038400000|9802             |CLOSED        |
+--------+-------------+-----------------+--------------+

Scala code

The example is present in our Github repository

https://github.com/proedu-organisation/spark-scala-examples/blob/main/src/main/scala/broadcast/BroadcastExample1.scala

import org.apache.spark.sql.SparkSession
import org.apache.spark.util.LongAccumulator
object AccumulatorExample1 extends Serializable {
  def main(args: Array[String]): Unit = {
    // Create SparkSession
    val spark = SparkSession.builder()
      .master("local[5]")
      .getOrCreate()
    // Create LongAccumulator
    val completeAccum: LongAccumulator = spark.sparkContext.longAccumulator("COMPLETED")
    // Reading the parquet file
    val orders = spark.read
      .parquet("src/main/resources/retail_db/orders_parquet")
      .repartition(10)
    // Incrementing the accumulator count in foreach action.
    orders.foreach(
      row => {
        val status = row.getAs[String]("order_status")
        if (status.equals("COMPLETE")) {
          completeAccum.add(1L)
        }
      }
    )
    // Printing the current value of Accumulator. This piece of code runs in Driver.
    println("Accumulator Value: " + completeAccum.value)
  }
}

Happy Learning 🙂