Skip to content

Commit

Permalink
Changed to use Pandas DataFrames instead of dictionaries #123
Browse files Browse the repository at this point in the history
  • Loading branch information
dpuenteramirez committed Mar 11, 2022
1 parent 682f51f commit c96cf94
Showing 1 changed file with 91 additions and 95 deletions.
186 changes: 91 additions & 95 deletions semisupervised/densitypeaks.py → semisupervised/DensityPeaks.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @Filename: densitypeaks.py
# @Filename: DensityPeaks.py
# @Author: Daniel Puente Ramírez
# @Time: 5/3/22 09:55
# @Version: 2.0

from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import math

import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.semi_supervised import SelfTrainingClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
import numpy as np
import math


class SemiSupervisedSelfTraining:
Expand All @@ -23,15 +22,18 @@ def __init__(self,
density_threshold=None,
distance_threshold=None,
anormal=True,
filtering=False
filtering=False,
classifier=None
):
self.y = None
self.dc = dc
self.distance_metric = distance_metric
self.gauss_cutoff = gauss_cutoff
self.density_threshold = density_threshold
self.distance_threshold = distance_threshold
self.anormal = anormal
self.filtering = filtering
self.classifier = classifier

def build_distance(self):
"""
Expand Down Expand Up @@ -132,111 +134,105 @@ def min_neighbor_and_distance(self):

return np.array(delta, np.float32), np.array(nneigh, np.float32)

def _fit_without(self, L, U, y):
self.data = np.concatenate((L, U), axis=0)
def __structure(self):
self.structure = dict.fromkeys(range(self.n_id))
for index, sample in enumerate(self.data):
self.structure[index] = [
sample,
int(self.nneigh[index]),
None,
self.y[index] if index < len(self.y) else -1
]

for index in range(self.n_id):
if self.structure[self.structure[index][1]][2] is None:
self.structure[self.structure[index][1]][2] = index

self.structure = pd.DataFrame(self.structure, index=['sample', 'next',
'previous',
'label']) \
.transpose()

def _fit_without(self, l, u, y):
self.data = np.concatenate((l, u), axis=0)
self.n_id = self.data.shape[0]
self.distances, self.max_dis, self.min_dis = self.build_distance()
self.dc = self.select_dc()
self.rho = self.local_density()
self.delta, self.nneigh = self.min_neighbor_and_distance()
self.dlabeled = {}
self.dunlabeled = {}
self.classifier = SVC()

for index, sample in enumerate(L):
self.dlabeled[tuple(sample)] = tuple(self.data[int(self.nneigh[
index])])
try:
self.dlabeled[tuple(sample)]
except KeyError:
print(tuple(sample))
print(tuple(self.data[int(self.nneigh[index])]))
self.dlabeled[tuple(sample)] = tuple(self.data[int(
self.nneigh[index])])

while len(self.dlabeled.keys()) != len(y):
y = y[:-1]

for index, sample in enumerate(U):
self.dunlabeled[tuple(sample)] = \
tuple(self.data[int(self.nneigh[index + len(L)])])
try:
self.dunlabeled[tuple(sample)]
except KeyError:
print(tuple(sample))
print(tuple(self.data[int(self.nneigh[index])]))
self.dunlabeled[tuple(sample)] = tuple(self.data[int(
self.nneigh[index + len(L)])])
if self.classifier is None:
self.classifier = SVC()

# Step 1
self.__structure()

# Step 2
while True:
L = list(self.dlabeled.keys())
self.classifier.fit(L, y)
samples_to_add = []
for sample, next in self.dunlabeled.items():
if next in self.dlabeled.keys():
samples_to_add.append(tuple(sample))
if len(samples_to_add) == 0:
# 2.a
samples_labeled = self.structure.loc[self.structure['label'] != -1]
l = samples_labeled['sample'].to_list()
self.y = samples_labeled['label'].to_list()
self.classifier.fit(l, self.y)

# 2.b
next_rows = samples_labeled['next'].to_numpy()
next_unlabeled = []
samples_labeled_index = samples_labeled.index.to_list()
for next_row in next_rows:
if next_row not in samples_labeled_index:
next_unlabeled.append(next_row)
if len(next_unlabeled) == 0:
break
next_pred = self.classifier.predict(samples_to_add)
y = np.concatenate((next_pred, y), axis=0)

for sample in samples_to_add:
self.dlabeled[tuple(sample)] = self.dunlabeled[tuple(sample)]
self.dunlabeled.pop(tuple(sample))
while len(self.dlabeled.keys()) != len(y):
y = y[:-1]

while len(self.dunlabeled) > 0:
L = list(self.dlabeled.keys())
self.classifier.fit(L, y)
samples_to_add = []
for prev, sample in self.dlabeled.items():
if tuple(sample) in self.dunlabeled.keys():
samples_to_add.append(sample)
if len(samples_to_add) == 0:
unlabeled_next_of_labeled = self.structure.loc[next_unlabeled]

lu = unlabeled_next_of_labeled['sample'].to_list()
y_pred = self.classifier.predict(lu)

# 2.c
for new_label, pos in zip(y_pred, next_unlabeled):
self.structure.at[pos, 'label'] = new_label

# Step 3
while True:
# 3.a
samples_labeled = self.structure.loc[self.structure['label'] != -1]
l = samples_labeled['sample'].to_list()
self.y = samples_labeled['label'].to_list()
self.classifier.fit(l, self.y)

# 3.b
prev_rows = samples_labeled['previous'].to_numpy()
prev_unlabeled = []
samples_labeled_index = samples_labeled.index.to_list()
for prev_row in prev_rows:
if prev_row not in samples_labeled_index and prev_row is not \
None:
prev_unlabeled.append(prev_row)
if len(prev_unlabeled) == 0:
break
next_pred = self.classifier.predict(samples_to_add)
y = np.concatenate((next_pred, y), axis=0)
for sample in samples_to_add:
self.dlabeled[tuple(sample)] = self.dunlabeled[tuple(sample)]
self.dunlabeled.pop(tuple(sample))
while len(self.dlabeled.keys()) != len(y):
y = y[:-1]

def fit(self, L, U, y):
unlabeled_prev_of_labeled = self.structure.loc[prev_unlabeled]

lu = unlabeled_prev_of_labeled['sample'].to_list()
y_pred = self.classifier.predict(lu)

# 3.c
for new_label, pos in zip(y_pred, prev_unlabeled):
self.structure.at[pos, 'label'] = new_label

def fit(self, l, u, y):
"""Fit method"""
if len(L) != len(y):
if len(l) != len(y):
raise ValueError(
f'The dimension of the labeled data must be the same as the '
f'number of labels given. {len(L)} != {len(y)}'
f'number of labels given. {len(l)} != {len(y)}'
)

le = LabelEncoder()
le.fit(y)
y = le.transform(y)
self.y = y
if self.filtering is False:
self._fit_without(L, U, y)
self._fit_without(l, u, y)

def predict(self, src):
return self.classifier.predict(src)


if __name__ == '__main__':
iris = load_iris()
X = iris.data
y = iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.9)
choices = np.random.choice(len(X_train), math.ceil(len(X_train) * 0.5),
replace=False)
labeled = [x for x in range(len(X_train)) if x not in choices]
y = y[labeled]
X_labeled = X_train[labeled]
X_unlabeled = X_train[choices]

model = SemiSupervisedSelfTraining(filtering=False)
model.fit(X_labeled, X_unlabeled, y)
y_pred = model.predict(X_test)
from sklearn.metrics import accuracy_score
print(accuracy_score(y_test, y_pred))


0 comments on commit c96cf94

Please sign in to comment.