diff --git a/distributed/tests/test_resources.py b/distributed/tests/test_resources.py index 3ee222a0027..a2e8768c6e5 100644 --- a/distributed/tests/test_resources.py +++ b/distributed/tests/test_resources.py @@ -286,6 +286,7 @@ async def test_set_resources(c, s, a): assert s.workers[a.address].resources == {"A": 3} +@pytest.mark.repeat(100) # DO NOT MERGE @gen_cluster( client=True, nthreads=[ diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 2bb8b640a4e..c021593192e 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -1850,7 +1850,9 @@ def _transition_waiting_constrained( for dep in ts.dependencies ) assert all(dep.state == "memory" for dep in ts.dependencies) - assert ts.key not in self.ready + # FIXME https://github.com/dask/distributed/issues/6710 + # assert ts.key not in pluck(1, self.ready) + # assert ts.key not in self.constrained ts.state = "constrained" self.constrained.append(ts.key) return self._ensure_computing() @@ -1885,7 +1887,9 @@ def _transition_waiting_ready( ) -> RecsInstrs: if self.validate: assert ts.state == "waiting" - assert ts.key not in self.ready + # FIXME https://github.com/dask/distributed/issues/6710 + # assert ts.key not in pluck(1, self.ready) + # assert ts.key not in self.constrained assert not ts.waiting_for_data for dep in ts.dependencies: assert dep.key in self.data or dep.key in self.actors @@ -2165,7 +2169,6 @@ def _transition_executing_memory( if self.validate: assert ts.state in ("executing", "long-running") assert not ts.waiting_for_data - assert ts.key not in self.ready self.executing.discard(ts) self.long_running.discard(ts) @@ -2182,7 +2185,9 @@ def _transition_constrained_executing( assert not ts.waiting_for_data assert ts.key not in self.data assert ts.state in READY - assert ts.key not in self.ready + # FIXME https://github.com/dask/distributed/issues/6710 + # assert ts.key not in pluck(1, self.ready) + # assert ts.key not in self.constrained for dep in ts.dependencies: assert dep.key in self.data or dep.key in self.actors @@ -2197,7 +2202,9 @@ def _transition_ready_executing( assert not ts.waiting_for_data assert ts.key not in self.data assert ts.state in READY - assert ts.key not in self.ready + # FIXME https://github.com/dask/distributed/issues/6710 + # assert ts.key not in pluck(1, self.ready) + # assert ts.key not in self.constrained assert all( dep.key in self.data or dep.key in self.actors for dep in ts.dependencies @@ -3082,8 +3089,6 @@ def _validate_task_memory(self, ts: TaskState) -> None: assert ts.key in self.data or ts.key in self.actors assert isinstance(ts.nbytes, int) assert not ts.waiting_for_data - assert ts.key not in self.ready - assert ts.state == "memory" def _validate_task_executing(self, ts: TaskState) -> None: if ts.state == "executing": @@ -3102,9 +3107,19 @@ def _validate_task_executing(self, ts: TaskState) -> None: assert dep.key in self.data or dep.key in self.actors def _validate_task_ready(self, ts: TaskState) -> None: - assert ts.key in pluck(1, self.ready) + if ts.state == "ready": + assert not ts.resource_restrictions + assert ts.key in pluck(1, self.ready) + # FIXME https://github.com/dask/distributed/issues/6710 + # assert ts.key not in self.constrained + else: + assert ts.resource_restrictions + assert ts.state == "constrained" + # FIXME https://github.com/dask/distributed/issues/6710 + # assert ts.key not in pluck(1, self.ready) + assert ts.key in self.constrained + assert ts.key not in self.data - assert ts.state != "executing" assert not ts.done assert not ts.waiting_for_data assert all( @@ -3113,7 +3128,6 @@ def _validate_task_ready(self, ts: TaskState) -> None: def _validate_task_waiting(self, ts: TaskState) -> None: assert ts.key not in self.data - assert ts.state == "waiting" assert not ts.done if ts.dependencies and ts.run_spec: assert not all(dep.key in self.data for dep in ts.dependencies) @@ -3121,7 +3135,8 @@ def _validate_task_waiting(self, ts: TaskState) -> None: def _validate_task_flight(self, ts: TaskState) -> None: assert ts.key not in self.data assert ts in self.in_flight_tasks - assert not any(dep.key in self.ready for dep in ts.dependents) + # FIXME https://github.com/dask/distributed/issues/6710 + # assert not any(dep.key in self.ready for dep in ts.dependents) assert ts.coming_from assert ts.coming_from in self.in_flight_workers assert ts.key in self.in_flight_workers[ts.coming_from] @@ -3189,7 +3204,7 @@ def validate_task(self, ts: TaskState) -> None: self._validate_task_cancelled(ts) elif ts.state == "resumed": self._validate_task_resumed(ts) - elif ts.state == "ready": + elif ts.state in ("ready", "constrained"): self._validate_task_ready(ts) elif ts.state in ("executing", "long-running"): self._validate_task_executing(ts)