-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathopen.py
105 lines (80 loc) · 3.69 KB
/
open.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#!/usr/bin/python
# coding: utf-8
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import NaiveBayesModel
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import DecisionTreeClassificationModel
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import LinearSVCModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
def parse_line(p):
cols = p.split(' ')
label = cols[0]
if label not in ('Y', 'N'):
return None
label = 1.0 if label == 'Y' else 0.0
fname = ' '.join(cols[1:])
return Row(label=label, sentence=fname)
def train(spark):
sc = spark.sparkContext
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=8000)
idf = IDF(inputCol="rawFeatures", outputCol="features")
srcdf = sc.textFile('part.csv').map(parse_line)
srcdf = srcdf.toDF()
training, testing = srcdf.randomSplit([0.9, 0.1])
wordsData = tokenizer.transform(training)
featurizedData = hashingTF.transform(wordsData)
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.persist()
trainDF = rescaledData.select("features", "label").rdd.map(
lambda x: Row(label=float(x['label']), features=Vectors.dense(x['features']))
).toDF()
naivebayes = NaiveBayes()
model = naivebayes.fit(trainDF)
testWordsData = tokenizer.transform(testing)
testFeaturizedData = hashingTF.transform(testWordsData)
testIDFModel = idf.fit(testFeaturizedData)
testRescaledData = testIDFModel.transform(testFeaturizedData)
testRescaledData.persist()
testDF = testRescaledData.select("features", "label").rdd.map(
lambda x: Row(label=float(x['label']), features=Vectors.dense(x['features']))
).toDF()
predictions = model.transform(testDF)
predictions.show()
evaluator = MulticlassClassificationEvaluator(
labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("The accuracy on test-set is " + str(accuracy))
model.save('Bayes20000')
def test(spark):
sc = spark.sparkContext
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=8000)
idf = IDF(inputCol="rawFeatures", outputCol="features")
srcdf = sc.textFile('predict.csv').map(parse_line)
testing = srcdf.toDF()
model = DecisionTreeClassificationModel.load('Bayes20000')
testWordsData = tokenizer.transform(testing)
testFeaturizedData = hashingTF.transform(testWordsData)
testIDFModel = idf.fit(testFeaturizedData)
testRescaledData = testIDFModel.transform(testFeaturizedData)
testRescaledData.persist()
testDF = testRescaledData.select("features", "label").rdd.map(
lambda x: Row(label=float(x['label']), features=Vectors.dense(x['features']))
).toDF()
predictions = model.transform(testDF)
predictions.select('prediction').write.csv(path='submit', header=True, sep=',', mode='overwrite')
evaluator = MulticlassClassificationEvaluator(
labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("The accuracy on test-set is " + str(accuracy))
if __name__ == "__main__":
spark = SparkSession.builder.master("local[*]").appName("Bigdata").getOrCreate()
train(spark)
spark.stop()