1+ from  pyspark .sql  import  SparkSession 
2+ from  pyspark .sql .types  import  * 
3+ from  pyspark .sql .functions  import  current_timestamp 
4+ import  sys 
5+ import  os 
6+ 
7+ def  spark_script ():
8+  print ("start..................." )
9+ 
10+  input_path  =  os .environ ['input_path' ] 
11+  target_path  =  os .environ ['output_path' ]
12+  s3_bucket   =  os .environ ['s3_bucket' ]
13+ 
14+  aws_region  =  os .environ ['REGION' ] 
15+  aws_access_key_id  =  os .environ ['ACCESS_KEY_ID' ] 
16+  aws_secret_access_key  =  os .environ ['SECRET_ACCESS_KEY' ] 
17+  session_token  =  os .environ ['SESSION_TOKEN' ]
18+  
19+  
20+  input_path  =  "s3a://" + s3_bucket + "/" + input_path 
21+  target_path  = "s3a://" + s3_bucket + "/" + target_path 
22+  
23+  print (" ******* Input path " ,input_path )
24+  print (" ******* Target path " ,target_path )
25+ 
26+  spark  =  SparkSession .builder  \
27+  .appName ("Spark-on-AWS-Lambda" ) \
28+  .master ("local[*]" ) \
29+  .config ("spark.driver.bindAddress" , "127.0.0.1" ) \
30+  .config ("spark.driver.memory" , "5g" ) \
31+  .config ("spark.executor.memory" , "5g" ) \
32+  .config ("spark.serializer" , "org.apache.spark.serializer.KryoSerializer" ) \
33+  .config ("spark.sql.hive.convertMetastoreParquet" , "false" ) \
34+  .config ("spark.hadoop.hive.metastore.client.factory.class" , "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" ) \
35+  .config ("hoodie.meta.sync.client.tool.class" , "org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool" ) \
36+  .config ("spark.hadoop.fs.s3a.access.key" , aws_access_key_id ) \
37+  .config ("spark.hadoop.fs.s3a.secret.key" , aws_secret_access_key ) \
38+  .config ("spark.hadoop.fs.s3a.session.token" ,session_token ) \
39+  .config ("spark.hadoop.fs.s3a.aws.credentials.provider" ,"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider" ) \
40+  .enableHiveSupport ().getOrCreate ()
41+  
42+ 
43+  
44+  print ("Started Reading the CSV file from S3 location " ,input_path )
45+  
46+  df = spark .read .option ('header' ,'true' ).csv (input_path )
47+  df  =  df .withColumn ("last_upd_timestamp" , current_timestamp ())
48+  df .show ()
49+  
50+  hudi_options  =  {
51+   'hoodie.table.name' : 'customer_table' ,
52+     'hoodie.datasource.write.recordkey.field' : 'Customer_ID' ,
53+     'hoodie.datasource.write.precombine.field' : 'last_upd_timestamp' ,
54+     'hoodie.insert.shuffle.parallelism' : 2 ,
55+     "hoodie.datasource.hive_sync.enable" : "false" ,
56+     "hoodie.datasource.hive_sync.database" : "default" ,
57+     "hoodie.datasource.hive_sync.table" : "customer_table" ,
58+     "hoodie.datasource.hive_sync.use_jdbc" : "false" ,
59+     "hoodie.datasource.hive_sync.mode" : "hms" ,
60+     "hoodie.write.markers.type" :"direct" , # It's not advisable to use this configuration. Working on workaround without using this config.  
61+     "hoodie.embed.timeline.server" :"false"  # It's not advisable to use this configuration. Working on workaround without using this config.  
62+  }
63+ 
64+  print ("Started Writing the CSV file to  Target hudi table " , target_path )
65+  df .write .format ("hudi" ).options (** hudi_options ).mode ("overwrite" ).save (target_path )
66+  # df.write.format("csv").save(target_path) 
67+  
68+ if  __name__  ==  '__main__' :
69+  spark_script ()
0 commit comments