-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
use asyncio.run to run gen_cluster, gen_test and cluster #6231
use asyncio.run to run gen_cluster, gen_test and cluster #6231
Conversation
Unit Test Results 15 files ±0 15 suites ±0 6h 41m 14s ⏱️ + 20m 25s For more details on these failures, see this check. Results for commit 86cef41. ± Comparison against base commit 046ab17. ♻️ This comment has been updated with latest results. |
ad01bd9
to
9b89f8a
Compare
b159135
to
b3d72d8
Compare
b817c5d
to
a84e1b5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CI is very sad
@clean(**clean_kwargs) | ||
def test_func(*outer_args, **kwargs): | ||
result = None | ||
with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop: | ||
|
||
async def coro(): | ||
with tempfile.TemporaryDirectory() as tmpdir: | ||
config2 = merge({"temporary-directory": tmpdir}, config) | ||
with dask.config.set(config2): | ||
workers = [] | ||
s = False | ||
|
||
for _ in range(60): | ||
try: | ||
s, ws = await start_cluster( | ||
nthreads, | ||
scheduler, | ||
loop, | ||
security=security, | ||
Worker=Worker, | ||
scheduler_kwargs=scheduler_kwargs, | ||
worker_kwargs=worker_kwargs, | ||
) | ||
except Exception as e: | ||
logger.error( | ||
"Failed to start gen_cluster: " | ||
f"{e.__class__.__name__}: {e}; retrying", | ||
exc_info=True, | ||
) | ||
await asyncio.sleep(1) | ||
else: | ||
workers[:] = ws | ||
args = [s] + workers | ||
break | ||
if s is False: | ||
raise Exception("Could not start cluster") | ||
if client: | ||
c = await Client( | ||
s.address, | ||
loop=loop, | ||
async def async_fn(): | ||
result = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume the entire block that follows is just an indentation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's actually a few changes, eg I don't pass loop
to await start_cluster(
and await Client(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make review easier, you could split the indentation vs other changes up into two commits.
OR you leave comments pointing reviewers to the code you actually modified (if it's just about the loop, that's ok)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep it's just the avoiding passing a loop kwarg, moving clean to a decorator, and using asyncio.run
with asyncio.wait_for
instead of run_sync
1f1853b
to
77eae29
Compare
CI should be happier now! |
77eae29
to
cabab46
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm OK with the changes proposed. If test failures turn out to be unrelated I'm good to go.
I left a few review comments where I think more inline commentary would help but this is not a requirement.
distributed/utils_test.py
Outdated
loop_started = concurrent.futures.Future() | ||
with concurrent.futures.ThreadPoolExecutor( | ||
1, thread_name_prefix="test IOLoop" | ||
) as tpe: | ||
|
||
async def run(): | ||
io_loop = IOLoop.current() | ||
stop_event = asyncio.Event() | ||
loop_started.set_result((io_loop, stop_event)) | ||
await stop_event.wait() | ||
|
||
ran = tpe.submit(_run_and_close_tornado, run) | ||
for f in concurrent.futures.as_completed((loop_started, ran)): | ||
if f is loop_started: | ||
io_loop, stop_event = loop_started.result() | ||
try: | ||
yield io_loop | ||
finally: | ||
io_loop.add_callback(stop_event.set) | ||
|
||
elif f is ran: | ||
ran.result() | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more complicated than before but also much more robust.
I think it makes sense but a comment to justify this complexity would be nice. Very naively I would've implemented something more similar to the previous version than this
|
||
@contextmanager | ||
def clean(threads=True, instances=True, processes=True): | ||
asyncio.set_event_loop(None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What exactly is this doing? The documentation doesn't state the behavior if None is passed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this causes asyncio.get_event_loop()
to fail, which causes IOLoop.current()
to fail. It's the equivalent of running asyncio.run(noop())
:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the side effect of calling IOLoop.clear_current()
, where asyncio.get_event_loop()
raises a RuntimeError:
try:
self.old_asyncio = asyncio.get_event_loop()
except (RuntimeError, AssertionError):
self.old_asyncio = None # type: ignore # get_event_loop() failed so `_clear_current_hook` calls asyncio.set_event_loop(None)`
self.is_current = True
asyncio.set_event_loop(self.asyncio_loop)
def _clear_current_hook(self) -> None:
if self.is_current:
asyncio.set_event_loop(self.old_asyncio)
@clean(**clean_kwargs) | ||
def test_func(*outer_args, **kwargs): | ||
result = None | ||
with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop: | ||
|
||
async def coro(): | ||
with tempfile.TemporaryDirectory() as tmpdir: | ||
config2 = merge({"temporary-directory": tmpdir}, config) | ||
with dask.config.set(config2): | ||
workers = [] | ||
s = False | ||
|
||
for _ in range(60): | ||
try: | ||
s, ws = await start_cluster( | ||
nthreads, | ||
scheduler, | ||
loop, | ||
security=security, | ||
Worker=Worker, | ||
scheduler_kwargs=scheduler_kwargs, | ||
worker_kwargs=worker_kwargs, | ||
) | ||
except Exception as e: | ||
logger.error( | ||
"Failed to start gen_cluster: " | ||
f"{e.__class__.__name__}: {e}; retrying", | ||
exc_info=True, | ||
) | ||
await asyncio.sleep(1) | ||
else: | ||
workers[:] = ws | ||
args = [s] + workers | ||
break | ||
if s is False: | ||
raise Exception("Could not start cluster") | ||
if client: | ||
c = await Client( | ||
s.address, | ||
loop=loop, | ||
async def async_fn(): | ||
result = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make review easier, you could split the indentation vs other changes up into two commits.
OR you leave comments pointing reviewers to the code you actually modified (if it's just about the loop, that's ok)
rather than making a pristine loop this forces functions defended by clean to make their own pristine loop
0ed4344
to
86cef41
Compare
looks like the test suite ending with exit code 1 and all tests passing is already happening: https://github.com/dask/distributed/runs/6604868888?check_suite_focus=true#step:11:1852 (from #6451) |
I'm curious about this. Is there some useful signal that we can give to future developers about why a test is flaky in this way that would help them? I ask this question without having thought deeply about it. |
Or, more broadly, do you have thoughts on how to address the increase in flaky tests that we're likely to see? I like that we're flushing these issues out into the open, but I'd also like to get some ideas on how we can address them. Is there any systemic approach that you see? |
This is mostly about edge cases involving implicitly closed rpc objects and implicitly closed tasks. Implicitly closed tasks are now explicitly cancelled and so await calls will start raising CancelledError during the test rather than GeneratorExit when the task is garbage collected. In addition some |
With changes from dask/distributed#6231 , the `loop` fixture now depends on the `cleanup` fixture, which must be imported explicitly. Authors: - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) - Mads R. B. Kristensen (https://github.com/madsbk) URL: #924
pre-commit run --all-files