Skip to content

Commit df91ebb

Browse files
authored
Merge pull request #263 from python-adaptive/support-loky
add support for loky
2 parents f2b5620 + db051be commit df91ebb

File tree

6 files changed

+70
-7
lines changed

6 files changed

+70
-7
lines changed

adaptive/runner.py

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,13 @@
1212
from adaptive.notebook_integration import in_ipynb, live_info, live_plot
1313

1414
try:
15-
import ipyparallel
15+
if sys.version_info < (3, 8):
16+
# XXX: remove when ipyparallel 6.2.5 is released
17+
import ipyparallel
1618

17-
with_ipyparallel = True
19+
with_ipyparallel = True
20+
else:
21+
with_ipyparallel = False
1822
except ModuleNotFoundError:
1923
with_ipyparallel = False
2024

@@ -32,6 +36,13 @@
3236
except ModuleNotFoundError:
3337
with_mpi4py = False
3438

39+
try:
40+
import loky
41+
42+
with_loky = True
43+
except ModuleNotFoundError:
44+
with_loky = False
45+
3546
with suppress(ModuleNotFoundError):
3647
import uvloop
3748

@@ -232,10 +243,13 @@ def _remove_unfinished(self):
232243

233244
def _cleanup(self):
234245
if self.shutdown_executor:
235-
# XXX: temporary set wait=True for Python 3.7
246+
# XXX: temporary set wait=True because of a bug with Python ≥3.7
247+
# and loky in any Python version.
236248
# see https://github.com/python-adaptive/adaptive/issues/156
237249
# and https://github.com/python-adaptive/adaptive/pull/164
238-
self.executor.shutdown(wait=True if sys.version_info >= (3, 7) else False)
250+
# and https://bugs.python.org/issue36281
251+
# and https://github.com/joblib/loky/issues/241
252+
self.executor.shutdown(wait=True)
239253
self.end_time = time.time()
240254

241255
@property
@@ -269,7 +283,8 @@ class BlockingRunner(BaseRunner):
269283
the learner as its sole argument, and return True when we should
270284
stop requesting more points.
271285
executor : `concurrent.futures.Executor`, `distributed.Client`,\
272-
`mpi4py.futures.MPIPoolExecutor`, or `ipyparallel.Client`, optional
286+
`mpi4py.futures.MPIPoolExecutor`, `ipyparallel.Client` or\
287+
`loky.get_reusable_executor`, optional
273288
The executor in which to evaluate the function to be learned.
274289
If not provided, a new `~concurrent.futures.ProcessPoolExecutor`.
275290
ntasks : int, optional
@@ -386,7 +401,8 @@ class AsyncRunner(BaseRunner):
386401
stop requesting more points. If not provided, the runner will run
387402
forever, or until ``self.task.cancel()`` is called.
388403
executor : `concurrent.futures.Executor`, `distributed.Client`,\
389-
`mpi4py.futures.MPIPoolExecutor`, or `ipyparallel.Client`, optional
404+
`mpi4py.futures.MPIPoolExecutor`, `ipyparallel.Client` or\
405+
`loky.get_reusable_executor`, optional
390406
The executor in which to evaluate the function to be learned.
391407
If not provided, a new `~concurrent.futures.ProcessPoolExecutor`.
392408
ntasks : int, optional
@@ -740,9 +756,16 @@ def shutdown(self, wait=True):
740756
pass
741757

742758

759+
def _default_executor():
760+
if with_loky:
761+
return loky.get_reusable_executor()
762+
else:
763+
return concurrent.ProcessPoolExecutor()
764+
765+
743766
def _ensure_executor(executor):
744767
if executor is None:
745-
executor = concurrent.ProcessPoolExecutor()
768+
executor = _default_executor()
746769

747770
if isinstance(executor, concurrent.Executor):
748771
return executor
@@ -765,6 +788,8 @@ def _get_ncores(ex):
765788
ex, (concurrent.ProcessPoolExecutor, concurrent.ThreadPoolExecutor)
766789
):
767790
return ex._max_workers # not public API!
791+
elif with_loky and isinstance(ex, loky.reusable_executor._ReusablePoolExecutor):
792+
return ex._max_workers # not public API!
768793
elif isinstance(ex, SequentialExecutor):
769794
return 1
770795
elif with_distributed and isinstance(ex, distributed.cfexecutor.ClientExecutor):

adaptive/tests/test_runner.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
stop_after,
1616
with_distributed,
1717
with_ipyparallel,
18+
with_loky,
1819
)
1920

2021

@@ -91,6 +92,13 @@ def ipyparallel_executor():
9192
raise RuntimeError("Could not stop ipcluster")
9293

9394

95+
@pytest.fixture(scope="session")
96+
def loky_executor():
97+
import loky
98+
99+
return loky.get_reusable_executor()
100+
101+
94102
def linear(x):
95103
return x
96104

@@ -134,3 +142,12 @@ def test_distributed_executor():
134142
BlockingRunner(learner, trivial_goal, executor=client)
135143
client.shutdown()
136144
assert learner.npoints > 0
145+
146+
147+
@pytest.mark.skipif(not with_loky, reason="loky not installed")
148+
def test_loky_executor(loky_executor):
149+
learner = Learner1D(lambda x: x, (-1, 1))
150+
BlockingRunner(
151+
learner, trivial_goal, executor=loky_executor, shutdown_executor=True
152+
)
153+
assert learner.npoints > 0

docs/source/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@
139139
"holoviews": ("https://holoviews.org/", None),
140140
"ipyparallel": ("https://ipyparallel.readthedocs.io/en/stable/", None),
141141
"scipy": ("https://docs.scipy.org/doc/scipy/reference", None),
142+
"loky": ("https://loky.readthedocs.io/en/stable/", None),
142143
}
143144

144145

docs/source/tutorial/tutorial.parallelism.rst

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,3 +116,21 @@ How you call MPI might depend on your specific queuing system, with SLURM for ex
116116
#SBATCH --ntasks 100
117117
118118
srun -n $SLURM_NTASKS --mpi=pmi2 ~/miniconda3/envs/py37_min/bin/python -m mpi4py.futures run_learner.py
119+
120+
`loky.get_reusable_executor`
121+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
122+
123+
This executor is basically a powered-up version of `~concurrent.futures.ProcessPoolExecutor`, check its `documentation <https://loky.readthedocs.io/>`_.
124+
Among other things, it allows to *reuse* the executor and uses ``cloudpickle`` for serialization.
125+
This means you can even learn closures, lambdas, or other functions that are not picklable with `pickle`.
126+
127+
.. code:: python
128+
129+
from loky import get_reusable_executor
130+
ex = get_reusable_executor()
131+
132+
f = lambda x: x
133+
learner = adaptive.Learner1D(f, bounds=(-1, 1))
134+
135+
runner = adaptive.Runner(learner, goal=lambda l: l.loss() < 0.01, executor=ex)
136+
runner.live_info()

environment.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ dependencies:
1212
- ipyparallel
1313
- distributed
1414
- ipykernel>=4.8*
15+
- loky
1516
- jupyter_client>=5.2.2
1617
- ipywidgets
1718
- scikit-optimize

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ def get_version_and_cmdclass(package_name):
5353
"other": [
5454
"ipyparallel",
5555
"distributed",
56+
"loky",
5657
"scikit-optimize",
5758
"wexpect" if os.name == "nt" else "pexpect",
5859
],

0 commit comments

Comments
 (0)