Page Contents
Hello Friends. In this post we will see several examples of how to create a Spark dataframe in different ways. You might also be interested in our RDD tutorials like map, flatmap, reduceByKey.
Data Preparation
First of all, we will create some sample data and read that as RDD.
// Lets prepare some data.
val records = List( ("Prakash", "30", "Male", "Uttrakhand"),
("Amit", "35", "Male", "Bangalore"),
("Adarsh", "36", "Male", "Haryana"),
("Anuj", "37", "Male", "Delhi"))
// Creating a SparkSession object.
val spark = SparkSession.builder()
.master("local[*]")
.appName("Proedu.co examples")
.getOrCreate()
import spark.implicits._
val rdd = spark.sparkContext.parallelize(records)
// Let's print some records.
rdd.collect.foreach(println)
(Prakash,30,Male,Uttrakhand)
(Amit,35,Male,Bangalore)
(Adarsh,36,Male,Haryana)
(Anuj,37,Male,Delhi)
After reading the data as an RDD, we will convert it to a dataframe in different ways.
Spark Create DataFrame from RDD
Using toDF() function
Lets convert the RDD to a Dataframe using toDF function as mentioned below
val df1 = rdd.toDF()
df1.show()
// Output
+-------+---+----+----------+
| _1| _2| _3| _4|
+-------+---+----+----------+
|Prakash| 30|Male|Uttrakhand|
| Amit| 35|Male| Bangalore|
| Adarsh| 36|Male| Haryana|
| Anuj| 37|Male| Delhi|
+-------+---+----+----------+
If you notice the output above, the column names are _1, _2 and so on. toDF() fucntion comes in another flavor toDF(colNames: String*) which allow us to provide the logical column names. We will also print the default schema of the dataframe columns using printSchema function.
Using toDF() function
val df2 = rdd.toDF("Name", "Age", "Gender", "Location")
df2.show()
// Output
+-------+---+------+----------+
| Name|Age|Gender| Location|
+-------+---+------+----------+
|Prakash| 30| Male|Uttrakhand|
| Amit| 35| Male| Bangalore|
| Adarsh| 36| Male| Haryana|
| Anuj| 37| Male| Delhi|
+-------+---+------+----------+
df2.printSchema()
// Output
root
|-- Name: string (nullable = true)
|-- Age: string (nullable = true)
|-- Gender: string (nullable = true)
|-- Location: string (nullable = true)
From above code and output, we can see that now Dataframe columns have a meaningful name. Also if you see the schema , all columns have a default StringType. What if we want to provide an explicit schema to the columns. We can use createDataFrame() function which accepts an RDD and a Schema to create dataframe.
Using createDataFrame() with Schema
To use createDataFrame function, we will first create a schema using ScructType as shown below
// We will specify Age column as Integer and all other columns as String.
val schema = StructType(Array(
StructField("Name", StringType, true),
StructField("Age", IntegerType, true),
StructField("Gender", StringType, true),
StructField("Location", StringType, true)))
Also we need to convert our RDD to RDD[Row]. We can use a map transformation function for the same.
// We have input RDD where each element is a tuple containing four values. We have used a map transformation and converted the input RDD to a org.apache.spark.sql.Row object.
rdd.map(tuple => Row(tuple._1, tuple._2.toInt, tuple._3, tuple._4))
The complete solution will look as mentioned below
val df3 = spark.createDataFrame(rdd.map(tuple => Row(tuple._1, tuple._2.toInt, tuple._3, tuple._4)), schema)
df3.show()
// Output
+-------+---+------+----------+
| Name|Age|Gender| Location|
+-------+---+------+----------+
|Prakash| 30| Male|Uttrakhand|
| Amit| 35| Male| Bangalore|
| Adarsh| 36| Male| Haryana|
| Anuj| 37| Male| Delhi|
+-------+---+------+----------+
df3.printSchema()
// Output
root
|-- Name: string (nullable = true)
|-- Age: integer (nullable = true)
|-- Gender: string (nullable = true)
|-- Location: string (nullable = true)
If you notice the above output, the schema mentions the column “Age” as Integer and not String. Till not we have seen how an existing RDD can be converted to a Dataframe. Now let’s understand how we can create Dataframe from a local scala collection. We will use toDF() function to achieve the same.
Create DataFrame from local scala collection
Create DataFrame from scala list using toDF
As a part of data preparation, we created a records list. Here we will convert that list to a Dataframe using toDF()
// Create DataFrame from scala list.
val df4 = records.toDF("Name", "Age", "Gender", "Location")
df4.show()
// Output
+-------+---+------+----------+
| Name|Age|Gender| Location|
+-------+---+------+----------+
|Prakash| 30| Male|Uttrakhand|
| Amit| 35| Male| Bangalore|
| Adarsh| 36| Male| Haryana|
| Anuj| 37| Male| Delhi|
+-------+---+------+----------+
Similarly we can use createDataframe function to create a Dataframe from a list.
Create DataFrame from scala list using createDataframe
// Create dataframe from local collection
spark.createDataFrame(records)
.toDF("Name", "Age", "Gender", "Location")
.show()
// Output
+-------+---+------+----------+
| Name|Age|Gender| Location|
+-------+---+------+----------+
|Prakash| 30| Male|Uttrakhand|
| Amit| 35| Male| Bangalore|
| Adarsh| 36| Male| Haryana|
| Anuj| 37| Male| Delhi|
+-------+---+------+----------+
Create Dataframe from structured data sources
Creating Spark DataFrame from CSV
We can use spark.read.csv to create a Dataframe from CSV file.
// Creating DataFrame from CSV
spark.read
.csv("src/main/resources/retail_db/customers")
.toDF("customer_id", "customer_fname", "customer_lname", "email", "password", "street", "city", "state", "zip")
.show(10)
// Output
+-----------+--------------+--------------+---------+---------+--------------------+-----------+-----+-----+
|customer_id|customer_fname|customer_lname| email| password| street| city|state| zip|
+-----------+--------------+--------------+---------+---------+--------------------+-----------+-----+-----+
| 1| Richard| Hernandez|XXXXXXXXX|XXXXXXXXX| 6303 Heather Plaza|Brownsville| TX|78521|
| 2| Mary| Barrett|XXXXXXXXX|XXXXXXXXX|9526 Noble Embers...| Littleton| CO|80126|
| 3| Ann| Smith|XXXXXXXXX|XXXXXXXXX|3422 Blue Pioneer...| Caguas| PR|00725|
| 4| Mary| Jones|XXXXXXXXX|XXXXXXXXX| 8324 Little Common| San Marcos| CA|92069|
| 5| Robert| Hudson|XXXXXXXXX|XXXXXXXXX|10 Crystal River ...| Caguas| PR|00725|
| 6| Mary| Smith|XXXXXXXXX|XXXXXXXXX|3151 Sleepy Quail...| Passaic| NJ|07055|
| 7| Melissa| Wilcox|XXXXXXXXX|XXXXXXXXX|9453 High Concession| Caguas| PR|00725|
| 8| Megan| Smith|XXXXXXXXX|XXXXXXXXX|3047 Foggy Forest...| Lawrence| MA|01841|
| 9| Mary| Perez|XXXXXXXXX|XXXXXXXXX| 3616 Quaking Street| Caguas| PR|00725|
| 10| Melissa| Smith|XXXXXXXXX|XXXXXXXXX|8598 Harvest Beac...| Stafford| VA|22554|
+-----------+--------------+--------------+---------+---------+--------------------+-----------+-----+-----+
We can also read the entire data as a Text file using spark.read.text method. This method reads the entire row as a single String line.
// Creating DataFrame from text file
spark.read
.text("src/main/resources/retail_db/customers")
.show(10, false)
// Output
+--------------------------------------------------------------------------------+
|value |
+--------------------------------------------------------------------------------+
|1,Richard,Hernandez,XXXXXXXXX,XXXXXXXXX,6303 Heather Plaza,Brownsville,TX,78521 |
|2,Mary,Barrett,XXXXXXXXX,XXXXXXXXX,9526 Noble Embers Ridge,Littleton,CO,80126 |
|3,Ann,Smith,XXXXXXXXX,XXXXXXXXX,3422 Blue Pioneer Bend,Caguas,PR,00725 |
|4,Mary,Jones,XXXXXXXXX,XXXXXXXXX,8324 Little Common,San Marcos,CA,92069 |
|5,Robert,Hudson,XXXXXXXXX,XXXXXXXXX,10 Crystal River Mall ,Caguas,PR,00725 |
|6,Mary,Smith,XXXXXXXXX,XXXXXXXXX,3151 Sleepy Quail Promenade,Passaic,NJ,07055 |
|7,Melissa,Wilcox,XXXXXXXXX,XXXXXXXXX,9453 High Concession,Caguas,PR,00725 |
|8,Megan,Smith,XXXXXXXXX,XXXXXXXXX,3047 Foggy Forest Plaza,Lawrence,MA,01841 |
|9,Mary,Perez,XXXXXXXXX,XXXXXXXXX,3616 Quaking Street,Caguas,PR,00725 |
|10,Melissa,Smith,XXXXXXXXX,XXXXXXXXX,8598 Harvest Beacon Plaza,Stafford,VA,22554|
+--------------------------------------------------------------------------------+
So far so good. Let us see how we can read the JSON data as a Dataframe. We will read the JSON data and will only select few columns using select method.
// Create Dataframe from JSON file.
spark.read
.json("src/main/resources/retail_db/customers-json")
.select("customer_city", "customer_fname", "customer_lname")
.show(10)
// Output
+-------------+--------------+--------------+
|customer_city|customer_fname|customer_lname|
+-------------+--------------+--------------+
| Brownsville| Richard| Hernandez|
| Littleton| Mary| Barrett|
| Caguas| Ann| Smith|
| San Marcos| Mary| Jones|
| Caguas| Robert| Hudson|
| Passaic| Mary| Smith|
| Caguas| Melissa| Wilcox|
| Lawrence| Megan| Smith|
| Caguas| Mary| Perez|
| Stafford| Melissa| Smith|
+-------------+--------------+--------------+
Finally let’s see how we can read a parquet file as a Dataframe using spark.read.parquet
// Create Dataframe from Parquet file.
spark.read
.parquet("src/main/resources/retail_db/orders_parquet")
.show(10)
// Output
+--------+-------------+-----------------+---------------+
|order_id| order_date|order_customer_id| order_status|
+--------+-------------+-----------------+---------------+
| 1|1374710400000| 11599| CLOSED|
| 2|1374710400000| 256|PENDING_PAYMENT|
| 3|1374710400000| 12111| COMPLETE|
| 4|1374710400000| 8827| CLOSED|
| 5|1374710400000| 11318| COMPLETE|
| 6|1374710400000| 7130| COMPLETE|
| 7|1374710400000| 4530| COMPLETE|
| 8|1374710400000| 2911| PROCESSING|
| 9|1374710400000| 5657|PENDING_PAYMENT|
| 10|1374710400000| 5648|PENDING_PAYMENT|
+--------+-------------+-----------------+---------------+
I hope you guys have enjoyed the article. Happy learning 🙂
Nice Article Sir!
Could you please explain how to apply compression while saving CSV data to HDFS ?
Thanks!!!
Great Article Sir.