Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hackathon: dask backend #111

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion navis/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
pint.Quantity([])


def Neuron(x: Union[nx.DiGraph, str, pd.DataFrame, 'TreeNeuron', 'MeshNeuron'],
def Neuron(x: Union[nx.DiGraph, str, pd.DataFrame, 'core.TreeNeuron', 'core.MeshNeuron'],
**metadata):
"""Constructor for Neuron objects. Depending on the input, either a
``TreeNeuron`` or a ``MeshNeuron`` is returned.
Expand Down
118 changes: 40 additions & 78 deletions navis/core/core_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# GNU General Public License for more details.

import functools
import itertools
import numbers
import os
import pint
Expand All @@ -26,16 +27,6 @@

from .. import config, graph, utils, core


try:
#from pathos.multiprocessing import ProcessingPool
# pathos' ProcessingPool apparently ignores chunksize
# (see https://stackoverflow.com/questions/55611806/how-to-set-chunk-size-when-using-pathos-processingpools-map)
import pathos
ProcessingPool = pathos.pools._ProcessPool
except ImportError:
ProcessingPool = None

__all__ = ['make_dotprops', 'to_neuron_space']

# Set up logging
Expand Down Expand Up @@ -319,7 +310,8 @@ def __init__(self,
warn_inplace: bool = True,
omit_failures: bool = False,
exclude_zip: list = [],
desc: Optional[str] = None):
desc: Optional[str] = None,
executor: Optional['core.executors.Executor'] = None):
if utils.is_iterable(function):
if len(function) != len(nl):
raise ValueError('Number of functions must match neurons.')
Expand All @@ -341,6 +333,7 @@ def __init__(self,
self.warn_inplace = warn_inplace
self.exclude_zip = exclude_zip
self.omit_failures = omit_failures
self.executor = executor

# This makes sure that help and name match the functions being called
functools.update_wrapper(self, self.function)
Expand All @@ -349,84 +342,53 @@ def __call__(self, *args, **kwargs):
# Explicitly providing these parameters overwrites defaults
parallel = kwargs.pop('parallel', self.parallel)
n_cores = kwargs.pop('n_cores', self.n_cores)
chunksize = kwargs.pop('chunksize', self.chunksize)

# We will check, for each argument, if it matches the number of
# functions to run. If they it does, we will zip the values
# with the neurons
# functions to run. If they it does, we will pass directly as iterables.
# Otherwise, repeat the value for each neuron.

parsed_args = []
parsed_kwargs = []

for i, n in enumerate(self.nl):
parsed_args.append([])
parsed_kwargs.append({})
for k, a in enumerate(args):
if k in self.exclude_zip:
parsed_args[i].append(a)
elif not utils.is_iterable(a) or len(a) != len(self.nl):
parsed_args[i].append(a)
else:
parsed_args[i].append(a[i])

for k, v in kwargs.items():
if k in self.exclude_zip:
parsed_kwargs[i][k] = v
elif not utils.is_iterable(v) or len(v) != len(self.nl):
parsed_kwargs[i][k] = v
else:
parsed_kwargs[i][k] = v[i]
parsed_kwargs = {}

for k, a in enumerate(args):
if k in self.exclude_zip:
parsed_args.append(itertools.repeat(a))
elif not utils.is_iterable(a) or len(a) != len(self.nl):
parsed_args.append(itertools.repeat(a))
else:
parsed_args.append(a)

for k, v in kwargs.items():
if k in self.exclude_zip:
parsed_kwargs[k] = itertools.repeat(v)
elif not utils.is_iterable(v) or len(v) != len(self.nl):
parsed_kwargs[k] = itertools.repeat(v)
else:
parsed_kwargs[k] = v

# Silence loggers (except Errors)
level = logger.getEffectiveLevel()

if level < 30:
logger.setLevel('WARNING')

# Apply function
if parallel:
if not ProcessingPool:
raise ImportError('navis relies on pathos for multiprocessing!'
'Please install pathos and try again:\n'
' pip3 install pathos -U')

if self.warn_inplace and kwargs.get('inplace', False):
logger.warning('`inplace=True` does not work with '
'multiprocessing ')

with ProcessingPool(n_cores) as pool:
combinations = list(zip(self.funcs,
parsed_args,
parsed_kwargs))
chunksize = kwargs.pop('chunksize', self.chunksize) # max(int(len(combinations) / 100), 1)

if not self.omit_failures:
wrapper = _call
else:
wrapper = _try_call

res = list(config.tqdm(pool.imap(wrapper,
combinations,
chunksize=chunksize),
total=len(combinations),
desc=self.desc,
disable=config.pbar_hide or not self.progress,
leave=config.pbar_leave))
if self.executor:
executor = self.executor
else:
res = []
for i, n in enumerate(config.tqdm(self.nl, desc=self.desc,
disable=(config.pbar_hide
or not self.progress
or len(self.nl) <= 1),
leave=config.pbar_leave)):
try:
res.append(self.funcs[i](*parsed_args[i], **parsed_kwargs[i]))
except BaseException as e:
if self.omit_failures:
res.append(FailedRun(func=self.funcs[i],
args=parsed_args[i],
kwargs=parsed_kwargs[i],
exception=e))
else:
raise
if parallel:
executor = core.executors.PathosProcessPoolExecutor(progress=self.progress, n_cores=n_cores)
else:
executor = core.executors.Executor(progress=self.progress)

if self.warn_inplace and kwargs.get('inplace', False) and not executor.supports_inplace():
logger.warning('`inplace=True` does not work with '
'multiprocessing ')

with executor as executor:
res = list(executor.map(
self.funcs, *parsed_args, **parsed_kwargs, chunksize=chunksize, desc=self.desc
))

# Reset logger level to previous state
logger.setLevel(level)
Expand Down
14 changes: 14 additions & 0 deletions navis/core/executors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# This script is part of navis (http://www.github.com/navis-org/navis).
# Copyright (C) 2018 Philipp Schlegel
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.

from .base import Executor, PathosProcessPoolExecutor
122 changes: 122 additions & 0 deletions navis/core/executors/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# This script is part of navis (http://www.github.com/navis-org/navis).
# Copyright (C) 2018 Philipp Schlegel
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.

import itertools
import os

from typing import Optional

from ... import config


try:
#from pathos.multiprocessing import ProcessingPool
# pathos' ProcessingPool apparently ignores chunksize
# (see https://stackoverflow.com/questions/55611806/how-to-set-chunk-size-when-using-pathos-processingpools-map)
import pathos
ProcessingPool = pathos.pools._ProcessPool
except ImportError:
ProcessingPool = None


def kwargs_of_iter_to_iter_of_kwargs(**kwargs):
"""Convert a dict with iterable values to an iterator of dicts."""
if kwargs:
kwargs_iters = zip(*kwargs.values())
kwargs_iter = (dict(zip(kwargs.keys(), vs)) for vs in kwargs_iters)
else:
kwargs_iter = itertools.repeat({})

return kwargs_iter


class Executor:
"""Simple analogue to `concurrent.futures.Executor` with progress and error handling."""
progress: bool

def __init__(
self,
progress: bool = True,
):
self.progress = progress

def submit(self, fn, *args, **kwargs):
return fn(*args, **kwargs)

def map(self, fn, *iterables, desc: Optional[str]=None, chunksize=1, **kwargs):
kwargs_iter = kwargs_of_iter_to_iter_of_kwargs(**kwargs)

res = list(map(
lambda x, kw: self.submit(*x, **kw),
config.tqdm(
zip(fn, *iterables),
desc=desc,
disable=(config.pbar_hide
or not self.progress),
leave=config.pbar_leave),
kwargs_iter,
)
)

return iter(res)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
return False

@classmethod
def supports_inplace(cls):
return True


class PathosProcessPoolExecutor(Executor):
def __init__(
self,
n_cores: int = os.cpu_count() // 2,
**kwargs
):
super().__init__(**kwargs)
if not ProcessingPool:
raise ImportError('navis relies on pathos for multiprocessing!'
'Please install pathos and try again:\n'
' pip3 install pathos -U')
self.pool = ProcessingPool(n_cores)

def __enter__(self):
self.pool.__enter__()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
return self.pool.__exit__(exc_type, exc_val, exc_tb)

def map(self, fn, *iterables, desc: Optional[str]=None, chunksize=1, **kwargs):
kwargs_iter = kwargs_of_iter_to_iter_of_kwargs(**kwargs)

res = list(config.tqdm(self.pool.imap(
lambda x: x[0](*x[1], **x[2]),
zip(fn, zip(*iterables), kwargs_iter),
chunksize=chunksize,
),
desc=desc,
disable=(config.pbar_hide
or not self.progress),
leave=config.pbar_leave),
)

return iter(res)

@classmethod
def supports_inplace(cls):
return False
57 changes: 57 additions & 0 deletions navis/core/executors/dask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# This script is part of navis (http://www.github.com/navis-org/navis).
# Copyright (C) 2018 Philipp Schlegel
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.

import contextlib

import dask
import tqdm.dask

from typing import Optional

from ... import config
from . import Executor
from .base import kwargs_of_iter_to_iter_of_kwargs


class SimpleDaskExecutor(Executor):
def __init__(
self,
**kwargs
):
super().__init__(**kwargs)

def map(self, fn, *iterables, desc: Optional[str]=None, chunksize=1, **kwargs):
kwargs_iter = kwargs_of_iter_to_iter_of_kwargs(**kwargs)

if self.progress:
pbar = tqdm.dask.TqdmCallback(
tqdm_class=config.tqdm,
desc=desc,
disable=(config.pbar_hide
or not self.progress),
leave=config.pbar_leave)
else:
pbar = contextlib.nullcontext()

res = [dask.delayed(f)(*a, **kw) for f, a, kw in
zip(fn, zip(*iterables), kwargs_iter)
]

with pbar:
res = dask.compute(res)[0]

return iter(res)

@classmethod
def supports_inplace(cls):
return True
4 changes: 2 additions & 2 deletions navis/io/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def read_from_zip(
self, files: Union[str, List[str]],
zippath: os.PathLike,
attrs: Optional[Dict[str, Any]] = None,
on_error: Union[Literal['ignore', Literal['raise']]] = 'ignore'
on_error: Union[Literal['ignore'], Literal['raise']] = 'ignore'
) -> 'core.NeuronList':
"""Read given files from a zip into a NeuronList.

Expand Down Expand Up @@ -378,7 +378,7 @@ def read_zip(
parallel="auto",
limit: Optional[int] = None,
attrs: Optional[Dict[str, Any]] = None,
on_error: Union[Literal['ignore', Literal['raise']]] = 'ignore'
on_error: Union[Literal['ignore'], Literal['raise']] = 'ignore'
) -> 'core.NeuronList':
"""Read files from a zip into a NeuronList.

Expand Down
2 changes: 1 addition & 1 deletion navis/morpho/manipulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,7 @@ def combine_neurons(*x: Union[Sequence[NeuronObject], 'core.NeuronList']
elif isinstance(nl[0], core.VoxelNeuron):
raise TypeError('Combining VoxelNeuron not (yet) supported')
else:
raise TypeError(f'Unable to combine {ty}')
raise TypeError(f'Unable to combine {type(nl[0])}')

return x

Expand Down
Loading