Skip to content

Commit

Permalink
transition flight to missing if no who_has (dask#5653)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Jan 13, 2022
1 parent 3df9fcb commit 9565acd
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2114,9 +2114,11 @@ def transition_released_waiting(self, ts, *, stimulus_id):
def transition_fetch_flight(self, ts, worker, *, stimulus_id):
if self.validate:
assert ts.state == "fetch"
assert ts.who_has
assert ts.key not in self.data_needed

if not ts.who_has:
return {ts: "missing"}, []

ts.done = False
ts.state = "flight"
ts.coming_from = worker
Expand Down Expand Up @@ -3237,6 +3239,11 @@ def release_key(
ts.nbytes = None
ts._previous = None
ts._next = None
ts.exception = None
ts.exception_text = ""
ts.traceback = None
ts.traceback_text = ""
self._missing_dep_flight.discard(ts)
ts.done = False

self._executing.discard(ts)
Expand Down Expand Up @@ -3883,7 +3890,7 @@ def validate_task_missing(self, ts):
assert not ts.who_has
assert not ts.done
assert not any(ts.key in has_what for has_what in self.has_what.values())
assert ts.key in self._missing_dep_flight
assert ts in self._missing_dep_flight

def validate_task_cancelled(self, ts):
assert ts.key not in self.data
Expand Down Expand Up @@ -3973,7 +3980,7 @@ def validate_state(self):
assert (
ts_wait.state
in READY | {"executing", "flight", "fetch", "missing"}
or ts_wait.key in self._missing_dep_flight
or ts_wait in self._missing_dep_flight
or ts_wait.who_has.issubset(self.in_flight_workers)
), (ts, ts_wait, self.story(ts), self.story(ts_wait))
if ts.state == "memory":
Expand Down

0 comments on commit 9565acd

Please sign in to comment.