-
Notifications
You must be signed in to change notification settings - Fork 584
/
Copy pathGroupingDemo.py
40 lines (31 loc) · 1.18 KB
/
GroupingDemo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from lib.logger import Log4j
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Agg Demo") \
.master("local[2]") \
.getOrCreate()
logger = Log4j(spark)
invoice_df = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("data/invoices.csv")
NumInvoices = f.countDistinct("InvoiceNo").alias("NumInvoices")
TotalQuantity = f.sum("Quantity").alias("TotalQuantity")
InvoiceValue = f.expr("round(sum(Quantity * UnitPrice),2) as InvoiceValue")
exSummary_df = invoice_df \
.withColumn("InvoiceDate", f.to_date(f.col("InvoiceDate"), "dd-MM-yyyy H.mm")) \
.where("year(InvoiceDate) == 2010") \
.withColumn("WeekNumber", f.weekofyear(f.col("InvoiceDate"))) \
.groupBy("Country", "WeekNumber") \
.agg(NumInvoices, TotalQuantity, InvoiceValue)
exSummary_df.coalesce(1) \
.write \
.format("parquet") \
.mode("overwrite") \
.save("output")
exSummary_df.sort("Country", "WeekNumber").show()
spark.stop()