-
Notifications
You must be signed in to change notification settings - Fork 0
/
wordcount.py
executable file
·105 lines (76 loc) · 3.11 KB
/
wordcount.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import nltk
from pyspark import SparkContext, SparkConf
from operator import add
import string
import os
import collections
import random
from nltk.tokenize import word_tokenize
from nltk.tokenize import RegexpTokenizer
from nltk.corpus import stopwords
from nltk.stem.porter import *
import sys
APP_NAME = "Word Count"
def word_count(sc, base_location, folder_names):
features = set()
for folder_name in folder_names:
textRDD = sc.textFile(base_location+folder_name)
stemmer = PorterStemmer()
stop_words = stopwords.words('english')
custom_stop_words = ["could", "one", "ms", "mr", "follow", "said", "ha", "wa", "would", "thi", "hi", "like", "also", "say", "take"]
stop_words = stop_words + custom_stop_words
punctuation = string.punctuation
words = textRDD.flatMap(lambda s: word_tokenize(s.lower()))
stems = words.map(lambda w: stemmer.stem(w))\
.filter(lambda p: p not in punctuation)\
.filter(lambda p: re.sub(r'[^\x00-\x7f]', r'', p))\
.filter(lambda s: s not in stop_words)\
.filter(lambda a: a.isalpha() == True) \
.map(lambda x: (x.encode('ascii', 'ignore'), 1))
wordcount = stems.reduceByKey(add).sortBy(lambda a: a[1], ascending=False).take(40)
for (word, count) in wordcount:
features.add(word)
feature_file = sc.parallelize(features)
feature_file.coalesce(1).saveAsTextFile("features")
print(features)
return features
def extract_feature(sc, base_location, folder_names, features):
doc_id = -1
input_matrix = sc.parallelize([])
for folder in folder_names:
doc_id = doc_id + 1
textRDD = sc.wholeTextFiles(base_location+folder)
words = textRDD.flatMap(customSomething)\
.map(lambda (k, v): ((k, v.encode('ascii', 'ignore')), 1))
wordcount = words.reduceByKey(add)\
.map(lambda (k,v): (str(k[0]), str(features[str(k[1])]) +":"+ str(v))) \
.sortBy(lambda (k, v): int(v.split(":")[0]), ascending = True) \
.reduceByKey(lambda v1, v2: v1+" "+v2)\
.map(lambda (k,v): str(doc_id)+ " "+v)
input_matrix = input_matrix.union(wordcount)
input_matrix.coalesce(1).saveAsTextFile("output")
def customSomething((file_name, article)):
word_list = word_tokenize(article.lower())
output = []
stemmer = PorterStemmer()
for word in word_list:
stemmed = stemmer.stem(word)
if(stemmed in features):
output.append((file_name, stemmed))
return output
if __name__ == "__main__":
conf = SparkConf().setAppName(APP_NAME).setMaster("local[*]")
sc = SparkContext(conf=conf)
base_location = sys.argv[1]
folder_names = []
folder_names.append(sys.argv[2])
folder_names.append(sys.argv[3])
folder_names.append(sys.argv[4])
folder_names.append(sys.argv[5])
features = word_count(sc, base_location, folder_names)
dic = dict()
i = 1
for feature in features:
dic[feature] = i
i = i + 1
extract_feature(sc, base_location, folder_names, dic)