From b09570b43ac219d003ecd836d61142f2905aea88 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 23 Aug 2022 17:25:10 +0200 Subject: [PATCH 1/8] Expose comm_nbytes in get_metrics and prometheus --- distributed/http/worker/prometheus/core.py | 6 ++++++ distributed/worker.py | 1 + 2 files changed, 7 insertions(+) diff --git a/distributed/http/worker/prometheus/core.py b/distributed/http/worker/prometheus/core.py index 8e40957bf83..c5406bd44ee 100644 --- a/distributed/http/worker/prometheus/core.py +++ b/distributed/http/worker/prometheus/core.py @@ -53,6 +53,12 @@ def collect(self): value=self.server.latency, ) + yield GaugeMetricFamily( + self.build_name("comm_reserved_bytes"), + "Number of bytes currently reserved for incoming/outgoing data transfers.", + value=self.server.state.comm_nbytes, + ) + # all metrics using digests require crick to be installed # the following metrics will export NaN, if the corresponding digests are None if self.crick_available: diff --git a/distributed/worker.py b/distributed/worker.py index b863f319d7f..0b49aa21ca0 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -983,6 +983,7 @@ async def get_metrics(self) -> dict: "memory": spilled_memory, "disk": spilled_disk, }, + comm_reserved_bytes=self.state.comm_nbytes, event_loop_interval=self._tick_interval_observed, ) out.update(self.monitor.recent()) From aba58f04ac5998db26aec6f3929b3719e285979b Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 31 Aug 2022 10:26:30 +0200 Subject: [PATCH 2/8] Adjust total count logic and calculate transfer_outgoing_bytes --- distributed/http/worker/prometheus/core.py | 47 ++++++++++++++++++++-- distributed/worker.py | 27 +++++++++---- distributed/worker_state_machine.py | 12 +++--- 3 files changed, 69 insertions(+), 17 deletions(-) diff --git a/distributed/http/worker/prometheus/core.py b/distributed/http/worker/prometheus/core.py index c5406bd44ee..3268f64857d 100644 --- a/distributed/http/worker/prometheus/core.py +++ b/distributed/http/worker/prometheus/core.py @@ -37,7 +37,10 @@ def collect(self): yield GaugeMetricFamily( self.build_name("concurrent_fetch_requests"), - "Number of open fetch requests to other workers.", + ( + "[Deprecated: This metric has been renamed to transfer_incoming_count.] " + "Number of open fetch requests to other workers." + ), value=self.server.state.transfer_incoming_count, ) @@ -54,9 +57,45 @@ def collect(self): ) yield GaugeMetricFamily( - self.build_name("comm_reserved_bytes"), - "Number of bytes currently reserved for incoming/outgoing data transfers.", - value=self.server.state.comm_nbytes, + self.build_name("transfer_incoming_bytes"), + "Total size of open data transfers from other workers.", + value=self.server.state.transfer_incoming_bytes, + ) + + yield GaugeMetricFamily( + self.build_name("transfer_incoming_count"), + "Number of open data transfers from other workers.", + value=self.server.state.transfer_incoming_bytes, + ) + + yield GaugeMetricFamily( + self.build_name("transfer_incoming_count_total"), + ( + "Total number of data transfers from other workers " + "since the worker was started." + ), + value=self.server.state.transfer_incoming_count_total, + ) + + yield GaugeMetricFamily( + self.build_name("transfer_outgoing_bytes"), + "Total size of open data transfers to other workers.", + value=self.server.transfer_outgoing_bytes, + ) + + yield GaugeMetricFamily( + self.build_name("transfer_outgoing_count"), + "Number of open data transfers to other workers.", + value=self.server.transfer_outgoing_count, + ) + + yield GaugeMetricFamily( + self.build_name("transfer_outgoing_count_total"), + ( + "Total number of data transfers to other workers " + "since the worker was started." + ), + value=self.server.transfer_outgoing_count_total, ) # all metrics using digests require crick to be installed diff --git a/distributed/worker.py b/distributed/worker.py index 0b49aa21ca0..0d777cfc82f 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -385,8 +385,10 @@ class Worker(BaseWorker, ServerNode): profile_history: deque[tuple[float, dict[str, Any]]] transfer_incoming_log: deque[dict[str, Any]] transfer_outgoing_log: deque[dict[str, Any]] - #: Number of total data transfers to other workers. + #: Total number of data transfers to other workers since the worker was started. transfer_outgoing_count_total: int + #: Total size of open data transfers to other worker. + transfer_outgoing_bytes: int #: Number of open data transfers to other workers. transfer_outgoing_count: int bandwidth: float @@ -543,6 +545,7 @@ def __init__( self.transfer_incoming_log = deque(maxlen=100000) self.transfer_outgoing_log = deque(maxlen=100000) self.transfer_outgoing_count_total = 0 + self.transfer_outgoing_bytes = 0 self.transfer_outgoing_count = 0 self.bandwidth = parse_bytes(dask.config.get("distributed.scheduler.bandwidth")) self.bandwidth_workers = defaultdict( @@ -983,7 +986,14 @@ async def get_metrics(self) -> dict: "memory": spilled_memory, "disk": spilled_disk, }, - comm_reserved_bytes=self.state.comm_nbytes, + transfer={ + "incoming_bytes": self.state.transfer_incoming_bytes, + "incoming_count": self.state.transfer_incoming_count, + "incoming_count_total": self.state.transfer_incoming_count_total, + "outgoing_bytes": self.transfer_outgoing_bytes, + "outgoing_count": self.transfer_outgoing_count, + "outgoing_count_total": self.transfer_outgoing_count_total, + }, event_loop_interval=self._tick_interval_observed, ) out.update(self.monitor.recent()) @@ -1685,6 +1695,7 @@ async def get_data( return {"status": "busy"} self.transfer_outgoing_count += 1 + self.transfer_outgoing_count_total += 1 data = {k: self.data[k] for k in keys if k in self.data} if len(data) < len(keys): @@ -1697,7 +1708,11 @@ async def get_data( ) msg = {"status": "OK", "data": {k: to_serialize(v) for k, v in data.items()}} - nbytes = {k: self.state.tasks[k].nbytes for k in data if k in self.state.tasks} + bytes_per_task = { + k: self.state.tasks[k].nbytes for k in data if k in self.state.tasks + } + total_bytes = sum(filter(None, bytes_per_task.values())) + self.transfer_outgoing_bytes += total_bytes stop = time() if self.digests is not None: self.digests["get-data-load-duration"].add(stop - start) @@ -1716,14 +1731,12 @@ async def get_data( comm.abort() raise finally: + self.transfer_outgoing_bytes -= total_bytes self.transfer_outgoing_count -= 1 stop = time() if self.digests is not None: self.digests["get-data-send-duration"].add(stop - start) - total_bytes = sum(filter(None, nbytes.values())) - - self.transfer_outgoing_count_total += 1 duration = (stop - start) or 0.5 # windows self.transfer_outgoing_log.append( { @@ -1732,7 +1745,7 @@ async def get_data( "middle": (start + stop) / 2, "duration": duration, "who": who, - "keys": nbytes, + "keys": bytes_per_task, "total": total_bytes, "compressed": compressed, "bandwidth": total_bytes / duration, diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index cbaefa7e72a..57d0dad3cf3 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -1149,14 +1149,14 @@ class WorkerState: #: dependencies until the current query returns. in_flight_workers: dict[str, set[str]] - #: The total size of incoming data transfers for in-flight tasks. + #: Total size of open data transfers from other workers. transfer_incoming_bytes: int - #: The maximum number of concurrent incoming data transfers from other workers. + #: Maximum number of concurrent incoming data transfers from other workers. #: See also :attr:`distributed.worker.Worker.transfer_outgoing_count_limit`. transfer_incoming_count_limit: int - #: Number of total data transfers from other workers since the worker was started. + #: Total number of data transfers from other workers since the worker was started. transfer_incoming_count_total: int #: Ignore :attr:`transfer_incoming_count_limit` as long as :attr:`transfer_incoming_bytes` is @@ -1354,7 +1354,7 @@ def all_running_tasks(self) -> set[TaskState]: @property def in_flight_tasks_count(self) -> int: - """Count of tasks currently being replicated from other workers to this one. + """Number of tasks currently being replicated from other workers to this one. See also -------- @@ -1364,7 +1364,7 @@ def in_flight_tasks_count(self) -> int: @property def transfer_incoming_count(self) -> int: - """Count of open data transfers from other workers. + """Number of open data transfers from other workers. See also -------- @@ -1529,6 +1529,7 @@ def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs: ) self.in_flight_workers[worker] = to_gather_keys + self.transfer_incoming_count_total += 1 self.transfer_incoming_bytes += total_nbytes if ( self.transfer_incoming_count >= self.transfer_incoming_count_limit @@ -2845,7 +2846,6 @@ def _gather_dep_done_common(self, ev: GatherDepDoneEvent) -> Iterator[TaskState] _execute_done_common """ self.transfer_incoming_bytes -= ev.total_nbytes - self.transfer_incoming_count_total += 1 keys = self.in_flight_workers.pop(ev.worker) for key in keys: ts = self.tasks[key] From 1dce893117261e63583dea329edef86db9cbbe0a Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 31 Aug 2022 13:45:08 +0200 Subject: [PATCH 3/8] Fix test --- distributed/http/worker/tests/test_worker_http.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/distributed/http/worker/tests/test_worker_http.py b/distributed/http/worker/tests/test_worker_http.py index 02d3d4dba59..6d0c424b8c8 100644 --- a/distributed/http/worker/tests/test_worker_http.py +++ b/distributed/http/worker/tests/test_worker_http.py @@ -21,6 +21,12 @@ async def test_prometheus(c, s, a): "dask_worker_concurrent_fetch_requests", "dask_worker_threads", "dask_worker_latency_seconds", + "dask_worker_transfer_incoming_bytes", + "dask_worker_transfer_incoming_count", + "dask_worker_transfer_incoming_count_total", + "dask_worker_transfer_outgoing_bytes", + "dask_worker_transfer_outgoing_count", + "dask_worker_transfer_outgoing_count_total", } try: From bf4c97f351a24757df5290aa0e35cb0ef79ccbf7 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 31 Aug 2022 14:49:04 +0200 Subject: [PATCH 4/8] Test transfer_incoming metrics --- .../tests/test_worker_state_machine.py | 91 ++++++++++++++++++- distributed/worker_state_machine.py | 2 +- 2 files changed, 90 insertions(+), 3 deletions(-) diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index a1db60cc5e7..d7ec62ea902 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -410,7 +410,7 @@ def test_computetask_dummy(): # nbytes is generated from who_has if omitted ev2 = ComputeTaskEvent.dummy("x", who_has={"y": "127.0.0.1:2"}, stimulus_id="s") - assert ev2.nbytes == {"y": 1} + assert ev2.nbytes == {"y": 7} def test_updatedata_to_dict(): @@ -1030,6 +1030,9 @@ def test_aggregate_gather_deps(ws, nbytes, n_in_flight): assert instructions == [GatherDep.match(worker=ws2, stimulus_id="s1")] assert len(instructions[0].to_gather) == n_in_flight assert len(ws.in_flight_tasks) == n_in_flight + assert ws.transfer_incoming_bytes == nbytes * n_in_flight + assert ws.transfer_incoming_count == 1 + assert ws.transfer_incoming_count_total == 1 def test_gather_priority(ws): @@ -1083,7 +1086,7 @@ def test_gather_priority(ws): stimulus_id="unpause", worker="127.0.0.7:1", to_gather={"y", "x8"}, - total_nbytes=1 + 4 * 2**20, + total_nbytes=7 + 4 * 2**20, ), # Followed by local workers GatherDep( @@ -1111,6 +1114,10 @@ def test_gather_priority(ws): total_nbytes=4 * 2**20, ), ] + expected_bytes = 7 + 4 * 2**20 + 4 * 2**20 + 8 * 2**20 + 4 * 2**20 + assert ws.transfer_incoming_bytes == expected_bytes + assert ws.transfer_incoming_count == 4 + assert ws.transfer_incoming_count_total == 4 @pytest.mark.parametrize("state", ["executing", "long-running"]) @@ -1264,3 +1271,83 @@ def test_gather_dep_failure(ws): ] assert ws.tasks["x"].state == "error" assert ws.tasks["y"].state == "waiting" # Not ready + assert ws.transfer_incoming_bytes == 0 + assert ws.transfer_incoming_count == 0 + assert ws.transfer_incoming_count_total == 1 + + +def test_transfer_incoming_metrics(ws): + assert ws.transfer_incoming_bytes == 0 + assert ws.transfer_incoming_count == 0 + assert ws.transfer_incoming_count_total == 0 + + ws2 = "127.0.0.1:2" + ws.handle_stimulus( + ComputeTaskEvent.dummy( + "b", who_has={"a": [ws2]}, nbytes={"a": 7}, stimulus_id="s1" + ) + ) + assert ws.transfer_incoming_bytes == 7 + assert ws.transfer_incoming_count == 1 + assert ws.transfer_incoming_count_total == 1 + + ws.handle_stimulus( + GatherDepSuccessEvent( + worker=ws2, data={"a": 123}, total_nbytes=7, stimulus_id="s2" + ) + ) + assert ws.transfer_incoming_bytes == 0 + assert ws.transfer_incoming_count == 0 + assert ws.transfer_incoming_count_total == 1 + + ws.handle_stimulus( + ComputeTaskEvent.dummy( + "e", + who_has={"c": [ws2], "d": [ws2]}, + nbytes={"c": 11, "d": 13}, + stimulus_id="s2", + ) + ) + assert ws.transfer_incoming_bytes == 24 + assert ws.transfer_incoming_count == 1 + assert ws.transfer_incoming_count_total == 2 + + ws.handle_stimulus( + GatherDepSuccessEvent( + worker=ws2, data={"c": 123, "d": 234}, total_nbytes=24, stimulus_id="s3" + ) + ) + assert ws.transfer_incoming_bytes == 0 + assert ws.transfer_incoming_count == 0 + assert ws.transfer_incoming_count_total == 2 + + ws3 = "127.0.0.1:3" + ws.handle_stimulus( + ComputeTaskEvent.dummy( + "h", + who_has={"f": [ws2], "g": [ws3]}, + nbytes={"f": 17, "g": 19}, + stimulus_id="s4", + ) + ) + assert ws.transfer_incoming_bytes == 36 + assert ws.transfer_incoming_count == 2 + assert ws.transfer_incoming_count_total == 4 + + ws.handle_stimulus( + GatherDepSuccessEvent( + worker=ws3, data={"g": 345}, total_nbytes=19, stimulus_id="s5" + ) + ) + assert ws.transfer_incoming_bytes == 17 + assert ws.transfer_incoming_count == 1 + assert ws.transfer_incoming_count_total == 4 + + ws.handle_stimulus( + GatherDepSuccessEvent( + worker=ws2, data={"g": 456}, total_nbytes=17, stimulus_id="s6" + ) + ) + assert ws.transfer_incoming_bytes == 0 + assert ws.transfer_incoming_count == 0 + assert ws.transfer_incoming_count_total == 4 diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 57d0dad3cf3..3403cc2bb98 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -794,7 +794,7 @@ def dummy( return ComputeTaskEvent( key=key, who_has=who_has or {}, - nbytes=nbytes or {k: 1 for k in who_has or ()}, + nbytes=nbytes or {k: 7 for k in who_has or ()}, priority=priority, duration=duration, run_spec=None, From efe0a621e2ccfcfff9fb32d6be2b3c967c3efdeb Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 31 Aug 2022 15:33:04 +0200 Subject: [PATCH 5/8] Revert nbytes default --- distributed/tests/test_worker_state_machine.py | 6 +++--- distributed/worker_state_machine.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index d7ec62ea902..f2c8372c7f8 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -410,7 +410,7 @@ def test_computetask_dummy(): # nbytes is generated from who_has if omitted ev2 = ComputeTaskEvent.dummy("x", who_has={"y": "127.0.0.1:2"}, stimulus_id="s") - assert ev2.nbytes == {"y": 7} + assert ev2.nbytes == {"y": 1} def test_updatedata_to_dict(): @@ -1086,7 +1086,7 @@ def test_gather_priority(ws): stimulus_id="unpause", worker="127.0.0.7:1", to_gather={"y", "x8"}, - total_nbytes=7 + 4 * 2**20, + total_nbytes=1 + 4 * 2**20, ), # Followed by local workers GatherDep( @@ -1114,7 +1114,7 @@ def test_gather_priority(ws): total_nbytes=4 * 2**20, ), ] - expected_bytes = 7 + 4 * 2**20 + 4 * 2**20 + 8 * 2**20 + 4 * 2**20 + expected_bytes = 1 + 4 * 2**20 + 4 * 2**20 + 8 * 2**20 + 4 * 2**20 assert ws.transfer_incoming_bytes == expected_bytes assert ws.transfer_incoming_count == 4 assert ws.transfer_incoming_count_total == 4 diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 3403cc2bb98..57d0dad3cf3 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -794,7 +794,7 @@ def dummy( return ComputeTaskEvent( key=key, who_has=who_has or {}, - nbytes=nbytes or {k: 7 for k in who_has or ()}, + nbytes=nbytes or {k: 1 for k in who_has or ()}, priority=priority, duration=duration, run_spec=None, From 8805a49d1fb9a5d6ee577e1f4ab55a621da4bea6 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 31 Aug 2022 18:52:28 +0200 Subject: [PATCH 6/8] Apply suggestions from code review Co-authored-by: crusaderky --- distributed/worker.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 0d777cfc82f..f53ab940cb5 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -387,9 +387,9 @@ class Worker(BaseWorker, ServerNode): transfer_outgoing_log: deque[dict[str, Any]] #: Total number of data transfers to other workers since the worker was started. transfer_outgoing_count_total: int - #: Total size of open data transfers to other worker. + #: Current total size of open data transfers to other workers transfer_outgoing_bytes: int - #: Number of open data transfers to other workers. + #: Current number of open data transfers to other workers transfer_outgoing_count: int bandwidth: float latency: float @@ -1708,10 +1708,10 @@ async def get_data( ) msg = {"status": "OK", "data": {k: to_serialize(v) for k, v in data.items()}} - bytes_per_task = { - k: self.state.tasks[k].nbytes for k in data if k in self.state.tasks - } - total_bytes = sum(filter(None, bytes_per_task.values())) + # Note: `if k in self.data` above guarantees that + # k is in self.state.tasks too and that nbytes is non-None + bytes_per_task = {k: self.state.tasks[k].nbytes for k in data} + total_bytes = sum(bytes_per_task.values()) self.transfer_outgoing_bytes += total_bytes stop = time() if self.digests is not None: From 67a01ce7db0ea5916c8659e3ff77312753b8f926 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 31 Aug 2022 19:01:14 +0200 Subject: [PATCH 7/8] Align docstrings --- distributed/worker.py | 8 +++++--- distributed/worker_state_machine.py | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index f53ab940cb5..ca1d1ffa218 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -385,7 +385,7 @@ class Worker(BaseWorker, ServerNode): profile_history: deque[tuple[float, dict[str, Any]]] transfer_incoming_log: deque[dict[str, Any]] transfer_outgoing_log: deque[dict[str, Any]] - #: Total number of data transfers to other workers since the worker was started. + #: Total number of data transfers to other workers since the worker was started transfer_outgoing_count_total: int #: Current total size of open data transfers to other workers transfer_outgoing_bytes: int @@ -1708,9 +1708,11 @@ async def get_data( ) msg = {"status": "OK", "data": {k: to_serialize(v) for k, v in data.items()}} - # Note: `if k in self.data` above guarantees that + # Note: `if k in self.data` above guarantees that # k is in self.state.tasks too and that nbytes is non-None - bytes_per_task = {k: self.state.tasks[k].nbytes for k in data} + bytes_per_task: dict[str, int] = { + k: self.state.tasks[k].nbytes or 0 for k in data + } total_bytes = sum(bytes_per_task.values()) self.transfer_outgoing_bytes += total_bytes stop = time() diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 57d0dad3cf3..fec249c5abb 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -1149,7 +1149,7 @@ class WorkerState: #: dependencies until the current query returns. in_flight_workers: dict[str, set[str]] - #: Total size of open data transfers from other workers. + #: Current total size of open data transfers from other workers transfer_incoming_bytes: int #: Maximum number of concurrent incoming data transfers from other workers. @@ -1364,7 +1364,7 @@ def in_flight_tasks_count(self) -> int: @property def transfer_incoming_count(self) -> int: - """Number of open data transfers from other workers. + """Current number of open data transfers from other workers. See also -------- From d6b3fd7800b1e3638efa61865b83f6ddd27668ef Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 31 Aug 2022 19:10:49 +0200 Subject: [PATCH 8/8] Minor --- distributed/worker.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index ca1d1ffa218..3d4b98da019 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1710,9 +1710,7 @@ async def get_data( msg = {"status": "OK", "data": {k: to_serialize(v) for k, v in data.items()}} # Note: `if k in self.data` above guarantees that # k is in self.state.tasks too and that nbytes is non-None - bytes_per_task: dict[str, int] = { - k: self.state.tasks[k].nbytes or 0 for k in data - } + bytes_per_task = {k: self.state.tasks[k].nbytes or 0 for k in data} total_bytes = sum(bytes_per_task.values()) self.transfer_outgoing_bytes += total_bytes stop = time()