Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring gahter_dep / Remove missing data message #6544

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 0 additions & 35 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3023,7 +3023,6 @@ def __init__(
"task-erred": self.handle_task_erred,
"release-worker-data": self.release_worker_data,
"add-keys": self.add_keys,
"missing-data": self.handle_missing_data,
"long-running": self.handle_long_running,
"reschedule": self.reschedule,
"keep-alive": lambda *args, **kwargs: None,
Expand Down Expand Up @@ -4667,40 +4666,6 @@ def handle_task_erred(self, key: str, stimulus_id: str, **msg) -> None:
self._transitions(recommendations, client_msgs, worker_msgs, stimulus_id)
self.send_all(client_msgs, worker_msgs)

def handle_missing_data(
self, key: str, worker: str, errant_worker: str, stimulus_id: str
) -> None:
"""Signal that `errant_worker` does not hold `key`.

This may either indicate that `errant_worker` is dead or that we may be working
with stale data and need to remove `key` from the workers `has_what`. If no
replica of a task is available anymore, the task is transitioned back to
released and rescheduled, if possible.

Parameters
----------
key : str
Task key that could not be found
worker : str
Address of the worker informing the scheduler
errant_worker : str
Address of the worker supposed to hold a replica
"""
logger.debug(f"handle missing data {key=} {worker=} {errant_worker=}")
self.log_event(errant_worker, {"action": "missing-data", "key": key})

ts = self.tasks.get(key)
ws = self.workers.get(errant_worker)
if not ts or not ws or ws not in ts.who_has:
return

self.remove_replica(ts, ws)
if ts.state == "memory" and not ts.who_has:
if ts.run_spec:
self.transitions({key: "released"}, stimulus_id)
else:
self.transitions({key: "forgotten"}, stimulus_id)

def release_worker_data(self, key: str, worker: str, stimulus_id: str) -> None:
ts = self.tasks.get(key)
ws = self.workers.get(worker)
Expand Down
13 changes: 0 additions & 13 deletions distributed/tests/test_cancelled_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,6 @@ def f(ev):
)


@gen_cluster(client=True)
async def test_worker_find_missing(c, s, a, b):
fut = c.submit(inc, 1, workers=[a.address])
await fut
# We do not want to use proper API since it would ensure that the cluster is
# informed properly
del a.data[fut.key]
del a.tasks[fut.key]

# Actually no worker has the data; the scheduler is supposed to reschedule
assert await c.submit(inc, fut, workers=[b.address]) == 3
Comment on lines -104 to -114
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is a bit nonsense. I don't think we should be allowed to simplify mess with the state and expect the system to recover



@gen_cluster(client=True)
async def test_worker_stream_died_during_comm(c, s, a, b):
write_queue = asyncio.Queue()
Expand Down
5 changes: 2 additions & 3 deletions distributed/tests/test_stories.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ async def test_worker_story_with_deps(c, s, a, b):
# Story now includes randomized stimulus_ids and timestamps.
story = b.story("res")
stimulus_ids = {ev[-2].rsplit("-", 1)[0] for ev in story}
assert stimulus_ids == {"compute-task", "task-finished"}

assert stimulus_ids == {"compute-task", "gather-dep-success", "task-finished"}
# This is a simple transition log
expected = [
("res", "compute-task", "released"),
Expand All @@ -153,7 +152,7 @@ async def test_worker_story_with_deps(c, s, a, b):

story = b.story("dep")
stimulus_ids = {ev[-2].rsplit("-", 1)[0] for ev in story}
assert stimulus_ids == {"compute-task"}
assert stimulus_ids == {"compute-task", "gather-dep-success"}
expected = [
("dep", "ensure-task-exists", "released"),
("dep", "released", "fetch", "fetch", {}),
Expand Down
1 change: 0 additions & 1 deletion distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2946,7 +2946,6 @@ async def test_who_has_consistent_remove_replicas(c, s, *workers):
coming_from.handle_stimulus(RemoveReplicasEvent(keys=[f1.key], stimulus_id="test"))
await f2

assert_story(a.story(f1.key), [(f1.key, "missing-dep")])
assert a.tasks[f1.key].suspicious_count == 0
assert s.tasks[f1.key].suspicious == 0

Expand Down
36 changes: 36 additions & 0 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,3 +647,39 @@ async def test_fetch_to_missing_on_refresh_who_has(c, s, w1, w2, w3):
assert w3.tasks["x"].state == "missing"
assert w3.tasks["y"].state == "flight"
assert w3.tasks["y"].who_has == {w2.address}


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_fetch_to_missing_on_network_failure(c, s, a):
"""
1. Two tasks, x and y, are respectively in flight and fetch state from the same
worker, which holds the only replica of both.
2. gather_dep for x returns GatherDepNetworkFailureEvent
3. The event empties has_what, x.who_has, and y.who_has; it recommends a transition
to missing for both x and y.
5. Before the recommendation can be implemented, the same event invokes
_ensure_communicating, which pops y from data_needed - but y has an empty
who_has, which is an exceptional situation.
6. The fetch->missing transition is executed, but y is no longer in data_needed -
another exceptional situation.
"""
block_get_data = asyncio.Event()

class BlockedBreakingWorker(Worker):
async def get_data(self, comm, *args, **kwargs):
await block_get_data.wait()
raise OSError("fake error")

async with BlockedBreakingWorker(s.address) as b:
x = c.submit(inc, 1, key="x", workers=[b.address])
y = c.submit(inc, 2, key="y", workers=[b.address])
await wait([x, y])
s.request_acquire_replicas(a.address, ["x"], stimulus_id="test_x")
await wait_for_state("x", "flight", a)
s.request_acquire_replicas(a.address, ["y"], stimulus_id="test_y")
await wait_for_state("y", "fetch", a)

block_get_data.set()

await wait_for_state("x", "missing", a)
# await wait_for_state("y", "missing", a)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

y is not reliably transitioned to missing since it is stuck in fetch, depending on how fast ther RefreshWhoHas comes in.
This could be restored if we transitioned to missing immediately but I pointed out already a couple of times that I don't think this is a requirement and by not doing this we have much simpler recommendation code

Loading