Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't execute multiprocessing tasks using wrapt.decorator functions - not pickle serializable #158

Open
marwan116 opened this issue Mar 25, 2020 · 14 comments

Comments

@marwan116
Copy link

marwan116 commented Mar 25, 2020

I am posting this to share the issue I face when trying to use multiple processes with a function wrapped with a wrapt.decorator.

I am using the following libraries:
wrapt==1.12.1
joblib=0.14.1
python=3.7.6

Please see the dummy example below to reproduce the error

from joblib import Parallel, delayed
import wrapt


@wrapt.decorator
def dummy_decorator(wrapped, instance, args, kwargs):
    print("before wrapped call")
    res = wrapped(*args, **kwargs)
    print("after wrapped call")
    return res


@dummy_decorator
def add(x, y):
    return x + y

So first I try the code using multithreading and it works fine

with Parallel(n_jobs=2, prefer="threads") as parallel:
    fs = parallel(delayed(add)(x=x, y=y) for x, y in zip([1, 2], [3, 4]))
    for f in fs:
        print(f)

I get the following output:

before wrapped call
before wrapped call
after wrapped call
after wrapped call
4
6

Then when I try to use processes instead of threads:

with Parallel(n_jobs=2, prefer="processes") as parallel:
    fs = parallel(delayed(add)(x=x, y=y) for x, y in zip([1, 2], [3, 4]))
    for f in fs:
        print(f)

I get an error mainly: NotImplementedError: object proxy must define __reduce_ex__()

below is the full traceback:

joblib.externals.loky.process_executor._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/site-packages/joblib/externals/loky/backend/queues.py", line 150, in _feed
    obj_ = dumps(obj, reducers=reducers)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/site-packages/joblib/externals/loky/backend/reduction.py", line 247, in dumps
    dump(obj, buf, reducers=reducers, protocol=protocol)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/site-packages/joblib/externals/loky/backend/reduction.py", line 240, in dump
    _LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/site-packages/joblib/externals/cloudpickle/cloudpickle.py", line 482, in dump
    return Pickler.dump(self, obj)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 437, in dump
    self.save(obj)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 890, in _batch_setitems
    save(v)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 549, in save
    self.save_reduce(obj=obj, *rv)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 662, in save_reduce
    save(state)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 859, in save_dict
    self._batch_setitems(obj.items())
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 885, in _batch_setitems
    save(v)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 819, in save_list
    self._batch_appends(obj)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 846, in _batch_appends
    save(tmp[0])
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 774, in save_tuple
    save(element)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/pickle.py", line 524, in save
    rv = reduce(self.proto)
NotImplementedError: object proxy must define __reduce_ex__()
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "ifm_utils/logging/dummy.py", line 24, in <module>
    fs = parallel(delayed(add)(x=x, y=y) for x, y in zip([1, 2], [3, 4]))
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/site-packages/joblib/parallel.py", line 1017, in __call__
    self.retrieve()
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/site-packages/joblib/parallel.py", line 909, in retrieve
    self._output.extend(job.get(timeout=self.timeout))
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/site-packages/joblib/_parallel_backends.py", line 562, in wrap_future_result
    return future.result(timeout=timeout)
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/concurrent/futures/_base.py", line 435, in result
    return self.__get_result()
  File "/Users/marwansarieddine/anaconda3/envs/ifm_utils/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
_pickle.PicklingError: Could not pickle the task to send it to the workers.
@marwan116
Copy link
Author

marwan116 commented Mar 25, 2020

I thought a workaround to the problem, if it is too hard to fix this, would be to disable the decorator and the following workaround works if the value passed toenabled is not a callable

@wrapt.decorator(enabled=False)
def dummy_decorator(wrapped, instance, args, kwargs):
    print("before wrapped call")
    res = wrapped(*args, **kwargs)
    print("after wrapped call")
    return res

then the below won't code throw any error

with Parallel(n_jobs=2, prefer="processes") as parallel:
    fs = parallel(delayed(add)(x=x, y=y) for x, y in zip([1, 2], [3, 4]))
    for f in fs:
        print(f)

however - to dynamically set enabled, I thought using a callable _enabled would be the way to go and this sadly doesn't work:

def _enabled():
    return False

