Skip to content

Commit

Permalink
Validate constrained tasks (#6698)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Jul 11, 2022
1 parent a05cc38 commit 30e86ef
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 12 deletions.
1 change: 1 addition & 0 deletions distributed/tests/test_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ async def test_set_resources(c, s, a):
assert s.workers[a.address].resources == {"A": 3}


@pytest.mark.repeat(100) # DO NOT MERGE
@gen_cluster(
client=True,
nthreads=[
Expand Down
39 changes: 27 additions & 12 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1850,7 +1850,9 @@ def _transition_waiting_constrained(
for dep in ts.dependencies
)
assert all(dep.state == "memory" for dep in ts.dependencies)
assert ts.key not in self.ready
# FIXME https://github.com/dask/distributed/issues/6710
# assert ts.key not in pluck(1, self.ready)
# assert ts.key not in self.constrained
ts.state = "constrained"
self.constrained.append(ts.key)
return self._ensure_computing()
Expand Down Expand Up @@ -1885,7 +1887,9 @@ def _transition_waiting_ready(
) -> RecsInstrs:
if self.validate:
assert ts.state == "waiting"
assert ts.key not in self.ready
# FIXME https://github.com/dask/distributed/issues/6710
# assert ts.key not in pluck(1, self.ready)
# assert ts.key not in self.constrained
assert not ts.waiting_for_data
for dep in ts.dependencies:
assert dep.key in self.data or dep.key in self.actors
Expand Down Expand Up @@ -2165,7 +2169,6 @@ def _transition_executing_memory(
if self.validate:
assert ts.state in ("executing", "long-running")
assert not ts.waiting_for_data
assert ts.key not in self.ready

self.executing.discard(ts)
self.long_running.discard(ts)
Expand All @@ -2182,7 +2185,9 @@ def _transition_constrained_executing(
assert not ts.waiting_for_data
assert ts.key not in self.data
assert ts.state in READY
assert ts.key not in self.ready
# FIXME https://github.com/dask/distributed/issues/6710
# assert ts.key not in pluck(1, self.ready)
# assert ts.key not in self.constrained
for dep in ts.dependencies:
assert dep.key in self.data or dep.key in self.actors

Expand All @@ -2197,7 +2202,9 @@ def _transition_ready_executing(
assert not ts.waiting_for_data
assert ts.key not in self.data
assert ts.state in READY
assert ts.key not in self.ready
# FIXME https://github.com/dask/distributed/issues/6710
# assert ts.key not in pluck(1, self.ready)
# assert ts.key not in self.constrained
assert all(
dep.key in self.data or dep.key in self.actors
for dep in ts.dependencies
Expand Down Expand Up @@ -3082,8 +3089,6 @@ def _validate_task_memory(self, ts: TaskState) -> None:
assert ts.key in self.data or ts.key in self.actors
assert isinstance(ts.nbytes, int)
assert not ts.waiting_for_data
assert ts.key not in self.ready
assert ts.state == "memory"

def _validate_task_executing(self, ts: TaskState) -> None:
if ts.state == "executing":
Expand All @@ -3102,9 +3107,19 @@ def _validate_task_executing(self, ts: TaskState) -> None:
assert dep.key in self.data or dep.key in self.actors

def _validate_task_ready(self, ts: TaskState) -> None:
assert ts.key in pluck(1, self.ready)
if ts.state == "ready":
assert not ts.resource_restrictions
assert ts.key in pluck(1, self.ready)
# FIXME https://github.com/dask/distributed/issues/6710
# assert ts.key not in self.constrained
else:
assert ts.resource_restrictions
assert ts.state == "constrained"
# FIXME https://github.com/dask/distributed/issues/6710
# assert ts.key not in pluck(1, self.ready)
assert ts.key in self.constrained

assert ts.key not in self.data
assert ts.state != "executing"
assert not ts.done
assert not ts.waiting_for_data
assert all(
Expand All @@ -3113,15 +3128,15 @@ def _validate_task_ready(self, ts: TaskState) -> None:

def _validate_task_waiting(self, ts: TaskState) -> None:
assert ts.key not in self.data
assert ts.state == "waiting"
assert not ts.done
if ts.dependencies and ts.run_spec:
assert not all(dep.key in self.data for dep in ts.dependencies)

def _validate_task_flight(self, ts: TaskState) -> None:
assert ts.key not in self.data
assert ts in self.in_flight_tasks
assert not any(dep.key in self.ready for dep in ts.dependents)
# FIXME https://github.com/dask/distributed/issues/6710
# assert not any(dep.key in self.ready for dep in ts.dependents)
assert ts.coming_from
assert ts.coming_from in self.in_flight_workers
assert ts.key in self.in_flight_workers[ts.coming_from]
Expand Down Expand Up @@ -3189,7 +3204,7 @@ def validate_task(self, ts: TaskState) -> None:
self._validate_task_cancelled(ts)
elif ts.state == "resumed":
self._validate_task_resumed(ts)
elif ts.state == "ready":
elif ts.state in ("ready", "constrained"):
self._validate_task_ready(ts)
elif ts.state in ("executing", "long-running"):
self._validate_task_executing(ts)
Expand Down

0 comments on commit 30e86ef

Please sign in to comment.