Skip to content

Commit

Permalink
Remove missing data message
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Jun 9, 2022
1 parent d020453 commit 43fc518
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 104 deletions.
35 changes: 0 additions & 35 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3023,7 +3023,6 @@ def __init__(
"task-erred": self.handle_task_erred,
"release-worker-data": self.release_worker_data,
"add-keys": self.add_keys,
"missing-data": self.handle_missing_data,
"long-running": self.handle_long_running,
"reschedule": self.reschedule,
"keep-alive": lambda *args, **kwargs: None,
Expand Down Expand Up @@ -4667,40 +4666,6 @@ def handle_task_erred(self, key: str, stimulus_id: str, **msg) -> None:
self._transitions(recommendations, client_msgs, worker_msgs, stimulus_id)
self.send_all(client_msgs, worker_msgs)

def handle_missing_data(
self, key: str, worker: str, errant_worker: str, stimulus_id: str
) -> None:
"""Signal that `errant_worker` does not hold `key`.
This may either indicate that `errant_worker` is dead or that we may be working
with stale data and need to remove `key` from the workers `has_what`. If no
replica of a task is available anymore, the task is transitioned back to
released and rescheduled, if possible.
Parameters
----------
key : str
Task key that could not be found
worker : str
Address of the worker informing the scheduler
errant_worker : str
Address of the worker supposed to hold a replica
"""
logger.debug(f"handle missing data {key=} {worker=} {errant_worker=}")
self.log_event(errant_worker, {"action": "missing-data", "key": key})

ts = self.tasks.get(key)
ws = self.workers.get(errant_worker)
if not ts or not ws or ws not in ts.who_has:
return

self.remove_replica(ts, ws)
if ts.state == "memory" and not ts.who_has:
if ts.run_spec:
self.transitions({key: "released"}, stimulus_id)
else:
self.transitions({key: "forgotten"}, stimulus_id)

def release_worker_data(self, key: str, worker: str, stimulus_id: str) -> None:
ts = self.tasks.get(key)
ws = self.workers.get(worker)
Expand Down
13 changes: 0 additions & 13 deletions distributed/tests/test_cancelled_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,6 @@ def f(ev):
)


@gen_cluster(client=True)
async def test_worker_find_missing(c, s, a, b):
fut = c.submit(inc, 1, workers=[a.address])
await fut
# We do not want to use proper API since it would ensure that the cluster is
# informed properly
del a.data[fut.key]
del a.tasks[fut.key]

# Actually no worker has the data; the scheduler is supposed to reschedule
assert await c.submit(inc, fut, workers=[b.address]) == 3


@gen_cluster(client=True)
async def test_worker_stream_died_during_comm(c, s, a, b):
write_queue = asyncio.Queue()
Expand Down
1 change: 0 additions & 1 deletion distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2946,7 +2946,6 @@ async def test_who_has_consistent_remove_replicas(c, s, *workers):
coming_from.handle_stimulus(RemoveReplicasEvent(keys=[f1.key], stimulus_id="test"))
await f2

assert_story(a.story(f1.key), [(f1.key, "missing-dep")])
assert a.tasks[f1.key].suspicious_count == 0
assert s.tasks[f1.key].suspicious == 0

Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,4 +682,4 @@ async def get_data(self, comm, *args, **kwargs):
block_get_data.set()

await wait_for_state("x", "missing", a)
await wait_for_state("y", "missing", a)
# await wait_for_state("y", "missing", a)
68 changes: 23 additions & 45 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@
InvalidTaskState,
InvalidTransition,
LongRunningMsg,
MissingDataMsg,
RecommendationsConflict,
Recs,
RecsInstrs,
Expand Down Expand Up @@ -3023,6 +3022,9 @@ def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs:
# transition hasn't been enacted yet, so the task is still in fetch
# state and in data_needed.
# See matching code in transition_fetch_missing.
# Just to be sure, transition to missing again. If the above
# described assumption is correct, this is a no-op
recommendations[ts] = "missing"
continue

