forked from MaxMcGlinnPoole/TermDocumentTensor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvx.py
245 lines (225 loc) · 8.86 KB
/
vx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
import textmining
import csv
import os
from tensorly.tenalg import khatri_rao
from tensorly.decomposition import parafac
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from scipy import spatial
import plotly
from plotly.graph_objs import *
from collections import deque
import matplotlib.pyplot as plt
import re
class TermDocumentTensor():
def __init__(self, directory, type="binary"):
self.vocab = []
self.tdt = []
self.corpus_names = []
self.directory = directory
self.type = type
self.rank_approximation = None
self.factor_matrices = []
# These are the output of our tensor decomposition.
self.factors = []
def create_factor_matrices(self):
tdm_1 = np.matmul(self.factors[0], np.transpose(khatri_rao([self.factors[2], self.factors[1]])))
tdm_2 = np.matmul(self.factors[1], np.transpose(khatri_rao([self.factors[2], self.factors[0]])))
tdm_3 = np.matmul(self.factors[2], np.transpose(khatri_rao([self.factors[1], self.factors[0]])))
self.factors = [tdm_1, tdm_2, tdm_3]
return self.factors
def generate_cosine_similarity_matrix(self, matrix):
cosine_sim = []
for entry in matrix:
sim = []
for other_entry in matrix:
sim.append(spatial.distance.cosine(entry, other_entry)*-1 + 1)
cosine_sim.append(sim)
return cosine_sim
def get_estimated_rank(self):
"""
Getting the rank of a tensor is an NP hard problem
Therefore we use an estimation based on the size of the dimensions of our tensor.
These numbers are grabbed from Table 3.3 of Tammy Kolda's paper:
http://www.sandia.gov/~tgkolda/pubs/pubfiles/TensorReview.pdf
:return:
"""
# At the moment the rank returned by this function is normally too high for either
# my machine or the tensorly library to handle, therefore I have made it just return 1 for right now
I = len(self.tdt[0])
J = len(self.tdt[0][0])
K = len(self.tdt)
if I == 1 or J == 1 or K == 1:
return 1
elif I == J == K == 2:
return 2
elif I == J == 3 and K == 2:
return 3
elif I == 5 and J == K == 3:
return 5
elif I >= 2*J and K == 2:
return 2*J
elif 2*J > I > J and K ==2:
return I
elif I == J and K == 2:
return I
elif I >= J*K:
return J*K
elif J*K - J < I < J*K:
return I
elif I == J*K - I:
return I
else:
print(I, J, K, "did not have an exact estimation")
return min(I*J, I*K, J*K)
def print_formatted_term_document_tensor(self):
for matrix in self.tdt:
print(self.vocab)
for i in range(len(matrix)):
print(self.corpus_names[i], matrix[i])
def create_term_document_tensor(self, **kwargs):
if self.type == "binary":
return self.create_binary_term_document_tensor(**kwargs)
else:
return self.create_text_corpus(**kwargs)
def create_binary_term_document_tensor(self, **kwargs):
doc_content = []
first_occurences_corpus = {}
ngrams = kwargs["ngrams"] if kwargs["ngrams"] is not None else 1
print(ngrams)
for file_name in os.listdir(self.directory):
previous_bytes = deque()
first_occurences = {}
byte_count = 0
with open(self.directory + "/" + file_name, "rb") as file:
my_string = ""
while True:
byte_count += 1
current_byte = file.read(1).hex()
if not current_byte:
break
if byte_count >= ngrams:
byte_gram = "".join(list(previous_bytes)) + current_byte
if byte_gram not in first_occurences:
first_occurences[byte_gram] = byte_count
if byte_count % ngrams == 0:
my_string += byte_gram + " "
if ngrams > 1:
previous_bytes.popleft()
if ngrams > 1:
previous_bytes.append(current_byte)
first_occurences_corpus[file_name] = first_occurences
doc_content.append(my_string)
doc_names = os.listdir(self.directory)
# Convert a collection of text documents to a matrix of token counts
vectorizer = TfidfVectorizer(use_idf=False)
# Learn the vocabulary dictionary and return term-document matrix.
x1 = vectorizer.fit_transform(doc_content).toarray()
self.vocab = ["vocab"]
self.vocab.extend(vectorizer.get_feature_names())
tdm = []
for i in range(len(doc_names)):
row = x1[i]
tdm.append(row)
tdm_first_occurences = []
self.corpus_names = doc_names
# tdm_first_occurences[0] = tdm[0]
# Create a first occurences matrix that corresponds with the tdm
for j in range(len(doc_names)):
item = doc_names[j]
this_tdm = []
for i in range(0, len(tdm[0])):
word = self.vocab[i]
try:
this_tdm.append(first_occurences_corpus[item][word])
except:
this_tdm.append(0)
# print(this_tdm)
tdm_first_occurences.append(this_tdm)
tdt = [tdm, tdm_first_occurences]
self.tdt = tdt
return self.tdt
def convert_term_document_tensor_to_csv(self):
# Converts a tdm to csv
try:
tdt = self.tdt
# if the tdt is 3d or greater
if isinstance(self.tdt[0][0], list):
tdt = self.tdt[0]
with open("test.csv", "w", newline='') as csv_file:
writer = csv.writer(csv_file)
for entry in tdt:
num_list = map(str, entry)
writer.writerow(num_list)
except IndexError:
print("You must create the term document tensor")
return IndexError
def create_term_document_tensor_text(self):
mydoclist = []
tdm = textmining.TermDocumentMatrix()
files = []
first_occurences_corpus = {}
text_names = []
number_files = 0
for file in os.listdir(self.directory):
number_files += 1
first_occurences = {}
words = 0
with open(self.directory + "/" + file, "r") as shake:
files.append(file)
lines_100 = ""
while True:
my_line = shake.readline()
if not my_line:
break
re.sub(r'\W+', '', my_line)
for word in my_line.split():
words += 1
if word not in first_occurences:
first_occurences[word] = words
lines_100 += my_line
first_occurences_corpus[file] = first_occurences
tdm.add_doc(lines_100)
mydoclist.append(file)
text_names.append(file)
tdm = list(tdm.rows(cutoff=1))
tdt = [0, 0]
tdm_first_occurences = []
# tdm_first_occurences[0] = tdm[0]
# Create a first occurences matrix that corresponds with the tdm
for j in range(len(text_names)):
item = text_names[j]
this_tdm = []
for i in range(0, len(tdm[0])):
word = tdm[0][i]
try:
this_tdm.append(first_occurences_corpus[item][word])
except:
this_tdm.append(0)
# print(this_tdm)
tdm_first_occurences.append(this_tdm)
self.vocab = tdm.pop(0)
self.corpus_names = mydoclist
tdt[0] = tdm
tdt[1] = tdm_first_occurences
tdt = np.asanyarray(tdt)
self.tdt = tdt
return tdt
def parafac_decomposition(self):
self.factors = parafac(np.array(self.tdt), rank=self.get_estimated_rank())
return self.factors
def main():
tdt = TermDocumentTensor("zeus_binaries", type="binary")
tdt.create_binary_term_document_tensor(ngrams=1)
tdt.convert_term_document_tensor_to_csv()
print(tdt.get_estimated_rank())
factors = tdt.parafac_decomposition()
factor_matrices = tdt.create_factor_matrices()
cos_sim = tdt.generate_cosine_similarity_matrix(factor_matrices[1])
#tdt.print_formatted_term_document_tensor()
plotly.tools.set_credentials_file(username='MaxPoole', api_key='2ajqCLZjiLNDFxgyLtGn')
fig, ax1 = plt.subplots(1, 1)
ax1.imshow(cos_sim, cmap='hot')
print(tdt.corpus_names)
plt.show()
main()