-
Notifications
You must be signed in to change notification settings - Fork 2
/
util.py
44 lines (36 loc) · 1.47 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# -*- coding: utf-8 -*-
from dateutil.relativedelta import relativedelta
import py4j
import pyspark
from pyspark import SparkConf, SparkContext, SparkFiles
from pyspark.sql import SQLContext, HiveContext
from pyspark import SparkFiles
from StringIO import StringIO
from pyspark.storagelevel import StorageLevel
from pyspark.sql.functions import *
from pyspark.sql.types import *
def create_vertices(edges, src_col, dst_col):
"""
From graph edges data, create list of vertices.
:param data: pyspark dataframe, edge data containing source, destination and weight columns
:param src_col: str, source column name in data
:param dst_col: str, destination column name in data
:return: pyspark dataframe, containing vertices data with column name : "id"
"""
data_1 = edges.select(src_col)
data_2 = edges.select(dst_col)
vertices = data_1.unionAll(data_2).dropDuplicates()
exp = "%s as id" %(src_col)
return vertices.selectExpr(exp)
def set_spark_context(rundate, appname):
conf = SparkConf().\
setAppName(appname + " " + str(rundate)).\
set('spark.hadoop.mapreduce.output.fileoutputformat.compress', 'false').\
set('spark.sql.parquet.compression.codec','uncompressed')
sc = SparkContext(conf=conf)
try:
sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
sqlCtx = sqlContext = HiveContext(sc)
except py4j.protocol.Py4JError:
sqlCtx = sqlContext = SQLContext(sc)
return sc, sqlContext