Nowadays companies are moving to cloud and keeping their data in to AWS S3 storage.

Amazon Simple Storage Service (Amazon S3) is a scalable, high-speed, web-based cloud storage service.

Lets discuss how Spark connect to S3. Here first we will read the file from s3 bucket as a rdd and apply few transformation on it and save transformed rdd into s3 again.

Step-1 Define the secret key, access key, s3 path:



val myAccessKey = “xxxxxxx”
val mySecretKey = “aaaaaaaa”
val filePath = “s3n://data-stats/input/log.parquet”
val outFilePath = “s3n://data-stats/output/”
//data-stats is bucket name of s3

In the above code,myAccessKey and mySecretKey are the credentials to access the S3,
It can be retrieved from AWS portal.

outFilePath="s3n://data-stats/input/log.parquet",Here s3n can be replaced to s3a. 

Now, one may raise a question: what is the difference between s3n and s3a?

The difference between s3n and s3a is:

  • s3 is a block-based overlay on top of Amazon S3, while s3n/s3a are not (they are object-based).
  • s3n supports objects up to 5GB in size, while s3a supports objects up to 5TB and has higher performance .
  • s3a is the successor to s3n.

Step-2:Define the Spark Session :



val spark = SparkSession
.builder()
.master(“local”)
.appName(“Spark AWS Example”)
.getOrCreate()

Step-3: Set the hadoop configurations :



val hadoopConf = spark.sparkContext.hadoopConfiguration hadoopConf.set(“fs.s3n.impl”, “org.apache.hadoop.fs.s3native.NativeS3FileSystem”)
hadoopConf.set(“fs.s3n.awsAccessKeyId”,myAccessKey)
hadoopConf.set(“fs.s3n.awsSecretAccessKey”,mySecretKey)

Step-4: Read parquet file as rdd :



val rdd_parquet=spark.SparkContext.textFile(filePath)

Step-5: Apply rdd Filter transformation :



val rdd_transformed = rdd_parquet.filter( line => line.contains(“Error”))

Step-6: Save the transformed RDD back into S3 :



rdd_transformed.saveAsTextFile(outFilePath)

There are below two dependencies that should be added in pom.xml:

org.apache.hadoop hadoop-aws 2.7.3

com.amazonaws aws-java-sdk 1.11.693

Lets consolidated above steps and below one is complete code to access s3 from spark code.

import org.apache.spark.sql.{ DataFrame, SparkSession }

object SparkAWS {
  val myAccessKey = "xxxxxxx"
  val mySecretKey = "aaaaaaaa"
  val filePath = "s3n://data-stats/input/log.parquet"
  val outFilePath = "s3n://data-stats/output/"
  //data-stats is bucket name of s3
  def main(args: Array[String]):Unit = {
  
   val spark = SparkSession
      .builder()
      .master("local")
      .appName("Spark S3 Example")
      .getOrCreate()

 
   val hadoopConf = spark.sparkContext.hadoopConfiguration hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
   hadoopConf.set("fs.s3n.awsAccessKeyId",myAccessKey)
   hadoopConf.set("fs.s3n.awsSecretAccessKey",mySecretKey)

   val rdd_parquet=spark.SparkContext.textFile(filePath)

   val rdd_transformed = rdd_parquet.filter( line => line.contains("Error"))
   
   rdd_transformed.saveAsTextFile(outFilePath)


Well, you might have understood spark s3 steps and code. You may comment on the post in case of any issues to connect with s3 or any issues related to Spark, will try to reach you as soon as possible.

Your feedbacks are always welcome… 🙂

Categories: Spark

0 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *

Insert math as
Block
Inline
Additional settings
Formula color
Text color
#333333
Type math using LaTeX
Preview
\({}\)
Nothing to preview
Insert