Page Contents
Hello Friends. In this post we will learn what is the difference between Repartition and Coalesce In Apache Spark. Spark partitions the RDD and distribute it on multiple worker nodes so that multiple tasks can read or process the data in parallel.
You may also face some situations where you need to manually modify the partitions to tune your spark program or run spark applications efficiently. Lets understand the basic Repartition and Coalesce functionality and their differences.
Understanding Repartition
Repartition is a way to reshuffle ( increase or decrease ) the data in the RDD randomly to create either more or fewer partitions. This method shuffles whole data over the network into multiple partitions and also balance it across them.
Understanding Coalesce
Coalesce is also a way to reshuffle the data but it can only decrease the number of partitions in the RDD. Coalesce shuffles the data using Hash Partitioner (Default) and adjusts them into existing partitions. Its better in terms of performance as it avoids the full shuffle.
Keep in mind that repartitioning your data is a fairly expensive operation. Spark also has an optimized version of repartition() called coalesce() that allows minimizing data movement, but only if you are decreasing the number of RDD partitions.
Partitioning the data in RDD
RDD – repartition()
RDD repartition method can increase or decrease the number of partitions. Lets start by reading the customers data as shown below. The RDD has two partitions. The dataset can be download from our Github repo mentioned below
scala> val customers = sc.textFile("C:\\Users\\proedu\\input\\customers-json")
customers: org.apache.spark.rdd.RDD[String] = C:\Users\proedu\input\customers-json MapPartitionsRDD[8] at textFile at <console>:24
scala> customers.getNumPartitions
res9: Int = 2 // Initial number of partitions is 2.
Lets increase the partitions using repartition method as shown below
scala> val custNew = customers.repartition(10)
custNew: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at repartition at <console>:25
scala> custNew.getNumPartitions
res1: Int = 10 // Repartition method has changed the partitions to 10.
If we try to save the RDD as text file it will create ten part files .
scala> custNew.saveAsTextFile("C:\\Users\\proedu\\output\\customers-json")
RDD – coalesce()
RDD coalesce method can only decrease the number of partitions. As stated earlier coalesce is the optimized version of repartition
. Lets try to reduce the partitions of custNew RDD (created above) from 10 partitions to 5 partitions using coalesce method.
scala> custNew.getNumPartitions
res4: Int = 10
scala> val custCoalesce = custNew.coalesce(5)
custCoalesce: org.apache.spark.rdd.RDD[String] = CoalescedRDD[7] at coalesce at <console>:25
scala> custCoalesce.getNumPartitions
res5: Int = 5
If we try to save the custCoalesce RDD as text file, it will create five part files .
scala> custCoalesce.saveAsTextFile("C:\\Users\\proedu\\output\\customers-json-new")
We can also provide the partitioning information in RDD at the time of creating them. Below methods provides additional arguments for partitioning
def parallelize[T](seq: Seq[T],numSlices: Int)
def textFile(path: String,minPartitions: Int)
def wholeTextFiles(path: String,minPartitions: Int)
Partitioning the data in Dataframe
Dataframe- repartition()
Lets talk about repartition and coalesce in spark Dataframes. Similar to RDD, DataFrame repartition method can increase or decrease the partitions. Lets start by reading the customers data as a Dataframe.
scala> val custDF = spark.read.json("C:\\Users\\proedu\\input\\customers-json")
custDF: org.apache.spark.sql.DataFrame = [customer_city: string, customer_email: string ... 7 more fields]
Lets check the partitions in the custDF dataframe as below.
scala> data.rdd.getNumPartitions
res6: Int = 1 // Initial number of partitions is 1.
Now we will increase the partitions to ten using repartition method as shown below. Note that the repartition method will cause full shuffling of data across all the partitions.
scala> val custDFNew = custDF.repartition(10)
scala> custDFNew.rdd.getNumPartitions
res8: Int = 10 // Repartition method has changed the partitions to 10.
Dataframe- coalesce()
Dataframe coalesce method can only decrease the number of partitions. Lets try to reduce the partitions of custDFNew Dataframe (created above) from 10 partitions to 5 partitions using coalesce method.
scala> val custCoalesce = custDFNew.coalesce(5)
custCoalesce: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [customer_city: string, customer_email: string ... 7 more fields]
scala> custCoalesce.rdd.getNumPartitions
res2: Int = 5 // Coalesce method has changed the partitions to 5.
Which one is faster repartition or coalesce ?
Lets compare the execution time of repartition and coalesce method. We have a dataframe with 20 partitions as shown below.
scala> custDFNew.rdd.getNumPartitions
res3: Int = 20 // Dataframe has 20 partitions.
scala> custDFNew.count
res6: Long = 12435 // Total records in Dataframe.
Starting from Spark2+
we can use spark.time(<command>)
(only in scala until now) to get the time taken to execute the action/transformation. We will reduce the partitions to 5 using repartition and coalesce methods
scala> spark.time(custDFNew.repartition(5))
Time taken: 2 ms
res4: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]...
scala> spark.time(custDFNew.coalesce(5))
Time taken: 1 ms
res5: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]...
We can clearly see that the coalesce method has taken half the time taken by repartition.