Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update SSHCluster usage in benchmarks with new CUDAWorker #326

Merged

Conversation

pentschev
Copy link
Member

@pentschev pentschev commented Jun 26, 2020

Updates usage of SSHCluster according to changes in dask/distributed#5191.

@pentschev
Copy link
Member Author

This PR is still waiting on dask/distributed#3907 getting merged.

@pentschev
Copy link
Member Author

Still waiting on dask/distributed#3907 .

@pentschev pentschev added 0 - Blocked Cannot progress due to external reasons improvement Improvement / enhancement to an existing function non-breaking Non-breaking change labels Jan 8, 2021
@github-actions
Copy link

This PR has been marked stale due to no recent activity in the past 30d. Please close this PR if it is no longer required. Otherwise, please respond with a comment indicating any updates. This PR will be marked rotten if there is no activity in the next 60d.

@pentschev pentschev changed the base branch from branch-0.15 to branch-21.10 August 12, 2021 16:22
@pentschev
Copy link
Member Author

I updated the PR and the name condition here @charlesbluca , could you take a look as well?

@codecov-commenter
Copy link

codecov-commenter commented Aug 12, 2021

Codecov Report

❗ No coverage uploaded for pull request base (branch-21.12@78326ee). Click here to learn what that means.
The diff coverage is n/a.

❗ Current head fd9cb0d differs from pull request most recent head f615441. Consider uploading reports for the commit f615441 to get more accurate results
Impacted file tree graph

@@               Coverage Diff               @@
##             branch-21.12     #326   +/-   ##
===============================================
  Coverage                ?   70.01%           
===============================================
  Files                   ?       15           
  Lines                   ?     1914           
  Branches                ?        0           
===============================================
  Hits                    ?     1340           
  Misses                  ?      574           
  Partials                ?        0           

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 78326ee...f615441. Read the comment docs.

Copy link
Member

@charlesbluca charlesbluca left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall, thanks for adding in the name fix 😄 just a couple questions:

"scheduler_options": {"protocol": args.protocol},
"worker_module": "dask_cuda.dask_cuda_worker",
"worker_options": worker_options,
"scheduler_options": {"protocol": args.protocol, "port": 8786},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so I understand better - why do we want to explicitly set the scheduler port to 8786?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, I can't remember the reason for that. I think it may have been because we need to have a scheduler_address which is defined in https://github.com/rapidsai/dask-cuda/blob/branch-21.10/dask_cuda/benchmarks/utils.py#L207 and that was to ensure they both match.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense - thanks for the clarification!