@wrapt.decorator(enabled=_enabled)
def dummy_decorator(wrapped, instance, args, kwargs):
    print("before wrapped call")
    res = wrapped(*args, **kwargs)
    print("after wrapped call")
    return res

I see on other issue threads (mainly #102) that perhaps an ObjectProxy is picklable/serializable if we explicitly define the __reduce__ and __reduce_ex__ methods - would this be the workaround needed here - i.e. to implement the decorator as a wrapper instead?

@GrahamDumpleton
Copy link
Owner

For dynamic function to specify whether enabled, should be:

def _enabled():
    return False

@wrapt.decorator(enabled=_enabled)
def dummy_decorator(wrapped, instance, args, kwargs):
    print("before wrapped call")
    res = wrapped(*args, **kwargs)
    print("after wrapped call")
    return res

Not sure if you just cut and paste the wrong thing.

But yes, it may not work as the function wrapper is still present when disabled using a function call, as is only evaluated at the time of the call. For the decorator to be applied at all, can only supply literal value.

def _enabled():
    return False

am_i_enabled = _enabled()

@wrapt.decorator(enabled=am_i_enabled)
def dummy_decorator(wrapped, instance, args, kwargs):
    print("before wrapped call")
    res = wrapped(*args, **kwargs)
    print("after wrapped call")
    return res

So call has to be evaluated at time of code import.

Anyway, I will think about pickle issue. For this narrow case of a function wrapper decorator (as opposed to general case of object proxy), there may be a way to get it to work. Will need some investigation though. The analysis was never done for dill since that was a third party package, and so not necessarily commonly used.

@marwan116
Copy link
Author

Sorry yes I had a typo there - just edited/corrected it ...

For this narrow case of a function wrapper decorator (as opposed to general case of object proxy), there may be a way to get it to work

This would be extremely helpful - thank you for this - please let me know if I can help in the process in any way.

@ludaavics
Copy link

Hi @marwan116

have you given this any more thought?

@twiddli
Copy link

twiddli commented Mar 5, 2024

The error I'm getting on python 3.11 is that AdapterWrapper does not define a __reduce_ex__. For starters, it would be helpful to have a way we can provide custom adapter wrappers, in the same way adapter factory works. This would allow the end user to implement their own __reduce_ex__ that suits their needs. What do you think?

Or just add the following to it:

def __reduce_ex__( self, protocol ):
    return (
        object.__new__,
        (type(self),),
        object.__getstate__(self),
        )

@GrahamDumpleton
Copy link
Owner

@twiddli I would need to see an actual small code example of what you are trying to do to suggest anything including any explanation if it can already be done as not sure what I am trying to suggest a modification to.

FWIW. Trying to pickle code is not generally a great idea. And there is no generic single __reduce_ex__ function one could add to the wrapper which would be guaranteed to always work.

@NellyWhads
Copy link

Hello hello,

I've reproduced this error and it very much blocks current development work using wrapt decorators. Here's the simplest setup:

test.py

from functools import wraps
from typing import Any

import wrapt
from joblib import Parallel, delayed


@wrapt.decorator
def wrapt_decorator(wrapped, instance, args, kwargs):
    return wrapped(*args, **kwargs)

def functools_decorator(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        return func(*args, **kwargs)
    return wrapper


@wrapt_decorator
def noop_func(x: Any) -> Any:
    return x

if __name__ == "__main__":
    with Parallel(n_jobs=2, prefer="processes") as parallel:
        fs = parallel(delayed(noop_func)(x=x) for x in [1, 2, 3])
        for f in fs:
            print(f)

Run this file - it will execute just fine.

test_2.py

from joblib import Parallel, delayed

from test import noop_func

if __name__ == "__main__":
    with Parallel(n_jobs=2, prefer="processes") as parallel:
        fs = parallel(delayed(noop_func)(x=x) for x in [1, 2, 3])
        for f in fs:
            print(f)

Run this file and it will fail with the same above pickling error.

test_3.py

from typing import Any

from joblib import Parallel, delayed


def noop_func_2(x: Any) -> Any:
    from test import noop_func
    return noop_func(x)

if __name__ == "__main__":
    with Parallel(n_jobs=2, prefer="processes") as parallel:
        fs = parallel(delayed(noop_func_2)(x=x) for x in [1, 2, 3])
        for f in fs:
            print(f)

This, once again, runs fine.

Switching from wrapt_decorator to functools_decorator makes all 3 cases work.

How do I resolve this to make case 2 work? Is there a way to patch-through the __reduce_ex__ method from the target object wrapped by the decorator?

Though this is a trivial case, this is preventing me from using a wrapt decorator in a much more complex distributed project which uses Ray and Daft.

@NellyWhads
Copy link

I also see that the error here makes it difficult to discern which property is raising it:

def __reduce__(self):

@GrahamDumpleton
Copy link
Owner

Typo in exception strings fixed in 0da4ba5.

Thanks for highlighting that one.

Let me see I can get a skeleton together for doing a custom function wrapper which would allow you to override the behaviour of __reduce_ex__ and perhaps then you can work out how you might implement that special method.

@NellyWhads
Copy link

NellyWhads commented Dec 5, 2024

I think what's missing is simply a passthrough to the properties of the function. I believe that python will throw the appropriate errors if the method is not valid/callable for the underlying wrapped object.

Ie. When using the functools wrapper, the multiprocessing call still runs as expected. The issue (I think) is that the NotImplementedError which is raised gets in the way of other logic in the call stack.

@NellyWhads
Copy link

NellyWhads commented Dec 5, 2024

Never mind - I tried patching this on my installed module and setting WRAPT_DISABLE_EXTENSIONS=1 and I see a different error along the same lines.

Why does this work perfectly with functools.wraps? And only fail with wrapt when importing across files, outside of the context of the child process? Perhaps I'm missing something fundamental.

@GrahamDumpleton
Copy link
Owner

GrahamDumpleton commented Dec 5, 2024

If you ignore all the extra special magic it does, for simple case of a decorator applied to a normal function, @wrapt.decorator boils down to:

import wrapt

def decorator(wrapper):
    def _wrapper(wrapped):
        return wrapt.FunctionWrapper(wrapped, wrapper)
    return _wrapper

@decorator
def mydecorator(wrapped, instance, args, kwargs):
    print("decorator", wrapped, instance, args, kwargs)
    return wrapped(*args, **kwargs)

@mydecorator
def func():
  print("func")

func()

The FunctionWrapper derives from ObjectProxy, so you can write:

import wrapt

class OverrideFunctionWrapper(wrapt.FunctionWrapper):

    def __reduce__(self):
        raise NotImplementedError(
                'OVERRIDE: object proxy must define __reduce__()')

    def __reduce_ex__(self, protocol):
        raise NotImplementedError(
                'OVERRIDE: object proxy must define __reduce_ex__()')

def decorator(wrapper):
    def _wrapper(wrapped):
        return OverrideFunctionWrapper(wrapped, wrapper)
    return _wrapper

@decorator
def mydecorator(wrapped, instance, args, kwargs):
    print("decorator", wrapped, instance, args, kwargs)
    return wrapped(*args, **kwargs)

@mydecorator
def func():
  print("func")

func()

import pickle

pickle.dumps(func)

This will now raise the override error message.

With that as base you should now be able to play with how the special dunder methods might be implemented.

Note that this example doesn't cater for decorators of instance methods or anything else where binding would occur in accessing the decorated function. In fact binding is in part why having a generic implementation for a function decorator might be troublesome.

Now although you could defer the call of __reduce_ex__() to the wrapped object, that means that the decorator wrapper itself does not get pickled. So any override for the function must somehow account for pickling your special implementation of the decorator wrapper function and context for it created if it were a decorator with optional arguments or other weird stuff. This is another reason why a generic implementation is so hard.

The reason it is a problem with wrapt is because decorators are implemented as a class, and more specifically a descriptor with binding. Using a class makes things much more complicated than decorators created using just nested functions.

@NellyWhads
Copy link

This helps - I will need to do some more reading on pickling in python as well.

From what I'm understanding, I'll need to find a way to write a pickling operation for the decorator itself, and then defer to the underlying decorated object's dunder for the rest, is that correct? So long as reduce_ex() for the decorator behaves well, python should take care of the rest?

The method may need to be customized for each specific decorator based on its various state variables, yes?

@GrahamDumpleton
Copy link
Owner

If you check other open issues about pickling and wrapt you may actually find a partial example of how it may need to work at least in the simplest common cases. I can't remember right now and would also need to look back at all the related issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants