Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker raises CommClosedError on client shutdown #94

Open
lgarrison opened this issue Oct 5, 2022 · 20 comments
Open

Worker raises CommClosedError on client shutdown #94

lgarrison opened this issue Oct 5, 2022 · 20 comments

Comments

@lgarrison
Copy link
Contributor

Describe the issue:

I'm trying out a simple hello-world style dask-mpi example, and the computation returns the right result, but I'm getting exceptions when the client finishes. I'm running the below script under Slurm as srun -n3 python repro.py, and the error is:

2022-10-05 14:05:41,510 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-a2bfbff7-5dcc-49da-868a-08c403ba78f9 Address tcp://10.250.145.105:38331 Status: Status.closing
2022-10-05 14:05:41,510 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Worker->Scheduler local=tcp://10.250.145.105:42110 remote=tcp://10.250.145.105:45101>
Traceback (most recent call last):
  File "/mnt/home/lgarrison/scc/daskdistrib/venv8/lib/python3.8/site-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
  File "/mnt/sw/nix/store/db63z7j5w4n84c625pv5b57m699bnbws-python-3.8.12-view/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/mnt/home/lgarrison/scc/daskdistrib/venv8/lib/python3.8/site-packages/distributed/comm/tcp.py", line 269, in write
    raise CommClosedError()
distributed.comm.core.CommClosedError

I thought this might be related to #87, but I'm running on Python 3.8 and there's just an exception, no hang.

Am I doing something wrong? It looks to me like the worker is complaining because the scheduler shuts down before the worker does. Is this expected? If I manually force the workers to shut down before the client and scheduler do, with:

def closeall(dask_scheduler):
    for w in dask_scheduler.workers:
        dask_scheduler.close_worker(w)
[...]
client.run_on_scheduler(closeall)

then everything exits with no exceptions. But this feels like a hack... am I missing something?

Minimal Complete Verifiable Example:

import dask_mpi
from distributed import Client

def f(a):
    return a + 1

def main():
    dask_mpi.initialize()

    with Client() as client:
        future = client.submit(f, 1)
        res = future.result()
        print(f'future returned {res}')

if __name__ == '__main__':
    main()

Full log:

