Skip to content

Commit 32169e0

Browse files
Initial Commit
1 parent 455fcef commit 32169e0

File tree

5 files changed

+542020
-0
lines changed

5 files changed

+542020
-0
lines changed

Diff for: 13-AggDemo/AggDemo.py

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
from pyspark.sql import SparkSession
2+
from pyspark.sql import functions as f
3+
4+
from lib.logger import Log4j
5+
6+
if __name__ == "__main__":
7+
spark = SparkSession \
8+
.builder \
9+
.appName("Agg Demo") \
10+
.master("local[2]") \
11+
.getOrCreate()
12+
13+
logger = Log4j(spark)
14+
15+
invoice_df = spark.read \
16+
.format("csv") \
17+
.option("header", "true") \
18+
.option("inferSchema", "true") \
19+
.load("data/invoices.csv")
20+
21+
invoice_df.select(f.count("*").alias("Count *"),
22+
f.sum("Quantity").alias("TotalQuantity"),
23+
f.avg("UnitPrice").alias("AvgPrice"),
24+
f.countDistinct("InvoiceNo").alias("CountDistinct")
25+
).show()
26+
27+
invoice_df.selectExpr(
28+
"count(1) as `count 1`",
29+
"count(StockCode) as `count field`",
30+
"sum(Quantity) as TotalQuantity",
31+
"avg(UnitPrice) as AvgPrice"
32+
).show()
33+
34+
invoice_df.createOrReplaceTempView("sales")
35+
summary_sql = spark.sql("""
36+
SELECT Country, InvoiceNo,
37+
sum(Quantity) as TotalQuantity,
38+
round(sum(Quantity*UnitPrice),2) as InvoiceValue
39+
FROM sales
40+
GROUP BY Country, InvoiceNo""")
41+
42+
summary_sql.show()
43+
44+
summary_df = invoice_df \
45+
.groupBy("Country", "InvoiceNo") \
46+
.agg(f.sum("Quantity").alias("TotalQuantity"),
47+
f.round(f.sum(f.expr("Quantity * UnitPrice")), 2).alias("InvoiceValue"),
48+
f.expr("round(sum(Quantity * UnitPrice),2) as InvoiceValueExpr")
49+
)
50+
51+
summary_df.show()

0 commit comments

Comments
 (0)