Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] - Feature/numba parallel #43

Open
wants to merge 7 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ htmlcov/
.coverage
*.coverage.*
.coveragerc
venv/
venv/
*.cache
216 changes: 197 additions & 19 deletions PyNomaly/loop.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from math import erf, sqrt
import os

from math import erf, factorial, sqrt
import multiprocessing as mp
import numpy as np
from python_utils.terminal import get_terminal_size
import sys
Expand All @@ -7,14 +10,46 @@

try:
import numba
dynamic_range = numba.prange
# dynamic_range = range
except ImportError:
dynamic_range = range
pass

__author__ = 'Valentino Constantinou'
__version__ = '0.3.4'
__version__ = '0.4.0'
__license__ = 'Apache License, Version 2.0'


def factorial_lil(n):
if n == 0:
return 1
else:
return n * factorial_lil(n-1)

# TODO: update docstring and move to proper location and add type hints
def _progress(iteration, total, prefix='', suffix='', decimals=1, length=100, fill='█', printEnd="\r") -> None:
"""
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
printEnd - Optional : end character (e.g. "\r", "\r\n") (Str)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
print(f'\r{prefix} |{bar}| {percent}% {suffix}', end=printEnd)
# Print New Line on Complete
if iteration == total:
print()


class Utils:

@staticmethod
Expand Down Expand Up @@ -321,6 +356,12 @@ def new_f(*args, **kwds):
},
'progress_bar': {
'type': types[8]
},
'parallel': {
'type': types[9]
},
'num_threads': {
'type': types[10]
}
}
for x in kwds:
Expand All @@ -341,10 +382,11 @@ def new_f(*args, **kwds):
return decorator

@accepts(object, np.ndarray, np.ndarray, np.ndarray, (int, np.integer),
(int, np.integer), list, bool, bool)
(int, np.integer), list, bool, bool, bool, int)
def __init__(self, data=None, distance_matrix=None, neighbor_matrix=None,
extent=3, n_neighbors=10, cluster_labels=None,
use_numba=False, progress_bar=False) -> None:
use_numba=False, progress_bar=False, parallel=False,
num_threads=mp.cpu_count()) -> None:
self.data = data
self.distance_matrix = distance_matrix
self.neighbor_matrix = neighbor_matrix
Expand All @@ -359,6 +401,8 @@ def __init__(self, data=None, distance_matrix=None, neighbor_matrix=None,
self.local_outlier_probabilities = None
self._objects = {}
self.progress_bar = progress_bar
self.parallel = parallel
self.num_threads = num_threads
self.is_fit = False

if self.use_numba is True and 'numba' not in sys.modules:
Expand Down Expand Up @@ -514,7 +558,8 @@ def _compute_distance_and_neighbor_matrix(
clust_points_vector: np.ndarray,
indices: np.ndarray,
distances: np.ndarray,
indexes: np.ndarray
indexes: np.ndarray,
total: int
) -> Tuple[np.ndarray, np.ndarray, int]:
"""
This helper method provides the heavy lifting for the _distances
Expand All @@ -523,7 +568,17 @@ def _compute_distance_and_neighbor_matrix(
desired.
"""

for i in range(clust_points_vector.shape[0]):
counter = 0
# n = clust_points_vector.shape[0]
# total = (n * (n + 1)) / 16

progress = "█"
# togo = '░' * 20
w = 0
blocks = 0

for i in dynamic_range(clust_points_vector.shape[0]):
# for i in range(clust_points_vector.shape[0]):
for j in range(i + 1, clust_points_vector.shape[0]):
p = ((i,), (j,))

Expand All @@ -544,9 +599,116 @@ def _compute_distance_and_neighbor_matrix(
distances[idx][idx_max] = d
indexes[idx][idx_max] = p[0][0]

yield distances, indexes, i

def _distances(self, progress_bar: bool = False) -> None:
counter += 1

percent = round(100 * (counter / total), 2)

if percent - w >= 5:
w += 5
blocks += 1
progress += "█"

# os.system("cls")

# print(percent, bars)
# print("\e[1;1H\e[2J" + progress)
# print("\x1b[1K\r" + progress)
# print(chr(27) + "[2J", w)

# print("\033[H")
# print("\033[2J")
# print("\033c")
# print('Test\033[K')
# print("meep" + chr(27) + "[2J")

# progress = progress + (20 - blocks) * "░"
bar = progress + (20 - blocks) * "░"
print(bar, percent)

bar = "█" * 21
print(bar, "100.")

# print(bars * progress)

# print("\x1b[2J\x1b[H")

# if total < w:
# block_size = int(w / total)
# else:
# block_size = int(total / w)

# print(block_size)

# if i % block_size in [0, 1, 2]:
# progress += "="
#
# print(progress + " %")
# print(progress + " " + str(percent) + "%")

# print(counter)

# 200010000
# 24218125



# if i % (total/50) == 0:
#
# iteration = counter
# decimals = 1
# length = 100
# prefix = '|'
# suffix = '|'
# fill = "="
# printEnd = "\r"
#
# print(iteration / float(total))

# # percent = str(100 * (iteration / float(total)))
# filledLength = int(length * iteration // total)
# bar = fill * filledLength + '-' * (length - filledLength)
# print(prefix + " | " + bar + " | " + "" + "% " + suffix)
# # Print New Line on Complete
# if iteration == total:
# print()


# print(counter)
# # update the progress bar
# # if progress_bar is True:
# progress = Utils.emit_progress_bar(
# progress, i+1, clust_points_vector.shape[0])

# yield distances, indexes, i
# # yield i
# # print(
# # progress, i + 1, clust_points_vector.shape[0])
# # w, h = get_terminal_size()
# w = 200
# h = 50
# print("\r")
# total = clust_points_vector.shape[0]
# if total < w:
# block_size = int(w / total)
# else:
# block_size = int(total / w)
# if completed % block_size == 0:
# progress += "="
# percent = completed / total
# print(percent)
# # print("[ " + progress + " ]")
# # print("[ %s ] %.2f%%" % (progress, percent * 100))
# print(np.count_nonzero(distances))

## NOTES
# multicore JIT with return object = 31.46 seconds
# single core JIT with return object = 44.05
# single core JIT with yield generator = 42.99
# single core python only = ~243

return distances, indexes, clust_points_vector.shape[0]

def _distances(self, progress_bar: bool = False, parallel: bool = False, num_threads: int = 1) -> None:
"""
Provides the distances between each observation and it's closest
neighbors. When input data is provided, calculates the euclidean
Expand All @@ -560,23 +722,39 @@ def _distances(self, progress_bar: bool = False) -> None:
indexes = np.full([self._n_observations(), self.n_neighbors], 9e10,
dtype=float)
self.points_vector = self.Validate._data(self.data)
compute = numba.jit(self._compute_distance_and_neighbor_matrix,
cache=True) if self.use_numba else \
self._compute_distance_and_neighbor_matrix

if self.use_numba:
numba.set_num_threads(num_threads)
compute = numba.jit(self._compute_distance_and_neighbor_matrix,
cache=False,
parallel=parallel,
nopython=parallel,
nogil=parallel,
)
else:
compute = self._compute_distance_and_neighbor_matrix
progress = "="
# TODO: check progress bar multiple cluster
for cluster_id in set(self._cluster_labels()):
indices = np.where(self._cluster_labels() == cluster_id)
clust_points_vector = np.array(
self.points_vector.take(indices, axis=0)[0],
dtype=np.float64
)

n = clust_points_vector.shape[0]
total = (n * (n + 1)) / np.max([num_threads, 2])
# total = (n * (n + 1)) / (num_threads + 2)

# a generator that yields an updated distance matrix on each loop
for c in compute(clust_points_vector, indices, distances, indexes):
distances, indexes, i = c
# update the progress bar
if progress_bar is True:
progress = Utils.emit_progress_bar(
progress, i+1, clust_points_vector.shape[0])
distances, indexes, i = compute(clust_points_vector, indices, distances, indexes, total)
# for c in compute(clust_points_vector, indices, distances, indexes):
# # distances, indexes, i = c
# i = c
# # update the progress bar
# if progress_bar is True:
# progress = Utils.emit_progress_bar(
# progress, i+1, clust_points_vector.shape[0])

self.distance_matrix = distances
self.neighbor_matrix = indexes
Expand Down Expand Up @@ -754,7 +932,7 @@ def fit(self) -> 'LocalOutlierProbability':

store = self._store()
if self.data is not None:
self._distances(progress_bar=self.progress_bar)
self._distances(progress_bar=self.progress_bar, parallel=self.parallel, num_threads=self.num_threads)
store = self._assign_distances(store)
store = self._ssd(store)
store = self._standard_distances(store)
Expand Down
7 changes: 6 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ All notable changes to PyNomaly will be documented in this Changelog.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## 0.3.4
## 0.4.0
### Added
- Parallel processing capability through numba just-in-time
compilation was added as an option for computing the Local Outlier
Probability.

### Changed
- Unit tests from using the `sklearn.utils.testing` submodule
to standard Python assertions, as the submodule will be changed
Expand Down
31 changes: 31 additions & 0 deletions examples/numba_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import multiprocessing as mp
import numpy as np
from PyNomaly import loop
import time

# generate a large set of data
data = np.ones(shape=(25000, 4))

for i in range(1, mp.cpu_count() + 1, 1):

t5 = time.time()
scores_numba_parallel = loop.LocalOutlierProbability(
data,
n_neighbors=3,
use_numba=True,
progress_bar=True,
parallel=True,
# TODO: user warning, correct behavior and continue of too many threads are passed
# TODO: user warning, ignore num_threads if numba and/or parallel is false
# TODO: add num threads to readme
num_threads=i
).fit().local_outlier_probabilities
t6 = time.time()
seconds_numba_parallel = t6 - t5
print("\nComputation took " + str(seconds_numba_parallel) +
" seconds with Numba JIT with parallel processing, using " + str(i) + " thread(s).")





Loading