-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdistances.py
165 lines (134 loc) · 7.27 KB
/
distances.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# sklearn medoids
from abc import ABC, abstractmethod
from sklearn_extra.cluster import KMedoids
import numpy as np
from pyclustering.cluster.kmedoids import kmedoids
from pyclustering.cluster.clarans import clarans
from sklearn.decomposition import PCA
from sklearn.metrics.pairwise import euclidean_distances
from sklearn.preprocessing import StandardScaler
import kmedoids as km
DISTANCE_TECHNIQUES = ["fasterpam", "clarans", "kmedoids", "skmedoids"]
__all__ = ["get_distance_calculator", "TECHNIQUES"]
def get_distance_calculator(technique, embedding_path, num_labels, use_reduced_for_medoids=True, use_reduced_for_dist=True):
assert technique in DISTANCE_TECHNIQUES, f"Technique {technique} not supported. Choose from {TECHNIQUES}"
if technique == "fasterpam":
return FasterPam(embedding_path, num_labels, use_reduced_for_medoids, use_reduced_for_dist)
elif technique == "clarans":
return Clarans(embedding_path, num_labels, use_reduced_for_medoids, use_reduced_for_dist)
elif technique == "kmedoids":
return Kmedoids(embedding_path, num_labels, use_reduced_for_medoids, use_reduced_for_dist)
elif technique == "skmedoids":
return SKmedoids(embedding_path, num_labels, use_reduced_for_medoids, use_reduced_for_dist)
class Distance(ABC):
def __init__(self, embedding_path, num_labels, use_reduced_for_medoids=True, use_reduced_for_dist=True):
self.raw_embeddings = np.load(embedding_path)
self.reduced_embeddings = None
self.num_clusters_per_class=5
self.num_labels = num_labels
self.use_reduced_for_medoids = use_reduced_for_medoids
self.use_reduced_for_dist = use_reduced_for_dist
def get_embeddings_for_distance(self):
if self.use_reduced_for_dist and self.use_reduced_for_medoids and self.reduced_embeddings is not None:
return self.reduced_embeddings
return self.raw_embeddings
def get_embeddings_for_medoids(self):
if self.use_reduced_for_medoids and self.reduced_embeddings is not None:
return self.reduced_embeddings
return self.raw_embeddings
def pca(self, embeddings):
print("staring pca")
print("embeddings shape: ", embeddings.shape)
scaler = StandardScaler()
embeddings_standardized = scaler.fit_transform(embeddings)
pca = PCA(n_components=0.95)
embeddings_pca = pca.fit_transform(embeddings_standardized)
print("Completed PCA")
print("embeddings_pca shape: ", embeddings_pca.shape)
return embeddings_pca
def set_raw_embeddings(self, embeddings):
self.raw_embeddings = embeddings
return
def set_reduced_embeddings(self, embeddings):
self.reduced_embeddings = embeddings
return
def set_labels(self, labels):
self.labels = labels
return
def get_distances_with_medoids(self, medoids):
embeddings_for_dist = self.get_embeddings_for_distance()
distances = np.zeros(self.raw_embeddings.shape[0])
for i, embedding in enumerate(embeddings_for_dist):
distances[i] = np.min(np.linalg.norm(embedding-medoids, axis=1))
return np.array(distances)
def get_distances(self):
embeddings_for_dist = self.get_embeddings_for_distance()
medoids = self.get_medoids()
distances = np.zeros(self.raw_embeddings.shape[0])
for i, embedding in enumerate(embeddings_for_dist):
distances[i] = np.min(np.linalg.norm(embedding - medoids, dim=1))
return np.array(distances)
def get_data_idxs_per_class(self):
data_idxs_per_class = []
for i in range(self.num_labels):
data_idxs_per_class.append(np.where(self.labels == i)[0])
return data_idxs_per_class
@abstractmethod
def _get_medoids(self, data_idxs):
pass
def get_embeddings_from_subset_idxs(self, data_subset_idxs, subset_medoid_idxs):
data_idxs = data_subset_idxs[subset_medoid_idxs]
return np.array(self.get_embeddings_for_distance()[data_idxs])
def get_medoids(self):
data_idxs = np.arange(len(self.reduced_embeddings))
medoids_idxs = self._get_medoids(data_idxs)
return np.array(self.get_embeddings_for_distance()[medoids_idxs])
def get_medoid_per_class(self):
data_idxs_per_class = self.get_data_idxs_per_class()
medoids = []
for class_num, data_idxs in enumerate(data_idxs_per_class):
new_medoid_idxs = self._get_medoids(data_idxs)
medoid_values = self.get_embeddings_from_subset_idxs(data_idxs, new_medoid_idxs)
# print("medoid values for class",class_num,": ", medoid_values.shape, len(data_idxs))
for medoid in medoid_values:
medoids.append(medoid)
return np.array(medoids)
# per class, fastest
class FasterPam(Distance):
def __init__(self, embedding_path, num_labels, use_reduced_for_medoids=True, use_reduced_for_dist=True):
super().__init__(embedding_path, num_labels, use_reduced_for_medoids, use_reduced_for_dist)
def _get_medoids(self, data_idxs):
relevant_embeddings = self.get_embeddings_for_medoids()[data_idxs]
diss = euclidean_distances(relevant_embeddings)
fp = km.fasterpam(diss, self.num_clusters_per_class)
subset_medoid_indxes = fp.medoids
return fp.medoids
def get_medoids(self):
return self._get_medoids(np.arange(len(self.reduced_embeddings)))
# this runs like a potato
class Clarans(Distance):
def __init__(self, embedding_path, num_labels, use_reduced_for_medoids=True, use_reduced_for_dist=True):
super().__init__(embedding_path, num_labels, use_reduced_for_medoids, use_reduced_for_dist)
def _get_medoids(self, data_idxs):
relevant_embeddings = self.get_embeddings_for_medoids()[data_idxs]
num_clusters = int(self.num_labels*self.num_clusters_per_class * (len(data_idxs) / len(self.reduced_embeddings)))
medoids = clarans(relevant_embeddings, num_clusters, numlocal=5, maxneighbor=4).process()
return medoids.get_medoids()
# this runs ok
class Kmedoids(Distance):
def __init__(self, embedding_path, num_labels, use_reduced_for_medoids=True, use_reduced_for_dist=True):
super().__init__(embedding_path, num_labels, use_reduced_for_medoids, use_reduced_for_dist)
def _get_medoids(self, data_idxs):
relevant_embeddings = self.get_embeddings_for_medoids()[data_idxs]
num_clusters = int(self.num_labels*self.num_clusters_per_class * (len(data_idxs) / len(self.reduced_embeddings)))
initial_index_medoids = np.random.randint(0, len(relevant_embeddings), num_clusters)
medoids = kmedoids(relevant_embeddings, initial_index_medoids).process()
return medoids.get_medoids()
class SKmedoids(Distance):
def __init__(self, embedding_path, num_labels, use_reduced_for_medoids=True, use_reduced_for_dist=True):
super().__init__(embedding_path, num_labels, use_reduced_for_medoids, use_reduced_for_dist)
def _get_medoids(self, data_idxs):
embeddings = self.get_embeddings_for_medoids()[data_idxs]
num_clusters = int(self.num_labels*self.num_clusters_per_class * (len(data_idxs) / len(self.reduced_embeddings)))
kmedoids = KMedoids(n_clusters=num_clusters, random_state=0).fit(embeddings)
return kmedoids.medoid_indices_