Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TypeError: an integer is required (got type NoneType) in pebble/pool/process.py", line 178, in message_manager_loop #58

Closed
marxin opened this issue Apr 14, 2020 · 19 comments
Labels

Comments

@marxin
Copy link
Contributor

marxin commented Apr 14, 2020

First, I would like to really thank for the library. I need to create a process pool where I need capability to immediately terminate running tasks. I can't understand why the official concurrent.futures lacks the ability.

However, I see various exceptions when I do pool.stop():
https://github.com/marxin/creduce/blob/threadpool/creduce/utils/testing.py#L317-L319

00:00:01 INFO ===< ClangBinarySearchPass::replace-function-def-with-decl >===
Exception in thread Thread-18:
Traceback (most recent call last):
  File "/usr/lib64/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 178, in message_manager_loop
    pool_manager.process_next_message(SLEEP_UNIT)
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 210, in process_next_message
    message = self.worker_manager.receive(timeout)
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 342, in receive
    return self.pool_channel.recv()
  File "/usr/lib/python3.8/site-packages/pebble/pool/channel.py", line 63, in recv
    return self.reader.recv()
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 421, in _recv_bytes
    return self._recv(size)
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
TypeError: an integer is required (got type NoneType)
...
Traceback (most recent call last):
  File "/tmp/bin/creduce/usr/local/bin/creduce", line 204, in <module>
    reducer.reduce(pass_group, skip_initial=args.skip_initial_passes)
  File "/tmp/bin/creduce/usr/local/bin/../share/creduce/creduce.py", line 118, in reduce
    self._run_additional_passes(pass_group["first"])
  File "/tmp/bin/creduce/usr/local/bin/../share/creduce/creduce.py", line 145, in _run_additional_passes
    self.test_manager.run_pass(p)
  File "/tmp/bin/creduce/usr/local/bin/../share/creduce/utils/testing.py", line 384, in run_pass
    parallel_tests = self.run_parallel_tests()
  File "/tmp/bin/creduce/usr/local/bin/../share/creduce/utils/testing.py", line 319, in run_parallel_tests
    pool.join()
  File "/usr/lib/python3.8/site-packages/pebble/pool/base_pool.py", line 77, in join
    self._stop_pool()
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 75, in _stop_pool
    loop.join()
  File "/usr/lib64/python3.8/threading.py", line 1011, in join
    self._wait_for_tstate_lock()
  File "/usr/lib64/python3.8/threading.py", line 1027, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

Can you please help me what can I do wrong?

@marxin
Copy link
Contributor Author

marxin commented Apr 14, 2020

Simple reproducer:

#!/usr/bin/env python3

from pebble import ProcessPool
from concurrent.futures import wait, FIRST_COMPLETED

def run():
    pass

while True:
    with ProcessPool(max_workers=16) as pool:
        print('new ProcessPool')
        futures = []
        for i in range(10):
            futures.append(pool.schedule(run))
        wait(futures, return_when=FIRST_COMPLETED)
        pool.close()
        pool.stop()
        pool.join()
$ ./test.py
new ProcessPool
new ProcessPool
...
new ProcessPool
new ProcessPool
Exception in thread Thread-309:
Traceback (most recent call last):
  File "/usr/lib64/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 178, in message_manager_loop
    pool_manager.process_next_message(SLEEP_UNIT)
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 210, in process_next_message
    message = self.worker_manager.receive(timeout)
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 342, in receive
    return self.pool_channel.recv()
  File "/usr/lib/python3.8/site-packages/pebble/pool/channel.py", line 63, in recv
    return self.reader.recv()
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 421, in _recv_bytes
    return self._recv(size)
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
TypeError: an integer is required (got type NoneType)

@noxdafox
Copy link
Owner

Hello,

pool.close and pool.terminate are supposed to be used in mutual exclusive way and not together. You use close and join if you want to finish all scheduled jobs before garbage collecting the pool. You use terminate and join if, instead, you want to abruptly stop the ongoing work.

Yet, it should not fail as such. Thank for providing a minimum reproducible example. I will fix this ASAP.

@marxin
Copy link
Contributor Author

marxin commented Apr 15, 2020

Thank you for the quick answer.

You probably mean pool.stop instead of pool.terminate, right? I can confirm that also stop and join suffers from the problem. Reason why I also added close is that I experimented with that:

#!/usr/bin/env python3

from pebble import ProcessPool
from concurrent.futures import wait, FIRST_COMPLETED

import os

def run(foo):
    return foo

while True:
    with ProcessPool(max_workers=16) as pool:
        print('new ProcessPool')
        futures = []
        for i in range(10):
            futures.append(pool.schedule(run))
        wait(futures, return_when=FIRST_COMPLETED)
        pool.stop()
        pool.join()
$ ./pe.py 
new ProcessPool
Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 178, in message_manager_loop
    pool_manager.process_next_message(SLEEP_UNIT)
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 210, in process_next_message
    message = self.worker_manager.receive(timeout)
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 342, in receive
    return self.pool_channel.recv()
  File "/usr/lib/python3.8/site-packages/pebble/pool/channel.py", line 63, in recv
    return self.reader.recv()
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 421, in _recv_bytes
    return self._recv(size)
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
TypeError: an integer is required (got type NoneType)
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
...

