-
Notifications
You must be signed in to change notification settings - Fork 1
/
idecutils.py
85 lines (68 loc) · 2.75 KB
/
idecutils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from __future__ import division, print_function
import numpy as np
from sklearn.metrics import normalized_mutual_info_score, f1_score, adjusted_rand_score, cluster, accuracy_score, \
precision_score, recall_score
import sklearn.metrics as metrics
from munkres import Munkres
import random
pre = precision_score
rec = recall_score
Fscore = f1_score
def cluster_acc(y_true, y_pred):
nmi = normalized_mutual_info_score(y_true, y_pred)
ari = adjusted_rand_score(y_true, y_pred)
y_true = y_true - np.min(y_true)
l1 = list(set(y_true))
numclass1 = len(l1)
l2 = list(set(y_pred))
numclass2 = len(l2)
ind = 0
if numclass1 != numclass2:
for i in l1:
if i in l2:
pass
else:
y_pred[ind] = i
ind += 1
l2 = list(set(y_pred))
numclass2 = len(l2)
if numclass1 != numclass2:
print('error')
return
cost = np.zeros((numclass1, numclass2), dtype=int)
for i, c1 in enumerate(l1):
mps = [i1 for i1, e1 in enumerate(y_true) if e1 == c1]
for j, c2 in enumerate(l2):
mps_d = [i1 for i1 in mps if y_pred[i1] == c2]
cost[i][j] = len(mps_d)
# match two clustering results by Munkres algorithm
m = Munkres()
cost = cost.__neg__().tolist()
indexes = m.compute(cost)
# get the match results
new_predict = np.zeros(len(y_pred))
for i, c in enumerate(l1):
# correponding label in l2:
c2 = l2[indexes[i][1]]
# ai is the index with label==c2 in the pred_label list
ai = [ind for ind, elm in enumerate(y_pred) if elm == c2]
new_predict[ai] = c
acc = metrics.accuracy_score(y_true, new_predict)
f1_macro = metrics.f1_score(y_true, new_predict, average='macro')
precision_macro = metrics.precision_score(y_true, new_predict, average='macro')
recall_macro = metrics.recall_score(y_true, new_predict, average='macro')
# f1_micro = metrics.f1_score(y_true, new_predict, average='micro')
# precision_micro = metrics.precision_score(y_true, new_predict, average='micro')
# recall_micro = metrics.recall_score(y_true, new_predict, average='micro')
return acc, nmi, ari, f1_macro, precision_macro, recall_macro
def normalize(x):
x = (x-np.tile(np.min(x, axis=0), (x.shape[0], 1))) / np.tile((np.max(x, axis=0)-np.min(x, axis=0)), (x.shape[0], 1))
return x
def aligned_data_split(n_all, test_prop, seed):
random.seed(seed)
random_idx = random.sample(range(n_all), n_all)
train_num = np.ceil((1-test_prop) * n_all).astype(int)
train_idx = np.array(sorted(random_idx[0:train_num]))
test_num = np.floor(test_prop * n_all).astype(int)
test_idx = np.array(sorted(random_idx[-test_num:]))
return train_idx, test_idx