From 9b78992e3f4897a3d63a18ecb37020509b8c7ecb Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 13 May 2022 16:04:01 +0200 Subject: [PATCH 1/2] Assert that no weak references remain and fix gen_cluster leak --- distributed/utils_test.py | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 88f3c4e82b..bb1cb70d40 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -56,6 +56,7 @@ from distributed.nanny import Nanny from distributed.node import ServerNode from distributed.proctitle import enable_proctitle_on_children +from distributed.profile import wait_profiler from distributed.protocol import deserialize from distributed.security import Security from distributed.utils import ( @@ -1003,10 +1004,10 @@ def _(func): @functools.wraps(func) def test_func(*outer_args, **kwargs): result = None - workers = [] with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop: async def coro(): + workers = [] with dask.config.set(config): s = False for _ in range(60): @@ -1131,22 +1132,20 @@ def get_unclosed(): finally: Comm._instances.clear() _global_clients.clear() - + for w in workers: + if getattr(w, "data", None): + try: + w.data.clear() + except OSError: + # zict backends can fail if their storage directory + # was already removed + pass return result result = loop.run_sync( coro, timeout=timeout * 2 if timeout else timeout ) - for w in workers: - if getattr(w, "data", None): - try: - w.data.clear() - except OSError: - # zict backends can fail if their storage directory - # was already removed - pass - return result # Patch the signature so pytest can inject fixtures @@ -1776,6 +1775,10 @@ def check_instances(): _global_clients.clear() + wait_profiler() + gc.collect() + assert not Nanny._instances + for w in Worker._instances: with suppress(RuntimeError): # closed IOLoop w.loop.add_callback(w.close, report=False, executor_wait=False) @@ -1799,11 +1802,6 @@ def check_instances(): Comm._instances.clear() raise ValueError("Unclosed Comms", L) - assert all( - n.status in {Status.closed, Status.init, Status.failed} - for n in Nanny._instances - ), {n: n.status for n in Nanny._instances} - # assert not list(SpecCluster._instances) # TODO assert all(c.status == Status.closed for c in SpecCluster._instances), list( SpecCluster._instances From 86714500009467636348dc5c9f25b9894068f7cb Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 13 May 2022 16:10:30 +0200 Subject: [PATCH 2/2] Cleanup --- distributed/utils_test.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index bb1cb70d40..7ac9f17f21 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1773,8 +1773,6 @@ def check_instances(): sleep(0.1) assert time() < start + 10 - _global_clients.clear() - wait_profiler() gc.collect() assert not Nanny._instances @@ -1808,7 +1806,6 @@ def check_instances(): ) SpecCluster._instances.clear() - Nanny._instances.clear() DequeHandler.clear_all_instances()