From 01d1dc859077056cfc246576c8377f6338b60368 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 5 Jul 2022 12:12:58 +0200 Subject: [PATCH 01/11] Move total_resources from Worker to WorkerState and add check in validate_state --- distributed/worker.py | 16 +++++----------- distributed/worker_state_machine.py | 29 ++++++++++++++++++++++++++--- 2 files changed, 31 insertions(+), 14 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 577e75775ee..c38e45cdae6 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -406,7 +406,6 @@ class Worker(BaseWorker, ServerNode): _dashboard_address: str | None _dashboard: bool _http_prefix: str - total_resources: dict[str, float] death_timeout: float | None lifetime: float | None lifetime_stagger: float | None @@ -628,7 +627,6 @@ def __init__( if resources is None: resources = dask.config.get("distributed.worker.resources") assert isinstance(resources, dict) - self.total_resources = resources.copy() self.death_timeout = parse_timedelta(death_timeout) @@ -754,7 +752,7 @@ def __init__( data=self.memory_manager.data, threads=self.threads, plugins=self.plugins, - resources=self.total_resources, + resources=resources, total_out_connections=total_out_connections, validate=validate, transition_counter_max=transition_counter_max, @@ -877,6 +875,7 @@ def data(self) -> MutableMapping[str, Any]: tasks = DeprecatedWorkerStateAttribute() target_message_size = DeprecatedWorkerStateAttribute() total_out_connections = DeprecatedWorkerStateAttribute() + total_resources = DeprecatedWorkerStateAttribute() transition_counter = DeprecatedWorkerStateAttribute() transition_counter_max = DeprecatedWorkerStateAttribute() validate = DeprecatedWorkerStateAttribute() @@ -1100,7 +1099,7 @@ async def _register_with_scheduler(self) -> None: }, types={k: typename(v) for k, v in self.data.items()}, now=time(), - resources=self.total_resources, + resources=self.state.total_resources, memory_limit=self.memory_manager.memory_limit, local_directory=self.local_directory, services=self.service_ports, @@ -1753,16 +1752,11 @@ def update_data( return {"nbytes": {k: sizeof(v) for k, v in data.items()}, "status": "OK"} async def set_resources(self, **resources) -> None: - for r, quantity in resources.items(): - if r in self.total_resources: - self.state.available_resources[r] += quantity - self.total_resources[r] - else: - self.state.available_resources[r] = quantity - self.total_resources[r] = quantity + self.state.set_resources(resources) await retry_operation( self.scheduler.set_resources, - resources=self.total_resources, + resources=self.state.total_resources, worker=self.contact_address, ) diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 6759e50ff0f..d04d79b8e75 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -81,7 +81,7 @@ "resumed", } READY: set[TaskStateState] = {"ready", "constrained"} - +RUNNING: set[TaskStateState] = {"executing", "long-running", "cancelled", "resumed"} NO_VALUE = "--no-value-sentinel--" @@ -1027,8 +1027,12 @@ class WorkerState: #: determining a last-in-first-out order between them. generation: int + #: ``{resource name: amount}``. Total resources available for task execution. + #: See :doc: `resources`. + total_resources: dict[str, float] + #: ``{resource name: amount}``. Current resources that aren't being currently - #: consumed by task execution. Always less or equal to ``Worker.total_resources``. + #: consumed by task execution. Always less or equal to :attr:`total_resources`. #: See :doc:`resources`. available_resources: dict[str, float] @@ -1102,7 +1106,8 @@ def __init__( self.data = data if data is not None else {} self.threads = threads if threads is not None else {} self.plugins = plugins if plugins is not None else {} - self.available_resources = dict(resources) if resources is not None else {} + self.total_resources = dict(resources) if resources is not None else {} + self.available_resources = self.total_resources.copy() self.validate = validate self.tasks = {} @@ -3109,6 +3114,24 @@ def validate_state(self) -> None: for tss in self.data_needed.values(): assert len({ts.key for ts in tss}) == len(tss) + # Test that resources are consumed and released correctly + for resource, total in self.total_resources.items(): + available = self.available_resources[resource] + assert available >= 0 + allocated = 0.0 + for ts in self.tasks.values(): + if ts.resource_restrictions and ts.state in RUNNING: + allocated += ts.resource_restrictions.get(resource, 0) + assert available + allocated == total + + def set_resources(self, resources: dict[str, float]) -> None: + for r, quantity in resources.items(): + if r in self.total_resources: + self.available_resources[r] += quantity - self.total_resources[r] + else: + self.available_resources[r] = quantity + self.total_resources[r] = quantity + class BaseWorker(abc.ABC): """Wrapper around the :class:`WorkerState` that implements instructions handling. From 38ba29791f9e2816a199f672f816f4b1e30b2e5f Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 5 Jul 2022 15:14:16 +0200 Subject: [PATCH 02/11] Refactor resource allocation to set of methods --- distributed/worker_state_machine.py | 37 ++++++++++++++++------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index d04d79b8e75..a381e054dae 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -1450,15 +1450,11 @@ def _ensure_computing(self) -> RecsInstrs: if ts in recs: continue - if any( - self.available_resources[resource] < needed - for resource, needed in ts.resource_restrictions.items() - ): + if not self._resource_restrictions_satisfied(ts): break self.constrained.popleft() - for resource, needed in ts.resource_restrictions.items(): - self.available_resources[resource] -= needed + self._consume_resources(ts) recs[ts] = "executing" self.executing.add(ts) @@ -1739,8 +1735,7 @@ def _transition_executing_rescheduled( # Reschedule(), which is "cancelled" assert ts.state in ("executing", "long-running"), ts - for resource, quantity in ts.resource_restrictions.items(): - self.available_resources[resource] += quantity + self._release_resources(ts) self.executing.discard(ts) return merge_recs_instructions( @@ -1836,8 +1831,7 @@ def _transition_executing_error( *, stimulus_id: str, ) -> RecsInstrs: - for resource, quantity in ts.resource_restrictions.items(): - self.available_resources[resource] += quantity + self._release_resources(ts) self.executing.discard(ts) return merge_recs_instructions( @@ -1982,9 +1976,7 @@ def _transition_cancelled_released( self.executing.discard(ts) self.in_flight_tasks.discard(ts) - for resource, quantity in ts.resource_restrictions.items(): - self.available_resources[resource] += quantity - + self._release_resources(ts) return self._transition_generic_released(ts, stimulus_id=stimulus_id) def _transition_executing_released( @@ -2011,10 +2003,7 @@ def _transition_generic_memory( f"Tried to transition task {ts} to `memory` without data available" ) - if ts.resource_restrictions is not None: - for resource, quantity in ts.resource_restrictions.items(): - self.available_resources[resource] += quantity - + self._release_resources(ts) self.executing.discard(ts) self.in_flight_tasks.discard(ts) ts.coming_from = None @@ -2356,6 +2345,20 @@ def _transition( ) return recs, instructions + def _resource_restrictions_satisfied(self, ts: TaskState) -> bool: + return all( + self.available_resources[resource] < needed + for resource, needed in ts.resource_restrictions.items() + ) + + def _consume_resources(self, ts: TaskState) -> None: + for resource, needed in ts.resource_restrictions.items(): + self.available_resources[resource] -= needed + + def _release_resources(self, ts: TaskState) -> None: + for resource, quantity in ts.resource_restrictions.items(): + self.available_resources[resource] += quantity + def _transitions(self, recommendations: Recs, *, stimulus_id: str) -> Instructions: """Process transitions until none are left From bffc641ceb79a42a61dfe596bd246dab3b8b98bf Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 5 Jul 2022 16:44:36 +0200 Subject: [PATCH 03/11] Fix tests and adjust type annotations --- distributed/tests/test_resources.py | 10 +++++----- distributed/worker.py | 4 ++-- distributed/worker_state_machine.py | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/distributed/tests/test_resources.py b/distributed/tests/test_resources.py index 7f8b42e0da1..3ee222a0027 100644 --- a/distributed/tests/test_resources.py +++ b/distributed/tests/test_resources.py @@ -70,8 +70,8 @@ async def test_submit_many_non_overlapping_2(c, s, a, b): assert b.state.executing_count <= 1 await wait(futures) - assert a.total_resources == a.state.available_resources - assert b.total_resources == b.state.available_resources + assert a.state.total_resources == a.state.available_resources + assert b.state.total_resources == b.state.available_resources @gen_cluster( @@ -232,7 +232,7 @@ async def test_minimum_resource(c, s, a): assert a.state.executing_count <= 1 await wait(futures) - assert a.total_resources == a.state.available_resources + assert a.state.total_resources == a.state.available_resources @gen_cluster(client=True, nthreads=[("127.0.0.1", 2, {"resources": {"A": 1}})]) @@ -271,7 +271,7 @@ async def test_balance_resources(c, s, a, b): @gen_cluster(client=True, nthreads=[("127.0.0.1", 2)]) async def test_set_resources(c, s, a): await a.set_resources(A=2) - assert a.total_resources["A"] == 2 + assert a.state.total_resources["A"] == 2 assert a.state.available_resources["A"] == 2 assert s.workers[a.address].resources == {"A": 2} lock = Lock() @@ -281,7 +281,7 @@ async def test_set_resources(c, s, a): await asyncio.sleep(0.01) await a.set_resources(A=3) - assert a.total_resources["A"] == 3 + assert a.state.total_resources["A"] == 3 assert a.state.available_resources["A"] == 2 assert s.workers[a.address].resources == {"A": 3} diff --git a/distributed/worker.py b/distributed/worker.py index c38e45cdae6..d84d685e669 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1751,8 +1751,8 @@ def update_data( ) return {"nbytes": {k: sizeof(v) for k, v in data.items()}, "status": "OK"} - async def set_resources(self, **resources) -> None: - self.state.set_resources(resources) + async def set_resources(self, **resources: float) -> None: + self.state.set_resources(**resources) await retry_operation( self.scheduler.set_resources, diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index a381e054dae..f62e02acc73 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -3127,7 +3127,7 @@ def validate_state(self) -> None: allocated += ts.resource_restrictions.get(resource, 0) assert available + allocated == total - def set_resources(self, resources: dict[str, float]) -> None: + def set_resources(self, **resources: float) -> None: for r, quantity in resources.items(): if r in self.total_resources: self.available_resources[r] += quantity - self.total_resources[r] From e8b3a6081f9886dc956e5f4b6cd08f5188bc1f12 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 5 Jul 2022 16:46:56 +0200 Subject: [PATCH 04/11] Remove currently failing assertions --- distributed/worker_state_machine.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index f62e02acc73..288f34f2354 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -3117,16 +3117,6 @@ def validate_state(self) -> None: for tss in self.data_needed.values(): assert len({ts.key for ts in tss}) == len(tss) - # Test that resources are consumed and released correctly - for resource, total in self.total_resources.items(): - available = self.available_resources[resource] - assert available >= 0 - allocated = 0.0 - for ts in self.tasks.values(): - if ts.resource_restrictions and ts.state in RUNNING: - allocated += ts.resource_restrictions.get(resource, 0) - assert available + allocated == total - def set_resources(self, **resources: float) -> None: for r, quantity in resources.items(): if r in self.total_resources: From f77fb707f249c4693d95a6224066b4357907388f Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 5 Jul 2022 16:59:48 +0200 Subject: [PATCH 05/11] Minor --- distributed/worker_state_machine.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 288f34f2354..0c002d2dcfc 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -81,7 +81,6 @@ "resumed", } READY: set[TaskStateState] = {"ready", "constrained"} -RUNNING: set[TaskStateState] = {"executing", "long-running", "cancelled", "resumed"} NO_VALUE = "--no-value-sentinel--" From 380c53fc08f35de8f609d1ab61e06e6199feb86c Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 5 Jul 2022 17:21:25 +0200 Subject: [PATCH 06/11] Fix _resources_restrictions_satisfied --- distributed/worker_state_machine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 0c002d2dcfc..5ff450095ba 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -2346,7 +2346,7 @@ def _transition( def _resource_restrictions_satisfied(self, ts: TaskState) -> bool: return all( - self.available_resources[resource] < needed + self.available_resources[resource] >= needed for resource, needed in ts.resource_restrictions.items() ) From 6ea2bdd3b17e626ac68cbc9dee046a977c8ab3bf Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 5 Jul 2022 18:12:27 +0200 Subject: [PATCH 07/11] Rename to acquire_resources --- distributed/worker_state_machine.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 5ff450095ba..35e55fd1f6c 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -1453,7 +1453,7 @@ def _ensure_computing(self) -> RecsInstrs: break self.constrained.popleft() - self._consume_resources(ts) + self._acquire_resources(ts) recs[ts] = "executing" self.executing.add(ts) @@ -2350,7 +2350,7 @@ def _resource_restrictions_satisfied(self, ts: TaskState) -> bool: for resource, needed in ts.resource_restrictions.items() ) - def _consume_resources(self, ts: TaskState) -> None: + def _acquire_resources(self, ts: TaskState) -> None: for resource, needed in ts.resource_restrictions.items(): self.available_resources[resource] -= needed From 0cdb79b830bf15be17ca836760e4e9fb8120e224 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 6 Jul 2022 10:54:14 +0200 Subject: [PATCH 08/11] Adjust fixture --- distributed/utils_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index b747c8437af..b8ae1d0ff93 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -2449,7 +2449,7 @@ def ws_with_running_task(ws, request): The task may or may not raise secede(); the tests using this fixture runs twice. """ - ws.available_resources = {"R": 1} + ws.set_resources(R=1) instructions = ws.handle_stimulus( ComputeTaskEvent.dummy( key="x", resource_restrictions={"R": 1}, stimulus_id="compute" From b566e21ed0cdbc85a29adfa020ae5acf47cc560d Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 6 Jul 2022 13:27:52 +0200 Subject: [PATCH 09/11] Update distributed/worker_state_machine.py Co-authored-by: crusaderky --- distributed/worker_state_machine.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 35e55fd1f6c..232c2c663ee 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -2355,8 +2355,8 @@ def _acquire_resources(self, ts: TaskState) -> None: self.available_resources[resource] -= needed def _release_resources(self, ts: TaskState) -> None: - for resource, quantity in ts.resource_restrictions.items(): - self.available_resources[resource] += quantity + for resource, needed in ts.resource_restrictions.items(): + self.available_resources[resource] += needed def _transitions(self, recommendations: Recs, *, stimulus_id: str) -> Instructions: """Process transitions until none are left From c837a02cd50cdab757cdff32d117b422b5835ff7 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 6 Jul 2022 16:08:04 +0200 Subject: [PATCH 10/11] Rename Scheduler.consume_resources to Scheduler.acquire_resources --- distributed/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 9eadaf51288..9cee7a635ac 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1860,7 +1860,7 @@ def transition_waiting_processing(self, key, stimulus_id): self._set_duration_estimate(ts, ws) ts.processing_on = ws ts.state = "processing" - self.consume_resources(ts, ws) + self.acquire_resources(ts, ws) self.check_idle_saturated(ws) self.n_tasks += 1 @@ -2675,7 +2675,7 @@ def valid_workers(self, ts: TaskState) -> set[WorkerState] | None: return s - def consume_resources(self, ts: TaskState, ws: WorkerState): + def acquire_resources(self, ts: TaskState, ws: WorkerState): for r, required in ts.resource_restrictions.items(): ws.used_resources[r] += required From b1f25575f43af0e8a95cb8a6f3e53ea1e33dff00 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 6 Jul 2022 16:11:42 +0200 Subject: [PATCH 11/11] Rollback changes to set_resources and fix fixture --- distributed/utils_test.py | 3 ++- distributed/worker.py | 9 ++++++++- distributed/worker_state_machine.py | 8 -------- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index b8ae1d0ff93..966182a1559 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -2449,7 +2449,8 @@ def ws_with_running_task(ws, request): The task may or may not raise secede(); the tests using this fixture runs twice. """ - ws.set_resources(R=1) + ws.available_resources = {"R": 1} + ws.total_resources = {"R": 1} instructions = ws.handle_stimulus( ComputeTaskEvent.dummy( key="x", resource_restrictions={"R": 1}, stimulus_id="compute" diff --git a/distributed/worker.py b/distributed/worker.py index d84d685e669..19cc8eeaa5a 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1752,7 +1752,14 @@ def update_data( return {"nbytes": {k: sizeof(v) for k, v in data.items()}, "status": "OK"} async def set_resources(self, **resources: float) -> None: - self.state.set_resources(**resources) + for r, quantity in resources.items(): + if r in self.state.total_resources: + self.state.available_resources[r] += ( + quantity - self.state.total_resources[r] + ) + else: + self.state.available_resources[r] = quantity + self.state.total_resources[r] = quantity await retry_operation( self.scheduler.set_resources, diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 232c2c663ee..647f3cd331a 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -3116,14 +3116,6 @@ def validate_state(self) -> None: for tss in self.data_needed.values(): assert len({ts.key for ts in tss}) == len(tss) - def set_resources(self, **resources: float) -> None: - for r, quantity in resources.items(): - if r in self.total_resources: - self.available_resources[r] += quantity - self.total_resources[r] - else: - self.available_resources[r] = quantity - self.total_resources[r] = quantity - class BaseWorker(abc.ABC): """Wrapper around the :class:`WorkerState` that implements instructions handling.