Skip to content

fix: Add missing files and comments to better match the course content #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,18 @@
*.crc
*.parquet
*/*/_SUCCESS

# JetBrains IDE
.idea/

# Spark-Programming-In-Python
app-logs/
metastore_db/
spark-warehouse/
derby.log

# 05-DataSinkDemo
05-DataSinkDemo/dataSink/

# 14-GroupingDemo
14-GroupingDemo/output/
25 changes: 25 additions & 0 deletions .run/runConfigurations/HelloRDD.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="HelloRDD" type="PythonConfigurationType" factoryName="Python" nameIsGenerated="true">
<module name="Spark-Programming-In-Python" />
<option name="ENV_FILES" value="" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/02-HelloRDD" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/02-HelloRDD/HelloRDD.py" />
<option name="PARAMETERS" value="data/sample.csv" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>
25 changes: 25 additions & 0 deletions .run/runConfigurations/HelloSpark.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="HelloSpark" type="PythonConfigurationType" factoryName="Python" nameIsGenerated="true">
<module name="Spark-Programming-In-Python" />
<option name="ENV_FILES" value="" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/01-HelloSpark" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/01-HelloSpark/HelloSpark.py" />
<option name="PARAMETERS" value="data/sample.csv" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>
25 changes: 25 additions & 0 deletions .run/runConfigurations/HelloSparkSQL.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="HelloSparkSQL" type="PythonConfigurationType" factoryName="Python" nameIsGenerated="true">
<module name="Spark-Programming-In-Python" />
<option name="ENV_FILES" value="" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/03-HelloSparkSQL" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/03-HelloSparkSQL/HelloSparkSQL.py" />
<option name="PARAMETERS" value="data/sample.csv" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>
4 changes: 4 additions & 0 deletions 01-HelloSpark/test_utile.py → 01-HelloSpark/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ def setUpClass(cls) -> None:
.appName("HelloSparkTest") \
.getOrCreate()

@classmethod
def tearDownClass(cls) -> None:
cls.spark.stop()

def test_datafile_loading(self):
sample_df = load_survey_df(self.spark, "data/sample.csv")
result_count = sample_df.count()
Expand Down
16 changes: 16 additions & 0 deletions 04-SparkSchemaDemo/SparkSchemaDemo.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@
ORIGIN_CITY_NAME STRING, DEST STRING, DEST_CITY_NAME STRING, CRS_DEP_TIME INT, DEP_TIME INT,
WHEELS_ON INT, TAXI_IN INT, CRS_ARR_TIME INT, ARR_TIME INT, CANCELLED INT, DISTANCE INT"""

# CSV with inferred schema, this gives incorrect schema type for FL_DATE (string)
# flightTimeCsvDF = spark.read \
# .format("csv") \
# .option("header", "true") \
# .option("inferSchema", "true") \
# .load("data/flight*.csv")

flightTimeCsvDF = spark.read \
.format("csv") \
.option("header", "true") \
Expand All @@ -45,6 +52,11 @@
flightTimeCsvDF.show(5)
logger.info("CSV Schema:" + flightTimeCsvDF.schema.simpleString())

# JSON with inferred schema, this gives incorrect schema type for FL_DATE (string)
# flightTimeJsonDF = spark.read \
# .format("json") \
# .load("data/flight*.json")

flightTimeJsonDF = spark.read \
.format("json") \
.schema(flightSchemaDDL) \
Expand All @@ -54,9 +66,13 @@
flightTimeJsonDF.show(5)
logger.info("JSON Schema:" + flightTimeJsonDF.schema.simpleString())

# Parquet files include the schema, so no need to specify it
# There is a PyCharm plugin named "Avro and Parquet Viewer" you can install to view the parquet file schema/data.
flightTimeParquetDF = spark.read \
.format("parquet") \
.load("data/flight*.parquet")

flightTimeParquetDF.show(5)
logger.info("Parquet Schema:" + flightTimeParquetDF.schema.simpleString())

spark.stop()
24 changes: 17 additions & 7 deletions 05-DataSinkDemo/DataSinkDemo.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
logger.info("Num Partitions before: " + str(flightTimeParquetDF.rdd.getNumPartitions()))
flightTimeParquetDF.groupBy(spark_partition_id()).count().show()

# Writes only 1 file because all the records are in the partition 0 (even though there are 2 partitions)
# flightTimeParquetDF.write \
# .format("avro") \
# .mode("overwrite") \
# .option("path", "dataSink/avro/") \
# .save()

partitionedDF = flightTimeParquetDF.repartition(5)
logger.info("Num Partitions after: " + str(partitionedDF.rdd.getNumPartitions()))
partitionedDF.groupBy(spark_partition_id()).count().show()
Expand All @@ -29,10 +36,13 @@
.option("path", "dataSink/avro/") \
.save()

flightTimeParquetDF.write \
.format("json") \
.mode("overwrite") \
.option("path", "dataSink/json/") \
.partitionBy("OP_CARRIER", "ORIGIN") \
.option("maxRecordsPerFile", 10000) \
.save()
# This will take some time to complete
(flightTimeParquetDF.write
.format("json")
.mode("overwrite")
.option("path", "dataSink/json/")
.partitionBy("OP_CARRIER", "ORIGIN")
.option("maxRecordsPerFile", 10000) # Use this to control the number of records per file and file size
.save())

spark.stop()
26 changes: 16 additions & 10 deletions 06-SparkSQLTableDemo/SparkSQLTableDemo.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
from lib.logger import Log4j

if __name__ == "__main__":
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSQLTableDemo") \
.enableHiveSupport() \
.getOrCreate()
spark = (SparkSession
.builder
.master("local[3]")
.appName("SparkSQLTableDemo")
.enableHiveSupport() # Needed to allow the connectivity to a persistent Hive metastore
.getOrCreate())

logger = Log4j(spark)

Expand All @@ -17,10 +17,16 @@
.load("dataSource/")

spark.sql("CREATE DATABASE IF NOT EXISTS AIRLINE_DB")
spark.catalog.setCurrentDatabase("AIRLINE_DB")
spark.catalog.setCurrentDatabase("AIRLINE_DB") # If not set, it will default to the default database

flightTimeParquetDF.write \
.mode("overwrite") \
.saveAsTable("flight_data_tbl")
(flightTimeParquetDF.write
# .format("csv") # Uncomment if you would like to inspect the records, or use parquet plugin for PyCharm
.mode("overwrite")
# .partitionBy("ORIGIN", "OP_CARRIER") # You do not want to partition on a column with too many unique values
.bucketBy(5, "OP_CARRIER", "ORIGIN") # Bucketing is a way to distribute data across a fixed number of files
.sortBy("OP_CARRIER", "ORIGIN") # Companion for bucketBy, it allows the files to be ready by certain operations
.saveAsTable("flight_data_tbl")) # Alternatively you can use saveAsTable("AIRLINE_DB.flight_data_tbl")

logger.info(spark.catalog.listTables("AIRLINE_DB"))

spark.stop()
103 changes: 102 additions & 1 deletion 07-Notebook/MyPythonNotebook.ipynb
Original file line number Diff line number Diff line change
@@ -1 +1,102 @@
{"cells":[{"cell_type":"code","source":["from pyspark.sql import *\nfrom pyspark.sql.functions import *\nfrom pyspark.sql.types import *\n\ndef to_date_df(df, fmt, fld):\n return df.withColumn(fld, to_date(col(fld), fmt))"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\"></div>"]}}],"execution_count":1},{"cell_type":"code","source":["my_schema = StructType([\n StructField(\"ID\", StringType()),\n StructField(\"EventDate\", StringType())])\n\nmy_rows = [Row(\"123\", \"04/05/2020\"), Row(\"124\", \"4/5/2020\"), Row(\"125\", \"04/5/2020\"), Row(\"126\", \"4/05/2020\")]\nmy_rdd = spark.sparkContext.parallelize(my_rows, 2)\nmy_df = spark.createDataFrame(my_rdd, my_schema)"],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\"></div>"]}}],"execution_count":2},{"cell_type":"code","source":["my_df.printSchema()\nmy_df.show()\nnew_df = to_date_df(my_df, \"M/d/y\", \"EventDate\")\nnew_df.printSchema()\nnew_df.show() "],"metadata":{},"outputs":[{"metadata":{},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>\n<div class=\"ansiout\">root\n-- ID: string (nullable = true)\n-- EventDate: string (nullable = true)\n\n+---+----------+\n ID| EventDate|\n+---+----------+\n123|04/05/2020|\n124| 4/5/2020|\n125| 04/5/2020|\n126| 4/05/2020|\n+---+----------+\n\nroot\n-- ID: string (nullable = true)\n-- EventDate: date (nullable = true)\n\n+---+----------+\n ID| EventDate|\n+---+----------+\n123|2020-04-05|\n124|2020-04-05|\n125|2020-04-05|\n126|2020-04-05|\n+---+----------+\n\n</div>"]}}],"execution_count":3}],"metadata":{"name":"MyPythonNotebook","notebookId":2858727166054233},"nbformat":4,"nbformat_minor":0}
{
"cells": [
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "cc1e9686-728e-460d-83c7-570ed284664d",
"showTitle": false,
"title": ""
}
},
"outputs": [],
"source": [
"from pyspark.sql import *\n",
"from pyspark.sql.functions import *\n",
"from pyspark.sql.types import *\n",
"\n",
"def to_date_df(df, fmt, fld):\n",
" return df.withColumn(fld, to_date(col(fld), fmt))"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "1f7e7d9c-53fc-452d-af00-f49c38616c76",
"showTitle": false,
"title": ""
}
},
"outputs": [],
"source": [
"my_schema = StructType([\n",
" StructField(\"ID\", StringType()),\n",
" StructField(\"EventDate\", StringType())])\n",
"\n",
"my_rows = [Row(\"123\", \"04/05/2020\"), Row(\"124\", \"4/5/2020\"), Row(\"125\", \"04/5/2020\"), Row(\"126\", \"4/05/2020\")]\n",
"my_rdd = spark.sparkContext.parallelize(my_rows, 2)\n",
"my_df = spark.createDataFrame(my_rdd, my_schema)"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "61a01972-dea8-41d2-a05b-82311b420bd8",
"showTitle": false,
"title": ""
}
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"output_type": "stream",
"text": [
"root\n |-- ID: string (nullable = true)\n |-- EventDate: string (nullable = true)\n\n+---+----------+\n| ID| EventDate|\n+---+----------+\n|123|04/05/2020|\n|124| 4/5/2020|\n|125| 04/5/2020|\n|126| 4/05/2020|\n+---+----------+\n\nroot\n |-- ID: string (nullable = true)\n |-- EventDate: date (nullable = true)\n\n+---+----------+\n| ID| EventDate|\n+---+----------+\n|123|2020-04-05|\n|124|2020-04-05|\n|125|2020-04-05|\n|126|2020-04-05|\n+---+----------+\n\n"
]
}
],
"source": [
"my_df.printSchema()\n",
"my_df.show()\n",
"new_df = to_date_df(my_df, \"M/d/y\", \"EventDate\")\n",
"new_df.printSchema()\n",
"new_df.show() "
]
}
],
"metadata": {
"application/vnd.databricks.v1+notebook": {
"dashboards": [],
"environmentMetadata": null,
"language": "python",
"notebookMetadata": {
"pythonIndentUnit": 4
},
"notebookName": "MyPythonNotebook",
"widgets": {}
}
},
"nbformat": 4,
"nbformat_minor": 0
}
2 changes: 2 additions & 0 deletions 08-RowDemo/RowDemo.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,5 @@ def to_date_df(df, fmt, fld):
new_df = to_date_df(my_df, "M/d/y", "EventDate")
new_df.printSchema()
new_df.show()

spark.stop()
4 changes: 4 additions & 0 deletions 08-RowDemo/RowDemo_Test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ def setUpClass(cls) -> None:
my_rdd = cls.spark.sparkContext.parallelize(my_rows, 2)
cls.my_df = cls.spark.createDataFrame(my_rdd, my_schema)

@classmethod
def tearDownClass(cls) -> None:
cls.spark.stop()

def test_data_type(self):
rows = to_date_df(self.my_df, "M/d/y", "EventDate").collect()
for row in rows:
Expand Down
9 changes: 9 additions & 0 deletions 09-LogFileDemo/LogFileDemo.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,18 @@
regexp_extract('value', log_reg, 6).alias('request'),
regexp_extract('value', log_reg, 10).alias('referrer'))

logs_df.printSchema()

# Pre-transformation where hosts are not properly grouped
# logs_df.groupBy("referrer") \
# .count() \
# .show(100, truncate=False)

logs_df \
.where("trim(referrer) != '-' ") \
.withColumn("referrer", substring_index("referrer", "/", 3)) \
.groupBy("referrer") \
.count() \
.show(100, truncate=False)

spark.stop()
177 changes: 176 additions & 1 deletion 10-ExploringColumns/ColumnsNotebook.ipynb

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions 11-UDFDemo/UDFDemo.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,5 @@ def parse_gender(gender):

survey_df3 = survey_df.withColumn("Gender", expr("parse_gender_udf(Gender)"))
survey_df3.show(10)

spark.stop()
2 changes: 2 additions & 0 deletions 13-AggDemo/AggDemo.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,5 @@
)

summary_df.show()

spark.stop()
2 changes: 2 additions & 0 deletions 14-GroupingDemo/GroupingDemo.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@
.save("output")

exSummary_df.sort("Country", "WeekNumber").show()

spark.stop()
Loading