-
Notifications
You must be signed in to change notification settings - Fork 160
/
Copy pathStreamingWC.py
36 lines (28 loc) · 1.05 KB
/
StreamingWC.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from lib.logger import Log4j
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Streaming Word Count") \
.master("local[3]") \
.config("spark.streaming.stopGracefullyOnShutdown", "true") \
.config("spark.sql.shuffle.partitions", 3) \
.getOrCreate()
logger = Log4j(spark)
lines_df = spark.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", "9999") \
.load()
# lines_df.printSchema()
# words_df = lines_df.select(explode(split("value", " ")).alias("word"))
words_df = lines_df.select(expr("explode(split(value,' ')) as word"))
counts_df = words_df.groupBy("word").count()
word_count_query = counts_df.writeStream \
.format("console") \
.outputMode("complete") \
.option("checkpointLocation", "chk-point-dir") \
.start()
logger.info("Listening to localhost:9999")
word_count_query.awaitTermination()