After jumbling around with some Spark DataFrame functions, operations, and creation, let’s catch upon doing Analysis on a particular dataset.

These days, we are all fighting against Corona #COVID-19. So I opt for the COVID19 Dataset where we have columns depicting the number of cases, deaths, and other fields.

Question 1: Read the covid file and find the columns names and corresponding datatypes?


// Read the file
val df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("hdfs://localhost:9000/tables/covid.csv")
// View Dataframe
df.show()
//View Schema
df.printSchema()

When we set “inferschema” to true, it automatically identifies the data type of the columns.

You can download the CSV file from the given link: covid.csv

Question 2: Extract date,month and year from the date column and add the extracted values into the dataframe as separate columns?


Let’s convert into proper date format i.e YYYY-MM-DD using to_date function. After achieving the correct format of the date, we’ll extract day, month, and year using dayofmonth(), month(), and year() function which is pre-defined.

val df_date=df.withColumn("Date",to_date(col("dateRep"), "dd/MM/yyyy"))

df_date.withColumn("Day",dayofmonth(col("Date"))).withColumn("Month",month(col("Date"))).withColumn("Year",year(col("Date"))).show()

Question 3: Analyse the cases month wise. Which month has maximum cases and deaths?


To analyze cases based on months, we’ll use the df_date dataframe created in the previous question where we extracted Day, Month, and year.

val df_ver1=df_date.withColumn("Day",dayofmonth(col("Date"))).withColumn("Month",month(col("Date"))).withColumn("Year",year(col("Date")))
// View dataframe
df_ver1.show()
// Group by using Month, sorting the total cases and deaths and getting the first one
print("[Number of month with Maximum Cases and Deaths,Total_Cases,Total_Deaths]: ")
print(df_date1.groupBy("Month").agg(sum("cases").as("Total_Cases"),sum("deaths").as("Total_Deaths")).sort(desc("Total_Cases"),desc("Total_Deaths")).first())

After doing a group by and sorting, we achieved

And then took the first one,

Question 4: Which countriesAndTerritories has maximum cases?


First, we need to figure out total cases in each country and then find the maximum one by applying the sort function to get the descending list and take up the first value.

val df_new=df.groupBy("countriesAndTerritories").agg(sum("cases").as("Total_Cases"))
//View Dataframe
df_new.show()
// Get the country with max cases
df_new.sort(desc("Total_Cases")).first()

Question 5: Which countriesAndTerritories has maximum deaths?


val df_death=df.groupBy("countriesAndTerritories").agg(sum("deaths").as("Total_Deaths"))
df_death.show()
df_death.sort(desc("Total_Deaths")).first()

Question 6: Find countries in each continent which has maximum deaths?


Step 1: We will group by using country and continent and find the total death numbers in each country and name it as a new dataframe(df_death)

Step 2: We’ll find the maximum number of death in each country and name it as another dataframe(df_new).

Step 3: Apply the inner join between the two with Country and death cases conditions and select the required column.

val df_death=df.groupBy("continentExp","countriesAndTerritories").agg(sum("deaths").as("Total_Deaths")).sort("continentExp")
df_death.show()

val df_new=df_death.groupBy("continentExp").agg(max("Total_Deaths").as("Max_Deaths"))
df_new.show()

df_death.join(df_new,(df_death("Total_Deaths")===df_new("Max_Deaths")) &&      (df_death("continentExp")===df_new("continentExp")),"inner").select(df_death("continentExp"),df_death("countriesAndTerritories"),df_death("Total_Deaths")).show()

Question 7: How many distinct countries are present in the dataset?


Use dropDuplicates function to remove redundancy from the dataframe and then select the country column to check distinct countries and their count.

val df1=df.dropDuplicates("countriesAndTerritories")
df1.select("countriesAndTerritories").show()
// Count 
print("Distinct Counteries in Dataset" ,df1.count())

Question 8: Find the death percentage in each country using Spark udf ?


First, we’ll find out the total cases and total deaths in each country.

val df_per=df.groupBy("countriesAndTerritories").agg(sum("cases").as("Total_Cases"),sum("deaths").as("Total_Deaths"))
// View Dataframe
df_per.show()

The above-shown result has some top rows only. Now we’ll create a UDF (User Defined Function) and register it to apply. If you are not familiar with UDF, check out the link: Spark UDF

import org.apache.spark.sql.functions.{udf,col}
// UDF function
def percent(d:Int,t:Int):Float=
{
  (d.toFloat*100)/t
}
// Register UDF
val per=udf[Float,Int,Int](percent)
// Apply on dataframe
df_per.withColumn("Death_Percentage",per(col("Total_Deaths"),col("Total_Cases"))).show()

.toFloat is used to convert any data type to float (Typecasting), as one variable needs to be of float datatype to achieve float type division output.

Today we solved some questions and analyzed a real dataset. We learned the following concepts:

  • Udf function
  • GroupBy
  • sum
  • max
  • Inner Join
  • to_date
  • month,dayofmonth, year
  • sort

Thanks for giving it a read. It would be highly appreciable if you could leave some claps and comment on the blog below and share it if you found it useful.

Happy Sparking!!


0 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *

Insert math as
Block
Inline
Additional settings
Formula color
Text color
#333333
Type math using LaTeX
Preview
\({}\)
Nothing to preview
Insert