Skip to content

Commit

Permalink
Refactor busy workers reinsertion (#6379)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Jun 1, 2022
1 parent 6b6c0ed commit 62effdf
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 6 deletions.
27 changes: 21 additions & 6 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@
RequestRefreshWhoHasMsg,
RescheduleEvent,
RescheduleMsg,
RetryBusyWorkerEvent,
RetryBusyWorkerLater,
SendMessageToScheduler,
SerializedTask,
StateMachineEvent,
Expand Down Expand Up @@ -2856,6 +2858,7 @@ def transitions(self, recommendations: Recs, *, stimulus_id: str) -> None:
else:
self._handle_instructions(instructions)

@fail_hard
@log_errors
def handle_stimulus(self, stim: StateMachineEvent) -> None:
if not isinstance(stim, FindMissingEvent):
Expand Down Expand Up @@ -2921,6 +2924,12 @@ def _handle_instructions(self, instructions: Instructions) -> None:
name=f"execute({inst.key})",
)

elif isinstance(inst, RetryBusyWorkerLater):
task = asyncio.create_task(
self.retry_busy_worker_later(inst.worker),
name=f"retry_busy_worker_later({inst.worker})",
)

else:
raise TypeError(inst) # pragma: nocover

Expand Down Expand Up @@ -3353,7 +3362,9 @@ def done_event():
# Avoid hammering the worker. If there are multiple replicas
# available, immediately try fetching from a different worker.
self.busy_workers.add(worker)
self.io_loop.call_later(0.15, self._readd_busy_worker, worker)
instructions.append(
RetryBusyWorkerLater(worker=worker, stimulus_id=stimulus_id)
)

refresh_who_has = []

Expand Down Expand Up @@ -3392,11 +3403,10 @@ def done_event():
self.transitions(recommendations, stimulus_id=stimulus_id)
self._handle_instructions(instructions)

@log_errors
def _readd_busy_worker(self, worker: str) -> None:
self.busy_workers.remove(worker)
self.handle_stimulus(
GatherDepDoneEvent(stimulus_id=f"readd-busy-worker-{time()}")
async def retry_busy_worker_later(self, worker: str) -> StateMachineEvent | None:
await asyncio.sleep(0.15)
return RetryBusyWorkerEvent(
worker=worker, stimulus_id=f"retry-busy-worker-{time()}"
)

@log_errors
Expand Down Expand Up @@ -3884,6 +3894,11 @@ def _(self, ev: GatherDepDoneEvent) -> RecsInstrs:
"""Temporary hack - to be removed"""
return self._ensure_communicating(stimulus_id=ev.stimulus_id)

@handle_event.register
def _(self, ev: RetryBusyWorkerEvent) -> RecsInstrs:
self.busy_workers.discard(ev.worker)
return self._ensure_communicating(stimulus_id=ev.stimulus_id)

@handle_event.register
def _(self, ev: CancelComputeEvent) -> RecsInstrs:
"""Scheduler requested to cancel a task"""
Expand Down
12 changes: 12 additions & 0 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,12 @@ class Execute(Instruction):
key: str


@dataclass
class RetryBusyWorkerLater(Instruction):
__slots__ = ("worker",)
worker: str


@dataclass
class EnsureCommunicatingAfterTransitions(Instruction):
__slots__ = ()
Expand Down Expand Up @@ -464,6 +470,12 @@ class UnpauseEvent(StateMachineEvent):
__slots__ = ()


@dataclass
class RetryBusyWorkerEvent(StateMachineEvent):
__slots__ = ("worker",)
worker: str


@dataclass
class GatherDepDoneEvent(StateMachineEvent):
"""Temporary hack - to be removed"""
Expand Down

0 comments on commit 62effdf

Please sign in to comment.