diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 8743d9e65f4..480e5b9148d 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3023,7 +3023,6 @@ def __init__( "task-erred": self.handle_task_erred, "release-worker-data": self.release_worker_data, "add-keys": self.add_keys, - "missing-data": self.handle_missing_data, "long-running": self.handle_long_running, "reschedule": self.reschedule, "keep-alive": lambda *args, **kwargs: None, @@ -4667,40 +4666,6 @@ def handle_task_erred(self, key: str, stimulus_id: str, **msg) -> None: self._transitions(recommendations, client_msgs, worker_msgs, stimulus_id) self.send_all(client_msgs, worker_msgs) - def handle_missing_data( - self, key: str, worker: str, errant_worker: str, stimulus_id: str - ) -> None: - """Signal that `errant_worker` does not hold `key`. - - This may either indicate that `errant_worker` is dead or that we may be working - with stale data and need to remove `key` from the workers `has_what`. If no - replica of a task is available anymore, the task is transitioned back to - released and rescheduled, if possible. - - Parameters - ---------- - key : str - Task key that could not be found - worker : str - Address of the worker informing the scheduler - errant_worker : str - Address of the worker supposed to hold a replica - """ - logger.debug(f"handle missing data {key=} {worker=} {errant_worker=}") - self.log_event(errant_worker, {"action": "missing-data", "key": key}) - - ts = self.tasks.get(key) - ws = self.workers.get(errant_worker) - if not ts or not ws or ws not in ts.who_has: - return - - self.remove_replica(ts, ws) - if ts.state == "memory" and not ts.who_has: - if ts.run_spec: - self.transitions({key: "released"}, stimulus_id) - else: - self.transitions({key: "forgotten"}, stimulus_id) - def release_worker_data(self, key: str, worker: str, stimulus_id: str) -> None: ts = self.tasks.get(key) ws = self.workers.get(worker) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 67fac061031..94de10ac950 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -101,19 +101,6 @@ def f(ev): ) -@gen_cluster(client=True) -async def test_worker_find_missing(c, s, a, b): - fut = c.submit(inc, 1, workers=[a.address]) - await fut - # We do not want to use proper API since it would ensure that the cluster is - # informed properly - del a.data[fut.key] - del a.tasks[fut.key] - - # Actually no worker has the data; the scheduler is supposed to reschedule - assert await c.submit(inc, fut, workers=[b.address]) == 3 - - @gen_cluster(client=True) async def test_worker_stream_died_during_comm(c, s, a, b): write_queue = asyncio.Queue() diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 1ace95dc605..4e61544214d 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -2946,7 +2946,6 @@ async def test_who_has_consistent_remove_replicas(c, s, *workers): coming_from.handle_stimulus(RemoveReplicasEvent(keys=[f1.key], stimulus_id="test")) await f2 - assert_story(a.story(f1.key), [(f1.key, "missing-dep")]) assert a.tasks[f1.key].suspicious_count == 0 assert s.tasks[f1.key].suspicious == 0 diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index d95ecd4ad8c..4ba182530e4 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -682,4 +682,4 @@ async def get_data(self, comm, *args, **kwargs): block_get_data.set() await wait_for_state("x", "missing", a) - await wait_for_state("y", "missing", a) + # await wait_for_state("y", "missing", a) diff --git a/distributed/worker.py b/distributed/worker.py index ec31a59a95f..4c199193247 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -132,7 +132,6 @@ InvalidTaskState, InvalidTransition, LongRunningMsg, - MissingDataMsg, RecommendationsConflict, Recs, RecsInstrs, @@ -3023,6 +3022,9 @@ def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs: # transition hasn't been enacted yet, so the task is still in fetch # state and in data_needed. # See matching code in transition_fetch_missing. + # Just to be sure, transition to missing again. If the above + # described assumption is correct, this is a no-op + recommendations[ts] = "missing" continue workers = [ @@ -3368,55 +3370,22 @@ def _gather_dep_done_common(self, ev: GatherDepDoneEvent) -> Iterator[TaskState] ts.done = True yield ts - def _refetch_missing_data( - self, worker: str, tasks: Iterable[TaskState], stimulus_id: str - ) -> RecsInstrs: - """Helper of GatherDepSuccessEvent and GatherDepNetworkFailureEvent handlers. - - Remove tasks that were not returned from the peer worker from has_what and - inform the scheduler with a missing-data message. Then, transition them back to - 'fetch' so that they can be fetched from another worker. - """ - recommendations: Recs = {} - instructions: Instructions = [] - - for ts in tasks: - ts.who_has.discard(worker) - self.has_what[worker].discard(ts.key) - self.log.append((ts.key, "missing-dep", stimulus_id, time())) - instructions.append( - MissingDataMsg( - key=ts.key, - errant_worker=worker, - stimulus_id=stimulus_id, - ) - ) - if ts.state in ("flight", "resumed", "cancelled"): - # This will actually transition to missing if who_has is empty - recommendations[ts] = "fetch" - elif ts.state == "fetch": - self.data_needed_per_worker[worker].discard(ts) - if not ts.who_has: - recommendations[ts] = "missing" - - return recommendations, instructions - @_handle_event.register def _handle_gather_dep_success(self, ev: GatherDepSuccessEvent) -> RecsInstrs: """gather_dep terminated successfully. The response may contain less keys than the request. """ recommendations: Recs = {} - refetch = set() for ts in self._gather_dep_done_common(ev): if ts.key in ev.data: recommendations[ts] = ("memory", ev.data[ts.key]) else: - refetch.add(ts) + ts.who_has.discard(ev.worker) + self.has_what[ev.worker].discard(ts.key) + recommendations[ts] = "fetch" return merge_recs_instructions( (recommendations, []), - self._refetch_missing_data(ev.worker, refetch, ev.stimulus_id), self._ensure_communicating(stimulus_id=ev.stimulus_id), ) @@ -3467,16 +3436,27 @@ def _handle_gather_dep_network_failure( either retry a different worker, or ask the scheduler to inform us of a new worker if no other worker is available. """ - refetch = set(self._gather_dep_done_common(ev)) - refetch |= {self.tasks[key] for key in self.has_what[ev.worker]} + recommendations: Recs = {} + instructions: Instructions = [] + worker = ev.worker + for key in self.has_what[worker]: + ts = self.tasks[key] + ts.who_has.discard(worker) + + del ts + + for ts in self._gather_dep_done_common(ev): + ts.who_has.discard(worker) + self.log.append((ts.key, "missing-dep", ev.stimulus_id, time())) + recommendations[ts] = "fetch" - recs, instrs = merge_recs_instructions( - self._refetch_missing_data(ev.worker, refetch, ev.stimulus_id), - self._ensure_communicating(stimulus_id=ev.stimulus_id), - ) # This cleanup must happen after _refetch_missing_data del self.has_what[ev.worker] del self.data_needed_per_worker[ev.worker] + recs, instrs = merge_recs_instructions( + (recommendations, instructions), + self._ensure_communicating(stimulus_id=ev.stimulus_id), + ) return recs, instrs @_handle_event.register @@ -4271,8 +4251,6 @@ def validate_task_fetch(self, ts): assert ts.key not in self.data assert self.address not in ts.who_has assert not ts.done - assert ts in self.data_needed - assert ts.who_has for w in ts.who_has: assert ts.key in self.has_what[w] diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 5d2513de708..b6fa61f580c 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -389,15 +389,6 @@ class ReleaseWorkerDataMsg(SendMessageToScheduler): key: str -@dataclass -class MissingDataMsg(SendMessageToScheduler): - op = "missing-data" - - __slots__ = ("key", "errant_worker") - key: str - errant_worker: str - - # Not to be confused with RescheduleEvent below or the distributed.Reschedule Exception @dataclass class RescheduleMsg(SendMessageToScheduler):