Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PicklingError when using custom ProcessPoolExecutor #6803

Open
pavithraes opened this issue Jul 27, 2022 · 2 comments
Open

PicklingError when using custom ProcessPoolExecutor #6803

pavithraes opened this issue Jul 27, 2022 · 2 comments
Labels
documentation Improve or add to documentation

Comments

@pavithraes
Copy link
Member

pavithraes commented Jul 27, 2022

What happened:

Using a custom executor using WorkerPlugin + ProcessPoolExecutor causes _pickle.PicklingError: Can't pickle <function my_process at 0x7f83f862ab00>: it's not the same object as __main__.my_process

(This issue was originally reported on Discourse.)

What you expected to happen:

I'd have expected this to work (see additional notes below). If this is erroring intentionally, it'll be nice to raise a more informative message.

Minimal Complete Verifiable Example:

from concurrent.futures import ProcessPoolExecutor
from functools import partial

import dask
from dask.distributed import Client
from distributed.diagnostics.plugin import WorkerPlugin

def my_process(data, fixed_arg):
    pass


class AddProcessPool(WorkerPlugin):
    def setup(self, worker):
        executor = ProcessPoolExecutor(max_workers=worker.nthreads)
        worker.executors["processes"] = executor

def main():
    partial_process = partial(my_process, fixed_arg="fixed_data")
    with Client(processes=False) as client:
        client.register_worker_plugin(AddProcessPool())
        with dask.annotate(executor="processes"):
            client.gather(client.map(partial_process, list(range(1000))))

if __name__ == "__main__":
    main()
Error traceback:
home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/site-packages/distributed/worker_state_machine.py:3357: FutureWarning: The `Worker.nthreads` attribute has been moved to `Worker.state.nthreads`
  warnings.warn(
2022-07-21 08:37:14,053 - distributed.worker - ERROR - Exception during execution of task my_process-0514e9dc6d6631bf45d18e50c3312d9a.
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function my_process at 0x7f83f862ab00>: it's not the same object as __main__.my_process
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/site-packages/distributed/worker.py", line 2208, in execute
    result = await self.loop.run_in_executor(
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function my_process at 0x7f83f862ab00>: it's not the same object as __main__.my_process
2022-07-21 08:37:14,062 - distributed.worker - ERROR - Exception during execution of task my_process-04f1de90691642d43d79802ef4ff84d0.
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function my_process at 0x7f83f862ab00>: it's not the same object as __main__.my_process
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/site-packages/distributed/worker.py", line 2208, in execute
    result = await self.loop.run_in_executor(
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function my_process at 0x7f83f862ab00>: it's not the same object as __main__.my_process
2022-07-21 08:37:14,063 - distributed.worker - ERROR - Exception during execution of task my_process-ec0c8bbbd0b9a7d11117f2690ad9a733.
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function my_process at 0x7f83f862ab00>: it's not the same object as __main__.my_process
"""

...

Anything else we need to know?:

This code is coming from this video by Matt (+ this notebook) created in July 2021, where it seems to be working.

Environment:

  • Dask version: 2022.7.1 (latest), and a few older versions till 2021.6.2
  • Python version: 3.9/3.10
  • Operating System: macOS
  • Install method (conda, pip, source): conda

@mrocklin Do you have thoughts on why this isn't working anymore?

cc @ncclementi

@ian-r-rose
Copy link
Collaborator

Thanks for the writeup @pavithraes!

What is going on here is that the concurrent.futures ProcessPoolExecutor must have access to the __main__ in which the function is defined in order to work correctly. That's because pickle serializes things by reference instead of by value, so if __main__ isn't available in the subprocess, it doesn't know what function to run. In this case, the worker __main__ is not the same as the one in which the my_process function is defined. So the error message is actually a helpful one!

The way distributed projects typically get around this restriction in pickling is to use cloudpickle, which handles pickling things by value, which is particularly necessary functions defined interactively or in __main__. This is what the dask serialization protocol does, for instance, when sending interactively defined functions to workers. But the error we are seeing here is happening in a single worker, and is not going through the dask serialization protocol, so we run into the problem.

Fortunately, this is a problem that is nicely solved by the loky project. It provides its own implementation of a ProcessPoolExecutor which handles __main__ functions. If you take another look at the linked example above, it actually uses the loky ProcessPoolExecutor (presumably to get around exactly this issue).

Ideally, we would be able to identify this situation and handle it appropriately in dask, or at least provide a better error message. There are a number of reasons why it would be nice to make it easier for users to use process pools (cf #6325) For now, I would recommend the following workarounds to users:

  1. If you are using concurrent.Futures.ProcessPoolExecutor, avoid referencing interactively defined functions, or those in __main__.
  2. If you do want to use such functions, use loky.ProcessPoolExecutor.

@hameer-spire
Copy link

Thanks, Ian, I can confirm that using loky.ProcessPoolExecutor solves this particular issue. Please feel free to mark as documentation or close.

@ncclementi ncclementi added the documentation Improve or add to documentation label Sep 12, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improve or add to documentation
Projects
None yet
Development

No branches or pull requests

4 participants