From 047b082978f7c97e5f31dd3991be7de0935e628f Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Fri, 9 Dec 2022 14:33:43 -0700 Subject: [PATCH] Add `transfer_outgoing_bytes_total` metric (#7388) --- distributed/http/worker/prometheus/core.py | 9 +++++++++ distributed/http/worker/tests/test_worker_http.py | 1 + distributed/worker.py | 4 ++++ 3 files changed, 14 insertions(+) diff --git a/distributed/http/worker/prometheus/core.py b/distributed/http/worker/prometheus/core.py index f590340effb..767f9d0985b 100644 --- a/distributed/http/worker/prometheus/core.py +++ b/distributed/http/worker/prometheus/core.py @@ -112,6 +112,15 @@ def collect(self): value=self.server.transfer_outgoing_count, ) + yield CounterMetricFamily( + self.build_name("transfer_outgoing_bytes_total"), + ( + "Total size of data transfers to other workers " + "since the worker was started (including in-progress and failed transfers)" + ), + value=self.server.transfer_outgoing_bytes_total, + ) + yield CounterMetricFamily( self.build_name("transfer_outgoing_count_total"), ( diff --git a/distributed/http/worker/tests/test_worker_http.py b/distributed/http/worker/tests/test_worker_http.py index f698c9a31ff..645cc91d120 100644 --- a/distributed/http/worker/tests/test_worker_http.py +++ b/distributed/http/worker/tests/test_worker_http.py @@ -36,6 +36,7 @@ async def test_prometheus(c, s, a): "dask_worker_transfer_outgoing_bytes", "dask_worker_transfer_outgoing_count", "dask_worker_transfer_outgoing_count_total", + "dask_worker_transfer_outgoing_bytes_total", "dask_worker_tick_count_total", "dask_worker_tick_duration_maximum_seconds", } diff --git a/distributed/worker.py b/distributed/worker.py index 4242648d950..37e2630e50b 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -394,6 +394,8 @@ class Worker(BaseWorker, ServerNode): transfer_outgoing_log: deque[dict[str, Any]] #: Total number of data transfers to other workers since the worker was started transfer_outgoing_count_total: int + #: Total size of data transfers to other workers (including in-progress and failed transfers) + transfer_outgoing_bytes_total: int #: Current total size of open data transfers to other workers transfer_outgoing_bytes: int #: Current number of open data transfers to other workers @@ -556,6 +558,7 @@ def __init__( self.transfer_incoming_log = deque(maxlen=100000) self.transfer_outgoing_log = deque(maxlen=100000) self.transfer_outgoing_count_total = 0 + self.transfer_outgoing_bytes_total = 0 self.transfer_outgoing_bytes = 0 self.transfer_outgoing_count = 0 self.bandwidth = parse_bytes(dask.config.get("distributed.scheduler.bandwidth")) @@ -1746,6 +1749,7 @@ async def get_data( bytes_per_task = {k: self.state.tasks[k].nbytes or 0 for k in data} total_bytes = sum(bytes_per_task.values()) self.transfer_outgoing_bytes += total_bytes + self.transfer_outgoing_bytes_total += total_bytes stop = time() if self.digests is not None: self.digests["get-data-load-duration"].add(stop - start)