Apache Spark RDD groupByKey transformation
In our previous posts we talked about map and flatMap and filter functions. In this post we will learn RDD’s groupByKey transformation in Apache Spark.
As per Apache Spark documentation, groupByKey([numPartitions]) is called on a dataset of (K, V) pairs, and returns a dataset of (K, Iterable) pairs. It’s an expensive operation and consumes lot of memory if dataset is large. There are three variants –
- First variant def groupByKey(): RDD[(K, Iterable[V])] groups the values for each key in the RDD into a single sequence.
- Second variant def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] also the number of partitions in result RDD.
- In the third variant def groupByKey(partitioner): RDD[(K, Iterable[V])] we can provide the partitioner.
Let’s start with a very simple example. First we will create a pair RDD as shown below.
// Local scala collection containing tuples/ Key-Value pair. val data = Seq(("Apple",1),("Banana",1),("Apple",1),("Apple",1),("Orange",1),("Orange",1)) // Converting collection to RDD. val dataRDD = sparkContext.parallelize(data) // Let's print some data. dataRDD.collect.foreach(println) // Output (Apple,1) (Banana,1) (Apple,1) (Apple,1) (Orange,1) (Orange,1)
After creating a pair RDD, we will collect all values corresponding to the same key using groupByKey method.
// Calling transformation on Pair RDD. val groupRDD = dataRDD.groupByKey() // Let's print some data. groupRDD.collect.foreach(println) // Output (Apple,CompactBuffer(1, 1, 1)) (Banana,CompactBuffer(1)) (Orange,CompactBuffer(1, 1))
In the above example, groupByKey function grouped all values with respect to a single key. Unlike reduceByKey it doesn’t perform any operation on final output.
As a result , It just groups the data and returns in the form of an iterator. We can use this iterator to convert it to any collection like a List or a Set.
For example, we will sum all occurrences of an individual word. We will convert the CompactBuffer to a List as shown below.
// Converting a CompactBuffer to a List to calculate the sum. val wordCountRDD = groupRDD.map(tuple => (tuple._1, tuple._2.toList.sum)) wordCountRDD.collect.foreach(println) // Output (Apple,3) (Banana,1) (Orange,2)
Performance Considerations
While working with a large dataset, we must prefer reduceByKey over groupByKey. While both of these functions will produce the correct answer, the reduceByKey
works much better on a large dataset.
The reason is Spark knows it can combine output with a common key on each partition before shuffling the data.
Complete Code
Finally lets look into the complete example which is also present in our Github repository mentioned below
import org.apache.spark.sql.SparkSession object GroupByKeyExample extends App { // Creating a SparkContext object. val sparkContext = SparkSession.builder() .master("local[*]") .appName("Proedu.co examples") .getOrCreate() .sparkContext val data = Seq(("Apple", 1), ("Banana", 1), ("Apple", 1), ("Apple", 1), ("Orange", 1), ("Orange", 1)) val dataRDD = sparkContext.parallelize(data) // Let's print some data. dataRDD.collect.foreach(println) /** Output * (Apple,1) * (Banana,1) * (Apple,1) * (Apple,1) * (Orange,1) * (Orange,1) */ // Calling groupByKey transformation in Pair RDD. val groupRDD = dataRDD.groupByKey() // Let's print some data. groupRDD.collect.foreach(println) /** Output * (Apple,CompactBuffer(1, 1, 1)) * (Banana,CompactBuffer(1)) * (Orange,CompactBuffer(1, 1)) */ val wordCountRDD = groupRDD.map(tuple => (tuple._1, tuple._2.toList.sum)) wordCountRDD.collect.foreach(println) /** * (Apple,3) * (Banana,1) * (Orange,2) */ }
Happy Learning 🙂