Thanks.

@noxdafox
Copy link
Owner

I just tried your examples with Python 3.8 and could not reproduce it.

Also, the Pool stopping routine is tested via Travis. If there would be some issue, it would have already shown up. What OS are you using?

@marxin
Copy link
Contributor Author

marxin commented Apr 15, 2020

I'm using openSUSE Tumbleweed with:

$ python3 --version
Python 3.8.2
$ uname -a
Linux kettlebell 5.6.2-1-default #1 SMP Thu Apr 2 06:31:32 UTC 2020 (c8170d6) x86_64 x86_64 x86_64 GNU/Linux

Note that I've just tested that on 3 different Tumbleweed machines and I've also tested
podman run -it fedora /bin/bash and the same.

And I also tested SLES 15:

$ uname -a
Linux kunlun 4.12.14-197.29-default #1 SMP Fri Dec 6 12:08:50 UTC 2019 (ca25711) x86_64 x86_64 x86_64 GNU/Linux
$ cat /etc/SUSE-brand 
SLE
VERSION = 15
$ python3 --version
Python 3.6.10
$ ./pe.py
...
new ProcessPool
Exception in thread Thread-45:
Traceback (most recent call last):
  File "/usr/lib64/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/home/mliska/.local/lib/python3.6/site-packages/pebble/pool/process.py", line 178, in message_manager_loop
    pool_manager.process_next_message(SLEEP_UNIT)
  File "/home/mliska/.local/lib/python3.6/site-packages/pebble/pool/process.py", line 210, in process_next_message
    message = self.worker_manager.receive(timeout)
  File "/home/mliska/.local/lib/python3.6/site-packages/pebble/pool/process.py", line 342, in receive
    return self.pool_channel.recv()
  File "/home/mliska/.local/lib/python3.6/site-packages/pebble/pool/channel.py", line 63, in recv
    return self.reader.recv()
  File "/usr/lib64/python3.6/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/usr/lib64/python3.6/multiprocessing/connection.py", line 411, in _recv_bytes
    return self._recv(size)
  File "/usr/lib64/python3.6/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
TypeError: an integer is required (got type NoneType)

new ProcessPool
...

marxin added a commit to marxin/pebble that referenced this issue Apr 16, 2020
Make pytest run with -s in order to print output.
marxin added a commit to marxin/pebble that referenced this issue Apr 16, 2020
Make pytest run with -s in order to print output.
marxin added a commit to marxin/pebble that referenced this issue Apr 16, 2020
Make pytest run with -s in order to print output.
@marxin
Copy link
Contributor Author

marxin commented Apr 16, 2020

As explained in the pull request, one needs -s option to see the Exception. Otherwise, it's not presented by pytest.
Key set-up of the test-case is to join a ProcessFuture before pool.stop is called.

@marxin
Copy link
Contributor Author

marxin commented Apr 16, 2020

@hroncok Can you please test in on a bare-metal Fedora system?

@hroncok
Copy link

hroncok commented Apr 16, 2020

Can you please test in on a bare-metal Fedora system?

What test exactly? And with what pebble (from RPM, from PyPI or from git master)?

@marxin
Copy link
Contributor Author

marxin commented Apr 16, 2020

What test exactly? And with what pebble (form RPM, from PyPi or from git master)?

The test mentioned in #58 (comment).
I would test the latest release either installed with pip or RPM (btw. does Fedora have the Pebble packaged)?
Thank you

@hroncok
Copy link

hroncok commented Apr 16, 2020

does Fedora have the Pebble packaged

I don't know.

[pebtest]$ python3.8 -m venv __venv__
[pebtest]$ . __venv__/bin/activate
(__venv__) [pebtest]$ pip install pebble
Collecting pebble
  Downloading https://files.pythonhosted.org/packages/9c/35/085d244bc261f720e98a62943d639161f2d69aa068168464494ce05a14a4/Pebble-4.5.1-py2.py3-none-any.whl
Installing collected packages: pebble
Successfully installed pebble-4.5.1
WARNING: You are using pip version 19.3.1; however, version 20.0.2 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
(__venv__) [pebtest]$ cat > pe.py
#!/usr/bin/env python3

from pebble import ProcessPool
from concurrent.futures import wait, FIRST_COMPLETED

import os

def run(foo):
    return foo

while True:
    with ProcessPool(max_workers=16) as pool:
        print('new ProcessPool')
        futures = []
        for i in range(10):
            futures.append(pool.schedule(run))
        wait(futures, return_when=FIRST_COMPLETED)
        pool.stop()
        pool.join()
(__venv__) [pebtest]$ chmod +x pe.py 
(__venv__) [pebtest]$ ./pe.py 
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
Exception in thread Thread-36:
Traceback (most recent call last):
  File "/usr/lib64/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/process.py", line 178, in message_manager_loop
    pool_manager.process_next_message(SLEEP_UNIT)
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/process.py", line 210, in process_next_message
    message = self.worker_manager.receive(timeout)
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/process.py", line 342, in receive
    return self.pool_channel.recv()
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/channel.py", line 63, in recv
    return self.reader.recv()
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 421, in _recv_bytes
    return self._recv(size)
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
TypeError: an integer is required (got type NoneType)
new ProcessPool
new ProcessPool
Exception in thread Thread-42:
Traceback (most recent call last):
  File "/usr/lib64/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/process.py", line 178, in message_manager_loop
    pool_manager.process_next_message(SLEEP_UNIT)
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/process.py", line 210, in process_next_message
    message = self.worker_manager.receive(timeout)
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/process.py", line 342, in receive
    return self.pool_channel.recv()
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/channel.py", line 63, in recv
    return self.reader.recv()
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 421, in _recv_bytes
    return self._recv(size)
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
TypeError: an integer is required (got type NoneType)
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
new ProcessPool
^CProcess Process-366:
Exception ignored in: <function _after_fork at 0x7f7d3cf5baf0>
Traceback (most recent call last):
  File "/usr/lib64/python3.8/threading.py", line 1442, in _after_fork
    thread._reset_internal_locks(True)
  File "/usr/lib64/python3.8/threading.py", line 811, in _reset_internal_locks
    self._started._reset_internal_locks()
  File "/usr/lib64/python3.8/threading.py", line 511, in _reset_internal_locks
    self._cond.__init__(Lock())
  File "/usr/lib64/python3.8/threading.py", line 225, in __init__
    self._lock = lock
KeyboardInterrupt: 
Traceback (most recent call last):
  File "/usr/lib64/python3.8/multiprocessing/process.py", line 299, in _bootstrap
    util._close_stdin()
  File "/usr/lib64/python3.8/multiprocessing/util.py", line 399, in _close_stdin
    sys.stdin = open(fd, closefd=False)
  File "/usr/lib64/python3.8/_bootlocale.py", line 33, in getpreferredencoding
    def getpreferredencoding(do_setlocale=True):
KeyboardInterrupt
Traceback (most recent call last):
  File "./pe.py", line 16, in <module>
    futures.append(pool.schedule(run))
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/process.py", line 90, in schedule
    self._check_pool_state()
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/base_pool.py", line 91, in _check_pool_state
    self._update_pool_state()
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/base_pool.py", line 100, in _update_pool_state
    self._start_pool()
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/process.py", line 63, in _start_pool
    self._pool_manager.start()
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/process.py", line 192, in start
    self.worker_manager.create_workers()
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/process.py", line 364, in create_workers
    self.new_worker()
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/pool/process.py", line 376, in new_worker
    worker = launch_process(
  File "/home/churchyard/tmp/pebtest/__venv__/lib64/python3.8/site-packages/pebble/common.py", line 141, in launch_process
    process.start()
  File "/usr/lib64/python3.8/multiprocessing/process.py", line 120, in start
    _cleanup()
  File "/usr/lib64/python3.8/multiprocessing/process.py", line 64, in _cleanup
    if p._popen.poll() is not None:
  File "/usr/lib64/python3.8/multiprocessing/popen_fork.py", line 27, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt

@marxin
Copy link
Contributor Author

marxin commented Apr 16, 2020

Thank you @hroncok. You see exactly the same problem as I do.

@noxdafox noxdafox added the bug label Apr 19, 2020
@noxdafox
Copy link
Owner

Managed to reproduce it on one of my environments. I will investigate further, thanks for reporting this.

@marxin
Copy link
Contributor Author

marxin commented Apr 23, 2020

@noxdafox Any update on this please?

@swills
Copy link

swills commented Apr 28, 2020

I'm seeing this on FreeBSD too. Any update?

@noxdafox
Copy link
Owner

This issue is similar as to #10 and #20. It is a race condition happening at pool shutdown which produces a noisy crash.

This does not affect the Pool functionality itself so you can use it safely.

I already submitted a fix and will make a new release over the next week-end.

noxdafox added a commit that referenced this issue Apr 29, 2020
When terminating the Pool, there might be situations in which the
results polling thread still tries to received data from a closed
pipe.

This leads to the thread crashing with a visible traceback on the
logs.

I could not reproduce this in older versions of Python. I could not
figure out what changed in Python 3.8 but as it is a race condition
type of issue it might be very difficult to debug.

Signed-off-by: Matteo Cafasso <noxdafox@gmail.com>
@marxin
Copy link
Contributor Author

marxin commented Apr 29, 2020

Good job 👍

@marxin marxin closed this as completed Apr 29, 2020
@marxin
Copy link
Contributor Author

marxin commented Apr 29, 2020

I've just rebuilt openSUSE package and the issue is gone.
Thanks.

@marxin
Copy link
Contributor Author

marxin commented Apr 30, 2020

Can you please merge my PR #61 that tests this issue?

@noxdafox
Copy link
Owner

noxdafox commented May 6, 2020

Issue fixed in release 4.5.2.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants