Understanding Apache Spark Map transformation

  • Post category:Spark
  • Reading time:5 mins read

Understanding Spark RDD’s Map transformation

map RDD function in Apache Spark
Map() function in Apache Spark.

In this post we will talk about the RDD Map transformation in Apache Spark. RDD ( Resilient Distributed Dataset   ) is the most basic building block in Apache Spark. RDD is a collection of objects that is partitioned and distributed  across nodes in a cluster

As per Apache Spark documentation, map(func) transformation returns a new distributed dataset formed by passing each element of the source through a function func. For example we have an RDD of numbers as mentioned below.

// Creating RDD contains numbers.
val numRDD = sc.parallelize(1 to 10)
numRDD.collect.foreach(println)
1
2
3
4
5
6
7
8
9
10

After applying the transformation numRDD.map(num => num +1) to above RDD, each element is incremented by 1.

// Applying map function on numRDD.
val newRDD = numRDD.map(num => num + 1)
newRDD.collect.foreach(println)
2
3
4
5
6
7
8
9
10
11

Let’s take one more example. Here we will read orders dataset present in our Github repository. The data looks like below

1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE
4,2013-07-25 00:00:00.0,8827,CLOSED
5,2013-07-25 00:00:00.0,11318,COMPLETE

Let’s identify all distinct order statuses like CLOSED, COMPLETE.

In the first step, we will read the dataset using SparkContext’s textFile method.

// Creating a SparkContext object.
val sparkContext = SparkSession.builder()
  .master("local[*]")
  .appName("Proedu.co examples")
  .getOrCreate()
  .sparkContext

// Reading the Orders file. File may be present in HDFS.
val ordersRDD = sparkContext.textFile(file)

// Let's print some records.
ordersRDD.take(10).foreach(println)

// Output
1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE
4,2013-07-25 00:00:00.0,8827,CLOSED
5,2013-07-25 00:00:00.0,11318,COMPLETE

textFile method returns an RDD where every record is a String value containing four fields separated by Comma. We will split the record by comma separator as shown below.

val statusRDD = ordersRDD.map(order => order.split(",")(3))
statusRDD.collect.foreach(println)

// Output
COMPLETE
COMPLETE
COMPLETE
PENDING
PENDING_PAYMENT
PROCESSING
CLOSED

Here we are first splitting each record by comma. Split method returns an Array of String so we are using order.split(“,”)(3) to fetch the last element of the array, which is the order status in our case.

Finally we apply distinct function on statusRDD which returns a new RDD containing the distinct elements.

// Applying distinct function on RDD.
val distinctRDD = statusRDD.distinct()
distinctRDD.collect.foreach(println)

// Output
PENDING_PAYMENT
CLOSED
CANCELED
PAYMENT_REVIEW
PENDING
ON_HOLD
PROCESSING
SUSPECTED_FRAUD
COMPLETE

Complete Code

The complete code is present in our Github repository https://github.com/proedu-organisation/spark-scala-examples/blob/main/src/main/scala/rdd/transformations/MapExample2.scala

import org.apache.spark.sql.SparkSession
object MapExample2 extends App {
  val file = "src/main/resources/retail_db/orders"
  // Creating a SparkContext object.
  val sparkContext = SparkSession.builder()
    .master("local[*]")
    .appName("Proedu.co examples")
    .getOrCreate()
    .sparkContext
  val ordersRDD = sparkContext.textFile(file)
  ordersRDD.take(10).foreach(println)
  val statusRDD = ordersRDD.map(order => order.split(",")(3))
  statusRDD.collect.foreach(println)
  val distinctRDD = statusRDD.distinct()
  distinctRDD.collect.foreach(println)
}

Happy Learning 🙂