-
Notifications
You must be signed in to change notification settings - Fork 584
/
Copy pathSparkSQLTableDemo.py
32 lines (24 loc) · 1.29 KB
/
SparkSQLTableDemo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from pyspark.sql import *
from lib.logger import Log4j
if __name__ == "__main__":
spark = (SparkSession
.builder
.master("local[3]")
.appName("SparkSQLTableDemo")
.enableHiveSupport() # Needed to allow the connectivity to a persistent Hive metastore
.getOrCreate())
logger = Log4j(spark)
flightTimeParquetDF = spark.read \
.format("parquet") \
.load("dataSource/")
spark.sql("CREATE DATABASE IF NOT EXISTS AIRLINE_DB")
spark.catalog.setCurrentDatabase("AIRLINE_DB") # If not set, it will default to the default database
(flightTimeParquetDF.write
# .format("csv") # Uncomment if you would like to inspect the records, or use parquet plugin for PyCharm
.mode("overwrite")
# .partitionBy("ORIGIN", "OP_CARRIER") # You do not want to partition on a column with too many unique values
.bucketBy(5, "OP_CARRIER", "ORIGIN") # Bucketing is a way to distribute data across a fixed number of files
.sortBy("OP_CARRIER", "ORIGIN") # Companion for bucketBy, it allows the files to be ready by certain operations
.saveAsTable("flight_data_tbl")) # Alternatively you can use saveAsTable("AIRLINE_DB.flight_data_tbl")
logger.info(spark.catalog.listTables("AIRLINE_DB"))
spark.stop()