Skip to content

Commit

Permalink
Remove missing-data message (#6546)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Jun 10, 2022
1 parent cff6095 commit 059798a
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 65 deletions.
35 changes: 0 additions & 35 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3023,7 +3023,6 @@ def __init__(
"task-erred": self.handle_task_erred,
"release-worker-data": self.release_worker_data,
"add-keys": self.add_keys,
"missing-data": self.handle_missing_data,
"long-running": self.handle_long_running,
"reschedule": self.reschedule,
"keep-alive": lambda *args, **kwargs: None,
Expand Down Expand Up @@ -4670,40 +4669,6 @@ def handle_task_erred(self, key: str, stimulus_id: str, **msg) -> None:
self._transitions(recommendations, client_msgs, worker_msgs, stimulus_id)
self.send_all(client_msgs, worker_msgs)

def handle_missing_data(
self, key: str, worker: str, errant_worker: str, stimulus_id: str
) -> None:
"""Signal that `errant_worker` does not hold `key`.
This may either indicate that `errant_worker` is dead or that we may be working
with stale data and need to remove `key` from the workers `has_what`. If no
replica of a task is available anymore, the task is transitioned back to
released and rescheduled, if possible.
Parameters
----------
key : str
Task key that could not be found
worker : str
Address of the worker informing the scheduler
errant_worker : str
Address of the worker supposed to hold a replica
"""
logger.debug(f"handle missing data {key=} {worker=} {errant_worker=}")
self.log_event(errant_worker, {"action": "missing-data", "key": key})

ts = self.tasks.get(key)
ws = self.workers.get(errant_worker)
if not ts or not ws or ws not in ts.who_has:
return

self.remove_replica(ts, ws)
if ts.state == "memory" and not ts.who_has:
if ts.run_spec:
self.transitions({key: "released"}, stimulus_id)
else:
self.transitions({key: "forgotten"}, stimulus_id)

def release_worker_data(self, key: str, worker: str, stimulus_id: str) -> None:
ts = self.tasks.get(key)
ws = self.workers.get(worker)
Expand Down
13 changes: 0 additions & 13 deletions distributed/tests/test_cancelled_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,6 @@ def f(ev):
)


@gen_cluster(client=True)
async def test_worker_find_missing(c, s, a, b):
fut = c.submit(inc, 1, workers=[a.address])
await fut
# We do not want to use proper API since it would ensure that the cluster is
# informed properly
del a.data[fut.key]
del a.tasks[fut.key]

# Actually no worker has the data; the scheduler is supposed to reschedule
assert await c.submit(inc, fut, workers=[b.address]) == 3


@gen_cluster(client=True)
async def test_worker_stream_died_during_comm(c, s, a, b):
write_queue = asyncio.Queue()
Expand Down
3 changes: 3 additions & 0 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3129,6 +3129,7 @@ async def test_gather_dep_cancelled_rescheduled(c, s):
for in-flight state. The response parser, however, did not distinguish
resulting in unwanted missing-data signals to the scheduler, causing
potential rescheduling or data leaks.
(Note: missing-data was removed in #6445).
If a cancelled key is rescheduled for fetching while gather_dep waits
internally for get_data, the response parser would misclassify this key and
Expand Down Expand Up @@ -3177,6 +3178,8 @@ async def test_gather_dep_do_not_handle_response_of_not_requested_tasks(c, s, a)
for in-flight state. The response parser, however, did not distinguish
resulting in unwanted missing-data signals to the scheduler, causing
potential rescheduling or data leaks.
(Note: missing-data was removed in #6445).
This test may become obsolete if the implementation changes significantly.
"""
async with BlockedGatherDep(s.address) as b:
Expand Down
8 changes: 0 additions & 8 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@
InvalidTaskState,
InvalidTransition,
LongRunningMsg,
MissingDataMsg,
RecommendationsConflict,
Recs,
RecsInstrs,
Expand Down Expand Up @@ -3390,13 +3389,6 @@ def done_event():
self.has_what[worker].discard(ts.key)
self.data_needed_per_worker[worker].discard(ts)
self.log.append((d, "missing-dep", stimulus_id, time()))
instructions.append(
MissingDataMsg(
key=d,
errant_worker=worker,
stimulus_id=stimulus_id,
)
)
recommendations[ts] = "fetch"

if refresh_who_has:
Expand Down
9 changes: 0 additions & 9 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,15 +389,6 @@ class ReleaseWorkerDataMsg(SendMessageToScheduler):
key: str


@dataclass
class MissingDataMsg(SendMessageToScheduler):
op = "missing-data"

__slots__ = ("key", "errant_worker")
key: str
errant_worker: str


# Not to be confused with RescheduleEvent below or the distributed.Reschedule Exception
@dataclass
class RescheduleMsg(SendMessageToScheduler):
Expand Down

0 comments on commit 059798a

Please sign in to comment.