Apache Spark RDD reduceByKey transformation

In our previous posts we talked about mapPartitions / mapPartitionsWithIndex functions. In this post we will learn RDD’s reduceByKey transformation in Apache Spark.

As per Apache Spark documentation, reduceByKey(func) converts a dataset of (K, V) pairs, into a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V.

reduceByKey transformation Apache Spark
reduceByKey transformation Apache Spark

We have three variants of reduceBykey transformation

  • The first variant reduceByKey(function) will generate hash-partitioned output with existing partitioner.
  • The second variant reduceByKey(function, [numPartition]) will generate hash-partitioned output with number of partitions given by numPartition.
  • The third variant reduceByKey(partitioner, function) will generate output using Partitioner object referenced by partitioner.

Let’s take an example. We will write a word count program using reduceByKey transformation. In the first step, we will read a server log file as shown below.

// File Location
val file = "src/main/resources/server_log"

// Creating a SparkContext object.
val sparkContext = SparkSession.builder()
  .master("local[*]")
  .appName("Proedu.co examples")
  .getOrCreate()
  .sparkContext

// Reading Server Log file as RDD.
val serverLogRDD = sparkContext.textFile(file)

// Let's print some data.
serverLogRDD.collect.foreach(println)

// Output
ERROR	php	  
DONE	php	  
ERROR	RailsApp  
ERROR	php
DONE	php
...

As a result, we have RDD of String. Now we will split each line by tab “\t” delimiter.

// Split each line by TAB delimiter.
val flattenedRDD = serverLogRDD.flatMap(line => line.split("\t"))

// Let's print some data.
flattenedRDD.collect.foreach(println)

// Output
ERROR
php
DONE
php
ERROR
RailsApp

After flattening the RDD as shown in the above step, we will convert each word into a tuple, for instance.

// Convert each word into a tuple.
val tupleRDD = flattenedRDD.map(word => (word,1))

// Let's print some data.
tupleRDD.collect.foreach(println)

// Output
(ERROR,1)
(php,1)
(DONE,1)
(php,1)
(ERROR,1)
...

Finally we will reduce the tuple using reduceByKey function as shown below

// Reducing the tuple to calculate the word count.
val reducedRDD = tupleRDD.reduceByKey((x, y) => x + y)

// Let's print some data.
reducedRDD.collect.foreach(println)

// Output
(DONE,4)
(ERROR,9)
(php,7)
(RailsApp,3)
(mysql,3)

Complete example in Scala

The complete example is also present in out Github repository https://github.com/proedu-organisation/spark-scala-examples/blob/main/src/main/scala/rdd/transformations/ReduceByKeyExample.scala

import org.apache.spark.sql.SparkSession

object ReduceByKeyExample extends App {

  val file = "src/main/resources/server_log"

  // Creating a SparkContext object.
  val sparkContext = SparkSession.builder()
    .master("local[*]")
    .appName("Proedu.co examples")
    .getOrCreate()
    .sparkContext

  // Reading Server Log file as RDD.
  val serverLogRDD = sparkContext.textFile(file)

  // Let's print some data.
  serverLogRDD.collect.foreach(println)

  /** Output
   * ERROR	php
   * DONE	php
   * ERROR	RailsApp
   * ERROR	php
   * DONE	mysql
   */

  // Split each line by TAB delimiter.
  val flattenedRDD = serverLogRDD.flatMap(line => line.split("\t"))

  // Let's print some data.
  flattenedRDD.collect.foreach(println)

  /** Output
   * DONE
   * mysql
   * ERROR
   * php
   * ERROR
   * php
   * ERROR
   */

  // Convert each word into a tuple.
  val tupleRDD = flattenedRDD.map(word => (word, 1))

  // Let's print some data.
  tupleRDD.collect.foreach(println)

  /** Output
   * (ERROR,1)
   * (php,1)
   * (DONE,1)
   * (php,1)
   * (ERROR,1)
   */

  // Reducing the tuple to calculate the word count.
  val reducedRDD = tupleRDD.reduceByKey((x, y) => x + y)

  // Let's print some data.
  reducedRDD.collect.foreach(println)

  /** Output
   * (DONE,4)
   * (ERROR,9)
   * (php,7)
   * (RailsApp,3)
   * (mysql,3)
   */
  
}

Happy Learning 🙂