Hey there!! Have you heard about Data Lake or used any in your projects? Might be. Let me help you recall it.

Data Lake is a repository that helps to store structured and unstructured data, like HDFS, Azure Data Lake, or AWS S3.

But are you known to DELTA LAKE? If not let’s catch upon to dig in about Delta Lake in Spark.


Why is Delta Lake for Spark?

Let’s take a look at Spark documentation.

What did it say? It’s crucial to understand that Spark writes aren’t atomic. It might cause data loss/data corruption if there arises any exception during overwriting the files resulting in a failed job.

  • Spark doesn’t ensure ACID properties transactions
  • Missing Schema Enforcement
  • Data consistency isn’t certain
  • Difficult to handle Metadata ( which is itself BIGDATA)
ACID:

Atomicity -> Either all or nothing.

Consistency -> Data is always in Valid state.

Isolation -> An operation must be isolated from other concurrent operations.

Durability -> Once committed data is never lost.


How does the typical “Overwrite” saving mode works in Spark?

We are considering the worst case,


What is Delta Lake?

Delta lake is a project developed by Databricks which runs on top of an existing data lake. It is an open-source storage layer that brings ACID transactions to Apache Spark.

It helps to read and write data to Storage(files) introducing Commit log to Apache Spark and making the write operation atomic.

Photo by Knoldus


Features of Delta Lake:
  1. Provides Schema Enforcement: Avoid entering bad data by specifying schema and raising error before ingesting the data into the files.
  2. Supports both Batch and real-time processing.
  3. Maintains a transaction log (Commit log) to keep track of the various versions of files.
  4. Time Travel: Can rollback to the previous version of the file as the file never gets deleted and use them again.
  5. Delta table has an autorefresh feature.
  6. Provides reliability to the existing data lake.
  7. Delta lake treats Metadata like data with Scalable handling using Spark processing power.

All the data in Delta Lake is stored in Apache Parquet format.


How does “Overwrite” in Delta ensures ACID properties?


Implementation

Now, let’s code and see how it works!!

I am reading an Employee CSV file having Id, Name, Salary, and DeptNo as column names. You can download the file from here: Employee.csv

// Imports
import org.apache.spark.sql.functions._

val df=spark.read.options(Map("header"->"true","inferSchema"->"true")).csv("/FileStore/tables/Employee.csv")

df.show()

As we have the Dataframe ready, it’s easy to create Delta Table. Just specify the format as”delta” and the path to store the table. And view what we have in the given directory.

df.write.format("delta").save("/FileStore/delta")
// Display files in delta directory
display(dbutils.fs.ls("/FileStore/delta"))



In the delta directory, we have a parquet file of our Employee Delta Table and a folder named “_delta_log” where all information of the files are stored.

Let’s check _delta_log folder.

display(dbutils.fs.ls("/FileStore/delta/_delta_log"))



This is the first write to the table. So, we have 00000000000000000000.json file depicting the delta table information. It is auto incremental and increments for each action we perform on the table.

I have downloaded this file. Let’s me show you,

The file has commitInfo, metaData, protocol and add json.

The add json contains the path of the latest version file, statistics of the data and other fields.

Let’s perform the much awaited operation and check how delta lake ensures ACID properties.


DELETE OPERATION ON DELTA TABLE:

Take a look!! We are deleting all the rows from the table having deptNo as “B”. Let’s check how the table changes and what happens to _delta_log.

//Imports
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/FileStore/delta/")
//Deletion operation
deltaTable.delete("DeptNo = 'B'")
//View delta folder
display(dbutils.fs.ls("/FileStore/delta"))



Hey, we have two files i.e two versions of our file. Delete removes the data from the latest version of the delta table but doesn’t remove it from the physical storage. Let’s check the changes in the _delta_log folder.


display(dbutils.fs.ls("/FileStore/delta/_delta_log"))



We have 00000000000000000001.json  created in _delta_log. If we view its contents, we have remove json describing which path file needs to be removed.


Let’s check our latest version of delta table. After downloading the parquet file,

How to read parquet file from Command Line?

$ pip install parquet-cli

$parq path_of_file.parquet –head no_of_rows (Command to view file)

Use following command to view the file,

You can easily view that Employees with DeptNo as “B” are successfully deleted. 🙂


UPDATE OPERATION ON DELTA TABLE:

Let’s update the table and see the logs again. We are updating the DeptNo to “D” of those with “A”.

lit() function creates a column object to store literal value.

deltaTable.update(                
  col("DeptNo") === "A",
  Map("DeptNo" -> lit("D")))

display(dbutils.fs.ls("/FileStore/delta"))

display(dbutils.fs.ls("/FileStore/delta/_delta_log"))


Now, we have next delta log file named as 00000000000000000002.json. Have a look!!

All the rows with DeptNo as “B” got updated and set to “D”. Yeah!! We did it.

Download all the files from here: delta files

In today’s article, we learnt:

  • Why we use delta lake
  • How delta lake ensures ACID properties
  • How to update and delete from delta table

That’s all for the day. Stay tuned for Sparking!! Let me know in comment section, what you feel about it.

Happy Sparking!!

-Gargi Gupta


0 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *

Insert math as
Block
Inline
Additional settings
Formula color
Text color
#333333
Type math using LaTeX
Preview
\({}\)
Nothing to preview
Insert