Hey folks 🙂

You guys must to eager to know how can you update your existing Big Data files using Spark. So, we’ll try some hands-on on Spark Dataframe. After rigorously working on spark dataframes and visiting my previous articles, you must be familiar with the fact that:


IT’S DIFFICULT TO UPDATE THE EXISTING BIG DATA, WE SAY THAT BIG DATA IS FOR WRITE ONCE AND READ MULTIPLE TIMES

Awful!! What to do now?

Spark comes with some approaches to handle this issue. In today’s article, let’s learn to update Big data files with a demo involving both pros and cons of the proposed solution.


1. FOUR STEP STRATEGY

What is the requirement?

We have a base_table with initial records and an incremental_table (external) highlighting the changes that are needed to perform.

GOAL: Update the base table with incremental table.

The four steps strategy is a cyclic process. After accomplishing each cycle, we obtain up-to-date data/records.

Photo by Cloudera Community
A) Ingest

In the very first step, we load the data/tables or transfer it from RDBMS to Hadoop(Storage) using SQOOP in Apache Hive. Let’s load the base_table and incremental_table.

// Importing 
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

// Schema for both tables
val customSchema = StructType(Array(
  StructField("Id", IntegerType,false),
  StructField("Time", StringType, true),
  StructField("Name", StringType, true),
  StructField("Roll No", IntegerType, true))
)

// Read base table
val df=spark.read.options(Map("header"->"true")).schema(customSchema).csv("hdfs://localhost:9000/tables/base_table.csv")

// Convert date into proper format
val base=df.withColumn("Date",to_date(col("Time"), "MM/dd/yyyy"))
base.show()
// read incremental table
val df_inre=spark.read.options(Map("header"->"true")).schema(customSchema).csv("hdfs://localhost:9000/tables/increment_table.csv")

// Convert date into proper format
val incre=df_inre.withColumn("Date",to_date($"Time", "MM/dd/yyyy"))
incre.show()

You can download the files from given link: base.csv and incremental.csv


B) Reconcile

After ingesting the tables, we need to combine both of them to obtain up-to-date data utilizing unique Id or maximum time when the record is created or maybe, we can keep a column depicting either “Increment” data or “base” data. Then, we can easily update the table and generate a reconciled view.

val appended = base.union(incre)
appended.show()

We received the appended Dataframe. union() is used to merge data from two Dataframe into one.

Cautious: Dataframe of both tables must have same schema.

After merging, we need to overwrite base data with incremental data. We performed:

  • GroupBy on unique Id
  • Filter out the one with maximum date
val report=appended.groupBy("Id").agg(max("Date").as("Date"))
report.show()


C) Compact

Post combining the tables and generating a reconcile view, we’ll create a reporting table using the view. ( Specifically for Apache HIVE)

After each cycle, the reporting table gets truncated.


D) Purge
  • Replace base table with reporting table data.
  • Delete Incremental table records.

Here in Scala, we have inner joined the appended and report dataframe and sorted in accordance with Id.

val base=appended.join(report,appended("Date")===report("Date"),"inner").sort(report("Id"))
//Select required field
base.select(appended("Id"),appended("Name"),appended("Roll No"),appended("Date")).show()

Amazing 😛

We have achieved Updation in Big Data files using Spark. We obtained the overwritten data on the basis of the latest date.


2. APPEND SAVING MODE

Let’s create our own base file using Dataframe. write() method of Spark DataFrameWriter object can be used to write a Dataframe to CSV file.

// Importing
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val col=Seq("Id","Name","Roll No")
val row=Seq((1,"Bunny",5),(2,"Parth",30),(3,"Ronak",45))
val base=spark.createDataFrame(row).toDF(col:_*)
base.show()

// Create a CSV file into new directory
base.write.option("header","true")
 .csv("hdfs://localhost:9000/tables/new/")

So, we have base file ready. Let’s look upon increment table. Download the Increment file from here : increment.csv

Now, we’ll create a Dataframe from increment table to work further.

val increment=spark.read.options(Map("header"->"true")).csv("hdfs://localhost:9000/tables/increment.csv")
increment.show()

To combine both the tables, we’ll use Saving mode. Spark has a mode() method to specify SaveMode.

SaveMode. Append – Append the data of Dataframe to the existing file

// Append the data
increment.write.mode(SaveMode.Append).options(Map("header"->"true","inferSchema"->"true")).csv("hdfs://localhost:9000/tables/new/")

//Read the resultant CSV file
val result=spark.read.options(Map("header"->"true","inferSchema"->"true")).csv("hdfs://localhost:9000/tables/new/")
result.show()

Yeah!! we did it. That’s the appended table we’re looking for. :))

Well, now we are able to Update the tables in Spark. But this functionality comes with a demerit. Might be we get caught in an exception and the flow breaks and we lose the data. Also, Spark transactions aren’t ACID.

We’ll check in upcoming articles about all these issue and how to resolve them. Stay tuned!!

Thank you for reading this far. Try overwriting the tables using “SaveMode.Overwrite“. Have fun 😛

Let me know your views in the comment section.

Happy Sparking!!

-Gargi Gupta


3 Comments

Vikhyat · June 17, 2020 at 7:55 am

Thanks! And thanks for sharing your great posts every week!

Rashik · June 22, 2020 at 6:14 am

Thanks a lot for the article post.Much thanks again. Fantastic.

Gelya · June 28, 2020 at 8:29 am

Thank you for your blog post.Really thank you! Awesome.

Leave a Reply

Your email address will not be published. Required fields are marked *

Insert math as
Block
Inline
Additional settings
Formula color
Text color
#333333
Type math using LaTeX
Preview
\({}\)
Nothing to preview
Insert