Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove leaking reference to workers from gen_cluster #6337

Merged
merged 3 commits into from
May 13, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1003,14 +1003,15 @@ def _(func):
@functools.wraps(func)
def test_func(*outer_args, **kwargs):
result = None
workers = []
with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:

async def coro():
with tempfile.TemporaryDirectory() as tmpdir:
config2 = merge({"temporary-directory": tmpdir}, config)
with dask.config.set(config2):
workers = []
s = False

for _ in range(60):
try:
s, ws = await start_cluster(
Expand Down Expand Up @@ -1139,21 +1140,21 @@ def get_unclosed():
Comm._instances.clear()
_global_clients.clear()

for w in workers:
if getattr(w, "data", None):
try:
w.data.clear()
except OSError:
# zict backends can fail if their storage directory
# was already removed
pass

return result

result = loop.run_sync(
coro, timeout=timeout * 2 if timeout else timeout
)

for w in workers:
if getattr(w, "data", None):
try:
w.data.clear()
except OSError:
# zict backends can fail if their storage directory
# was already removed
pass

return result

# Patch the signature so pytest can inject fixtures
Expand Down