Skip to content

Commit afa23cc

Browse files
Initial Commit
1 parent 246f6a8 commit afa23cc

File tree

4 files changed

+109
-0
lines changed

4 files changed

+109
-0
lines changed

18-OuterJoinDemo/OuterJoinDemo.py

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
from pyspark.sql import SparkSession
2+
from pyspark.sql.functions import expr
3+
from lib.logger import Log4j
4+
5+
if __name__ == "__main__":
6+
spark = SparkSession \
7+
.builder \
8+
.appName("Spark Join Demo") \
9+
.master("local[3]") \
10+
.getOrCreate()
11+
12+
logger = Log4j(spark)
13+
14+
orders_list = [("01", "02", 350, 1),
15+
("01", "04", 580, 1),
16+
("01", "07", 320, 2),
17+
("02", "03", 450, 1),
18+
("02", "06", 220, 1),
19+
("03", "01", 195, 1),
20+
("04", "09", 270, 3),
21+
("04", "08", 410, 2),
22+
("05", "02", 350, 1)]
23+
24+
order_df = spark.createDataFrame(orders_list).toDF("order_id", "prod_id", "unit_price", "qty")
25+
26+
product_list = [("01", "Scroll Mouse", 250, 20),
27+
("02", "Optical Mouse", 350, 20),
28+
("03", "Wireless Mouse", 450, 50),
29+
("04", "Wireless Keyboard", 580, 50),
30+
("05", "Standard Keyboard", 360, 10),
31+
("06", "16 GB Flash Storage", 240, 100),
32+
("07", "32 GB Flash Storage", 320, 50),
33+
("08", "64 GB Flash Storage", 430, 25)]
34+
35+
product_df = spark.createDataFrame(product_list).toDF("prod_id", "prod_name", "list_price", "qty")
36+
37+
product_df.show()
38+
order_df.show()
39+
40+
join_expr = order_df.prod_id == product_df.prod_id
41+
42+
product_renamed_df = product_df.withColumnRenamed("qty", "reorder_qty")
43+
44+
order_df.join(product_renamed_df, join_expr, "left") \
45+
.drop(product_renamed_df.prod_id) \
46+
.select("order_id", "prod_id", "prod_name", "unit_price", "list_price", "qty") \
47+
.withColumn("prod_name", expr("coalesce(prod_name, prod_id)")) \
48+
.withColumn("list_price", expr("coalesce(list_price, unit_price)")) \
49+
.sort("order_id") \
50+
.show()

18-OuterJoinDemo/lib/__init__.py

Whitespace-only changes.

18-OuterJoinDemo/lib/logger.py

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
class Log4j:
2+
def __init__(self, spark):
3+
log4j = spark._jvm.org.apache.log4j
4+
5+
root_class = "guru.learningjournal.spark.examples"
6+
conf = spark.sparkContext.getConf()
7+
app_name = conf.get("spark.app.name")
8+
9+
self.logger = log4j.LogManager.getLogger(root_class + "." + app_name)
10+
11+
def warn(self, message):
12+
self.logger.warn(message)
13+
14+
def info(self, message):
15+
self.logger.info(message)
16+
17+
def error(self, message):
18+
self.logger.error(message)
19+
20+
def debug(self, message):
21+
self.logger.debug(message)

18-OuterJoinDemo/log4j.properties

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Set everything to be logged to the console
2+
log4j.rootCategory=WARN, console
3+
4+
# define console appender
5+
log4j.appender.console=org.apache.log4j.ConsoleAppender
6+
log4j.appender.console.target=System.out
7+
log4j.appender.console.layout=org.apache.log4j.PatternLayout
8+
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
9+
10+
#application log
11+
log4j.logger.guru.learningjournal.spark.examples=INFO, console, file
12+
log4j.additivity.guru.learningjournal.spark.examples=false
13+
14+
#define rolling file appender
15+
log4j.appender.file=org.apache.log4j.RollingFileAppender
16+
log4j.appender.file.File=${spark.yarn.app.container.log.dir}/${logfile.name}.log
17+
#define following in Java System
18+
# -Dlog4j.configuration=file:log4j.properties
19+
# -Dlogfile.name=hello-spark
20+
# -Dspark.yarn.app.container.log.dir=app-logs
21+
log4j.appender.file.ImmediateFlush=true
22+
log4j.appender.file.Append=false
23+
log4j.appender.file.MaxFileSize=500MB
24+
log4j.appender.file.MaxBackupIndex=2
25+
log4j.appender.file.layout=org.apache.log4j.PatternLayout
26+
log4j.appender.file.layout.conversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
27+
28+
# Recommendations from Spark template
29+
log4j.logger.org.apache.spark.repl.Main=WARN
30+
log4j.logger.org.spark_project.jetty=WARN
31+
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
32+
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
33+
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
34+
log4j.logger.org.apache.parquet=ERROR
35+
log4j.logger.parquet=ERROR
36+
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
37+
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
38+

0 commit comments

Comments
 (0)