Apache Spark RDD groupByKey transformation

Apache Spark RDD groupByKey transformation

In our previous posts we talked about map and flatMap and filter functions. In this post we will learn RDD’s groupByKey transformation in Apache Spark.

As per Apache Spark documentation, groupByKey([numPartitions]) is called on a dataset of (K, V) pairs, and returns a dataset of (K, Iterable) pairs. It’s an expensive operation and consumes lot of memory if dataset is large. There are three variants –

  • First variant def groupByKey(): RDD[(K, Iterable[V])] groups the values for each key in the RDD into a single sequence.
  • Second variant def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] also the number of partitions in result RDD.
  • In the third variant def groupByKey(partitioner): RDD[(K, Iterable[V])] we can provide the partitioner.

Let’s start with a very simple example. First we will create a pair RDD as shown below.

// Local scala collection containing tuples/ Key-Value pair.
val data = Seq(("Apple",1),("Banana",1),("Apple",1),("Apple",1),("Orange",1),("Orange",1))

// Converting collection to RDD.
val dataRDD = sparkContext.parallelize(data)

// Let's print some data.
dataRDD.collect.foreach(println)

// Output
(Apple,1)
(Banana,1)
(Apple,1)
(Apple,1)
(Orange,1)
(Orange,1)

After creating a pair RDD, we will collect all values corresponding to the same key using groupByKey method.

// Calling transformation on Pair RDD.
val groupRDD = dataRDD.groupByKey()

// Let's print some data.
groupRDD.collect.foreach(println)

// Output
(Apple,CompactBuffer(1, 1, 1))
(Banana,CompactBuffer(1))
(Orange,CompactBuffer(1, 1))

In the above example, groupByKey function grouped all values with respect to a single key. Unlike reduceByKey it doesnโ€™t perform any operation on final output.

As a result , It just groups the data and returns in the form of an iterator. We can use this iterator to convert it to any collection like a List or a Set.

For example, we will sum all occurrences of an individual word. We will convert the CompactBuffer to a List as shown below.

// Converting a CompactBuffer to a List to calculate the sum.
val wordCountRDD = groupRDD.map(tuple => (tuple._1, tuple._2.toList.sum))
wordCountRDD.collect.foreach(println)

// Output
(Apple,3)
(Banana,1)
(Orange,2)

Performance Considerations

While working with a large dataset, we must prefer reduceByKey over groupByKey. While both of these functions will produce the correct answer, the reduceByKey  works much better on a large dataset.

The reason is Spark knows it can combine output with a common key on each partition before shuffling the data.

groupByKey transformation in Apache Spark
groupByKey transformation in Apache Spark

Complete Code

Finally lets look into the complete example which is also present in our Github repository mentioned below

 https://github.com/proedu-organisation/spark-scala-examples/blob/main/src/main/scala/rdd/transformations/GroupByKeyExample.scala

import org.apache.spark.sql.SparkSession
object GroupByKeyExample extends App {
  // Creating a SparkContext object.
  val sparkContext = SparkSession.builder()
    .master("local[*]")
    .appName("Proedu.co examples")
    .getOrCreate()
    .sparkContext
  val data = Seq(("Apple", 1), ("Banana", 1), ("Apple", 1), ("Apple", 1), ("Orange", 1), ("Orange", 1))
  val dataRDD = sparkContext.parallelize(data)
  // Let's print some data.
  dataRDD.collect.foreach(println)
  /** Output
   * (Apple,1)
   * (Banana,1)
   * (Apple,1)
   * (Apple,1)
   * (Orange,1)
   * (Orange,1)
   */
  // Calling groupByKey transformation in Pair RDD.
  val groupRDD = dataRDD.groupByKey()
  // Let's print some data.
  groupRDD.collect.foreach(println)
  /** Output
   * (Apple,CompactBuffer(1, 1, 1))
   * (Banana,CompactBuffer(1))
   * (Orange,CompactBuffer(1, 1))
   */
  val wordCountRDD = groupRDD.map(tuple => (tuple._1, tuple._2.toList.sum))
  wordCountRDD.collect.foreach(println)
  /**
   * (Apple,3)
   * (Banana,1)
   * (Orange,2)
   */
}

Happy Learning ๐Ÿ™‚