Skip to content

Commit

Permalink
Test abort retirement
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Aug 11, 2022
1 parent 99a2db1 commit 5f579e3
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 12 deletions.
32 changes: 32 additions & 0 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3500,3 +3500,35 @@ def teardown(self, worker):
async with Worker(s.address) as worker:
assert await c.submit(inc, 1) == 2
assert worker.plugins[InitWorkerNewThread.name].setup_status is Status.running


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_execute_preamble_abort_retirement(c, s, a):
"""Test race condition in the preamble of Worker.execute(), which used to cause a
task to remain permanently in running state in case of very tight timings when
exiting the closing_gracefully status.
See https://github.com/dask/distributed/issues/6867
"""
x = c.submit(inc, 1, key="x")
await wait_for_state("x", "executing", a, interval=0)
# The Worker.execute asyncio task has just been created and is scheduled to
# first run somewhere after this test function in the event loop

# Simulate a request from Scheduler.retire_workers to switch the state to
# closing_gracefully to let any in-memory task to be copied elsewhere. As it would
# be exceedingly hard to get the right timings with the real thing, as the
# worker-status-change op would need to arrive from the scheduler exactly after the
# compute-task op, within the same loop (e.g. within the same batched-send message),
# we're instead tampering directly with the Worker endpoint.
a.handle_worker_status_change("closing_gracefully", stimulus_id="test")

# Run Worker.execute
await asyncio.sleep(0)
# The Worker.execute should have detected the closing_gracefully status and
# performed an early exit.

a.handle_worker_status_change("running", stimulus_id="test")

# Test that x does not get stuck.
assert await x == 2
25 changes: 16 additions & 9 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1968,15 +1968,22 @@ async def gather_dep(
total_nbytes: int,
*,
stimulus_id: str,
) -> StateMachineEvent | None:
) -> StateMachineEvent:
"""Implements BaseWorker abstract method
See also
--------
distributed.worker_state_machine.BaseWorker.gather_dep
"""
if self.status not in WORKER_ANY_RUNNING:
return None
# This is for the sake of coherence only;
# it should never actually reach the scheduler.
return GatherDepFailureEvent.from_exception(
RuntimeError("Worker is shutting down"),
worker=worker,
total_nbytes=total_nbytes,
stimulus_id=f"worker-closing-{time()}",
)

try:
self.state.log.append(
Expand Down Expand Up @@ -2044,7 +2051,7 @@ async def gather_dep(
stimulus_id=f"gather-dep-failure-{time()}",
)

async def retry_busy_worker_later(self, worker: str) -> StateMachineEvent | None:
async def retry_busy_worker_later(self, worker: str) -> StateMachineEvent:
"""Wait some time, then take a peer worker out of busy state.
Implements BaseWorker abstract method.
Expand Down Expand Up @@ -2137,18 +2144,18 @@ async def _maybe_deserialize_task(
return function, args, kwargs

@fail_hard
async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent | None:
async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent:
"""Execute a task. Implements BaseWorker abstract method.
See also
--------
distributed.worker_state_machine.BaseWorker.execute
"""
if self.status in {Status.closing, Status.closed, Status.closing_gracefully}:
return None
ts = self.state.tasks.get(key)
if not ts:
return None
if self.status not in WORKER_ANY_RUNNING:
return RescheduleEvent(key=key, stimulus_id=f"worker-closing-{time()}")
# The key *must* be in the state
ts = self.state.tasks[key]

if ts.state == "cancelled":
logger.debug(
"Trying to execute task %s which is not in executing state anymore",
Expand Down
6 changes: 3 additions & 3 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3432,7 +3432,7 @@ async def gather_dep(
total_nbytes: int,
*,
stimulus_id: str,
) -> StateMachineEvent | None:
) -> StateMachineEvent:
"""Gather dependencies for a task from a worker who has them
Parameters
Expand All @@ -3449,12 +3449,12 @@ async def gather_dep(
...

@abc.abstractmethod
async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent | None:
async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent:
"""Execute a task"""
...

@abc.abstractmethod
async def retry_busy_worker_later(self, worker: str) -> StateMachineEvent | None:
async def retry_busy_worker_later(self, worker: str) -> StateMachineEvent:
"""Wait some time, then take a peer worker out of busy state"""
...

Expand Down

0 comments on commit 5f579e3

Please sign in to comment.