forked from commoncrawl/cc-pyspark
-
Notifications
You must be signed in to change notification settings - Fork 0
/
linkmap2parquet.py
38 lines (29 loc) · 1.06 KB
/
linkmap2parquet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from sparkcc import CCSparkJob
from pyspark.sql.types import StructType, StructField, StringType
class LinkMapImportJob(CCSparkJob):
'''Import a map of link pairs <from, to>
to SparkSQL and save as Parquet'''
name = "LinkMapImport"
output_schema = StructType([
StructField("s", StringType(), True),
StructField("t", StringType(), True)
])
def map_line(self, line):
return line.split('\t')
def run_job(self, sc, sqlc):
output = None
if self.args.input != '':
input_data = sc.textFile(
self.args.input,
minPartitions=self.args.num_input_partitions)
output = input_data.map(self.map_line)
df = sqlc.createDataFrame(output, schema=self.output_schema)
df.dropDuplicates() \
.coalesce(self.args.num_output_partitions) \
.sortWithinPartitions('s', 't') \
.write \
.format("parquet") \
.saveAsTable(self.args.output)
if __name__ == "__main__":
job = LinkMapImportJob()
job.run()