From ffd3a1bcc73a13b175e0e02590a433b74cf80b73 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 10 Aug 2023 19:53:13 +0200 Subject: [PATCH] Fix race --- distributed/scheduler.py | 2 +- distributed/shuffle/tests/test_shuffle.py | 10 +++------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 1e64b326051..8eba59afca0 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4804,7 +4804,7 @@ def stimulus_task_finished(self, key, worker, stimulus_id, run_id, **kwargs): ws: WorkerState = self.workers[worker] ts: TaskState = self.tasks.get(key) - if ts is None or ts.state in ("released", "queued"): + if ts is None or ts.state in ("released", "queued", "no-worker"): logger.debug( "Received already computed task, worker: %s, state: %s" ", key: %s, who_has: %s", diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index 3bbaa3c53f5..b5540892a0d 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -448,19 +448,15 @@ async def test_restarting_does_not_deadlock(c, s): "shuffle-transfer", b.worker_address, 1, s ) a.status = Status.paused - while len(s.running) > 1: - await asyncio.sleep(0.01) + await async_poll_for(lambda: len(s.running) == 1, timeout=5) b.close_gracefully() await b.process.process.kill() - while s.running: - await asyncio.sleep(0.01) + await async_poll_for(lambda: not s.running, timeout=5) a.status = Status.running - while not s.running: - await asyncio.sleep(0.01) - pass + await async_poll_for(lambda: s.running, timeout=5) await fut