-
-
Notifications
You must be signed in to change notification settings - Fork 727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
always bind the Scheduler to the IOLoop.current() - and deprecate the Scheduler(loop= kwarg #6443
Conversation
fc42dbd
to
e5a8ca3
Compare
Besides this currently being broken, this implies that users who actually want the scheduler to be using a different event loop will need to make sure that the scheduler is initialized while running in a coro/task on that loop, correct? IIUC, this is how stdlib asyncio also suggest for their objects (e.g. Lock) to be used. |
I'm fine with this in principle. cc'ing @jacobtomlinson to keep him informed. |
This seems reasonable to me. Thanks for keeping me up to date. |
this needs a fix from #6231 - converting back into draft for now |
Unit Test Results 15 files + 15 15 suites +15 6h 20m 40s ⏱️ + 6h 20m 40s For more details on these failures, see this check. Results for commit 7f5f8ff. ± Comparison against base commit f20f776. ♻️ This comment has been updated with latest results. |
this also applies to subclasses of the distributed/distributed/core.py Line 254 in 1346671
import asyncio
import concurrent.futures
import contextlib
import sys
from tornado.ioloop import IOLoop
from distributed import Worker
from distributed.utils import sync
def _run_and_close_tornado(async_fn, /, *args, **kwargs):
loop = None
async def coro():
nonlocal loop
loop = IOLoop.current()
return await async_fn(*args, **kwargs)
try:
asyncio.run(coro())
finally:
loop.close(close_fds=True)
@contextlib.contextmanager
def loop_in_thread():
loop_started = concurrent.futures.Future()
with concurrent.futures.ThreadPoolExecutor(
1, thread_name_prefix="test IOLoop"
) as tpe:
async def run():
io_loop = IOLoop.current()
stop_event = asyncio.Event()
loop_started.set_result((io_loop, stop_event))
await stop_event.wait()
# run asyncio.run in a thread and collect exceptions from *either*
# the loop failing to start, or failing to close
ran = tpe.submit(_run_and_close_tornado, run)
for f in concurrent.futures.as_completed((loop_started, ran)):
if f is loop_started:
io_loop, stop_event = loop_started.result()
try:
yield io_loop
finally:
io_loop.add_callback(stop_event.set)
elif f is ran:
# if this is the first iteration the loop failed to start
# if it's the second iteration the loop has finished or
# the loop failed to close and we need to raise the exception
ran.result()
return
def test_io_loop_in_thread():
async def with_worker_contention(worker):
async def with_worker():
async with worker:
pass
return await asyncio.gather(with_worker(), with_worker())
async def main(loop):
worker = Worker(f"tcp://127.0.0.1:1234", loop=loop)
sync(loop, with_worker_contention, worker=worker)
with loop_in_thread() as loop:
asyncio.run(main(loop))
if __name__ == "__main__":
sys.exit(test_io_loop_in_thread())
|
Merged in the other PR. I've marked as ready for review and restarted CI. |
This is in. Thanks @graingert |
passing anything other than IOLoop.current() is already unsupported: see dask#6443 (comment)
refs #6163
Passing anything other than
IOLoop.current()
will fail when you try to use the Scheduler anyway:self._lock will be bound to
IOLoop.current().asyncio_loop
akaasyncio.get_running_loop()
:distributed/distributed/scheduler.py
Line 2923 in fc42dbd
and start_http_server runs on the IOLoop.current() also:
distributed/distributed/scheduler.py
Line 2963 in fc42dbd
pre-commit run --all-files