Here we are going to discuss mapPartitions ,mapPartitionsWithIndex and filter operation.

(a) MapPartitions: This transformation is similar to map,but runs separately on each partition (block) of the RDD. mapPartitions() can be used as an alternative to map() and foreach().

Q-1 How to iterate each partitions using mapPartitions transformation and convert each element in RDD like (value,1)?

data_rdd.mapPartitions{partition=>{
val flat_rdd=partition.flatMap(x=>(x.split(” “)))
flat_rdd.map(y=>(y,1))
}}.collect().foreach(println)
//Output
//(The,1)
//(Sun,1)
//(rises,1)
//(in,1)
//(the,1)
//(East,1)
//(and,1)
//..
//..

Q-2 How to sum of the numbers in each partitions using mapPartitions transformation?

val data = spark.sparkContext.parallelize(List(10,11,12,13,14), 2) //Defined two partitions
def mapPartitionSum(numbers : Iterator[Int]) : Iterator[Int] =
{
var sum = 0
while(numbers.hasNext)
{
sum = sum + numbers.next()
}
return Iterator(sum)
}

data.mapPartitions(mapPartitionSum).collect().foreach(println)

//Output – 21,39
//One partition will have 10 and 11 numbers and second one has 12,13,14 numbers .

(b) MapParitionsWithIndex :  This transformation is similar to map Partitions, but also provides func with an integer value representing the index of the partition.

Q-3:How to enable partition indexing to the each element in RDD using mapPartitionsWithindex transformation?

val cars_rdd=spark.sparkContext.parallelize(List(“Audi”,”Verna”,”BMW”,”Baleno”,”Mercedes-Benz”,”Ferrari”),4)
//There will be four partitions

cars_rdd.mapPartitionsWithIndex{
(index,iterator)=>{
val cars_list=iterator.toList
cars_list.map(x=>(x+”–>”+index)).iterator
}
}.collect().foreach(println)

//Output- Here 0,1,2,3 representing the partitions index
//Audi–>0
//Verna–>1
//BMW–>1
//Baleno–>2
//Mercedes-Benz–>3
//Ferrari–>3

(c) Filter :  This transformation returns a new dataset formed by selecting those elements of the source on which function returns true.

Q-4:How many times the keyword “Sun” come in the input dataset ?

//Input data.txt contains
//The Sun rises in the East and sets in the West.
//The Sun sets in the West and rises in the East

val data_rdd=spark.sparkContext.textFile(“/FileStore/tables/data.txt”)
val flat_rdd=data_rdd.flatMap(x=>x.split(” “))
flat_rdd.filter(x=>(x==”Sun”)).count()
//Output 2

$${}$$