workers = [
Expand Down Expand Up @@ -3368,55 +3370,22 @@ def _gather_dep_done_common(self, ev: GatherDepDoneEvent) -> Iterator[TaskState]
ts.done = True
yield ts

def _refetch_missing_data(
self, worker: str, tasks: Iterable[TaskState], stimulus_id: str
) -> RecsInstrs:
"""Helper of GatherDepSuccessEvent and GatherDepNetworkFailureEvent handlers.
Remove tasks that were not returned from the peer worker from has_what and
inform the scheduler with a missing-data message. Then, transition them back to
'fetch' so that they can be fetched from another worker.
"""
recommendations: Recs = {}
instructions: Instructions = []

for ts in tasks:
ts.who_has.discard(worker)
self.has_what[worker].discard(ts.key)
self.log.append((ts.key, "missing-dep", stimulus_id, time()))
instructions.append(
MissingDataMsg(
key=ts.key,
errant_worker=worker,
stimulus_id=stimulus_id,
)
)
if ts.state in ("flight", "resumed", "cancelled"):
# This will actually transition to missing if who_has is empty
recommendations[ts] = "fetch"
elif ts.state == "fetch":
self.data_needed_per_worker[worker].discard(ts)
if not ts.who_has:
recommendations[ts] = "missing"

return recommendations, instructions

@_handle_event.register
def _handle_gather_dep_success(self, ev: GatherDepSuccessEvent) -> RecsInstrs:
"""gather_dep terminated successfully.
The response may contain less keys than the request.
"""
recommendations: Recs = {}
refetch = set()
for ts in self._gather_dep_done_common(ev):
if ts.key in ev.data:
recommendations[ts] = ("memory", ev.data[ts.key])
else:
refetch.add(ts)
ts.who_has.discard(ev.worker)
self.has_what[ev.worker].discard(ts.key)
recommendations[ts] = "fetch"

return merge_recs_instructions(
(recommendations, []),
self._refetch_missing_data(ev.worker, refetch, ev.stimulus_id),
self._ensure_communicating(stimulus_id=ev.stimulus_id),
)

Expand Down Expand Up @@ -3467,16 +3436,27 @@ def _handle_gather_dep_network_failure(
either retry a different worker, or ask the scheduler to inform us of a new
worker if no other worker is available.
"""
refetch = set(self._gather_dep_done_common(ev))
refetch |= {self.tasks[key] for key in self.has_what[ev.worker]}
recommendations: Recs = {}
instructions: Instructions = []
worker = ev.worker
for key in self.has_what[worker]:
ts = self.tasks[key]
ts.who_has.discard(worker)

del ts

for ts in self._gather_dep_done_common(ev):
ts.who_has.discard(worker)
self.log.append((ts.key, "missing-dep", ev.stimulus_id, time()))
recommendations[ts] = "fetch"

recs, instrs = merge_recs_instructions(
self._refetch_missing_data(ev.worker, refetch, ev.stimulus_id),
self._ensure_communicating(stimulus_id=ev.stimulus_id),
)
# This cleanup must happen after _refetch_missing_data
del self.has_what[ev.worker]
del self.data_needed_per_worker[ev.worker]
recs, instrs = merge_recs_instructions(
(recommendations, instructions),
self._ensure_communicating(stimulus_id=ev.stimulus_id),
)
return recs, instrs

@_handle_event.register
Expand Down Expand Up @@ -4271,8 +4251,6 @@ def validate_task_fetch(self, ts):
assert ts.key not in self.data
assert self.address not in ts.who_has
assert not ts.done
assert ts in self.data_needed
assert ts.who_has

for w in ts.who_has:
assert ts.key in self.has_what[w]
Expand Down
9 changes: 0 additions & 9 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,15 +389,6 @@ class ReleaseWorkerDataMsg(SendMessageToScheduler):
key: str


@dataclass
class MissingDataMsg(SendMessageToScheduler):
op = "missing-data"

__slots__ = ("key", "errant_worker")
key: str
errant_worker: str


# Not to be confused with RescheduleEvent below or the distributed.Reschedule Exception
@dataclass
class RescheduleMsg(SendMessageToScheduler):
Expand Down

0 comments on commit 43fc518

Please sign in to comment.