Understanding Spark RDD’s Map transformation
In this post we will talk about the RDD Map transformation in Apache Spark. RDD ( Resilient Distributed Dataset ) is the most basic building block in Apache Spark. RDD is a collection of objects that is partitioned and distributed across nodes in a cluster
As per Apache Spark documentation, map(func) transformation returns a new distributed dataset formed by passing each element of the source through a function func. For example we have an RDD of numbers as mentioned below.
// Creating RDD contains numbers.
val numRDD = sc.parallelize(1 to 10)
numRDD.collect.foreach(println)
1
2
3
4
5
6
7
8
9
10
After applying the transformation numRDD.map(num => num +1) to above RDD, each element is incremented by 1.
// Applying map function on numRDD.
val newRDD = numRDD.map(num => num + 1)
newRDD.collect.foreach(println)
2
3
4
5
6
7
8
9
10
11
Let’s take one more example. Here we will read orders dataset present in our Github repository. The data looks like below
1,2013-07-25 00:00:00.0,11599,CLOSED 2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT 3,2013-07-25 00:00:00.0,12111,COMPLETE 4,2013-07-25 00:00:00.0,8827,CLOSED 5,2013-07-25 00:00:00.0,11318,COMPLETE
Let’s identify all distinct order statuses like CLOSED, COMPLETE.
In the first step, we will read the dataset using SparkContext’s textFile method.
// Creating a SparkContext object. val sparkContext = SparkSession.builder() .master("local[*]") .appName("Proedu.co examples") .getOrCreate() .sparkContext // Reading the Orders file. File may be present in HDFS. val ordersRDD = sparkContext.textFile(file) // Let's print some records. ordersRDD.take(10).foreach(println) // Output 1,2013-07-25 00:00:00.0,11599,CLOSED 2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT 3,2013-07-25 00:00:00.0,12111,COMPLETE 4,2013-07-25 00:00:00.0,8827,CLOSED 5,2013-07-25 00:00:00.0,11318,COMPLETE
textFile method returns an RDD where every record is a String value containing four fields separated by Comma. We will split the record by comma separator as shown below.
val statusRDD = ordersRDD.map(order => order.split(",")(3))
statusRDD.collect.foreach(println)
// Output
COMPLETE
COMPLETE
COMPLETE
PENDING
PENDING_PAYMENT
PROCESSING
CLOSED
Here we are first splitting each record by comma. Split method returns an Array of String so we are using order.split(“,”)(3) to fetch the last element of the array, which is the order status in our case.
Finally we apply distinct function on statusRDD which returns a new RDD containing the distinct elements.
// Applying distinct function on RDD. val distinctRDD = statusRDD.distinct() distinctRDD.collect.foreach(println) // Output PENDING_PAYMENT CLOSED CANCELED PAYMENT_REVIEW PENDING ON_HOLD PROCESSING SUSPECTED_FRAUD COMPLETE
Complete Code
The complete code is present in our Github repository https://github.com/proedu-organisation/spark-scala-examples/blob/main/src/main/scala/rdd/transformations/MapExample2.scala
import org.apache.spark.sql.SparkSession object MapExample2 extends App { val file = "src/main/resources/retail_db/orders" // Creating a SparkContext object. val sparkContext = SparkSession.builder() .master("local[*]") .appName("Proedu.co examples") .getOrCreate() .sparkContext val ordersRDD = sparkContext.textFile(file) ordersRDD.take(10).foreach(println) val statusRDD = ordersRDD.map(order => order.split(",")(3)) statusRDD.collect.foreach(println) val distinctRDD = statusRDD.distinct() distinctRDD.collect.foreach(println) }
Happy Learning 🙂