From 6a8708c6821356674539127e59cd3bcccdb22f9c Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 31 Aug 2022 20:28:06 +0200 Subject: [PATCH] Expose transfer-related metrics in `Worker.get_metrics` and `WorkerMetricCollector` (#6936) --- distributed/http/worker/prometheus/core.py | 47 +++++++++- .../http/worker/tests/test_worker_http.py | 6 ++ .../tests/test_worker_state_machine.py | 87 +++++++++++++++++++ distributed/worker.py | 28 ++++-- distributed/worker_state_machine.py | 12 +-- 5 files changed, 166 insertions(+), 14 deletions(-) diff --git a/distributed/http/worker/prometheus/core.py b/distributed/http/worker/prometheus/core.py index 8e40957bf83..3268f64857d 100644 --- a/distributed/http/worker/prometheus/core.py +++ b/distributed/http/worker/prometheus/core.py @@ -37,7 +37,10 @@ def collect(self): yield GaugeMetricFamily( self.build_name("concurrent_fetch_requests"), - "Number of open fetch requests to other workers.", + ( + "[Deprecated: This metric has been renamed to transfer_incoming_count.] " + "Number of open fetch requests to other workers." + ), value=self.server.state.transfer_incoming_count, ) @@ -53,6 +56,48 @@ def collect(self): value=self.server.latency, ) + yield GaugeMetricFamily( + self.build_name("transfer_incoming_bytes"), + "Total size of open data transfers from other workers.", + value=self.server.state.transfer_incoming_bytes, + ) + + yield GaugeMetricFamily( + self.build_name("transfer_incoming_count"), + "Number of open data transfers from other workers.", + value=self.server.state.transfer_incoming_bytes, + ) + + yield GaugeMetricFamily( + self.build_name("transfer_incoming_count_total"), + ( + "Total number of data transfers from other workers " + "since the worker was started." + ), + value=self.server.state.transfer_incoming_count_total, + ) + + yield GaugeMetricFamily( + self.build_name("transfer_outgoing_bytes"), + "Total size of open data transfers to other workers.", + value=self.server.transfer_outgoing_bytes, + ) + + yield GaugeMetricFamily( + self.build_name("transfer_outgoing_count"), + "Number of open data transfers to other workers.", + value=self.server.transfer_outgoing_count, + ) + + yield GaugeMetricFamily( + self.build_name("transfer_outgoing_count_total"), + ( + "Total number of data transfers to other workers " + "since the worker was started." + ), + value=self.server.transfer_outgoing_count_total, + ) + # all metrics using digests require crick to be installed # the following metrics will export NaN, if the corresponding digests are None if self.crick_available: diff --git a/distributed/http/worker/tests/test_worker_http.py b/distributed/http/worker/tests/test_worker_http.py index 02d3d4dba59..6d0c424b8c8 100644 --- a/distributed/http/worker/tests/test_worker_http.py +++ b/distributed/http/worker/tests/test_worker_http.py @@ -21,6 +21,12 @@ async def test_prometheus(c, s, a): "dask_worker_concurrent_fetch_requests", "dask_worker_threads", "dask_worker_latency_seconds", + "dask_worker_transfer_incoming_bytes", + "dask_worker_transfer_incoming_count", + "dask_worker_transfer_incoming_count_total", + "dask_worker_transfer_outgoing_bytes", + "dask_worker_transfer_outgoing_count", + "dask_worker_transfer_outgoing_count_total", } try: diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index a1db60cc5e7..f2c8372c7f8 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -1030,6 +1030,9 @@ def test_aggregate_gather_deps(ws, nbytes, n_in_flight): assert instructions == [GatherDep.match(worker=ws2, stimulus_id="s1")] assert len(instructions[0].to_gather) == n_in_flight assert len(ws.in_flight_tasks) == n_in_flight + assert ws.transfer_incoming_bytes == nbytes * n_in_flight + assert ws.transfer_incoming_count == 1 + assert ws.transfer_incoming_count_total == 1 def test_gather_priority(ws): @@ -1111,6 +1114,10 @@ def test_gather_priority(ws): total_nbytes=4 * 2**20, ), ] + expected_bytes = 1 + 4 * 2**20 + 4 * 2**20 + 8 * 2**20 + 4 * 2**20 + assert ws.transfer_incoming_bytes == expected_bytes + assert ws.transfer_incoming_count == 4 + assert ws.transfer_incoming_count_total == 4 @pytest.mark.parametrize("state", ["executing", "long-running"]) @@ -1264,3 +1271,83 @@ def test_gather_dep_failure(ws): ] assert ws.tasks["x"].state == "error" assert ws.tasks["y"].state == "waiting" # Not ready + assert ws.transfer_incoming_bytes == 0 + assert ws.transfer_incoming_count == 0 + assert ws.transfer_incoming_count_total == 1 + + +def test_transfer_incoming_metrics(ws): + assert ws.transfer_incoming_bytes == 0 + assert ws.transfer_incoming_count == 0 + assert ws.transfer_incoming_count_total == 0 + + ws2 = "127.0.0.1:2" + ws.handle_stimulus( + ComputeTaskEvent.dummy( + "b", who_has={"a": [ws2]}, nbytes={"a": 7}, stimulus_id="s1" + ) + ) + assert ws.transfer_incoming_bytes == 7 + assert ws.transfer_incoming_count == 1 + assert ws.transfer_incoming_count_total == 1 + + ws.handle_stimulus( + GatherDepSuccessEvent( + worker=ws2, data={"a": 123}, total_nbytes=7, stimulus_id="s2" + ) + ) + assert ws.transfer_incoming_bytes == 0 + assert ws.transfer_incoming_count == 0 + assert ws.transfer_incoming_count_total == 1 + + ws.handle_stimulus( + ComputeTaskEvent.dummy( + "e", + who_has={"c": [ws2], "d": [ws2]}, + nbytes={"c": 11, "d": 13}, + stimulus_id="s2", + ) + ) + assert ws.transfer_incoming_bytes == 24 + assert ws.transfer_incoming_count == 1 + assert ws.transfer_incoming_count_total == 2 + + ws.handle_stimulus( + GatherDepSuccessEvent( + worker=ws2, data={"c": 123, "d": 234}, total_nbytes=24, stimulus_id="s3" + ) + ) + assert ws.transfer_incoming_bytes == 0 + assert ws.transfer_incoming_count == 0 + assert ws.transfer_incoming_count_total == 2 + + ws3 = "127.0.0.1:3" + ws.handle_stimulus( + ComputeTaskEvent.dummy( + "h", + who_has={"f": [ws2], "g": [ws3]}, + nbytes={"f": 17, "g": 19}, + stimulus_id="s4", + ) + ) + assert ws.transfer_incoming_bytes == 36 + assert ws.transfer_incoming_count == 2 + assert ws.transfer_incoming_count_total == 4 + + ws.handle_stimulus( + GatherDepSuccessEvent( + worker=ws3, data={"g": 345}, total_nbytes=19, stimulus_id="s5" + ) + ) + assert ws.transfer_incoming_bytes == 17 + assert ws.transfer_incoming_count == 1 + assert ws.transfer_incoming_count_total == 4 + + ws.handle_stimulus( + GatherDepSuccessEvent( + worker=ws2, data={"g": 456}, total_nbytes=17, stimulus_id="s6" + ) + ) + assert ws.transfer_incoming_bytes == 0 + assert ws.transfer_incoming_count == 0 + assert ws.transfer_incoming_count_total == 4 diff --git a/distributed/worker.py b/distributed/worker.py index b863f319d7f..3d4b98da019 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -385,9 +385,11 @@ class Worker(BaseWorker, ServerNode): profile_history: deque[tuple[float, dict[str, Any]]] transfer_incoming_log: deque[dict[str, Any]] transfer_outgoing_log: deque[dict[str, Any]] - #: Number of total data transfers to other workers. + #: Total number of data transfers to other workers since the worker was started transfer_outgoing_count_total: int - #: Number of open data transfers to other workers. + #: Current total size of open data transfers to other workers + transfer_outgoing_bytes: int + #: Current number of open data transfers to other workers transfer_outgoing_count: int bandwidth: float latency: float @@ -543,6 +545,7 @@ def __init__( self.transfer_incoming_log = deque(maxlen=100000) self.transfer_outgoing_log = deque(maxlen=100000) self.transfer_outgoing_count_total = 0 + self.transfer_outgoing_bytes = 0 self.transfer_outgoing_count = 0 self.bandwidth = parse_bytes(dask.config.get("distributed.scheduler.bandwidth")) self.bandwidth_workers = defaultdict( @@ -983,6 +986,14 @@ async def get_metrics(self) -> dict: "memory": spilled_memory, "disk": spilled_disk, }, + transfer={ + "incoming_bytes": self.state.transfer_incoming_bytes, + "incoming_count": self.state.transfer_incoming_count, + "incoming_count_total": self.state.transfer_incoming_count_total, + "outgoing_bytes": self.transfer_outgoing_bytes, + "outgoing_count": self.transfer_outgoing_count, + "outgoing_count_total": self.transfer_outgoing_count_total, + }, event_loop_interval=self._tick_interval_observed, ) out.update(self.monitor.recent()) @@ -1684,6 +1695,7 @@ async def get_data( return {"status": "busy"} self.transfer_outgoing_count += 1 + self.transfer_outgoing_count_total += 1 data = {k: self.data[k] for k in keys if k in self.data} if len(data) < len(keys): @@ -1696,7 +1708,11 @@ async def get_data( ) msg = {"status": "OK", "data": {k: to_serialize(v) for k, v in data.items()}} - nbytes = {k: self.state.tasks[k].nbytes for k in data if k in self.state.tasks} + # Note: `if k in self.data` above guarantees that + # k is in self.state.tasks too and that nbytes is non-None + bytes_per_task = {k: self.state.tasks[k].nbytes or 0 for k in data} + total_bytes = sum(bytes_per_task.values()) + self.transfer_outgoing_bytes += total_bytes stop = time() if self.digests is not None: self.digests["get-data-load-duration"].add(stop - start) @@ -1715,14 +1731,12 @@ async def get_data( comm.abort() raise finally: + self.transfer_outgoing_bytes -= total_bytes self.transfer_outgoing_count -= 1 stop = time() if self.digests is not None: self.digests["get-data-send-duration"].add(stop - start) - total_bytes = sum(filter(None, nbytes.values())) - - self.transfer_outgoing_count_total += 1 duration = (stop - start) or 0.5 # windows self.transfer_outgoing_log.append( { @@ -1731,7 +1745,7 @@ async def get_data( "middle": (start + stop) / 2, "duration": duration, "who": who, - "keys": nbytes, + "keys": bytes_per_task, "total": total_bytes, "compressed": compressed, "bandwidth": total_bytes / duration, diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index cbaefa7e72a..fec249c5abb 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -1149,14 +1149,14 @@ class WorkerState: #: dependencies until the current query returns. in_flight_workers: dict[str, set[str]] - #: The total size of incoming data transfers for in-flight tasks. + #: Current total size of open data transfers from other workers transfer_incoming_bytes: int - #: The maximum number of concurrent incoming data transfers from other workers. + #: Maximum number of concurrent incoming data transfers from other workers. #: See also :attr:`distributed.worker.Worker.transfer_outgoing_count_limit`. transfer_incoming_count_limit: int - #: Number of total data transfers from other workers since the worker was started. + #: Total number of data transfers from other workers since the worker was started. transfer_incoming_count_total: int #: Ignore :attr:`transfer_incoming_count_limit` as long as :attr:`transfer_incoming_bytes` is @@ -1354,7 +1354,7 @@ def all_running_tasks(self) -> set[TaskState]: @property def in_flight_tasks_count(self) -> int: - """Count of tasks currently being replicated from other workers to this one. + """Number of tasks currently being replicated from other workers to this one. See also -------- @@ -1364,7 +1364,7 @@ def in_flight_tasks_count(self) -> int: @property def transfer_incoming_count(self) -> int: - """Count of open data transfers from other workers. + """Current number of open data transfers from other workers. See also -------- @@ -1529,6 +1529,7 @@ def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs: ) self.in_flight_workers[worker] = to_gather_keys + self.transfer_incoming_count_total += 1 self.transfer_incoming_bytes += total_nbytes if ( self.transfer_incoming_count >= self.transfer_incoming_count_limit @@ -2845,7 +2846,6 @@ def _gather_dep_done_common(self, ev: GatherDepDoneEvent) -> Iterator[TaskState] _execute_done_common """ self.transfer_incoming_bytes -= ev.total_nbytes - self.transfer_incoming_count_total += 1 keys = self.in_flight_workers.pop(ev.worker) for key in keys: ts = self.tasks[key]