Skip to content

Commit

Permalink
Merge branch 'AMM/test_close_gracefully' into AMM/staging
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Jan 24, 2022
2 parents ee68b02 + b0b8e95 commit 5835ce3
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1635,7 +1635,7 @@ async def test_close_gracefully(c, s, a, b):
# be replicated by retire_workers().
while True:
mem = {k for k, ts in s.tasks.items() if ts.state == "memory"}
if len(mem) >= 8:
if len(mem) >= 8 and any(ts.state == "executing" for ts in b.tasks.values()):
break
await asyncio.sleep(0.01)

Expand Down Expand Up @@ -1663,16 +1663,22 @@ async def test_close_gracefully(c, s, a, b):
@pytest.mark.slow
@gen_cluster(client=True, nthreads=[("", 1)], timeout=10)
async def test_lifetime(c, s, a):
# Note: test was occasionally failing with lifetime="1 seconds"
async with Worker(s.address, lifetime="2 seconds") as b:
futures = c.map(slowinc, range(200), delay=0.1, workers=[b.address])
await asyncio.sleep(1)
assert not a.data

# Note: keys will appear in b.data several milliseconds before they switch to
# status=memory in s.tasks. It's important to sample the in-memory keys from the
# scheduler side, because those that the scheduler thinks are still processing
# won't be replicated by retire_workers().
mem = {k for k, ts in s.tasks.items() if ts.state == "memory"}
assert mem
while True:
mem = {k for k, ts in s.tasks.items() if ts.state == "memory"}
if len(mem) >= 8:
break
await asyncio.sleep(0.01)

assert b.status == Status.running
assert not a.data

while b.status != Status.closed:
await asyncio.sleep(0.01)
Expand Down

0 comments on commit 5835ce3

Please sign in to comment.