diff --git a/.gitignore b/.gitignore index cc65586..bd1967d 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,5 @@ htmlcov/ .coverage *.coverage.* .coveragerc -venv/ \ No newline at end of file +venv/ +*.cache diff --git a/PyNomaly/loop.py b/PyNomaly/loop.py index a0c341e..b951986 100644 --- a/PyNomaly/loop.py +++ b/PyNomaly/loop.py @@ -1,4 +1,7 @@ -from math import erf, sqrt +import os + +from math import erf, factorial, sqrt +import multiprocessing as mp import numpy as np from python_utils.terminal import get_terminal_size import sys @@ -7,14 +10,46 @@ try: import numba + dynamic_range = numba.prange + # dynamic_range = range except ImportError: + dynamic_range = range pass __author__ = 'Valentino Constantinou' -__version__ = '0.3.4' +__version__ = '0.4.0' __license__ = 'Apache License, Version 2.0' +def factorial_lil(n): + if n == 0: + return 1 + else: + return n * factorial_lil(n-1) + +# TODO: update docstring and move to proper location and add type hints +def _progress(iteration, total, prefix='', suffix='', decimals=1, length=100, fill='█', printEnd="\r") -> None: + """ + Call in a loop to create terminal progress bar + @params: + iteration - Required : current iteration (Int) + total - Required : total iterations (Int) + prefix - Optional : prefix string (Str) + suffix - Optional : suffix string (Str) + decimals - Optional : positive number of decimals in percent complete (Int) + length - Optional : character length of bar (Int) + fill - Optional : bar fill character (Str) + printEnd - Optional : end character (e.g. "\r", "\r\n") (Str) + """ + percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) + filledLength = int(length * iteration // total) + bar = fill * filledLength + '-' * (length - filledLength) + print(f'\r{prefix} |{bar}| {percent}% {suffix}', end=printEnd) + # Print New Line on Complete + if iteration == total: + print() + + class Utils: @staticmethod @@ -321,6 +356,12 @@ def new_f(*args, **kwds): }, 'progress_bar': { 'type': types[8] + }, + 'parallel': { + 'type': types[9] + }, + 'num_threads': { + 'type': types[10] } } for x in kwds: @@ -341,10 +382,11 @@ def new_f(*args, **kwds): return decorator @accepts(object, np.ndarray, np.ndarray, np.ndarray, (int, np.integer), - (int, np.integer), list, bool, bool) + (int, np.integer), list, bool, bool, bool, int) def __init__(self, data=None, distance_matrix=None, neighbor_matrix=None, extent=3, n_neighbors=10, cluster_labels=None, - use_numba=False, progress_bar=False) -> None: + use_numba=False, progress_bar=False, parallel=False, + num_threads=mp.cpu_count()) -> None: self.data = data self.distance_matrix = distance_matrix self.neighbor_matrix = neighbor_matrix @@ -359,6 +401,8 @@ def __init__(self, data=None, distance_matrix=None, neighbor_matrix=None, self.local_outlier_probabilities = None self._objects = {} self.progress_bar = progress_bar + self.parallel = parallel + self.num_threads = num_threads self.is_fit = False if self.use_numba is True and 'numba' not in sys.modules: @@ -514,7 +558,8 @@ def _compute_distance_and_neighbor_matrix( clust_points_vector: np.ndarray, indices: np.ndarray, distances: np.ndarray, - indexes: np.ndarray + indexes: np.ndarray, + total: int ) -> Tuple[np.ndarray, np.ndarray, int]: """ This helper method provides the heavy lifting for the _distances @@ -523,7 +568,17 @@ def _compute_distance_and_neighbor_matrix( desired. """ - for i in range(clust_points_vector.shape[0]): + counter = 0 + # n = clust_points_vector.shape[0] + # total = (n * (n + 1)) / 16 + + progress = "█" + # togo = '░' * 20 + w = 0 + blocks = 0 + + for i in dynamic_range(clust_points_vector.shape[0]): + # for i in range(clust_points_vector.shape[0]): for j in range(i + 1, clust_points_vector.shape[0]): p = ((i,), (j,)) @@ -544,9 +599,116 @@ def _compute_distance_and_neighbor_matrix( distances[idx][idx_max] = d indexes[idx][idx_max] = p[0][0] - yield distances, indexes, i - - def _distances(self, progress_bar: bool = False) -> None: + counter += 1 + + percent = round(100 * (counter / total), 2) + + if percent - w >= 5: + w += 5 + blocks += 1 + progress += "█" + + # os.system("cls") + + # print(percent, bars) + # print("\e[1;1H\e[2J" + progress) + # print("\x1b[1K\r" + progress) + # print(chr(27) + "[2J", w) + + # print("\033[H") + # print("\033[2J") + # print("\033c") + # print('Test\033[K') + # print("meep" + chr(27) + "[2J") + + # progress = progress + (20 - blocks) * "░" + bar = progress + (20 - blocks) * "░" + print(bar, percent) + + bar = "█" * 21 + print(bar, "100.") + + # print(bars * progress) + + # print("\x1b[2J\x1b[H") + + # if total < w: + # block_size = int(w / total) + # else: + # block_size = int(total / w) + + # print(block_size) + + # if i % block_size in [0, 1, 2]: + # progress += "=" + # + # print(progress + " %") + # print(progress + " " + str(percent) + "%") + + # print(counter) + + # 200010000 + # 24218125 + + + + # if i % (total/50) == 0: + # + # iteration = counter + # decimals = 1 + # length = 100 + # prefix = '|' + # suffix = '|' + # fill = "=" + # printEnd = "\r" + # + # print(iteration / float(total)) + + # # percent = str(100 * (iteration / float(total))) + # filledLength = int(length * iteration // total) + # bar = fill * filledLength + '-' * (length - filledLength) + # print(prefix + " | " + bar + " | " + "" + "% " + suffix) + # # Print New Line on Complete + # if iteration == total: + # print() + + + # print(counter) + # # update the progress bar + # # if progress_bar is True: + # progress = Utils.emit_progress_bar( + # progress, i+1, clust_points_vector.shape[0]) + + # yield distances, indexes, i + # # yield i + # # print( + # # progress, i + 1, clust_points_vector.shape[0]) + # # w, h = get_terminal_size() + # w = 200 + # h = 50 + # print("\r") + # total = clust_points_vector.shape[0] + # if total < w: + # block_size = int(w / total) + # else: + # block_size = int(total / w) + # if completed % block_size == 0: + # progress += "=" + # percent = completed / total + # print(percent) + # # print("[ " + progress + " ]") + # # print("[ %s ] %.2f%%" % (progress, percent * 100)) + # print(np.count_nonzero(distances)) + + ## NOTES + # multicore JIT with return object = 31.46 seconds + # single core JIT with return object = 44.05 + # single core JIT with yield generator = 42.99 + # single core python only = ~243 + + return distances, indexes, clust_points_vector.shape[0] + + def _distances(self, progress_bar: bool = False, parallel: bool = False, num_threads: int = 1) -> None: """ Provides the distances between each observation and it's closest neighbors. When input data is provided, calculates the euclidean @@ -560,23 +722,39 @@ def _distances(self, progress_bar: bool = False) -> None: indexes = np.full([self._n_observations(), self.n_neighbors], 9e10, dtype=float) self.points_vector = self.Validate._data(self.data) - compute = numba.jit(self._compute_distance_and_neighbor_matrix, - cache=True) if self.use_numba else \ - self._compute_distance_and_neighbor_matrix + + if self.use_numba: + numba.set_num_threads(num_threads) + compute = numba.jit(self._compute_distance_and_neighbor_matrix, + cache=False, + parallel=parallel, + nopython=parallel, + nogil=parallel, + ) + else: + compute = self._compute_distance_and_neighbor_matrix progress = "=" + # TODO: check progress bar multiple cluster for cluster_id in set(self._cluster_labels()): indices = np.where(self._cluster_labels() == cluster_id) clust_points_vector = np.array( self.points_vector.take(indices, axis=0)[0], dtype=np.float64 ) + + n = clust_points_vector.shape[0] + total = (n * (n + 1)) / np.max([num_threads, 2]) + # total = (n * (n + 1)) / (num_threads + 2) + # a generator that yields an updated distance matrix on each loop - for c in compute(clust_points_vector, indices, distances, indexes): - distances, indexes, i = c - # update the progress bar - if progress_bar is True: - progress = Utils.emit_progress_bar( - progress, i+1, clust_points_vector.shape[0]) + distances, indexes, i = compute(clust_points_vector, indices, distances, indexes, total) + # for c in compute(clust_points_vector, indices, distances, indexes): + # # distances, indexes, i = c + # i = c + # # update the progress bar + # if progress_bar is True: + # progress = Utils.emit_progress_bar( + # progress, i+1, clust_points_vector.shape[0]) self.distance_matrix = distances self.neighbor_matrix = indexes @@ -754,7 +932,7 @@ def fit(self) -> 'LocalOutlierProbability': store = self._store() if self.data is not None: - self._distances(progress_bar=self.progress_bar) + self._distances(progress_bar=self.progress_bar, parallel=self.parallel, num_threads=self.num_threads) store = self._assign_distances(store) store = self._ssd(store) store = self._standard_distances(store) diff --git a/changelog.md b/changelog.md index ec6bef6..80f67eb 100644 --- a/changelog.md +++ b/changelog.md @@ -4,7 +4,12 @@ All notable changes to PyNomaly will be documented in this Changelog. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). -## 0.3.4 +## 0.4.0 +### Added +- Parallel processing capability through numba just-in-time +compilation was added as an option for computing the Local Outlier +Probability. + ### Changed - Unit tests from using the `sklearn.utils.testing` submodule to standard Python assertions, as the submodule will be changed diff --git a/examples/numba_parallel.py b/examples/numba_parallel.py new file mode 100644 index 0000000..7bb21ab --- /dev/null +++ b/examples/numba_parallel.py @@ -0,0 +1,31 @@ +import multiprocessing as mp +import numpy as np +from PyNomaly import loop +import time + +# generate a large set of data +data = np.ones(shape=(25000, 4)) + +for i in range(1, mp.cpu_count() + 1, 1): + + t5 = time.time() + scores_numba_parallel = loop.LocalOutlierProbability( + data, + n_neighbors=3, + use_numba=True, + progress_bar=True, + parallel=True, + # TODO: user warning, correct behavior and continue of too many threads are passed + # TODO: user warning, ignore num_threads if numba and/or parallel is false + # TODO: add num threads to readme + num_threads=i + ).fit().local_outlier_probabilities + t6 = time.time() + seconds_numba_parallel = t6 - t5 + print("\nComputation took " + str(seconds_numba_parallel) + + " seconds with Numba JIT with parallel processing, using " + str(i) + " thread(s).") + + + + + diff --git a/examples/numba_speed_diff.py b/examples/numba_speed_diff.py index 2d4f622..bc5618c 100644 --- a/examples/numba_speed_diff.py +++ b/examples/numba_speed_diff.py @@ -1,4 +1,5 @@ import numpy as np +import os from PyNomaly import loop import time @@ -8,24 +9,47 @@ # first time the process without Numba # use the progress bar to track progress -t1 = time.time() -scores_numpy = loop.LocalOutlierProbability( - data, - n_neighbors=3, - use_numba=False, - progress_bar=True -).fit().local_outlier_probabilities -t2 = time.time() -seconds_no_numba = t2 - t1 -print("\nComputation took " + str(seconds_no_numba) + " seconds without Numba JIT.") +# t1 = time.time() +# scores_numpy = loop.LocalOutlierProbability( +# data, +# n_neighbors=3, +# use_numba=False, +# progress_bar=True +# ).fit().local_outlier_probabilities +# t2 = time.time() +# seconds_no_numba = t2 - t1 +# print("\nComputation took " + str(seconds_no_numba) + " seconds without Numba JIT.") + +# t3 = time.time() +# scores_numba = loop.LocalOutlierProbability( +# data, +# n_neighbors=3, +# use_numba=True, +# progress_bar=False, +# parallel=False +# ).fit().local_outlier_probabilities +# t4 = time.time() +# seconds_numba = t4 - t3 +# print("\nComputation took " + str(seconds_numba) + " seconds with Numba JIT.") -t3 = time.time() -scores_numba = loop.LocalOutlierProbability( +t5 = time.time() +scores_numba_parallel = loop.LocalOutlierProbability( data, n_neighbors=3, use_numba=True, - progress_bar=True + progress_bar=True, + parallel=True, + # TODO: fix sigenv kill on anything 3 or more cores + # TODO: user warning, correct behavior and continue of too many threads are passed + # TODO: user warning, ignore num_threads if numba and/or parallel is false + # TODO: add num threads to readme + # num_threads=os.cpu_count() + # TODO: num_threads or num_cores? + # TODO: flush stdout + # TODO: changelog, readme + # TODO: tests? + num_threads=3 # TODO: if numba true and parallel false, set to 1 ).fit().local_outlier_probabilities -t4 = time.time() -seconds_numba = t4 - t3 -print("\nComputation took " + str(seconds_numba) + " seconds with Numba JIT.") +t6 = time.time() +seconds_numba_parallel = t6 - t5 +print("\nComputation took " + str(seconds_numba_parallel) + " seconds with Numba JIT with parallel processing.") diff --git a/readme.md b/readme.md index a161255..73d1f62 100644 --- a/readme.md +++ b/readme.md @@ -5,7 +5,7 @@ LoOP is a local density based outlier detection method by Kriegel, Kröger, Schu scores in the range of [0,1] that are directly interpretable as the probability of a sample being an outlier. [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) -[![PyPi](https://img.shields.io/badge/pypi-0.3.4-blue.svg)](https://pypi.python.org/pypi/PyNomaly/0.3.4) +[![PyPi](https://img.shields.io/badge/pypi-0.4.0-blue.svg)](https://pypi.python.org/pypi/PyNomaly/0.4.0) ![](https://img.shields.io/pypi/dm/PyNomaly.svg?logoColor=blue) [![Build Status](https://travis-ci.org/vc1492a/PyNomaly.svg?branch=master)](https://travis-ci.org/vc1492a/PyNomaly) [![Coverage Status](https://coveralls.io/repos/github/vc1492a/PyNomaly/badge.svg?branch=master)](https://coveralls.io/github/vc1492a/PyNomaly?branch=master) @@ -39,13 +39,15 @@ to calculate the Local Outlier Probability of each sample. - Python 3.5 - 3.8 - numpy >= 1.16.3 - python-utils >= 2.3.0 -- (optional) numba >= 0.45.1 +- (optional) numba >= 0.51.2 +- (optional) SciPy >= 1.5.2 Numba just-in-time (JIT) compiles the function with calculates the Euclidean distance between observations, providing a reduction in computation time (significantly when a large number of observations are scored). Numba is not a requirement and PyNomaly may still be used solely with numpy if desired -(details below). +(details below). When using Numba, [SciPy](https://www.scipy.org/) should +also be installed within the environment. ## Quick Start @@ -110,6 +112,23 @@ speed of multiple calls to `LocalOutlierProbability()`, and PyNomaly has been tested with Numba version 0.45.1. An example of the speed difference that can be realized with using Numba is avaialble in `examples/numba_speed_diff.py`. +Parallel processing is available when using PyNomaly with Numba - +simply set `parallel=True`: + +```python +from PyNomaly import loop +m = loop.LocalOutlierProbability(data, use_numba=True, progress_bar=True, parallel=True).fit() +scores = m.local_outlier_probabilities +print(scores) +``` + +The benefits of using parallelism will vary depending on the CPU architecture (number of cores, +clock speed, etc.) and the shape of the data processed (number of observations +and features). In some cases, it may be best to use Numba to compile LoOP without +parallelization. Additionally, parallelization only applies to calculation of +distances between observations and thus will not be applied when supplying a +distance matrix from outside PyNomaly (more details below). + You may also choose to print progress bars _with our without_ the use of numba by passing `progress_bar=True` to the `LocalOutlierProbability()` method as above. diff --git a/setup.py b/setup.py index 9b9efc6..4468f4c 100644 --- a/setup.py +++ b/setup.py @@ -3,14 +3,14 @@ setup( name='PyNomaly', packages=['PyNomaly'], - version='0.3.4', + version='0.4.0', description='A Python 3 implementation of LoOP: Local Outlier ' 'Probabilities, a local density based outlier detection ' 'method providing an outlier score in the range of [0,1].', author='Valentino Constantinou', author_email='vc@valentino.io', url='https://github.com/vc1492a/PyNomaly', - download_url='https://github.com/vc1492a/PyNomaly/archive/0.3.4.tar.gz', + download_url='https://github.com/vc1492a/PyNomaly/archive/0.4.0.tar.gz', keywords=['outlier', 'anomaly', 'detection', 'machine', 'learning', 'probability'], classifiers=[], diff --git a/tests/test_loop.py b/tests/test_loop.py index 9e5850f..4f3e0be 100644 --- a/tests/test_loop.py +++ b/tests/test_loop.py @@ -486,8 +486,8 @@ def test_data_format() -> None: with pytest.warns(UserWarning) as record: clf.fit() - # check that only one warning was raised - assert len(record) == 1 + # check that at least one warning was raised + assert len(record) >= 1 def test_missing_values() -> None: