From 059798a85fb75ab0b7f914da16ce6c5ce8dab1d6 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 10 Jun 2022 15:46:33 +0100 Subject: [PATCH] Remove missing-data message (#6546) --- distributed/scheduler.py | 35 ----------------------- distributed/tests/test_cancelled_state.py | 13 --------- distributed/tests/test_worker.py | 3 ++ distributed/worker.py | 8 ------ distributed/worker_state_machine.py | 9 ------ 5 files changed, 3 insertions(+), 65 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 57c35d4a8f0..9bb199e7b8b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3023,7 +3023,6 @@ def __init__( "task-erred": self.handle_task_erred, "release-worker-data": self.release_worker_data, "add-keys": self.add_keys, - "missing-data": self.handle_missing_data, "long-running": self.handle_long_running, "reschedule": self.reschedule, "keep-alive": lambda *args, **kwargs: None, @@ -4670,40 +4669,6 @@ def handle_task_erred(self, key: str, stimulus_id: str, **msg) -> None: self._transitions(recommendations, client_msgs, worker_msgs, stimulus_id) self.send_all(client_msgs, worker_msgs) - def handle_missing_data( - self, key: str, worker: str, errant_worker: str, stimulus_id: str - ) -> None: - """Signal that `errant_worker` does not hold `key`. - - This may either indicate that `errant_worker` is dead or that we may be working - with stale data and need to remove `key` from the workers `has_what`. If no - replica of a task is available anymore, the task is transitioned back to - released and rescheduled, if possible. - - Parameters - ---------- - key : str - Task key that could not be found - worker : str - Address of the worker informing the scheduler - errant_worker : str - Address of the worker supposed to hold a replica - """ - logger.debug(f"handle missing data {key=} {worker=} {errant_worker=}") - self.log_event(errant_worker, {"action": "missing-data", "key": key}) - - ts = self.tasks.get(key) - ws = self.workers.get(errant_worker) - if not ts or not ws or ws not in ts.who_has: - return - - self.remove_replica(ts, ws) - if ts.state == "memory" and not ts.who_has: - if ts.run_spec: - self.transitions({key: "released"}, stimulus_id) - else: - self.transitions({key: "forgotten"}, stimulus_id) - def release_worker_data(self, key: str, worker: str, stimulus_id: str) -> None: ts = self.tasks.get(key) ws = self.workers.get(worker) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 67fac061031..94de10ac950 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -101,19 +101,6 @@ def f(ev): ) -@gen_cluster(client=True) -async def test_worker_find_missing(c, s, a, b): - fut = c.submit(inc, 1, workers=[a.address]) - await fut - # We do not want to use proper API since it would ensure that the cluster is - # informed properly - del a.data[fut.key] - del a.tasks[fut.key] - - # Actually no worker has the data; the scheduler is supposed to reschedule - assert await c.submit(inc, fut, workers=[b.address]) == 3 - - @gen_cluster(client=True) async def test_worker_stream_died_during_comm(c, s, a, b): write_queue = asyncio.Queue() diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index e4c1cab17d4..11949abd7d2 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -3129,6 +3129,7 @@ async def test_gather_dep_cancelled_rescheduled(c, s): for in-flight state. The response parser, however, did not distinguish resulting in unwanted missing-data signals to the scheduler, causing potential rescheduling or data leaks. + (Note: missing-data was removed in #6445). If a cancelled key is rescheduled for fetching while gather_dep waits internally for get_data, the response parser would misclassify this key and @@ -3177,6 +3178,8 @@ async def test_gather_dep_do_not_handle_response_of_not_requested_tasks(c, s, a) for in-flight state. The response parser, however, did not distinguish resulting in unwanted missing-data signals to the scheduler, causing potential rescheduling or data leaks. + (Note: missing-data was removed in #6445). + This test may become obsolete if the implementation changes significantly. """ async with BlockedGatherDep(s.address) as b: diff --git a/distributed/worker.py b/distributed/worker.py index 7de78c56bb6..ca3dff93969 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -127,7 +127,6 @@ InvalidTaskState, InvalidTransition, LongRunningMsg, - MissingDataMsg, RecommendationsConflict, Recs, RecsInstrs, @@ -3390,13 +3389,6 @@ def done_event(): self.has_what[worker].discard(ts.key) self.data_needed_per_worker[worker].discard(ts) self.log.append((d, "missing-dep", stimulus_id, time())) - instructions.append( - MissingDataMsg( - key=d, - errant_worker=worker, - stimulus_id=stimulus_id, - ) - ) recommendations[ts] = "fetch" if refresh_who_has: diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index de8fcde0cb7..b63f7318437 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -389,15 +389,6 @@ class ReleaseWorkerDataMsg(SendMessageToScheduler): key: str -@dataclass -class MissingDataMsg(SendMessageToScheduler): - op = "missing-data" - - __slots__ = ("key", "errant_worker") - key: str - errant_worker: str - - # Not to be confused with RescheduleEvent below or the distributed.Reschedule Exception @dataclass class RescheduleMsg(SendMessageToScheduler):