Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creating multiprocessing.Queues operates strangely with Process's created in classes #123208

Open
Moosems opened this issue Aug 21, 2024 · 11 comments
Assignees
Labels
topic-multiprocessing type-bug An unexpected behavior, bug, or error

Comments

@Moosems
Copy link

Moosems commented Aug 21, 2024

Bug report

Bug description:

The following from the docs works:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

The following also works:

from multiprocessing import Process, Queue
from time import sleep

def f(q):
    pass

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    sleep(1)

This works just fine:

class Foo:
    def __init__(self, q) -> None:
        pass

if __name__ == '__main__':
    q = Queue()
    p = Process(target=Foo, args=(q,))
    p.start()
    sleep(1)

But the following does NOT work:

from multiprocessing import Process, Queue
from time import sleep

class Foo:
    def __init__(self, q) -> None:
        pass

class Bar:
    def __init__(self) -> None:
        q = Queue()
        p = Process(target=Foo, args=(q,))
        p.start()

if __name__ == '__main__':
    Bar()
    sleep(1)

Which now gives this traceback:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/local/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/spawn.py", line 122, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/spawn.py", line 132, in _main
    self = reduction.pickle.load(from_parent)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/Cellar/python@3.12/3.12.4/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/synchronize.py", line 115, in __setstate__
    self._semlock = _multiprocessing.SemLock._rebuild(*state)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
FileNotFoundError: [Errno 2] No such file or directory

The following does work, though:

from multiprocessing import Process, Queue
from time import sleep

class Foo:
    def __init__(self, q) -> None:
        q.put("INIT COMPLETE")

class Bar:
    def __init__(self) -> None:
        q = Queue()
        p = Process(target=Foo, args=(q,))
        p.start()
        q.get()

if __name__ == '__main__':
    Bar()
    sleep(1)

This should either be documented or fixed, preferably the latter. This behavior is strange and, quite frankly, unexpected. It took me quite a while to figure this out and fix my code but adding an init queue item is not ideal. Note that the following also breaks with the same error:

from multiprocessing import Process, Queue
from time import sleep

def foo(q) -> None:
    pass

class Bar:
    def __init__(self) -> None:
        q = Queue()
        p = Process(target=foo, args=(q,))
        p.start()

if __name__ == '__main__':
    Bar()
    sleep(1)

Moving the Queue outside of the class (when calling it) also doesn't fix the issue:

from multiprocessing import Process, Queue
from time import sleep

def foo(q) -> None:
    pass

class Bar:
    def __init__(self, q) -> None:
        p = Process(target=foo, args=(q,))
        p.start()

if __name__ == '__main__':
    Bar(Queue())
    sleep(1)

But the following works just fine:

from multiprocessing import Process, Queue
from time import sleep

def foo(q) -> None:
    pass

class Bar:
    def __init__(self, q) -> None:
        p = Process(target=foo, args=(q,))
        p.start()

if __name__ == '__main__':
    q = Queue()
    Bar(q)
    sleep(1)

This may be related to #116526 but I don't personally think this should be classified as the same issue. May be related to #23376 as well but this may still be a separate issue. Neither address this kind of behavior, however.

CPython versions tested on:

3.12

Operating systems tested on:

macOS

