diff --git a/navis/core/base.py b/navis/core/base.py index 62d58037..1432c525 100644 --- a/navis/core/base.py +++ b/navis/core/base.py @@ -45,7 +45,7 @@ pint.Quantity([]) -def Neuron(x: Union[nx.DiGraph, str, pd.DataFrame, 'TreeNeuron', 'MeshNeuron'], +def Neuron(x: Union[nx.DiGraph, str, pd.DataFrame, 'core.TreeNeuron', 'core.MeshNeuron'], **metadata): """Constructor for Neuron objects. Depending on the input, either a ``TreeNeuron`` or a ``MeshNeuron`` is returned. diff --git a/navis/core/core_utils.py b/navis/core/core_utils.py index 59dd99ab..a1a0a014 100644 --- a/navis/core/core_utils.py +++ b/navis/core/core_utils.py @@ -12,6 +12,7 @@ # GNU General Public License for more details. import functools +import itertools import numbers import os import pint @@ -26,16 +27,6 @@ from .. import config, graph, utils, core - -try: - #from pathos.multiprocessing import ProcessingPool - # pathos' ProcessingPool apparently ignores chunksize - # (see https://stackoverflow.com/questions/55611806/how-to-set-chunk-size-when-using-pathos-processingpools-map) - import pathos - ProcessingPool = pathos.pools._ProcessPool -except ImportError: - ProcessingPool = None - __all__ = ['make_dotprops', 'to_neuron_space'] # Set up logging @@ -319,7 +310,8 @@ def __init__(self, warn_inplace: bool = True, omit_failures: bool = False, exclude_zip: list = [], - desc: Optional[str] = None): + desc: Optional[str] = None, + executor: Optional['core.executors.Executor'] = None): if utils.is_iterable(function): if len(function) != len(nl): raise ValueError('Number of functions must match neurons.') @@ -341,6 +333,7 @@ def __init__(self, self.warn_inplace = warn_inplace self.exclude_zip = exclude_zip self.omit_failures = omit_failures + self.executor = executor # This makes sure that help and name match the functions being called functools.update_wrapper(self, self.function) @@ -349,31 +342,30 @@ def __call__(self, *args, **kwargs): # Explicitly providing these parameters overwrites defaults parallel = kwargs.pop('parallel', self.parallel) n_cores = kwargs.pop('n_cores', self.n_cores) + chunksize = kwargs.pop('chunksize', self.chunksize) # We will check, for each argument, if it matches the number of - # functions to run. If they it does, we will zip the values - # with the neurons + # functions to run. If they it does, we will pass directly as iterables. + # Otherwise, repeat the value for each neuron. + parsed_args = [] - parsed_kwargs = [] - - for i, n in enumerate(self.nl): - parsed_args.append([]) - parsed_kwargs.append({}) - for k, a in enumerate(args): - if k in self.exclude_zip: - parsed_args[i].append(a) - elif not utils.is_iterable(a) or len(a) != len(self.nl): - parsed_args[i].append(a) - else: - parsed_args[i].append(a[i]) - - for k, v in kwargs.items(): - if k in self.exclude_zip: - parsed_kwargs[i][k] = v - elif not utils.is_iterable(v) or len(v) != len(self.nl): - parsed_kwargs[i][k] = v - else: - parsed_kwargs[i][k] = v[i] + parsed_kwargs = {} + + for k, a in enumerate(args): + if k in self.exclude_zip: + parsed_args.append(itertools.repeat(a)) + elif not utils.is_iterable(a) or len(a) != len(self.nl): + parsed_args.append(itertools.repeat(a)) + else: + parsed_args.append(a) + + for k, v in kwargs.items(): + if k in self.exclude_zip: + parsed_kwargs[k] = itertools.repeat(v) + elif not utils.is_iterable(v) or len(v) != len(self.nl): + parsed_kwargs[k] = itertools.repeat(v) + else: + parsed_kwargs[k] = v # Silence loggers (except Errors) level = logger.getEffectiveLevel() @@ -381,52 +373,22 @@ def __call__(self, *args, **kwargs): if level < 30: logger.setLevel('WARNING') - # Apply function - if parallel: - if not ProcessingPool: - raise ImportError('navis relies on pathos for multiprocessing!' - 'Please install pathos and try again:\n' - ' pip3 install pathos -U') - - if self.warn_inplace and kwargs.get('inplace', False): - logger.warning('`inplace=True` does not work with ' - 'multiprocessing ') - - with ProcessingPool(n_cores) as pool: - combinations = list(zip(self.funcs, - parsed_args, - parsed_kwargs)) - chunksize = kwargs.pop('chunksize', self.chunksize) # max(int(len(combinations) / 100), 1) - - if not self.omit_failures: - wrapper = _call - else: - wrapper = _try_call - - res = list(config.tqdm(pool.imap(wrapper, - combinations, - chunksize=chunksize), - total=len(combinations), - desc=self.desc, - disable=config.pbar_hide or not self.progress, - leave=config.pbar_leave)) + if self.executor: + executor = self.executor else: - res = [] - for i, n in enumerate(config.tqdm(self.nl, desc=self.desc, - disable=(config.pbar_hide - or not self.progress - or len(self.nl) <= 1), - leave=config.pbar_leave)): - try: - res.append(self.funcs[i](*parsed_args[i], **parsed_kwargs[i])) - except BaseException as e: - if self.omit_failures: - res.append(FailedRun(func=self.funcs[i], - args=parsed_args[i], - kwargs=parsed_kwargs[i], - exception=e)) - else: - raise + if parallel: + executor = core.executors.PathosProcessPoolExecutor(progress=self.progress, n_cores=n_cores) + else: + executor = core.executors.Executor(progress=self.progress) + + if self.warn_inplace and kwargs.get('inplace', False) and not executor.supports_inplace(): + logger.warning('`inplace=True` does not work with ' + 'multiprocessing ') + + with executor as executor: + res = list(executor.map( + self.funcs, *parsed_args, **parsed_kwargs, chunksize=chunksize, desc=self.desc + )) # Reset logger level to previous state logger.setLevel(level) diff --git a/navis/core/executors/__init__.py b/navis/core/executors/__init__.py new file mode 100644 index 00000000..7ce27d46 --- /dev/null +++ b/navis/core/executors/__init__.py @@ -0,0 +1,14 @@ +# This script is part of navis (http://www.github.com/navis-org/navis). +# Copyright (C) 2018 Philipp Schlegel +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +from .base import Executor, PathosProcessPoolExecutor diff --git a/navis/core/executors/base.py b/navis/core/executors/base.py new file mode 100644 index 00000000..63fe999b --- /dev/null +++ b/navis/core/executors/base.py @@ -0,0 +1,122 @@ +# This script is part of navis (http://www.github.com/navis-org/navis). +# Copyright (C) 2018 Philipp Schlegel +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +import itertools +import os + +from typing import Optional + +from ... import config + + +try: + #from pathos.multiprocessing import ProcessingPool + # pathos' ProcessingPool apparently ignores chunksize + # (see https://stackoverflow.com/questions/55611806/how-to-set-chunk-size-when-using-pathos-processingpools-map) + import pathos + ProcessingPool = pathos.pools._ProcessPool +except ImportError: + ProcessingPool = None + + +def kwargs_of_iter_to_iter_of_kwargs(**kwargs): + """Convert a dict with iterable values to an iterator of dicts.""" + if kwargs: + kwargs_iters = zip(*kwargs.values()) + kwargs_iter = (dict(zip(kwargs.keys(), vs)) for vs in kwargs_iters) + else: + kwargs_iter = itertools.repeat({}) + + return kwargs_iter + + +class Executor: + """Simple analogue to `concurrent.futures.Executor` with progress and error handling.""" + progress: bool + + def __init__( + self, + progress: bool = True, + ): + self.progress = progress + + def submit(self, fn, *args, **kwargs): + return fn(*args, **kwargs) + + def map(self, fn, *iterables, desc: Optional[str]=None, chunksize=1, **kwargs): + kwargs_iter = kwargs_of_iter_to_iter_of_kwargs(**kwargs) + + res = list(map( + lambda x, kw: self.submit(*x, **kw), + config.tqdm( + zip(fn, *iterables), + desc=desc, + disable=(config.pbar_hide + or not self.progress), + leave=config.pbar_leave), + kwargs_iter, + ) + ) + + return iter(res) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + return False + + @classmethod + def supports_inplace(cls): + return True + + +class PathosProcessPoolExecutor(Executor): + def __init__( + self, + n_cores: int = os.cpu_count() // 2, + **kwargs + ): + super().__init__(**kwargs) + if not ProcessingPool: + raise ImportError('navis relies on pathos for multiprocessing!' + 'Please install pathos and try again:\n' + ' pip3 install pathos -U') + self.pool = ProcessingPool(n_cores) + + def __enter__(self): + self.pool.__enter__() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + return self.pool.__exit__(exc_type, exc_val, exc_tb) + + def map(self, fn, *iterables, desc: Optional[str]=None, chunksize=1, **kwargs): + kwargs_iter = kwargs_of_iter_to_iter_of_kwargs(**kwargs) + + res = list(config.tqdm(self.pool.imap( + lambda x: x[0](*x[1], **x[2]), + zip(fn, zip(*iterables), kwargs_iter), + chunksize=chunksize, + ), + desc=desc, + disable=(config.pbar_hide + or not self.progress), + leave=config.pbar_leave), + ) + + return iter(res) + + @classmethod + def supports_inplace(cls): + return False diff --git a/navis/core/executors/dask.py b/navis/core/executors/dask.py new file mode 100644 index 00000000..d878f244 --- /dev/null +++ b/navis/core/executors/dask.py @@ -0,0 +1,57 @@ +# This script is part of navis (http://www.github.com/navis-org/navis). +# Copyright (C) 2018 Philipp Schlegel +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +import contextlib + +import dask +import tqdm.dask + +from typing import Optional + +from ... import config +from . import Executor +from .base import kwargs_of_iter_to_iter_of_kwargs + + +class SimpleDaskExecutor(Executor): + def __init__( + self, + **kwargs + ): + super().__init__(**kwargs) + + def map(self, fn, *iterables, desc: Optional[str]=None, chunksize=1, **kwargs): + kwargs_iter = kwargs_of_iter_to_iter_of_kwargs(**kwargs) + + if self.progress: + pbar = tqdm.dask.TqdmCallback( + tqdm_class=config.tqdm, + desc=desc, + disable=(config.pbar_hide + or not self.progress), + leave=config.pbar_leave) + else: + pbar = contextlib.nullcontext() + + res = [dask.delayed(f)(*a, **kw) for f, a, kw in + zip(fn, zip(*iterables), kwargs_iter) + ] + + with pbar: + res = dask.compute(res)[0] + + return iter(res) + + @classmethod + def supports_inplace(cls): + return True diff --git a/navis/io/base.py b/navis/io/base.py index 5a36ed3a..55c78de5 100644 --- a/navis/io/base.py +++ b/navis/io/base.py @@ -330,7 +330,7 @@ def read_from_zip( self, files: Union[str, List[str]], zippath: os.PathLike, attrs: Optional[Dict[str, Any]] = None, - on_error: Union[Literal['ignore', Literal['raise']]] = 'ignore' + on_error: Union[Literal['ignore'], Literal['raise']] = 'ignore' ) -> 'core.NeuronList': """Read given files from a zip into a NeuronList. @@ -378,7 +378,7 @@ def read_zip( parallel="auto", limit: Optional[int] = None, attrs: Optional[Dict[str, Any]] = None, - on_error: Union[Literal['ignore', Literal['raise']]] = 'ignore' + on_error: Union[Literal['ignore'], Literal['raise']] = 'ignore' ) -> 'core.NeuronList': """Read files from a zip into a NeuronList. diff --git a/navis/morpho/manipulation.py b/navis/morpho/manipulation.py index fa02d4a7..a7255fe1 100644 --- a/navis/morpho/manipulation.py +++ b/navis/morpho/manipulation.py @@ -937,7 +937,7 @@ def combine_neurons(*x: Union[Sequence[NeuronObject], 'core.NeuronList'] elif isinstance(nl[0], core.VoxelNeuron): raise TypeError('Combining VoxelNeuron not (yet) supported') else: - raise TypeError(f'Unable to combine {ty}') + raise TypeError(f'Unable to combine {type(nl[0])}') return x diff --git a/navis/utils/decorators.py b/navis/utils/decorators.py index 646b6825..76b89aa6 100644 --- a/navis/utils/decorators.py +++ b/navis/utils/decorators.py @@ -159,7 +159,7 @@ def wrapper(*args, **kwargs): # All things failing assume it's not inplace inplace = False - if parallel and 'inplace' in sig.parameters: + if parallel and 'inplace' not in kwargs and 'inplace' in sig.parameters: kwargs['inplace'] = True # Prepare processor @@ -174,7 +174,8 @@ def wrapper(*args, **kwargs): omit_failures=kwargs.pop('omit_failures', False), chunksize=chunksize, exclude_zip=excl, - n_cores=n_cores) + n_cores=n_cores, + executor=kwargs.pop("executor", None)) # Apply function res = proc(nl, *args, **kwargs) diff --git a/requirements.txt b/requirements.txt index b5af845e..147b5bec 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,7 +14,7 @@ seaborn>=0.10 setuptools>=50.6 scipy>=1.5 six>=1.11 -tqdm>=4.45 +tqdm>=4.59 typing-extensions>=3.7.4 trimesh>=3.8 # without backend, as we use some utility functions @@ -40,6 +40,8 @@ flybrains #extra: flybrains cloud-volume>=5.2.0 #extra: cloudvolume +dask #extra: dask + # this should inherit the vispy version constraint from above vispy[pyside6] #extra: vispy-default @@ -60,6 +62,7 @@ flake8 wheel mypy pytest +pytest-benchmark pytest-env pytest-xvfb gitpython diff --git a/tests/test_parallel.py b/tests/test_parallel.py index 7b1fbfcd..e58f0eb1 100644 --- a/tests/test_parallel.py +++ b/tests/test_parallel.py @@ -3,16 +3,20 @@ import numpy as np -def test_parallel(): +def test_parallel(benchmark): # Load example neurons nl = navis.example_neurons(kind='skeleton') + # Use serial reference for comparison + ref = navis.prune_by_strahler(nl, 1, parallel=False, inplace=False) + # Test decorator - pr = navis.prune_by_strahler(nl, 1, parallel=True, inplace=False) + pr = benchmark(navis.prune_by_strahler, nl, 1, parallel=True, inplace=False) assert isinstance(pr, navis.NeuronList) assert len(pr) == len(nl) assert pr[0].n_nodes < nl[0].n_nodes + assert pr == ref # Test apply pr = nl.apply(navis.prune_by_strahler, to_prune=1, inplace=False, @@ -20,6 +24,7 @@ def test_parallel(): assert isinstance(pr, navis.NeuronList) assert len(pr) == len(nl) assert all(pr.n_nodes < nl.n_nodes) + assert pr == ref def test_parallel_inplace():