-
Notifications
You must be signed in to change notification settings - Fork 0
/
parallel.py
75 lines (61 loc) · 1.64 KB
/
parallel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from itertools import chain
from pyspark import SparkContext
from collections import defaultdict
from pyspark.accumulators import AccumulatorParam
import spacy
import pandas as pd
import time
import multiprocessing as mp
import json
df = pd.read_csv('data.csv')
saved_column = df.TestDescription
nlp = spacy.load('en_core_web_sm')
sc = SparkContext("spark://arv-WSG37555W7-0525:7077","App")
dic = defaultdict(list)
dic[0].append([1,1])
# class ListParam(AccumulatorParam):
# def zero(self,value):
# print("value")
# print(value)
# return dic
# def addInPlace(self,val1,val2):
# for k,v in val2.iteritems():
# val1[k].append(v)
# return val1
#dict1 = sc.accumulator(dic,ListParam())
rdd = sc.parallelize(saved_column)
rdd2 = (rdd.cartesian(rdd))
tmp2 = rdd2.map(lambda x: (round(((nlp(str(x[0]).decode('utf-8'),disable=['parser','tagger','textcat']).similarity(nlp(str(x[1]).decode('utf-8'),disable=['parser','tagger','textcat']))*100))),x))
#tmp = rdd2.map(lambda x: (abs(x[1]-x[0]),x))
var = tmp2.groupByKey().map(lambda x:(x[0],list(x[1]))).collect()
f = open("output2.txt","w+")
f2 = open("output3.txt","w+")
for item in var:
f2.write("%s\n\n" % item)
for k,v in var:
print("\n\n")
print(k,v)
f.write(str(var))
f.close()
def l(x):
print(x)
def g(x):
print ("\n\n\n\n\n\n\n\n")
for k in x:
print(k)
#def f(x1):
# global dict1
# val2 = defaultdict(list)
# val2[abs(x1[0]-x1[1])].append(x1)
# dict1+=val2
#for k in var:
# print(k)
#print(type(dict1.value))
#di = dict1.value
#for k,v in di.iteritems():
# print(k,v)
# print("\n\n")
#for i in dict1.value:
# print(i)
# print("\n\n\n")
#print (dict1.value)