-
Notifications
You must be signed in to change notification settings - Fork 0
/
bayes.py
236 lines (197 loc) · 8.68 KB
/
bayes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
"""
朴素贝叶斯分类模型(高斯分布、伯努利分布、多项式分布)
@author aldebran
@since 2020/8/15/ 22:16
"""
from collections import Iterator, defaultdict
import jieba
import re
import numpy
import math
# 过滤词语, TODO 完善停止词
stop_words_pattern = re.compile("[\\s+|,|.|\"|'|!|\?|:|;|,|。|“|‘|!|?|:|;]")
# 分割文章的方法,多种方法共用我觉得也是可行的
# args是n_gram中n的范围,n属于[args[0],args[1])
def split_text(text: str, method='n_gram', *args):
sentences = list(filter(lambda it: it != '', re.split(stop_words_pattern, text)))
result = []
if method == 'n_gram':
if len(args) == 0:
args = [1, 2] # 默认是1-gram
for n in range(int(args[0]), int(args[1])):
for sentence in sentences:
if len(sentence) < n:
result.append(sentence)
else:
for i in range(0, len(sentence) - n):
result.append(sentence[i:i + n])
pass
elif method == 'jieba':
for sentence in sentences:
for word in list(filter(lambda it: it != '', jieba.cut(sentence))):
result.append(word)
else:
raise Exception(f'unsupported method: {method}')
return result
pass
# TODO TF-IDF等算法自动过滤干扰分类的常用词
# 词关联信息
class WordRef():
def __init__(self, count: int, tf_idf: float = 0):
self.count = count
self.tf_idf = tf_idf
# 文章
class Article():
def __init__(self, article_id, text: str, class_name: str = None):
self.article_id = article_id
self.word_count_map = defaultdict(lambda: WordRef(0, 0))
self.words_count = 0
self.class_name = class_name
self.feature = None
for word in split_text(text):
self.word_count_map[word].count += 1
self.words_count += 1
# print(self.word_count_map)
pass
def get_feature(self, feature_words, regenerate=False):
if regenerate or self.feature is None:
self.feature = numpy.array(list(map(lambda it: self.word_count_map[it].count, feature_words)))
return self.feature
pass
# 文章组
class Articles():
def __init__(self, filter_interval=50):
self.id_article_map = dict()
self.class_name_articles_map = defaultdict(lambda: [])
self.feature_words = []
self.feature_words_set = set()
self.filter_interval = filter_interval
pass
def add_one(self, article: Article):
self.id_article_map[article.article_id] = article
self.class_name_articles_map[article.class_name].append(article)
for word in article.word_count_map:
if word not in self.feature_words_set:
self.feature_words_set.add(word)
self.feature_words.append(word)
articles_count = len(self.id_article_map)
if articles_count % self.filter_interval == 0:
self.filter_disturb_items()
pass
def add_mul(self, articles: Iterator):
for article in articles:
self.add_one(article)
# TODO 过滤干扰分类的常用词
def filter_disturb_items(self):
pass
# 朴素贝叶斯文本分类模型
class NaiveBayesTextClassification():
def __init__(self,
method: str, # 方法: Bernoulli、Gaussian、Multinomial
filter_interval=50, # 过滤周期
):
if method not in ['Bernoulli', 'Gaussian']:
raise Exception(f'unsupported method: {method}')
self.class_name_probability = None
self.method = method
self.articles = Articles(filter_interval)
self.data_after_train = None
pass
def fit(self, article: Article):
self.articles.add_one(article)
def fit_mul(self, articles: list):
self.articles.add_mul(articles)
def train(self):
# 计算特征向量
for article in self.articles.id_article_map.values():
article.get_feature(self.articles.feature_words, True)
# 计算先验概率
self.class_name_probability = dict()
class_count = len(self.articles.class_name_articles_map)
all_articles_count = len(self.articles.id_article_map)
for class_name in self.articles.class_name_articles_map:
class_articles_count = len(self.articles.class_name_articles_map[class_name])
self.class_name_probability[class_name] = (class_articles_count + 1) / \
(all_articles_count + 1 * class_count) # 拉普拉斯平滑
# print(self.class_name_probability)
features_count = len(self.articles.feature_words)
# 条件概率所需数据
if self.method == 'Bernoulli':
self.data_after_train = defaultdict(lambda: numpy.zeros(shape=(features_count,)))
for class_name in self.articles.class_name_articles_map:
for a in self.articles.class_name_articles_map[class_name]:
self.data_after_train[class_name] += a.feature
self.data_after_train[class_name] = (self.data_after_train[class_name] + 1) / \
(numpy.sum(self.data_after_train[class_name])
+ 1 * features_count) # 拉普拉斯平滑
pass
elif self.method == 'Gaussian':
self.data_after_train = defaultdict(
lambda: {'σ': numpy.zeros(shape=(features_count,)), # 标准差
'μ': numpy.zeros(shape=(features_count,))}) # 期望
min_o = 1e+8
max_o = 0
for class_name in self.articles.class_name_articles_map:
articles = self.articles.class_name_articles_map[class_name]
array = numpy.zeros(shape=(features_count, len(articles)))
for i, a in enumerate(articles):
array[:, i] = a.feature.T
for i in range(features_count):
u = numpy.average(array[i]) # 期望
o = numpy.std(array[i]) # 标准差
if o < min_o and o != 0:
min_o = o
if o > max_o:
max_o = o
self.data_after_train[class_name]['μ'][i] = u # 期望
self.data_after_train[class_name]['σ'][i] = o # 标准差
# print('min_o', min_o)
# print('max_o', max_o)
a, b = 5, 1
# 0标准差处理,不能太小,会导致0概率问题
for class_name in self.articles.class_name_articles_map:
for i in range(features_count):
if self.data_after_train[class_name]['σ'][i] == 0:
self.data_after_train[class_name]['σ'][i] = (b * min_o + a * max_o) / (a + b)
pass
# print(self.train_data)
pass
elif self.method == 'Multinomial':
pass
def predict(self, article: Article):
feature = article.get_feature(self.articles.feature_words, True)
result = defaultdict(lambda: 1.0)
result.update(self.class_name_probability)
if self.method == 'Bernoulli':
for i in range(feature.shape[0]):
x_i = 0 if feature[i] == 0 else 1
for class_name in self.data_after_train:
p_1 = self.data_after_train[class_name][i]
result[class_name] *= x_i * p_1 + (1 - x_i) * (1 - p_1)
# print(result)
elif self.method == 'Gaussian':
for i in range(feature.shape[0]):
x_i = feature[i]
for class_name in self.data_after_train:
u = self.data_after_train[class_name]['μ'][i]
o = self.data_after_train[class_name]['σ'][i]
result[class_name] *= math.exp(-math.pow(x_i - u, 2) / (2 * math.pow(o, 2))) / (
math.sqrt(2 * math.pi) * o)
pass
# print(list(result.values()))
s = numpy.sum(numpy.array(list(result.values())))
for class_name in result:
result[class_name] = result[class_name] / s
return result
pass
def predict_label(self, article: Article):
result = self.predict(article)
result_class_name = None
result_class_p = -1
for class_name in result:
class_p = result[class_name]
if class_p > result_class_p:
result_class_p = class_p
result_class_name = class_name
pass
return result_class_name