Skip to content

Commit

Permalink
Fix dask#6869
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Aug 11, 2022
1 parent 5f579e3 commit 86382fe
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 11 deletions.
51 changes: 47 additions & 4 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
ComputeTaskEvent,
ExecuteFailureEvent,
ExecuteSuccessEvent,
FreeKeysEvent,
RemoveReplicasEvent,
SerializedTask,
StealRequestEvent,
Expand Down Expand Up @@ -3505,10 +3506,13 @@ def teardown(self, worker):
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_execute_preamble_abort_retirement(c, s, a):
"""Test race condition in the preamble of Worker.execute(), which used to cause a
task to remain permanently in running state in case of very tight timings when
task to remain permanently in executing state in case of very tight timings when
exiting the closing_gracefully status.
See https://github.com/dask/distributed/issues/6867
See also
--------
https://github.com/dask/distributed/issues/6867
test_execute_preamble_early_resume
"""
x = c.submit(inc, 1, key="x")
await wait_for_state("x", "executing", a, interval=0)
Expand All @@ -3525,10 +3529,49 @@ async def test_execute_preamble_abort_retirement(c, s, a):

# Run Worker.execute
await asyncio.sleep(0)
# The Worker.execute should have detected the closing_gracefully status and
# performed an early exit.
# Worker.execute should have detected the closing_gracefully status and performed an
# early exit.

a.handle_worker_status_change("running", stimulus_id="test")

# Test that x does not get stuck.
assert await x == 2


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_execute_preamble_early_resume(c, s, a):
"""Test race condition in the preamble of Worker.execute(), which used to cause a
task to remain permanently in resumed state in case of very tight timings when
resuming a task.
See also
--------
https://github.com/dask/distributed/issues/6869
test_execute_preamble_abort_retirement
"""
x = c.submit(inc, 1, key="x")
await wait_for_state("x", "executing", a, interval=0)
# The Worker.execute asyncio task has just been created and is scheduled to
# first run somewhere after this test function in the event loop

# Simulate a cancellation request from the scheduler, which is almost immediately
# reversed. As it would be exceedingly hard to get the right timings with the real
# thing, we're instead tampering directly with the Worker endpoint.
a.handle_stimulus(FreeKeysEvent(keys=["x"], stimulus_id="free"))
assert a.state.tasks["x"].state == "cancelled"

# Run Worker.execute
await asyncio.sleep(0)
# Worker.execute should have detected the cancelled state and
# performed an early exit.

# The exit callback of Worker.execute hasn't been run yet
assert a.state.tasks["x"].state == "cancelled"

# Resume the task. No need for run_spec etc, as the WorkerState will detect that
# the previous asyncio task is still running and it will resume it.
a.handle_stimulus(ComputeTaskEvent.dummy(key="x", stimulus_id="resume"))
assert a.state.tasks["x"].state == "executing"

# Test that x does not get stuck.
assert await x == 2
10 changes: 3 additions & 7 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2153,15 +2153,11 @@ async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent:
"""
if self.status not in WORKER_ANY_RUNNING:
return RescheduleEvent(key=key, stimulus_id=f"worker-closing-{time()}")
# The key *must* be in the state
ts = self.state.tasks[key]

# The key *must* be in the worker state
ts = self.state.tasks[key]
if ts.state == "cancelled":
logger.debug(
"Trying to execute task %s which is not in executing state anymore",
ts,
)
return AlreadyCancelledEvent(key=ts.key, stimulus_id=stimulus_id)
return RescheduleEvent(key=key, stimulus_id=f"already-cancelled-{time()}")

try:
function, args, kwargs = await self._maybe_deserialize_task(ts)
Expand Down

0 comments on commit 86382fe

Please sign in to comment.