In our previous posts we talked about mapPartitions / mapPartitionsWithIndex functions. In this post we will learn RDD’s reduceByKey transformation in Apache Spark.
As per Apache Spark documentation, reduceByKey(func) converts a dataset of (K, V) pairs, into a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V.
We have three variants of reduceBykey transformation
- The first variant reduceByKey(function) will generate hash-partitioned output with existing partitioner.
- The second variant reduceByKey(function, [numPartition]) will generate hash-partitioned output with number of partitions given by numPartition.
- The third variant reduceByKey(partitioner, function) will generate output using Partitioner object referenced by partitioner.
Let’s take an example. We will write a word count program using reduceByKey transformation. In the first step, we will read a server log file as shown below.
// File Location val file = "src/main/resources/server_log" // Creating a SparkContext object. val sparkContext = SparkSession.builder() .master("local[*]") .appName("Proedu.co examples") .getOrCreate() .sparkContext // Reading Server Log file as RDD. val serverLogRDD = sparkContext.textFile(file) // Let's print some data. serverLogRDD.collect.foreach(println) // Output ERROR php DONE php ERROR RailsApp ERROR php DONE php ...
As a result, we have RDD of String. Now we will split each line by tab “\t” delimiter.
// Split each line by TAB delimiter.
val flattenedRDD = serverLogRDD.flatMap(line => line.split("\t"))
// Let's print some data.
flattenedRDD.collect.foreach(println)
// Output
ERROR
php
DONE
php
ERROR
RailsApp
After flattening the RDD as shown in the above step, we will convert each word into a tuple, for instance.
// Convert each word into a tuple. val tupleRDD = flattenedRDD.map(word => (word,1)) // Let's print some data. tupleRDD.collect.foreach(println) // Output (ERROR,1) (php,1) (DONE,1) (php,1) (ERROR,1) ...
Finally we will reduce the tuple using reduceByKey function as shown below
// Reducing the tuple to calculate the word count. val reducedRDD = tupleRDD.reduceByKey((x, y) => x + y) // Let's print some data. reducedRDD.collect.foreach(println) // Output (DONE,4) (ERROR,9) (php,7) (RailsApp,3) (mysql,3)
Complete example in Scala
The complete example is also present in out Github repository https://github.com/proedu-organisation/spark-scala-examples/blob/main/src/main/scala/rdd/transformations/ReduceByKeyExample.scala
import org.apache.spark.sql.SparkSession object ReduceByKeyExample extends App { val file = "src/main/resources/server_log" // Creating a SparkContext object. val sparkContext = SparkSession.builder() .master("local[*]") .appName("Proedu.co examples") .getOrCreate() .sparkContext // Reading Server Log file as RDD. val serverLogRDD = sparkContext.textFile(file) // Let's print some data. serverLogRDD.collect.foreach(println) /** Output * ERROR php * DONE php * ERROR RailsApp * ERROR php * DONE mysql */ // Split each line by TAB delimiter. val flattenedRDD = serverLogRDD.flatMap(line => line.split("\t")) // Let's print some data. flattenedRDD.collect.foreach(println) /** Output * DONE * mysql * ERROR * php * ERROR * php * ERROR */ // Convert each word into a tuple. val tupleRDD = flattenedRDD.map(word => (word, 1)) // Let's print some data. tupleRDD.collect.foreach(println) /** Output * (ERROR,1) * (php,1) * (DONE,1) * (php,1) * (ERROR,1) */ // Reducing the tuple to calculate the word count. val reducedRDD = tupleRDD.reduceByKey((x, y) => x + y) // Let's print some data. reducedRDD.collect.foreach(println) /** Output * (DONE,4) * (ERROR,9) * (php,7) * (RailsApp,3) * (mysql,3) */ }
Happy Learning 🙂