Have you ever wondered if we could apply joins on Spark Dataframes as we do on SQL tables? Would it be possible?

Woohoo!! You guessed it right. Here we have with us, a spark module called SPARK SQL for structured data processing. Spark SQL supports all kinds of SQL joins. Join generally means combining two or more tables to get one set of optimized result based on the condition provided.

Let’s learn different types of joins by applying Join Syntax on two or more dataframes:

  • Inner Join
  • LeftOuter Join
  • RightOuter Join
  • FullOuter Join
  • Left-Semi Join
  • Left-Anti Join
  • Cross Join
  • Self Join

So, now we create two dataframes namely “customer” and “order” having a common attribute as “Customer_Id”.

//Importing
import spark.implicits._
import spark.sqlContext.implicits._

// customer dataframe
val row=Seq((101,"Berlin","Delhi","7656438889"),(102,"Rio","Kolkata","9694345576"),(103,"Tokyo","Jaipur","8965458890"),(104,"Denver","Mumbai","9090676676") )
val customer= row.toDF("Customer_Id","Name","City","Mobile No.")
// View Dataframe
customer.show()

//order dataframe
val seq=Seq(("#1",102,"Rs.500"),("#2",101,"Rs.40"),("#3",103,"Rs.250"),("#4",105,"Rs.50"))
val order=seq.toDF("Order_Id","Customer_Id","Amount")
// View Dataframe
order.show()

1. INNER JOIN

Inner Join joins two dataframes on a common column and drops the rows where values don’t match. It uses comparison operator “===” to match rows. It returns back all the data that has a match on the join condition.

a) When both tables can have a different name of the common column

Using the below syntax, we can join tables having unlike name of the common column.

//Inner Join
customer.join(order,customer("Customer_Id") ===  order("Customer_Id"),"inner")
        .show()
b) When both tables have a similar common column name

The join function contains the table name as the first argument and the common column name as the second argument.

customer.join(order,"Customer_Id").show()

If we don’t provide Jointype, it takes default type as “inner”. Spark automatically remove one “Customer_Id” column to remove redundancy.

2. LEFT-OUTER JOIN

If we wish to append information about “orders” to the “customer” table whether a particular customer placed an order or not, we can leverage LeftOuter Join. It returns all records from the left table regardless of any match in the right table and for unmatched records, it returns NULL.

Here, we are joining on “Customer_Id” column. We can use the following ways to perform left-outer join:

// Left join
customer.join(order,customer("Customer_Id") ===  order("Customer_Id"),"left")
    .show()

                                                 OR
  
customer.join(order,customer("Customer_Id") ===  order("Customer_Id"),"leftouter")
    .show()        
                                                  
customer.join(order,Seq("Customer_Id"), "left").show()

Here, we have a unique “Customer_Id” column. ID 104 has entries NULL since the customer with ID 104 doesn’t make any order.

3. RIGHT-OUTER JOIN

Using the Right-outer join, we can get a list of all orders appended with customer information. All the rows from the right table are returned regardless of any match on the left side. It leaves that record from the left table where the match isn’t found.

//Right Join
customer.join(order,customer("Customer_Id") ===  order("Customer_Id"),"rightouter")
    .show()

                                        OR

customer.join(order,customer("Customer_Id") ===  order("Customer_Id"),"right")
    .show()
customer.join(order,Seq("Customer_Id"), "right").show()

ID 105 comes with NULL entries because the Customer table(left) doesn’t have a match for it.

4. FULL-OUTER JOIN

Full-outer join keeps a list of all records. It returns all rows from both dataframe and gives NULL when the join condition doesn’t match.

Use Case: To find which customer in all didn’t order anything, which could be identified by NULL entries.

customer.join(order,customer("Customer_Id") ===  order("Customer_Id"),"outer")
    .show()

                                       OR

customer.join(order,customer("Customer_Id") ===  order("Customer_Id"),"fullouter")
    .show()

                                       OR

customer.join(order,customer("Customer_Id") ===  order("Customer_Id"),"full")
    .show()

5. LEFT-SEMI JOIN

Left-semi is similar to Inner Join, the thing which differs is it returns records from the left table only and drops all columns from the right table. So, when the join condition is matched, it takes the record from the left table and if not matched, drops from both dataframe.

customer.join(order,customer("Customer_Id") ===  order("Customer_Id"),"leftsemi")
    .show()

If you look closely at the output, all the Customer_Id present are also there in the order table, rest all are ignored.

6. LEFT-ANTI JOIN

As the name appears “Anti”, it is exact opposite of left-semi. It returns columns from the left table only for the unmatched records.

customer.join(order,customer("Customer_Id") ===  order("Customer_Id"),"leftanti")
    .show()

This Customer_Id present in the output is not there in the right table (order). Thus, it’s the unmatched record.

7. CROSS JOIN

Cross Product Join is the Cartesian Product of two tables. It obtains all combinations.

If we have “m” number of rows in the first table and “n” number of rows in the second table. Then number of rows in output is equal to m*n.

But considering such large output, we should avoid this practice with Big Tables as it will generate out-of-memory-exception.

customer.crossJoin(order).show()

8. SELF JOIN

Joining two copies of the same table is called Self-join. In simpler terms, we join the dataframe with itself. While joining, we need to perform aliases to access the table and distinguish between them.

Aliases generally means to give another name to an object for reference.

We created a new dataframe to perform self join,

val rows=Seq((1,"Nelsinki",2),(2,"Associater",3),(3,"Manager",4))
// Create Dataframe
val df=rows.toDF("Employee_Id","Employee_Name","Dept_No")
// View Dataframe
df.show()

We have named dataframe as “one” and “sec” and applied join condition.

val selfdf=df.as("one").join(df.as("sec"),$"one.Employee_Id" === $"sec.Dept_No")
selfdf.show()

Now, to refer to columns and reduce redundant data, we select the required column and perform aliases.
.as is used to rename them.

selfdf.select($"sec.Employee_Name".as("employee"),$"one.Employee_Name".as("Manager_Name")).show()

Thank you for reading this far. Comment down for any correction/suggestion or you found any other way of joining. I hope you are liking the series.

Check out my other articles Creating-dataframe-in-spark and Spark-Aggregate-functions

Happy Joining!!

-Gargi Gupta


0 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *

Insert math as
Block
Inline
Additional settings
Formula color
Text color
#333333
Type math using LaTeX
Preview
\({}\)
Nothing to preview
Insert