@Moosems Moosems added the type-bug An unexpected behavior, bug, or error label Aug 21, 2024
@Moosems Moosems changed the title multiprocessing.Queues have weird behaviors when used with classes and `Process's Creating multiprocessing.Queues operates strangely with `Process's Aug 21, 2024
@Moosems Moosems changed the title Creating multiprocessing.Queues operates strangely with `Process's Creating multiprocessing.Queues operates strangely with Process's created in classes Aug 21, 2024
@Moosems
Copy link
Author

Moosems commented Aug 22, 2024

Note that if I give two queues (for the put get init) and only put get for one of them, the second works just fine

@picnixz

This comment was marked as resolved.

@picnixz picnixz added the pending The issue will be closed if no feedback is provided label Aug 22, 2024
@Zheaoli
Copy link
Contributor

Zheaoli commented Aug 22, 2024

This problem exists. I can reproduce it on MacOS. I think this is a spawn mode only problem. maybe you can try multiprocessing.set_start_method('spawn') on Linux to reproduce it.

Would you mind to assign this issue to me? cc @picnixz

@picnixz
Copy link
Member

picnixz commented Aug 22, 2024

I can reproduce it in spawn mode. You can work on it if you want, just ping me if you need a review.

@picnixz picnixz removed the pending The issue will be closed if no feedback is provided label Aug 22, 2024
@YvesDup
Copy link
Contributor

YvesDup commented Aug 22, 2024

I am wondering if the __init__ class method is truely a callable object as the expected target parameter ?

@Moosems
Copy link
Author

Moosems commented Aug 22, 2024

@YvesDup The last two of the original examples show that it has to do with the Queue being created in a class creating a process. All three of the following break:

from multiprocessing import Process, Queue
from time import sleep

def foo(q) -> None:
    pass

class Bar:
    def __init__(self) -> None:
        pass

    def method(self):
        self.q = Queue()
        p = Process(target=foo, args=(self.q,))
        p.start()

if __name__ == '__main__':
    Bar().method()
    sleep(1)
from multiprocessing import Process, Queue
from time import sleep

def foo(q) -> None:
    pass

class Bar:
    def __init__(self) -> None:
        self.q = Queue()
        self.method()

    def method(self):
        p = Process(target=foo, args=(self.q,))
        p.start()

if __name__ == '__main__':
    Bar()
    sleep(1)
from multiprocessing import Process, Queue
from time import sleep

def foo(q) -> None:
    pass

class Bar:
    def __init__(self) -> None:
        self.method()

    def method(self):
        self.q = Queue()
        p = Process(target=foo, args=(self.q,))
        p.start()

if __name__ == '__main__':
    Bar()
    sleep(1)

@Zheaoli
Copy link
Contributor

Zheaoli commented Aug 23, 2024

After diving into this problem, I think it's not a bug.

The issue is caused by the spawn node speed.

TL;DR;

For now, the multiprocessing module have three mode

  1. Fork
  2. Spawn
  3. Fork Server

The default mode on macOS is spawn mode which is much slower than fork mode. So when the subprocess is ready to us e the fd which is input by us, the parent process has existed. So the fd is recycle by the system.

I think maybe we need to update the documentation to add more tips?

@Moosems
Copy link
Author

Moosems commented Aug 24, 2024

@Zheaoli I have found some interesting behavior and I think I understand what is happening: garbage collection. Let's keep Foo constant for all examples as follows:

class Foo:
    def __init__(self, q) -> None:
        pass

Now, let's take a look at Bar:

In this example, the script has no issues:

class Bar:
    def __init__(self) -> None:
        self.q = Queue()
        self.p = Process(target=Foo, args=(self.q,), daemon=True)
        self.p.start()

if __name__ == "__main__":
    x = Bar()
    sleep(1)

But if we make q a local variable q = Queue() or Process(target=Foo, args=(Queue(),), daemon=True) it breaks. In both these cases, the variable is thrown away before the new Process is finished setting up. The Queue is garbage collected as they have no references afterwards. The same happens if we change the if statement to simply contain

if __name__ == "__main__":
    Bar()
    sleep(1)

As such, I believe that this has to do with the Queue being garbage collected before the second process is created and that is what causes the issue. This conclusion is further evidenced by this working without issue:

class Bar:
    def __init__(self) -> None:
        q = Queue()
        self.p = Process(target=Foo, args=(q,), daemon=True)
        self.p.start()
        sleep(1)

if __name__ == "__main__":
    Bar()

Would you agree this seems to be the source of the issue?

@Moosems
Copy link
Author

Moosems commented Aug 24, 2024

This appears to be a viable workaround at the moment that works wether or not the user instantiates Bar as a variable:

from multiprocessing import Process, Queue
from time import sleep

class Foo:
    def __init__(self, q) -> None:
        pass

class Bar:
    def __init__(self) -> None:
        self.create_another()

    def create_another(self):
        if hasattr(self, "q"):
            self.p.terminate()
            del self.q # Just to prove this works
        self.q = Queue()
        self.p = Process(target=Foo, args=(self.q,), daemon=True)
        self.p.start()

    def __del__(self):
        self.p.terminate()
        del self

if __name__ == '__main__':
    # Mini tests
    x = Bar()
    sleep(1)
    x.create_another()
    Bar()
    Bar().create_another()

@YvesDup
Copy link
Contributor

YvesDup commented Sep 1, 2024

FYI, the example below fails whatever foo is: a function, a new object (__init__ as callable) or a callable existing object.

def main():
    q = Queue()
    p = Process(target=foo, args=(q,))
    p.start()

if __name__ == '__main__':
    main()
    sleep(1)

@Moosems
Copy link
Author

Moosems commented Sep 1, 2024

If you return q and keep it alive while the process is instantiated it works normally @YvesDup

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
topic-multiprocessing type-bug An unexpected behavior, bug, or error
Projects
None yet
Development

No branches or pull requests

4 participants