In this article , we are going to discuss about GroupByKey, ReduceByKey and AggregateByKey.

(a) GroupByKey

  • On applying groupbyKey ,dataset of (K, V) pairs convert into a dataset of (K, Iterable) pairs.
  • Lots of unnecessary data transfer over the network.

In the above image, each keys and values are being transferred in the network.

Q-1 Find the total marks of each student using groupByKey transformation?

//Input dataset
// Joseph,Maths,99,100
// Joseph,English,85,100
// Bob,Chemistry,35,100
// Bob,Maths,33,100
// Bob,English,60,100
//..,..,..,..
//..,..,..,..
//Schema
//StudentName,SubjectName,ObtainedMarks,TotalMarks

val students_marks_rdd=spark.sparkContext.textFile(“/FileStore/tables/student_marks.csv”)

//Fetch StudentName and Marks only
val students_name_marks=students_marks_rdd.map{x=>{
val row=x.split(“,”)
(row(0),row(2).toInt)
}
}

val group = students_name_marks.groupByKey()
group.collect().foreach(println)
//Output
//(Tina,CompactBuffer(100, 100, 100, 100, 100, 100))
//(Jimmy,CompactBuffer(90, 92, 100, 81, 92))
//(Harry,CompactBuffer(40, 42, 52, 53, 20, 30))
//(Rocky,CompactBuffer(99, 98, 98, 99, 98, 99))
//(Stephanie,CompactBuffer(70, 72, 67, 80, 90, 86))
//(Ron,CompactBuffer(70, 72, 73, 78))
//(Williamson,CompactBuffer(90, 91, 92, 93))
//(Joseph,CompactBuffer(80, 82, 99, 85, 85, 95))
//(Bob,CompactBuffer(45, 35, 33, 60, 65))


val group_sum=group.map(x=>(x._1,x._2.sum))
group_sum.collect().foreach(println)
//Output
//(Tina,600)
//(Jimmy,455)
//(Harry,237)
//(Rocky,591)
//(Stephanie,465)
//(Ron,293)
//(Williamson,366)
//(Joseph,526)
//(Bob,238)

(b) ReduceByKey:  This transformation applying on the dataset of (K, V) pairs and returns a dataset of (K, V) pairs where the values for each key are aggregated in the particular node.

  • Used when combiner and reducer logic are same.
  • Used when input datatype and output datatypes are same.
  • Suitable to perform sum ,mix,max operations

Q-2:Find the total marks of each students using ReduceByKey transformation?

//Input dataset
// Joseph,Maths,99,100
// Joseph,English,85,100
// Bob,Chemistry,35,100
// Bob,Maths,33,100
// Bob,English,60,100
//..,..,..,..
//..,..,..,..
//Schema
//StudentName,SubjectName,ObtainedMarks,TotalMarks

val students_marks_rdd=spark.sparkContext.textFile(“/FileStore/tables/student_marks.csv”)

//Fetch StudentName and Marks only
val students_name_marks=students_marks_rdd.map{x=>{
val row=x.split(“,”)
(row(0),row(2).toInt)
}
}

val totalmarks_perStudent=students_name_marks.reduceByKey((accum,marks)=>accum+marks)
totalmarks_perStudent.collect().foreach(println)
//Output
// (Tina,600)
// (Harry,237)
// (Jimmy,455)
// (Rocky,591)
// (Stephanie,465)
// (Ron,293)
// (Williamson,366)
// (Joseph,526)
// (Bob,238)

(b) AggregateByKey :  This transformation is similar to map Partitions, but also provides func with an integer value representing the index of the partition.

  • Used when combiner logic and reducer logic are different.
  • Used when Input datatype and output datatypes are different.
  • ZeroValue indicates the accumulator.
  • SeqOp: Here sequential operation is an operation of finding maximum marks (operation at each partition level data)
  • CombOp: Here combiner operation is an operation of finding maximum marks from two values (operation on aggregated data of all partitions)

Q-3:Find the total marks and count the subjects of each student using aggregateByKey transformation?

//Input dataset
// Joseph,Maths,99,100
// Joseph,English,85,100
// Bob,Chemistry,35,100
// Bob,Maths,33,100
// Bob,English,60,100
//..,..,..,..
//..,..,..,..
//Schema
//StudentName,SubjectName,ObtainedMarks,TotalMarks

val students_marks_rdd=spark.sparkContext.textFile(“/FileStore/tables/student_marks.csv”)

//Fetch StudentName and Marks only
val students_name_marks=students_marks_rdd.map{x=>{
val row=x.split(“,”)
(row(0),row(2).toInt)
}
}

val aggregate_sum=students_name_marks.aggregateByKey((0,0))(
(valueCounter, number) => (valueCounter._1 + number, valueCounter._2 + 1),
(valueCounter, nextValueCounter) => (valueCounter._1 + nextValueCounter._1, valueCounter._2 + nextValueCounter._2)
)
aggregate_percentage.collect().foreach(println)
//Output
//(Tina,(600,6))
//(Harry,(237,6))
//(Jimmy,(455,5))
//(Rocky,(591,6))
//(Stephanie,(465,6))
//(Ron,(293,4))
//(Williamson,(366,4))
//(Joseph,(526,6))
//(Bob,(238,5))

What are the difference between GroupByKey,ReduceByKey and AggregateByKey?


0 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *

Insert math as
Block
Inline
Additional settings
Formula color
Text color
#333333
Type math using LaTeX
Preview
\({}\)
Nothing to preview
Insert