diff --git a/distributed/worker.py b/distributed/worker.py index 2ad5d9c53a5..c306c5af53b 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -129,6 +129,8 @@ RequestRefreshWhoHasMsg, RescheduleEvent, RescheduleMsg, + RetryBusyWorkerEvent, + RetryBusyWorkerLater, SendMessageToScheduler, SerializedTask, StateMachineEvent, @@ -2856,6 +2858,7 @@ def transitions(self, recommendations: Recs, *, stimulus_id: str) -> None: else: self._handle_instructions(instructions) + @fail_hard @log_errors def handle_stimulus(self, stim: StateMachineEvent) -> None: if not isinstance(stim, FindMissingEvent): @@ -2921,6 +2924,12 @@ def _handle_instructions(self, instructions: Instructions) -> None: name=f"execute({inst.key})", ) + elif isinstance(inst, RetryBusyWorkerLater): + task = asyncio.create_task( + self.retry_busy_worker_later(inst.worker), + name=f"retry_busy_worker_later({inst.worker})", + ) + else: raise TypeError(inst) # pragma: nocover @@ -3353,7 +3362,9 @@ def done_event(): # Avoid hammering the worker. If there are multiple replicas # available, immediately try fetching from a different worker. self.busy_workers.add(worker) - self.io_loop.call_later(0.15, self._readd_busy_worker, worker) + instructions.append( + RetryBusyWorkerLater(worker=worker, stimulus_id=stimulus_id) + ) refresh_who_has = [] @@ -3392,11 +3403,10 @@ def done_event(): self.transitions(recommendations, stimulus_id=stimulus_id) self._handle_instructions(instructions) - @log_errors - def _readd_busy_worker(self, worker: str) -> None: - self.busy_workers.remove(worker) - self.handle_stimulus( - GatherDepDoneEvent(stimulus_id=f"readd-busy-worker-{time()}") + async def retry_busy_worker_later(self, worker: str) -> StateMachineEvent | None: + await asyncio.sleep(0.15) + return RetryBusyWorkerEvent( + worker=worker, stimulus_id=f"retry-busy-worker-{time()}" ) @log_errors @@ -3884,6 +3894,11 @@ def _(self, ev: GatherDepDoneEvent) -> RecsInstrs: """Temporary hack - to be removed""" return self._ensure_communicating(stimulus_id=ev.stimulus_id) + @handle_event.register + def _(self, ev: RetryBusyWorkerEvent) -> RecsInstrs: + self.busy_workers.discard(ev.worker) + return self._ensure_communicating(stimulus_id=ev.stimulus_id) + @handle_event.register def _(self, ev: CancelComputeEvent) -> RecsInstrs: """Scheduler requested to cancel a task""" diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 2f311cdb482..c672211aa86 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -280,6 +280,12 @@ class Execute(Instruction): key: str +@dataclass +class RetryBusyWorkerLater(Instruction): + __slots__ = ("worker",) + worker: str + + @dataclass class EnsureCommunicatingAfterTransitions(Instruction): __slots__ = () @@ -464,6 +470,12 @@ class UnpauseEvent(StateMachineEvent): __slots__ = () +@dataclass +class RetryBusyWorkerEvent(StateMachineEvent): + __slots__ = ("worker",) + worker: str + + @dataclass class GatherDepDoneEvent(StateMachineEvent): """Temporary hack - to be removed"""