"scheduler_options": {"protocol": args.protocol, "port": 8786},
"worker_class": "dask_cuda.CUDAWorker",
"worker_options": {
"protocol": args.protocol,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is protocol being handled by Distributed when creating the workers? Don't see anything in CUDAWorker that suggests we need to pass this argument to the constructor

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's passed via CUDAWorker's kwargs.

@charlesbluca charlesbluca self-requested a review August 12, 2021 20:51
@pentschev pentschev changed the base branch from branch-21.10 to branch-21.12 September 30, 2021 21:06
@github-actions github-actions bot added the python python code needed label Sep 30, 2021
@pentschev
Copy link
Member Author

pentschev commented Sep 30, 2021

This is now ready for review. I've verified it works:

Sample result
$ python dask_cuda/benchmarks/local_cupy.py -d 0,1,2,3,4,5,6,7 --multi-node --hosts dgx13,dgx13,dgx14
distributed.deploy.ssh - INFO - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - Clear task state
distributed.deploy.ssh - INFO - distributed.scheduler - INFO -   Scheduler at:  tcp://10.33.227.163:8786
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.33.227.163:34611'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.33.227.163:46231'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.33.227.163:40839'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.33.227.163:39261'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.33.227.163:44699'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.33.227.163:43081'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.33.227.163:41553'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.33.227.163:42265'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.33.227.164:36231'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.33.227.164:38385'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.33.227.164:34751'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.33.227.164:46277'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.33.227.164:43871'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.33.227.164:39925'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.33.227.164:45527'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.33.227.164:39429'
distributed.deploy.ssh - INFO - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.deploy.ssh - INFO - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.deploy.ssh - INFO - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.deploy.ssh - INFO - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.deploy.ssh - INFO - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.deploy.ssh - INFO - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.deploy.ssh - INFO - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.deploy.ssh - INFO - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.deploy.ssh - INFO - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.deploy.ssh - INFO - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.deploy.ssh - INFO - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.deploy.ssh - INFO - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.deploy.ssh - INFO - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.deploy.ssh - INFO - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.deploy.ssh - INFO - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.deploy.ssh - INFO - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.deploy.ssh - INFO - distributed.preloading - INFO - Run preload setup click command: dask_cuda.initialize
distributed.deploy.ssh - INFO - distributed.worker - INFO -       Start worker at:  tcp://10.33.227.163:42593
distributed.deploy.ssh - INFO - distributed.preloading - INFO - Run preload setup click command: dask_cuda.initialize
distributed.deploy.ssh - INFO - distributed.worker - INFO -       Start worker at:  tcp://10.33.227.164:33719
Roundtrip benchmark
--------------------------
Operation          | transpose_sum
User size          | 10000
User second size   | 1000
User chunk-size    | 2500
Compute shape      | (10000, 10000)
Compute chunk-size | (2500, 2500)
Ignore-size        | 1.00 MiB
Protocol           | tcp
Device(s)          | 0,1,2,3,4,5,6,7
Worker Thread(s)   | 1
==========================
Wall-clock         | npartitions
--------------------------
741.70 ms          | 16
401.54 ms          | 16
474.88 ms          | 16
==========================
(w1,w2)            | 25% 50% 75% (total nbytes)
--------------------------
(0-0,0-4)            | 503.84 MiB/s 503.84 MiB/s 503.84 MiB/s (47.68 MiB)
(0-0,1-2)            | 307.30 MiB/s 307.30 MiB/s 307.30 MiB/s (47.68 MiB)
(0-0,1-5)            | 352.49 MiB/s 352.49 MiB/s 352.49 MiB/s (47.68 MiB)
(0-1,0-4)            | 412.72 MiB/s 450.57 MiB/s 488.43 MiB/s (95.37 MiB)
(0-2,0-3)            | 477.32 MiB/s 477.32 MiB/s 477.32 MiB/s (47.68 MiB)
(0-2,0-7)            | 229.05 MiB/s 229.05 MiB/s 229.05 MiB/s (47.68 MiB)
(0-2,1-6)            | 201.33 MiB/s 201.33 MiB/s 201.33 MiB/s (47.68 MiB)
(0-3,0-2)            | 440.86 MiB/s 440.86 MiB/s 440.86 MiB/s (47.68 MiB)
(0-3,1-4)            | 240.14 MiB/s 240.14 MiB/s 240.14 MiB/s (47.68 MiB)
(0-4,0-0)            | 547.29 MiB/s 547.29 MiB/s 547.29 MiB/s (47.68 MiB)
(0-4,0-1)            | 332.11 MiB/s 412.50 MiB/s 492.89 MiB/s (95.37 MiB)
(0-5,1-0)            | 258.74 MiB/s 258.74 MiB/s 258.74 MiB/s (47.68 MiB)
(0-5,1-2)            | 209.05 MiB/s 209.05 MiB/s 209.05 MiB/s (47.68 MiB)
(0-6,1-4)            | 216.67 MiB/s 216.67 MiB/s 216.67 MiB/s (47.68 MiB)
(0-7,0-2)            | 235.83 MiB/s 235.83 MiB/s 235.83 MiB/s (47.68 MiB)
(0-7,1-2)            | 203.69 MiB/s 203.69 MiB/s 203.69 MiB/s (47.68 MiB)
(0-7,1-4)            | 300.24 MiB/s 300.24 MiB/s 300.24 MiB/s (47.68 MiB)
(1-0,0-5)            | 233.64 MiB/s 233.64 MiB/s 233.64 MiB/s (47.68 MiB)
(1-0,1-3)            | 448.04 MiB/s 448.04 MiB/s 448.04 MiB/s (47.68 MiB)
(1-0,1-6)            | 454.55 MiB/s 454.55 MiB/s 454.55 MiB/s (47.68 MiB)
(1-1,1-7)            | 327.01 MiB/s 327.01 MiB/s 327.01 MiB/s (47.68 MiB)
(1-2,0-0)            | 248.36 MiB/s 248.36 MiB/s 248.36 MiB/s (47.68 MiB)
(1-2,0-5)            | 240.94 MiB/s 240.94 MiB/s 240.94 MiB/s (47.68 MiB)
(1-2,0-7)            | 221.89 MiB/s 221.89 MiB/s 221.89 MiB/s (47.68 MiB)
(1-3,1-0)            | 484.33 MiB/s 484.33 MiB/s 484.33 MiB/s (47.68 MiB)
(1-3,1-5)            | 322.91 MiB/s 322.91 MiB/s 322.91 MiB/s (47.68 MiB)
(1-4,0-3)            | 227.95 MiB/s 227.95 MiB/s 227.95 MiB/s (47.68 MiB)
(1-4,0-6)            | 202.38 MiB/s 202.38 MiB/s 202.38 MiB/s (47.68 MiB)
(1-4,0-7)            | 341.02 MiB/s 341.02 MiB/s 341.02 MiB/s (47.68 MiB)
(1-5,0-0)            | 165.27 MiB/s 165.27 MiB/s 165.27 MiB/s (47.68 MiB)
(1-5,1-3)            | 307.15 MiB/s 307.15 MiB/s 307.15 MiB/s (47.68 MiB)
(1-6,0-2)            | 222.80 MiB/s 222.80 MiB/s 222.80 MiB/s (47.68 MiB)
(1-6,1-0)            | 490.94 MiB/s 490.94 MiB/s 490.94 MiB/s (47.68 MiB)
(1-7,1-1)            | 306.49 MiB/s 306.49 MiB/s 306.49 MiB/s (47.68 MiB)

However, after the above finishes, the cluster doesn't exit cleanly and dies 30 seconds later:

Close timeout traceback
tornado.application - ERROR - Exception in callback <bound method Client._heartbeat of <Client: 'tcp://10.33.227.163:8786' processes=16 threads=16, memory=1.97 TiB>>
Traceback (most recent call last):
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/tornado/ioloop.py", line 905, in _run
    return self.callback()
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/client.py", line 1172, in _heartbeat
    self.scheduler_comm.send({"op": "heartbeat-client"})
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/batched.py", line 136, in send
    raise CommClosedError(f"Comm {self.comm!r} already closed.")
distributed.comm.core.CommClosedError: Comm <TCP (closed) Client->Scheduler local=tcp://127.0.0.1:57628 remote=tcp://dgx13:8786> already closed.
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7f8e2597d6d0>>, <Task finished name='Task-126' coro=<SpecCluster._correct_state_internal() done, defined at /datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/deploy/spec.py:325> exception=OSError('Timed out trying to connect to tcp://10.33.227.163:8786 after 30 s')>)
ConnectionRefusedError: [Errno 111] Connection refused

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/comm/core.py", line 284, in connect
    comm = await asyncio.wait_for(
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/asyncio/tasks.py", line 494, in wait_for
    return fut.result()
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/comm/tcp.py", line 410, in connect
    convert_stream_closed_error(self, e)
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/comm/tcp.py", line 126, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
distributed.comm.core.CommClosedError: in <distributed.comm.tcp.TCPConnector object at 0x7f8dea2a65b0>: ConnectionRefusedError: [Errno 111] Connection refused

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "dask_cuda/benchmarks/local_cupy.py", line 315, in run
    await client.shutdown()
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 439, in __aexit__
    await f
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/deploy/spec.py", line 417, in _close
    await self._correct_state()
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/deploy/spec.py", line 332, in _correct_state_internal
    await self.scheduler_comm.retire_workers(workers=list(to_close))
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/core.py", line 785, in send_recv_from_rpc
    comm = await self.live_comm()
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/core.py", line 742, in live_comm
    comm = await connect(
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/comm/core.py", line 308, in connect
    raise OSError(
OSError: Timed out trying to connect to tcp://10.33.227.163:8786 after 30 s
ConnectionRefusedError: [Errno 111] Connection refused

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/comm/core.py", line 284, in connect
    comm = await asyncio.wait_for(
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/asyncio/tasks.py", line 494, in wait_for
    return fut.result()
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/comm/tcp.py", line 410, in connect
    convert_stream_closed_error(self, e)
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/comm/tcp.py", line 126, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
distributed.comm.core.CommClosedError: in <distributed.comm.tcp.TCPConnector object at 0x7f8dea2a65b0>: ConnectionRefusedError: [Errno 111] Connection refused

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "dask_cuda/benchmarks/local_cupy.py", line 380, in <module>
    main()
  File "dask_cuda/benchmarks/local_cupy.py", line 376, in main
    asyncio.get_event_loop().run_until_complete(run(args))
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "dask_cuda/benchmarks/local_cupy.py", line 315, in run
    await client.shutdown()
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/deploy/cluster.py", line 439, in __aexit__
    await f
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/deploy/spec.py", line 417, in _close
    await self._correct_state()
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/deploy/spec.py", line 332, in _correct_state_internal
    await self.scheduler_comm.retire_workers(workers=list(to_close))
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/core.py", line 785, in send_recv_from_rpc
    comm = await self.live_comm()
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/core.py", line 742, in live_comm
    comm = await connect(
  File "/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/comm/core.py", line 308, in connect
    raise OSError(
OSError: Timed out trying to connect to tcp://10.33.227.163:8786 after 30 s
/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/deploy/spec.py:663: RuntimeWarning: coroutine 'wait_for' was never awaited
  cluster.close(timeout=10)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
/datasets/pentschev/miniconda3/envs/rn-112-21.12.210930/lib/python3.8/site-packages/distributed/deploy/spec.py:663: RuntimeWarning: coroutine 'SpecCluster._close' was never awaited
  cluster.close(timeout=10)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

Given this works despite the unclean exit, the low-priority of this task and the prior knowledge of the rabbit hole that Distributed closing/finalizers is, I won't go through that now. If anyone feels like doing that, have fun!

@pentschev pentschev marked this pull request as ready for review September 30, 2021 21:11
@pentschev pentschev requested a review from a team as a code owner September 30, 2021 21:11
@charlesbluca charlesbluca added 3 - Ready for Review Ready for review by team and removed 0 - Blocked Cannot progress due to external reasons labels Sep 30, 2021
Copy link
Member

@charlesbluca charlesbluca left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that is strange behavior, considering there are certainly other areas of the benchmarks that have clean up issues and this doesn't impact the actual times, I'm in agreement to merge this and potentially dig down that rabbit hole later on with a follow up PR in Distributed.

Thanks @pentschev 🙂

@pentschev
Copy link
Member Author

rerun tests

@pentschev
Copy link
Member Author

Ok, this is now passing and since there are no objections to ignoring the unclean exit for now, I'm gonna go ahead and merge it. Thanks @charlesbluca for the review here!

@pentschev
Copy link
Member Author

@gpucibot merge

Copy link
Member

@madsbk madsbk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @pentschev

@rapids-bot rapids-bot bot merged commit 3ed9cc8 into rapidsai:branch-21.12 Oct 11, 2021
@pentschev
Copy link
Member Author

Thanks @madsbk !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
3 - Ready for Review Ready for review by team improvement Improvement / enhancement to an existing function non-breaking Non-breaking change python python code needed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants