Skip to content

Commit 08f84fa

Browse files
Initial Commit
1 parent 03ccf6d commit 08f84fa

File tree

7 files changed

+770599
-0
lines changed

7 files changed

+770599
-0
lines changed

Diff for: 04-SparkSchemaDemo/SparkSchemaDemo.py

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
from pyspark.sql import SparkSession
2+
from pyspark.sql.types import StructType, StructField, DateType, StringType, IntegerType
3+
4+
from lib.logger import Log4j
5+
6+
if __name__ == "__main__":
7+
spark = SparkSession \
8+
.builder \
9+
.master("local[3]") \
10+
.appName("SparkSchemaDemo") \
11+
.getOrCreate()
12+
13+
logger = Log4j(spark)
14+
15+
flightSchemaStruct = StructType([
16+
StructField("FL_DATE", DateType()),
17+
StructField("OP_CARRIER", StringType()),
18+
StructField("OP_CARRIER_FL_NUM", IntegerType()),
19+
StructField("ORIGIN", StringType()),
20+
StructField("ORIGIN_CITY_NAME", StringType()),
21+
StructField("DEST", StringType()),
22+
StructField("DEST_CITY_NAME", StringType()),
23+
StructField("CRS_DEP_TIME", IntegerType()),
24+
StructField("DEP_TIME", IntegerType()),
25+
StructField("WHEELS_ON", IntegerType()),
26+
StructField("TAXI_IN", IntegerType()),
27+
StructField("CRS_ARR_TIME", IntegerType()),
28+
StructField("ARR_TIME", IntegerType()),
29+
StructField("CANCELLED", IntegerType()),
30+
StructField("DISTANCE", IntegerType())
31+
])
32+
33+
flightSchemaDDL = """FL_DATE DATE, OP_CARRIER STRING, OP_CARRIER_FL_NUM INT, ORIGIN STRING,
34+
ORIGIN_CITY_NAME STRING, DEST STRING, DEST_CITY_NAME STRING, CRS_DEP_TIME INT, DEP_TIME INT,
35+
WHEELS_ON INT, TAXI_IN INT, CRS_ARR_TIME INT, ARR_TIME INT, CANCELLED INT, DISTANCE INT"""
36+
37+
flightTimeCsvDF = spark.read \
38+
.format("csv") \
39+
.option("header", "true") \
40+
.schema(flightSchemaStruct) \
41+
.option("mode", "FAILFAST") \
42+
.option("dateFormat", "M/d/y") \
43+
.load("data/flight*.csv")
44+
45+
flightTimeCsvDF.show(5)
46+
logger.info("CSV Schema:" + flightTimeCsvDF.schema.simpleString())
47+
48+
flightTimeJsonDF = spark.read \
49+
.format("json") \
50+
.schema(flightSchemaDDL) \
51+
.option("dateFormat", "M/d/y") \
52+
.load("data/flight*.json")
53+
54+
flightTimeJsonDF.show(5)
55+
logger.info("JSON Schema:" + flightTimeJsonDF.schema.simpleString())
56+
57+
flightTimeParquetDF = spark.read \
58+
.format("parquet") \
59+
.load("data/flight*.parquet")
60+
61+
flightTimeParquetDF.show(5)
62+
logger.info("Parquet Schema:" + flightTimeParquetDF.schema.simpleString())

Diff for: 04-SparkSchemaDemo/data/flight-time.csv

+470,478
Large diffs are not rendered by default.

Diff for: 04-SparkSchemaDemo/data/flight-time.json

+300,000
Large diffs are not rendered by default.

Diff for: 04-SparkSchemaDemo/data/flight-time.parquet

4.77 MB
Binary file not shown.

Diff for: 04-SparkSchemaDemo/lib/__init__.py

Whitespace-only changes.

Diff for: 04-SparkSchemaDemo/lib/logger.py

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
class Log4j:
2+
def __init__(self, spark):
3+
log4j = spark._jvm.org.apache.log4j
4+
5+
root_class = "guru.learningjournal.spark.examples"
6+
conf = spark.sparkContext.getConf()
7+
app_name = conf.get("spark.app.name")
8+
9+
self.logger = log4j.LogManager.getLogger(root_class + "." + app_name)
10+
11+
def warn(self, message):
12+
self.logger.warn(message)
13+
14+
def info(self, message):
15+
self.logger.info(message)
16+
17+
def error(self, message):
18+
self.logger.error(message)
19+
20+
def debug(self, message):
21+
self.logger.debug(message)

Diff for: 04-SparkSchemaDemo/log4j.properties

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Set everything to be logged to the console
2+
log4j.rootCategory=WARN, console
3+
4+
# define console appender
5+
log4j.appender.console=org.apache.log4j.ConsoleAppender
6+
log4j.appender.console.target=System.out
7+
log4j.appender.console.layout=org.apache.log4j.PatternLayout
8+
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
9+
10+
#application log
11+
log4j.logger.guru.learningjournal.spark.examples=INFO, console, file
12+
log4j.additivity.guru.learningjournal.spark.examples=false
13+
14+
#define rolling file appender
15+
log4j.appender.file=org.apache.log4j.RollingFileAppender
16+
log4j.appender.file.File=${spark.yarn.app.container.log.dir}/${logfile.name}.log
17+
#define following in Java System
18+
# -Dlog4j.configuration=file:log4j.properties
19+
# -Dlogfile.name=hello-spark
20+
# -Dspark.yarn.app.container.log.dir=app-logs
21+
log4j.appender.file.ImmediateFlush=true
22+
log4j.appender.file.Append=false
23+
log4j.appender.file.MaxFileSize=500MB
24+
log4j.appender.file.MaxBackupIndex=2
25+
log4j.appender.file.layout=org.apache.log4j.PatternLayout
26+
log4j.appender.file.layout.conversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
27+
28+
# Recommendations from Spark template
29+
log4j.logger.org.apache.spark.repl.Main=WARN
30+
log4j.logger.org.spark_project.jetty=WARN
31+
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
32+
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
33+
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
34+
log4j.logger.org.apache.parquet=ERROR
35+
log4j.logger.parquet=ERROR
36+
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
37+
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
38+

0 commit comments

Comments
 (0)