(venv8) lgarrison@scclin021:~/scc/daskdistrib$ srun -n3 python ./repro_commclosed.py 
2022-10-05 14:05:40,460 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2022-10-05 14:05:40,527 - distributed.scheduler - INFO - State start
2022-10-05 14:05:40,533 - distributed.scheduler - INFO -   Scheduler at: tcp://10.250.145.105:45101
2022-10-05 14:05:40,533 - distributed.scheduler - INFO -   dashboard at:                     :8787
2022-10-05 14:05:40,566 - distributed.worker - INFO -       Start worker at: tcp://10.250.145.105:38331
2022-10-05 14:05:40,566 - distributed.worker - INFO -          Listening to: tcp://10.250.145.105:38331
2022-10-05 14:05:40,566 - distributed.worker - INFO -           Worker name:                          2
2022-10-05 14:05:40,566 - distributed.worker - INFO -          dashboard at:       10.250.145.105:34979
2022-10-05 14:05:40,566 - distributed.worker - INFO - Waiting to connect to: tcp://10.250.145.105:45101
2022-10-05 14:05:40,566 - distributed.worker - INFO - -------------------------------------------------
2022-10-05 14:05:40,566 - distributed.worker - INFO -               Threads:                          1
2022-10-05 14:05:40,566 - distributed.worker - INFO -                Memory:                   7.81 GiB
2022-10-05 14:05:40,566 - distributed.worker - INFO -       Local Directory: /tmp/dask-worker-space/worker-az8e_3tm
2022-10-05 14:05:40,566 - distributed.worker - INFO - -------------------------------------------------
2022-10-05 14:05:41,354 - distributed.scheduler - INFO - Receive client connection: Client-5274d531-44d8-11ed-94ba-4cd98f221a38
2022-10-05 14:05:41,356 - distributed.core - INFO - Starting established connection
2022-10-05 14:05:41,385 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://10.250.145.105:38331', name: 2, status: init, memory: 0, processing: 0>
2022-10-05 14:05:41,386 - distributed.scheduler - INFO - Starting worker compute stream, tcp://10.250.145.105:38331
2022-10-05 14:05:41,386 - distributed.core - INFO - Starting established connection
2022-10-05 14:05:41,386 - distributed.worker - INFO -         Registered to: tcp://10.250.145.105:45101
2022-10-05 14:05:41,386 - distributed.worker - INFO - -------------------------------------------------
2022-10-05 14:05:41,387 - distributed.core - INFO - Starting established connection
2022-10-05 14:05:41,500 - distributed.scheduler - INFO - Remove client Client-5274d531-44d8-11ed-94ba-4cd98f221a38
2022-10-05 14:05:41,500 - distributed.scheduler - INFO - Remove client Client-5274d531-44d8-11ed-94ba-4cd98f221a38
2022-10-05 14:05:41,500 - distributed.scheduler - INFO - Close client connection: Client-5274d531-44d8-11ed-94ba-4cd98f221a38
future returned 2
2022-10-05 14:05:41,506 - distributed.scheduler - INFO - Receive client connection: Client-53080c1f-44d8-11ed-94ba-4cd98f221a38
2022-10-05 14:05:41,507 - distributed.core - INFO - Starting established connection
2022-10-05 14:05:41,508 - distributed.worker - INFO - Run out-of-band function 'stop'
2022-10-05 14:05:41,509 - distributed.scheduler - INFO - Scheduler closing...
2022-10-05 14:05:41,509 - distributed.scheduler - INFO - Scheduler closing all comms
2022-10-05 14:05:41,509 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://10.250.145.105:38331', name: 2, status: running, memory: 0, processing: 0>
2022-10-05 14:05:41,509 - distributed.worker - INFO - Stopping worker at tcp://10.250.145.105:38331
2022-10-05 14:05:41,510 - distributed.core - INFO - Removing comms to tcp://10.250.145.105:38331
2022-10-05 14:05:41,510 - distributed.scheduler - INFO - Lost all workers
2022-10-05 14:05:41,510 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-a2bfbff7-5dcc-49da-868a-08c403ba78f9 Address tcp://10.250.145.105:38331 Status: Status.closing
2022-10-05 14:05:41,510 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Worker->Scheduler local=tcp://10.250.145.105:42110 remote=tcp://10.250.145.105:45101>
Traceback (most recent call last):
  File "/mnt/home/lgarrison/scc/daskdistrib/venv8/lib/python3.8/site-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
  File "/mnt/sw/nix/store/db63z7j5w4n84c625pv5b57m699bnbws-python-3.8.12-view/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/mnt/home/lgarrison/scc/daskdistrib/venv8/lib/python3.8/site-packages/distributed/comm/tcp.py", line 269, in write
    raise CommClosedError()
distributed.comm.core.CommClosedError

Environment:

  • Dask version: 2022.9.2
  • Python version: 3.8.12
  • Operating System: Rocky 8
  • Install method (conda, pip, source): pip
@lgarrison
Copy link
Contributor Author

Actually if I submit enough jobs to to the worker (1000+), the run_on_scheduler(closeall) doesn't seem to stop the exception from happening.

@jacobtomlinson
Copy link
Member

Does calling client.shutdown() before leaving the client context manager resolve this?

@lgarrison
Copy link
Contributor Author

No, same error. The job also hangs for 30s when leaving the context, then fails with a ConnectionRefusedError to the scheduler. I figure this is because the shutdown is triggered twice: once with the explicit call to client.shutdown(), then again when leaving the context.

@kmpaul
Copy link
Collaborator

kmpaul commented Oct 8, 2022

These are errors I've seen before. I can't remember what is going on right now, but I'll look into it when I'm back in the office in a week. Sorry for the wait.

@lgarrison
Copy link
Contributor Author

Thanks! Let me know if you need me to run more tests.

@lgarrison
Copy link
Contributor Author

Hi @kmpaul, have you had a chance to look at this?

@kmpaul
Copy link
Collaborator

kmpaul commented Oct 21, 2022

@lgarrison: I have not, yet. I'm sorry. This week got very messy. I absolutely will look into this on Monday. I'm sorry for the delay.

@kmpaul
Copy link
Collaborator

kmpaul commented Oct 24, 2022

@lgarrison: I've verified the error on both Ubuntu and Rocky Linux. This is actually a resurgence of #88, which notes exactly the issue that you are seeing. I had thought this was fixed with #89, but it is clearly coming back.

@kmpaul
Copy link
Collaborator

kmpaul commented Oct 24, 2022

(Note that #89 actually changed the atexit-called send_close_signal function to be exactly what @jacobtomlinson suggested you try, namely client.shutdown().)

@kmpaul
Copy link
Collaborator

kmpaul commented Oct 24, 2022

...And I can further verify that these CommClosedErrors occur even for Python 3.10.6, on Linux and Windows.

@kmpaul
Copy link
Collaborator

kmpaul commented Oct 24, 2022

Due to the problems noted in #88 related to errors occurring during shutdown not resulting in non-0 exit codes, the PyTest suite is not catching errors. I think this is a significant problem and one that I will need some time to investigate and fix. In the meantime, @lgarrison, it doesn't look like the exit-time errors are resulting in incorrect results. Are you okay continuing "as-is" and just ignoring the CommClosedErrors until I can sort them out?

@lgarrison
Copy link
Contributor Author

lgarrison commented Oct 24, 2022

Yes, I can ignore the errors for now. I'm working on instructions for scientists using our local HPC resources to farm out work to the cluster using dask-mpi, and I think these errors would be a source of confusion, even if the code executes correctly. So I can continue my experimentation, but I'll probably wait until this is fixed to start teaching users about it.

(Alternatively, if there's a workflow, dask-based or otherwise, that you like to use for dynamically dispatching independent, Python-based tasks to a Slurm allocation, I'd be interested to hear about it!)

@kmpaul
Copy link
Collaborator

kmpaul commented Oct 24, 2022

Yes, I can ignore the errors for now. I'm working on instructions for scientists using our local HPC resources to farm out work to the cluster using dask-mpi, and I think these errors would be a source of confusion, even if the code executes correctly. So I can continue my experimentation, but I'll probably wait until this is fixed to start teaching users about it.

Ok. I'll keep plugging away to try to get a solution as soon as possible.

@kmpaul
Copy link
Collaborator

kmpaul commented Oct 25, 2022

Sigh. @lgarrison: I've tracked down the errors to something beyond Dask-MPI. You can test if you are seeing the same thing as me, but I'm seeing CommClosedErrors during client shutdowns even without Dask-MPI!

MVP:

With the latest versions of Dask and Distributed (on Linux), do the following.

In Terminal 1:

$ dask-scheduler

Note the ADDRESS in the Scheduler at: ADDRESS:8786 log message.

In Terminal 2:

$ dask-worker ADDRESS:8786

where ADDRESS is the scheduler IP address (without the port number).

In Terminal 3:

$ python
Python 3.10.6 | packaged by conda-forge | (main, Aug 22 2022, 20:36:39) [GCC 10.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from distributed import Client
>>> client = Client('ADDRESS:8786')
>>> client.shutdown()

where ADDRESS is the scheduler IP address noted above.

Results

In Terminal 1, the scheduler shuts down appropriately without errors.

In Terminal 2, the worker shuts down, but not without error. The logs of the worker after client.shutdown() is called are:

2022-10-25 15:54:14,502 - distributed.worker - INFO - Stopping worker at ADDRESS:40141
2022-10-25 15:54:14,503 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-ce265b58-eecf-4335-bd1e-d0fc3b07e93d Address ADDRESS:40141 Status: Status.closing
2022-10-25 15:54:14,503 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Worker->Scheduler local=ADDRESS:33682 remote=ADDRESS:8786>
Traceback (most recent call last):
  File ".../lib/python3.10/site-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
  File ".../lib/python3.10/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File ".../lib/python3.10/site-packages/distributed/comm/tcp.py", line 269, in write
    raise CommClosedError()
distributed.comm.core.CommClosedError
2022-10-25 15:54:14,507 - distributed.nanny - INFO - Worker closed
2022-10-25 15:54:14,508 - distributed.nanny - ERROR - Worker process died unexpectedly
2022-10-25 15:54:14,699 - distributed.nanny - INFO - Closing Nanny at 'ADDRESS:46611'.
2022-10-25 15:54:14,699 - distributed.dask_worker - INFO - End worker

In Terminal 3, where the client is running, a CommClosedError appears every time the client heartbeat is called (about once every 5 seconds):

2022-10-25 15:54:15,825 - tornado.application - ERROR - Exception in callback <bound method Client._heartbeat of <Client: 'ADDRESS:8786' processes=1 threads=8, memory=7.63 GiB>>
Traceback (most recent call last):
  File ".../lib/python3.10/site-packages/tornado/ioloop.py", line 905, in _run
    return self.callback()
  File ".../lib/python3.10/site-packages/distributed/client.py", line 1390, in _heartbeat
    self.scheduler_comm.send({"op": "heartbeat-client"})
  File ".../lib/python3.10/site-packages/distributed/batched.py", line 156, in send
    raise CommClosedError(f"Comm {self.comm!r} already closed.")
distributed.comm.core.CommClosedError: Comm <TCP (closed) Client->Scheduler local=ADDRESS:33802 remote=ADDRESS:8786> already closed.

And this does not stop repeating until the Python process is exited (e.g., exit()).

Interestingly, you can avoid the error that appears in Terminal 2 in the worker logs if you call client.retire_workers() before calling client.shutdown(), but the CommClosedError errors in the Client application (Terminal 3) are still present.

This happens with the latest versions of Dask and Distributed (from Conda-Forge) on Windows, too.

@lgarrison
Copy link
Contributor Author

Interesting, thanks! I was also able to reproduce this without dask-mpi following your instructions. I confirm that client.retire_workers() avoids the error, but when I add it to my reproduction script before client shutdown, it doesn't help. Sleeping between the retirement and shutdown doesn't help either.

So, should I open an issue in an upstream repo? Would that be dask/distributed?

@kmpaul
Copy link
Collaborator

kmpaul commented Oct 25, 2022

I'm opening an upstream issue right now. I'm seeing if I can figure out with Dask version introduced the regression. Then I'll submit the issue and report it here.

@kmpaul
Copy link
Collaborator

kmpaul commented Oct 25, 2022

There is definitely some strangeness produced by Python's async functions, here. When tests are run in dask-mpi, for example, and an async error occurs at shutdown, the process still ends with an exit code of 0. That really obscures problems that occur from changes upstream.

@kmpaul
Copy link
Collaborator

kmpaul commented Oct 25, 2022

Ok. The Dask Distributed issue has been created (dask/distributed#7192). I'm not sure how much more I want to work on Dask-MPI until I hear back about that issue, lest I spend too much time trying to design around an upstream bug. So, I'll return to Dask-MPI if/when I hear about a solution to dask/distributed#7192.

@kmpaul
Copy link
Collaborator

kmpaul commented Oct 26, 2022

Interesting, thanks! I was also able to reproduce this without dask-mpi following your instructions. I confirm that client.retire_workers() avoids the error, but when I add it to my reproduction script before client shutdown, it doesn't help. Sleeping between the retirement and shutdown doesn't help either.

@lgarrison: If you are following what is happening in dask/distributed#7192, then you probably know that I tried the sleep trick in my test and it actually worked. That was not your experience. How long did you sleep for between retire_workers and shutdown?

@kmpaul
Copy link
Collaborator

kmpaul commented Oct 26, 2022

(My thinking is that with a large number of workers, the retire_workers call could take quite a while to complete.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants