Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error closing a local cluster when client still running #6087

Closed
gjoseph92 opened this issue Apr 7, 2022 · 6 comments · Fixed by #6120
Closed

Error closing a local cluster when client still running #6087

gjoseph92 opened this issue Apr 7, 2022 · 6 comments · Fixed by #6120
Labels
bug Something is broken p1 Affects a large population and inhibits work regression

Comments

@gjoseph92
Copy link
Collaborator

What happened:

When a local cluster (using processes) shuts down, a ton of errors are now spewed about scheduling new futures after shutdown.

I can't replicate it in my distributed dev environment, but in a different environment (which is quite similar, also running dask & distributed from main—just py py3.9.1 instead of py3.9.5?) the process hangs and never terminates until a ctrl-C. In my distributed dev environment, the same errors are spewed, but it exits (with code 0 no the less).

git bisect implicates #6031 @graingert.

Minimal Complete Verifiable Example:

# repro.py
import distributed
from distributed.deploy.local import LocalCluster


if __name__ == "__main__":
    cluster = LocalCluster(n_workers=1, threads_per_worker=1, processes=True)
    client = distributed.Client(cluster)
(dask-distributed) gabe dev/distributed ‹f4c52e9a› » python repro.py
2022-04-07 15:47:02,414 - distributed.utils - ERROR - cannot schedule new futures after shutdown
Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 226, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1395, in _handle_report
    msgs = await self.scheduler_comm.comm.read()
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 242, in read
    convert_stream_closed_error(self, e)
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://127.0.0.1:58800 remote=tcp://127.0.0.1:58791>: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/utils.py", line 693, in log_errors
    yield
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1225, in _reconnect
    await self._ensure_connected(timeout=timeout)
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1254, in _ensure_connected
    comm = await connect(
  File "/Users/gabe/dev/distributed/distributed/comm/core.py", line 289, in connect
    comm = await asyncio.wait_for(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/tasks.py", line 481, in wait_for
    return fut.result()
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 439, in connect
    stream = await self.client.connect(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
    addrinfo = await self.resolver.resolve(host, port, af)
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 424, in resolve
    for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/base_events.py", line 856, in getaddrinfo
    return await self.run_in_executor(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/base_events.py", line 814, in run_in_executor
    executor.submit(func, *args), loop=self)
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py", line 161, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
2022-04-07 15:47:02,417 - distributed.utils - ERROR - cannot schedule new futures after shutdown
Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 226, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1395, in _handle_report
    msgs = await self.scheduler_comm.comm.read()
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 242, in read
    convert_stream_closed_error(self, e)
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://127.0.0.1:58800 remote=tcp://127.0.0.1:58791>: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/utils.py", line 693, in log_errors
    yield
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1401, in _handle_report
    await self._reconnect()
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1225, in _reconnect
    await self._ensure_connected(timeout=timeout)
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1254, in _ensure_connected
    comm = await connect(
  File "/Users/gabe/dev/distributed/distributed/comm/core.py", line 289, in connect
    comm = await asyncio.wait_for(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/tasks.py", line 481, in wait_for
    return fut.result()
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 439, in connect
    stream = await self.client.connect(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
    addrinfo = await self.resolver.resolve(host, port, af)
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 424, in resolve
    for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/base_events.py", line 856, in getaddrinfo
    return await self.run_in_executor(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/base_events.py", line 814, in run_in_executor
    executor.submit(func, *args), loop=self)
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py", line 161, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
2022-04-07 15:47:02,471 - distributed.utils - ERROR - cannot schedule new futures after shutdown
Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 226, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1395, in _handle_report
    msgs = await self.scheduler_comm.comm.read()
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 242, in read
    convert_stream_closed_error(self, e)
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://127.0.0.1:58800 remote=tcp://127.0.0.1:58791>: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/utils.py", line 693, in log_errors
    yield
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1521, in _close
    await asyncio.wait_for(asyncio.shield(handle_report_task), 0.1)
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/tasks.py", line 481, in wait_for
    return fut.result()
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1401, in _handle_report
    await self._reconnect()
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1225, in _reconnect
    await self._ensure_connected(timeout=timeout)
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1254, in _ensure_connected
    comm = await connect(
  File "/Users/gabe/dev/distributed/distributed/comm/core.py", line 289, in connect
    comm = await asyncio.wait_for(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/tasks.py", line 481, in wait_for
    return fut.result()
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 439, in connect
    stream = await self.client.connect(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
    addrinfo = await self.resolver.resolve(host, port, af)
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 424, in resolve
    for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/base_events.py", line 856, in getaddrinfo
    return await self.run_in_executor(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/base_events.py", line 814, in run_in_executor
    executor.submit(func, *args), loop=self)
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py", line 161, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')

Environment:

  • Dask version: 6e30766
  • Python version: 3.9.5
  • Operating System: macOS
  • Install method (conda, pip, source): source
@gjoseph92 gjoseph92 added bug Something is broken regression labels Apr 7, 2022
@graingert
Copy link
Member

graingert commented Apr 8, 2022

at python interpreter finalization:

The reason this worked before is that our ExecutorResolver used our legacy copy of ThreadPoolExecutor which uses atexit instead of threading._register_atexit:

def _python_exit():
global _shutdown
_shutdown = True
items = list(_threads_queues.items())
for t, q in items:
q.put(None)
for t, q in items:
t.join()
atexit.register(_python_exit)

A fix for this would be to ensure that clients then workers then clusters get shutdown atexit

@fjetter
Copy link
Member

fjetter commented Apr 8, 2022

I'm running into all sorts of problems regarding future/task scheduling and cancellation and blocking threadpools lately. Here are a couple of references but I can't tell you if any are actually related

@fjetter fjetter added the p1 Affects a large population and inhibits work label Apr 8, 2022
@mrocklin
Copy link
Member

We should probably block release on this, yes? cc @jrbourbeau

@graingert is this something that you have time to own? (this seems like the sort of thing that you know best among the team)

@mrocklin
Copy link
Member

Here is a possible solution: #6120

@vepadulano
Copy link
Contributor

Hi, I'm still having the same issue with the following configuration:

  • Python 3.10.5
  • dask==2022.7.0,distributed==2022.7.0

Reproducer

from dask.distributed import LocalCluster, Client

if __name__ == '__main__':
    cluster = LocalCluster(n_workers=1, threads_per_worker=1, processes=True)
    cluster.close()

Admittedly, the original reproducer didn't manually close the cluster object, but I'm not sure how much it matters in this case. I would like to be able to do so, in general.

@jtlz2
Copy link

jtlz2 commented Jul 5, 2023

Is the context manager the only way round here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken p1 Affects a large population and inhibits work regression
Projects
None yet
6 participants