Skip to content

Commit

Permalink
Add transfer_outgoing_bytes_total metric
Browse files Browse the repository at this point in the history
Closes dask#7123
  • Loading branch information
gjoseph92 committed Dec 9, 2022
1 parent 9647c9e commit 928f91f
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 0 deletions.
9 changes: 9 additions & 0 deletions distributed/http/worker/prometheus/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ def collect(self):
value=self.server.transfer_outgoing_count,
)

yield CounterMetricFamily(
self.build_name("transfer_outgoing_bytes_total"),
(
"Total size of data transferred to other workers "
"since the worker was started"
),
value=self.server.transfer_outgoing_bytes_total,
)

yield CounterMetricFamily(
self.build_name("transfer_outgoing_count_total"),
(
Expand Down
1 change: 1 addition & 0 deletions distributed/http/worker/tests/test_worker_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ async def test_prometheus(c, s, a):
"dask_worker_transfer_outgoing_bytes",
"dask_worker_transfer_outgoing_count",
"dask_worker_transfer_outgoing_count_total",
"dask_worker_transfer_outgoing_bytes_total",
"dask_worker_tick_count_total",
"dask_worker_tick_duration_maximum_seconds",
}
Expand Down
4 changes: 4 additions & 0 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ class Worker(BaseWorker, ServerNode):
transfer_outgoing_log: deque[dict[str, Any]]
#: Total number of data transfers to other workers since the worker was started
transfer_outgoing_count_total: int
#: Total size of data transferred to other workers (including in-progress transfers)
transfer_outgoing_bytes_total: int
#: Current total size of open data transfers to other workers
transfer_outgoing_bytes: int
#: Current number of open data transfers to other workers
Expand Down Expand Up @@ -556,6 +558,7 @@ def __init__(
self.transfer_incoming_log = deque(maxlen=100000)
self.transfer_outgoing_log = deque(maxlen=100000)
self.transfer_outgoing_count_total = 0
self.transfer_outgoing_bytes_total = 0
self.transfer_outgoing_bytes = 0
self.transfer_outgoing_count = 0
self.bandwidth = parse_bytes(dask.config.get("distributed.scheduler.bandwidth"))
Expand Down Expand Up @@ -1746,6 +1749,7 @@ async def get_data(
bytes_per_task = {k: self.state.tasks[k].nbytes or 0 for k in data}
total_bytes = sum(bytes_per_task.values())
self.transfer_outgoing_bytes += total_bytes
self.transfer_outgoing_bytes_total += total_bytes
stop = time()
if self.digests is not None:
self.digests["get-data-load-duration"].add(stop - start)
Expand Down

0 comments on commit 928f91f

Please sign in to comment.