-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
run cluster widget periodic callbacks on the correct event loop #6444
run cluster widget periodic callbacks on the correct event loop #6444
Conversation
self.periodic_callbacks["cluster-repr"] = pc | ||
pc.start() | ||
|
||
self.loop.add_callback(install) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the PeriodicCallbacks in self.periodic_callbacks
need to be started and stopped on the same loop as self.loop
because they bind to IOLoop.current()
when PeriodicCallback.start
is called
When running a synchronous Cluster the loop will be running in a different thread to where cluster._ipython_widget_
is called from
Unit Test Results 15 files ±0 15 suites ±0 6h 17m 2s ⏱️ - 3m 47s For more details on these failures, see this check. Results for commit 7dc400f. ± Comparison against base commit 046ab17. ♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice -- thanks @graingert. Since the cluster widget is a high-visibility portion of the codebase, could you add a test for this?
EDIT: To be clear, I trust that the changes you've made are correct. I just want to avoid a future regression
2714ca2
to
7dc400f
Compare
processes=False, | ||
) as cluster: | ||
cluster._ipython_display_() | ||
assert cluster.sync(get_ioloop, cluster) is loop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here's the test failing on the main branch:
distributed/deploy/tests/test_local.py::test_ipywidgets_loop FAILED [100%]
======================================================================================================================================== FAILURES ========================================================================================================================================
__________________________________________________________________________________________________________________________________ test_ipywidgets_loop __________________________________________________________________________________________________________________________________
loop = <tornado.platform.asyncio.AsyncIOLoop object at 0x7ff32fa29900>
def test_ipywidgets_loop(loop):
"""
Previously cluster._ipython_display_ attached the PeriodicCallback to the
currently running loop, See https://github.com/dask/distributed/pull/6444
"""
ipywidgets = pytest.importorskip("ipywidgets")
async def get_ioloop(cluster):
return cluster.periodic_callbacks["cluster-repr"].io_loop
async def amain():
# running synchronous code in an async context to setup a
# IOLoop.current() that's different from cluster.loop
with LocalCluster(
n_workers=0,
silence_logs=False,
loop=loop,
dashboard_address=":0",
processes=False,
) as cluster:
cluster._ipython_display_()
assert cluster.sync(get_ioloop, cluster) is loop
box = cluster._cached_widget
assert isinstance(box, ipywidgets.Widget)
> asyncio.run(amain())
distributed/deploy/tests/test_local.py:609:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/runners.py:44: in run
return loop.run_until_complete(main)
../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/base_events.py:646: in run_until_complete
return future.result()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
async def amain():
# running synchronous code in an async context to setup a
# IOLoop.current() that's different from cluster.loop
with LocalCluster(
n_workers=0,
silence_logs=False,
loop=loop,
dashboard_address=":0",
processes=False,
) as cluster:
cluster._ipython_display_()
> assert cluster.sync(get_ioloop, cluster) is loop
E AssertionError: assert <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7ff2d943ee00> is <tornado.platform.asyncio.AsyncIOLoop object at 0x7ff32fa29900>
E + where <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7ff2d943ee00> = <bound method SyncMethodMixin.sync of LocalCluster(f8c7cda6, 'inproc://192.168.50.210/64690/1', workers=0, threads=0, memory=0 B)>(<function test_ipywidgets_loop.<locals>.get_ioloop at 0x7ff32f9df250>, LocalCluster(f8c7cda6, 'inproc://192.168.50.210/64690/1', workers=0, threads=0, memory=0 B))
E + where <bound method SyncMethodMixin.sync of LocalCluster(f8c7cda6, 'inproc://192.168.50.210/64690/1', workers=0, threads=0, memory=0 B)> = LocalCluster(f8c7cda6, 'inproc://192.168.50.210/64690/1', workers=0, threads=0, memory=0 B).sync
distributed/deploy/tests/test_local.py:605: AssertionError
Heads up @hendrikmakait , your recent test may intermittently fail _______________________ test_watch_requires_lock_to_run ________________________
def test_watch_requires_lock_to_run():
start = time()
def stop_lock():
return time() > start + 0.600
def stop_profile():
return time() > start + 0.500
def hold_lock(stop):
with lock:
while not stop():
sleep(0.1)
start_threads = threading.active_count()
# Hog the lock over the entire duration of watch
thread = threading.Thread(
target=hold_lock, name="Hold Lock", kwargs={"stop": stop_lock}
)
thread.daemon = True
thread.start()
log = watch(interval="10ms", cycle="50ms", stop=stop_profile)
start = time() # wait until thread starts up
while threading.active_count() < start_threads + 2:
assert time() < start + 2
sleep(0.01)
sleep(0.5)
> assert len(log) == 0
E AssertionError: assert 1 == 0
E + where 1 = len(deque([(1653559454.046232, {'children': {'<module>;/Users/runner/miniconda3/envs/dask-distributed/bin/pytest;4': {'children': {'console_main;/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/_pytest/config/__init__.py;180': {...}}, 'count': 1, 'description': {'filename': '/Users/runner/miniconda3/envs/dask-distributed/bin/pytest', 'line': 'sys.exit(console_main())\n', 'line_number': 11, 'name': '<module>'}, 'identifier': '<module>;/Users/runner/miniconda3/envs/dask-distributed/bin/pytest;4'}}, 'count': 1, 'description': {'filename': '', 'line': '', 'line_number': 0, 'name': ''}, 'identifier': 'root'})]))
distributed/tests/test_profile.py:242: AssertionError |
Thank you @graingert . This looks good to me. Merging. |
distributed/tests/test_profile.py fails occasionally (e.g., see #6444 (comment)). This test restructures the test to avoid timing-based flakes.
) distributed/tests/test_profile.py fails occasionally (e.g., see dask#6444 (comment)). This test restructures the test to avoid timing-based flakes.
Closes #xxxx
pre-commit run --all-files