-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Cluster.get_client()
method
#6745
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ±0 15 suites ±0 6h 39m 10s ⏱️ + 34m 16s For more details on these failures, see this check. Results for commit e1cc294. ± Comparison against base commit a53858a. ♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This approach seems good to me.
Zooming out a little if we are going down this road maybe we should think more about the early user experience we are trying to hit with this. We are trying to avoid importing and instantiating a Client
because the concept of a client is a little nuanced, but this method is called get_client
and only actually saves a few characters without simplifying the concept.
Perhaps we could name this connect()
instead as that may be more intuitive to new users?
Hmm I like the way you are thinking. I do think the improvement in simplicity is mostly in saving the import: from dask_* import *Cluster
cluster = *Cluster()
client = cluster.get_client() vs from dask_* import *Cluster
from distributed import Client
cluster = *Cluster()
client = Client(cluster) But I see your point. It was making me wonder if there is actually an interesting difference between cluster and client at all. Like why doesn't the cluster just have all the client methods on it? In that world instantiating a cluster would implicitly set the client to point to that cluster. That way you could do something like: from dask_* import *Cluster
cluster = *Cluster()
cluster.submit(...) |
That's really interesting! @ian-r-rose mentioned in another PR that it might be nice to try and move away from the When adding the deployment chapter to the tutorial it was a real reminder that explaining what the I especially like |
Yeah I took a quick look to see what kind of overlap >>> from distributed import Client, LocalCluster
>>> set(dir(Client)).intersection(set(dir(LocalCluster)))
{
...
'asynchronous',
'close',
'dashboard_link',
'sync'
} |
I think I've sidetracked this PR, and we should maybe move this discussion to a design doc if we want to go down this road. Given that this PR improves consistency with |
I still need some help with the error output in tests. When I run them locally ( --- Logging error ---
Traceback (most recent call last):
File "/home/julia/distributed/distributed/comm/tcp.py", line 223, in read
frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/julia/distributed/distributed/client.py", line 1392, in _handle_report
msgs = await self.scheduler_comm.comm.read()
File "/home/julia/distributed/distributed/comm/tcp.py", line 239, in read
convert_stream_closed_error(self, e)
File "/home/julia/distributed/distributed/comm/tcp.py", line 144, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://127.0.0.1:43050 remote=tcp://127.0.0.1:44163>: Stream is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
return await func(*args, **kwargs)
File "/home/julia/distributed/distributed/client.py", line 1211, in _reconnect
await self._ensure_connected(timeout=timeout)
File "/home/julia/distributed/distributed/client.py", line 1241, in _ensure_connected
comm = await connect(
File "/home/julia/distributed/distributed/comm/core.py", line 291, in connect
comm = await asyncio.wait_for(
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
return fut.result()
File "/home/julia/distributed/distributed/comm/tcp.py", line 449, in connect
stream = await self.client.connect(
File "/home/julia/conda/envs/dask-dev/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
addrinfo = await self.resolver.resolve(host, port, af)
File "/home/julia/distributed/distributed/comm/tcp.py", line 434, in resolve
for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 856, in getaddrinfo
return await self.run_in_executor(
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 814, in run_in_executor
executor.submit(func, *args), loop=self)
File "/home/julia/conda/envs/dask-dev/lib/python3.9/concurrent/futures/thread.py", line 167, in submit
raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/julia/conda/envs/dask-dev/lib/python3.9/logging/__init__.py", line 1086, in emit
stream.write(msg + self.terminator)
ValueError: I/O operation on closed file.
Call stack:
File "/home/julia/conda/envs/dask-dev/lib/python3.9/threading.py", line 930, in _bootstrap
self._bootstrap_inner()
File "/home/julia/conda/envs/dask-dev/lib/python3.9/threading.py", line 973, in _bootstrap_inner
self.run()
File "/home/julia/conda/envs/dask-dev/lib/python3.9/threading.py", line 910, in run
self._target(*self._args, **self._kwargs)
File "/home/julia/distributed/distributed/utils.py", line 485, in run_loop
loop.start()
File "/home/julia/conda/envs/dask-dev/lib/python3.9/site-packages/tornado/platform/asyncio.py", line 199, in start
self.asyncio_loop.run_forever()
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 596, in run_forever
self._run_once()
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 1890, in _run_once
handle._run()
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/events.py", line 80, in _run
self._context.run(self._callback, *self._args)
File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
return await func(*args, **kwargs)
File "/home/julia/distributed/distributed/client.py", line 1400, in _handle_report
await self._reconnect()
File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
return await func(*args, **kwargs)
File "/home/julia/distributed/distributed/utils.py", line 804, in __exit__
logger.exception(exc_value)
Message: RuntimeError('cannot schedule new futures after shutdown')
Arguments: ()
cannot schedule new futures after shutdown
Traceback (most recent call last):
File "/home/julia/distributed/distributed/comm/tcp.py", line 223, in read
frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/julia/distributed/distributed/client.py", line 1392, in _handle_report
msgs = await self.scheduler_comm.comm.read()
File "/home/julia/distributed/distributed/comm/tcp.py", line 239, in read
convert_stream_closed_error(self, e)
File "/home/julia/distributed/distributed/comm/tcp.py", line 144, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://127.0.0.1:43050 remote=tcp://127.0.0.1:44163>: Stream is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
return await func(*args, **kwargs)
File "/home/julia/distributed/distributed/client.py", line 1400, in _handle_report
await self._reconnect()
File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
return await func(*args, **kwargs)
File "/home/julia/distributed/distributed/client.py", line 1211, in _reconnect
await self._ensure_connected(timeout=timeout)
File "/home/julia/distributed/distributed/client.py", line 1241, in _ensure_connected
comm = await connect(
File "/home/julia/distributed/distributed/comm/core.py", line 291, in connect
comm = await asyncio.wait_for(
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
return fut.result()
File "/home/julia/distributed/distributed/comm/tcp.py", line 449, in connect
stream = await self.client.connect(
File "/home/julia/conda/envs/dask-dev/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
addrinfo = await self.resolver.resolve(host, port, af)
File "/home/julia/distributed/distributed/comm/tcp.py", line 434, in resolve
for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 856, in getaddrinfo
return await self.run_in_executor(
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 814, in run_in_executor
executor.submit(func, *args), loop=self)
File "/home/julia/conda/envs/dask-dev/lib/python3.9/concurrent/futures/thread.py", line 167, in submit
raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
--- Logging error ---
Traceback (most recent call last):
File "/home/julia/distributed/distributed/comm/tcp.py", line 223, in read
frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/julia/distributed/distributed/client.py", line 1392, in _handle_report
msgs = await self.scheduler_comm.comm.read()
File "/home/julia/distributed/distributed/comm/tcp.py", line 239, in read
convert_stream_closed_error(self, e)
File "/home/julia/distributed/distributed/comm/tcp.py", line 144, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://127.0.0.1:43050 remote=tcp://127.0.0.1:44163>: Stream is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/julia/distributed/distributed/client.py", line 1521, in _close
await asyncio.wait_for(asyncio.shield(handle_report_task), 0.1)
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
return fut.result()
File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
return await func(*args, **kwargs)
File "/home/julia/distributed/distributed/client.py", line 1400, in _handle_report
await self._reconnect()
File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
return await func(*args, **kwargs)
File "/home/julia/distributed/distributed/client.py", line 1211, in _reconnect
await self._ensure_connected(timeout=timeout)
File "/home/julia/distributed/distributed/client.py", line 1241, in _ensure_connected
comm = await connect(
File "/home/julia/distributed/distributed/comm/core.py", line 291, in connect
comm = await asyncio.wait_for(
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
return fut.result()
File "/home/julia/distributed/distributed/comm/tcp.py", line 449, in connect
stream = await self.client.connect(
File "/home/julia/conda/envs/dask-dev/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
addrinfo = await self.resolver.resolve(host, port, af)
File "/home/julia/distributed/distributed/comm/tcp.py", line 434, in resolve
for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 856, in getaddrinfo
return await self.run_in_executor(
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 814, in run_in_executor
executor.submit(func, *args), loop=self)
File "/home/julia/conda/envs/dask-dev/lib/python3.9/concurrent/futures/thread.py", line 167, in submit
raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/julia/conda/envs/dask-dev/lib/python3.9/logging/__init__.py", line 1086, in emit
stream.write(msg + self.terminator)
ValueError: I/O operation on closed file.
Call stack:
File "/home/julia/conda/envs/dask-dev/lib/python3.9/threading.py", line 930, in _bootstrap
self._bootstrap_inner()
File "/home/julia/conda/envs/dask-dev/lib/python3.9/threading.py", line 973, in _bootstrap_inner
self.run()
File "/home/julia/conda/envs/dask-dev/lib/python3.9/threading.py", line 910, in run
self._target(*self._args, **self._kwargs)
File "/home/julia/distributed/distributed/utils.py", line 485, in run_loop
loop.start()
File "/home/julia/conda/envs/dask-dev/lib/python3.9/site-packages/tornado/platform/asyncio.py", line 199, in start
self.asyncio_loop.run_forever()
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 596, in run_forever
self._run_once()
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 1890, in _run_once
handle._run()
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/events.py", line 80, in _run
self._context.run(self._callback, *self._args)
File "/home/julia/distributed/distributed/client.py", line 1554, in _close
self.scheduler = None
File "/home/julia/distributed/distributed/utils.py", line 804, in __exit__
logger.exception(exc_value)
Message: RuntimeError('cannot schedule new futures after shutdown')
Arguments: ()
--- Logging error ---
Traceback (most recent call last):
File "/home/julia/distributed/distributed/comm/tcp.py", line 223, in read
frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/julia/distributed/distributed/client.py", line 1392, in _handle_report
msgs = await self.scheduler_comm.comm.read()
File "/home/julia/distributed/distributed/comm/tcp.py", line 239, in read
convert_stream_closed_error(self, e)
File "/home/julia/distributed/distributed/comm/tcp.py", line 144, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://127.0.0.1:43046 remote=tcp://127.0.0.1:44163>: Stream is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
return await func(*args, **kwargs)
File "/home/julia/distributed/distributed/client.py", line 1211, in _reconnect
await self._ensure_connected(timeout=timeout)
File "/home/julia/distributed/distributed/client.py", line 1241, in _ensure_connected
comm = await connect(
File "/home/julia/distributed/distributed/comm/core.py", line 291, in connect
comm = await asyncio.wait_for(
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
return fut.result()
File "/home/julia/distributed/distributed/comm/tcp.py", line 449, in connect
stream = await self.client.connect(
File "/home/julia/conda/envs/dask-dev/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
addrinfo = await self.resolver.resolve(host, port, af)
File "/home/julia/distributed/distributed/comm/tcp.py", line 434, in resolve
for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 856, in getaddrinfo
return await self.run_in_executor(
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 814, in run_in_executor
executor.submit(func, *args), loop=self)
File "/home/julia/conda/envs/dask-dev/lib/python3.9/concurrent/futures/thread.py", line 167, in submit
raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/julia/conda/envs/dask-dev/lib/python3.9/logging/__init__.py", line 1086, in emit
stream.write(msg + self.terminator)
ValueError: I/O operation on closed file.
Call stack:
File "/home/julia/conda/envs/dask-dev/lib/python3.9/threading.py", line 930, in _bootstrap
self._bootstrap_inner()
File "/home/julia/conda/envs/dask-dev/lib/python3.9/threading.py", line 973, in _bootstrap_inner
self.run()
File "/home/julia/conda/envs/dask-dev/lib/python3.9/threading.py", line 910, in run
self._target(*self._args, **self._kwargs)
File "/home/julia/distributed/distributed/utils.py", line 485, in run_loop
loop.start()
File "/home/julia/conda/envs/dask-dev/lib/python3.9/site-packages/tornado/platform/asyncio.py", line 199, in start
self.asyncio_loop.run_forever()
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 596, in run_forever
self._run_once()
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 1890, in _run_once
handle._run()
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/events.py", line 80, in _run
self._context.run(self._callback, *self._args)
File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
return await func(*args, **kwargs)
File "/home/julia/distributed/distributed/client.py", line 1400, in _handle_report
await self._reconnect()
File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
return await func(*args, **kwargs)
File "/home/julia/distributed/distributed/utils.py", line 804, in __exit__
logger.exception(exc_value)
Message: RuntimeError('cannot schedule new futures after shutdown')
Arguments: ()
cannot schedule new futures after shutdown
Traceback (most recent call last):
File "/home/julia/distributed/distributed/comm/tcp.py", line 223, in read
frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/julia/distributed/distributed/client.py", line 1392, in _handle_report
msgs = await self.scheduler_comm.comm.read()
File "/home/julia/distributed/distributed/comm/tcp.py", line 239, in read
convert_stream_closed_error(self, e)
File "/home/julia/distributed/distributed/comm/tcp.py", line 144, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://127.0.0.1:43046 remote=tcp://127.0.0.1:44163>: Stream is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
return await func(*args, **kwargs)
File "/home/julia/distributed/distributed/client.py", line 1400, in _handle_report
await self._reconnect()
File "/home/julia/distributed/distributed/utils.py", line 778, in wrapper
return await func(*args, **kwargs)
File "/home/julia/distributed/distributed/client.py", line 1211, in _reconnect
await self._ensure_connected(timeout=timeout)
File "/home/julia/distributed/distributed/client.py", line 1241, in _ensure_connected
comm = await connect(
File "/home/julia/distributed/distributed/comm/core.py", line 291, in connect
comm = await asyncio.wait_for(
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
return fut.result()
File "/home/julia/distributed/distributed/comm/tcp.py", line 449, in connect
stream = await self.client.connect(
File "/home/julia/conda/envs/dask-dev/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
addrinfo = await self.resolver.resolve(host, port, af)
File "/home/julia/distributed/distributed/comm/tcp.py", line 434, in resolve
for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 856, in getaddrinfo
return await self.run_in_executor(
File "/home/julia/conda/envs/dask-dev/lib/python3.9/asyncio/base_events.py", line 814, in run_in_executor
executor.submit(func, *args), loop=self)
File "/home/julia/conda/envs/dask-dev/lib/python3.9/concurrent/futures/thread.py", line 167, in submit
raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown |
I feel like I saw some discussion around this recently, @fjetter, @gjoseph92, @hendrikmakait was |
@graingert hadn't you fixed that a couple months ago? ^^ |
Ah no this is probably a task getting left over after the event loop is closed because we're not using asyncio.run in synchronous local clusters? |
#6680 might fix it? |
Ah ok my understanding is that this PR isn't doing anything wrong. So is it ok to merge @graingert and @gjoseph92? |
I think this is good to merge. Opened issues to track: |
Thanks for confirming @graingert @gjoseph92 |
Closes #6732
pre-commit run --all-files
I am not quite sure why this doesn't work, but I figured I'd just put up the work I've done so far. Anyone is welcome to take this PR over and push to the branch.