Skip to content

Commit

Permalink
Expose transfer-related metrics in Worker.get_metrics and `WorkerMe…
Browse files Browse the repository at this point in the history
…tricCollector` (#6936)
  • Loading branch information
hendrikmakait authored Aug 31, 2022
1 parent d72fd21 commit bfc5cfe
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 14 deletions.
47 changes: 46 additions & 1 deletion distributed/http/worker/prometheus/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ def collect(self):

yield GaugeMetricFamily(
self.build_name("concurrent_fetch_requests"),
"Number of open fetch requests to other workers.",
(
"[Deprecated: This metric has been renamed to transfer_incoming_count.] "
"Number of open fetch requests to other workers."
),
value=self.server.state.transfer_incoming_count,
)

Expand All @@ -53,6 +56,48 @@ def collect(self):
value=self.server.latency,
)

yield GaugeMetricFamily(
self.build_name("transfer_incoming_bytes"),
"Total size of open data transfers from other workers.",
value=self.server.state.transfer_incoming_bytes,
)

yield GaugeMetricFamily(
self.build_name("transfer_incoming_count"),
"Number of open data transfers from other workers.",
value=self.server.state.transfer_incoming_bytes,
)

yield GaugeMetricFamily(
self.build_name("transfer_incoming_count_total"),
(
"Total number of data transfers from other workers "
"since the worker was started."
),
value=self.server.state.transfer_incoming_count_total,
)

yield GaugeMetricFamily(
self.build_name("transfer_outgoing_bytes"),
"Total size of open data transfers to other workers.",
value=self.server.transfer_outgoing_bytes,
)

yield GaugeMetricFamily(
self.build_name("transfer_outgoing_count"),
"Number of open data transfers to other workers.",
value=self.server.transfer_outgoing_count,
)

yield GaugeMetricFamily(
self.build_name("transfer_outgoing_count_total"),
(
"Total number of data transfers to other workers "
"since the worker was started."
),
value=self.server.transfer_outgoing_count_total,
)

# all metrics using digests require crick to be installed
# the following metrics will export NaN, if the corresponding digests are None
if self.crick_available:
Expand Down
6 changes: 6 additions & 0 deletions distributed/http/worker/tests/test_worker_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ async def test_prometheus(c, s, a):
"dask_worker_concurrent_fetch_requests",
"dask_worker_threads",
"dask_worker_latency_seconds",
"dask_worker_transfer_incoming_bytes",
"dask_worker_transfer_incoming_count",
"dask_worker_transfer_incoming_count_total",
"dask_worker_transfer_outgoing_bytes",
"dask_worker_transfer_outgoing_count",
"dask_worker_transfer_outgoing_count_total",
}

try:
Expand Down
87 changes: 87 additions & 0 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,9 @@ def test_aggregate_gather_deps(ws, nbytes, n_in_flight):
assert instructions == [GatherDep.match(worker=ws2, stimulus_id="s1")]
assert len(instructions[0].to_gather) == n_in_flight
assert len(ws.in_flight_tasks) == n_in_flight
assert ws.transfer_incoming_bytes == nbytes * n_in_flight
assert ws.transfer_incoming_count == 1
assert ws.transfer_incoming_count_total == 1


def test_gather_priority(ws):
Expand Down Expand Up @@ -1111,6 +1114,10 @@ def test_gather_priority(ws):
total_nbytes=4 * 2**20,
),
]
expected_bytes = 1 + 4 * 2**20 + 4 * 2**20 + 8 * 2**20 + 4 * 2**20
assert ws.transfer_incoming_bytes == expected_bytes
assert ws.transfer_incoming_count == 4
assert ws.transfer_incoming_count_total == 4


@pytest.mark.parametrize("state", ["executing", "long-running"])
Expand Down Expand Up @@ -1264,3 +1271,83 @@ def test_gather_dep_failure(ws):
]
assert ws.tasks["x"].state == "error"
assert ws.tasks["y"].state == "waiting" # Not ready
assert ws.transfer_incoming_bytes == 0
assert ws.transfer_incoming_count == 0
assert ws.transfer_incoming_count_total == 1


def test_transfer_incoming_metrics(ws):
assert ws.transfer_incoming_bytes == 0
assert ws.transfer_incoming_count == 0
assert ws.transfer_incoming_count_total == 0

ws2 = "127.0.0.1:2"
ws.handle_stimulus(
ComputeTaskEvent.dummy(
"b", who_has={"a": [ws2]}, nbytes={"a": 7}, stimulus_id="s1"
)
)
assert ws.transfer_incoming_bytes == 7
assert ws.transfer_incoming_count == 1
assert ws.transfer_incoming_count_total == 1

ws.handle_stimulus(
GatherDepSuccessEvent(
worker=ws2, data={"a": 123}, total_nbytes=7, stimulus_id="s2"
)
)
assert ws.transfer_incoming_bytes == 0
assert ws.transfer_incoming_count == 0
assert ws.transfer_incoming_count_total == 1

ws.handle_stimulus(
ComputeTaskEvent.dummy(
"e",
who_has={"c": [ws2], "d": [ws2]},
nbytes={"c": 11, "d": 13},
stimulus_id="s2",
)
)
assert ws.transfer_incoming_bytes == 24
assert ws.transfer_incoming_count == 1
assert ws.transfer_incoming_count_total == 2

ws.handle_stimulus(
GatherDepSuccessEvent(
worker=ws2, data={"c": 123, "d": 234}, total_nbytes=24, stimulus_id="s3"
)
)
assert ws.transfer_incoming_bytes == 0
assert ws.transfer_incoming_count == 0
assert ws.transfer_incoming_count_total == 2

ws3 = "127.0.0.1:3"
ws.handle_stimulus(
ComputeTaskEvent.dummy(
"h",
who_has={"f": [ws2], "g": [ws3]},
nbytes={"f": 17, "g": 19},
stimulus_id="s4",
)
)
assert ws.transfer_incoming_bytes == 36
assert ws.transfer_incoming_count == 2
assert ws.transfer_incoming_count_total == 4

ws.handle_stimulus(
GatherDepSuccessEvent(
worker=ws3, data={"g": 345}, total_nbytes=19, stimulus_id="s5"
)
)
assert ws.transfer_incoming_bytes == 17
assert ws.transfer_incoming_count == 1
assert ws.transfer_incoming_count_total == 4

ws.handle_stimulus(
GatherDepSuccessEvent(
worker=ws2, data={"g": 456}, total_nbytes=17, stimulus_id="s6"
)
)
assert ws.transfer_incoming_bytes == 0
assert ws.transfer_incoming_count == 0
assert ws.transfer_incoming_count_total == 4
28 changes: 21 additions & 7 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,11 @@ class Worker(BaseWorker, ServerNode):
profile_history: deque[tuple[float, dict[str, Any]]]
transfer_incoming_log: deque[dict[str, Any]]
transfer_outgoing_log: deque[dict[str, Any]]
#: Number of total data transfers to other workers.
#: Total number of data transfers to other workers since the worker was started
transfer_outgoing_count_total: int
#: Number of open data transfers to other workers.
#: Current total size of open data transfers to other workers
transfer_outgoing_bytes: int
#: Current number of open data transfers to other workers
transfer_outgoing_count: int
bandwidth: float
latency: float
Expand Down Expand Up @@ -543,6 +545,7 @@ def __init__(
self.transfer_incoming_log = deque(maxlen=100000)
self.transfer_outgoing_log = deque(maxlen=100000)
self.transfer_outgoing_count_total = 0
self.transfer_outgoing_bytes = 0
self.transfer_outgoing_count = 0
self.bandwidth = parse_bytes(dask.config.get("distributed.scheduler.bandwidth"))
self.bandwidth_workers = defaultdict(
Expand Down Expand Up @@ -983,6 +986,14 @@ async def get_metrics(self) -> dict:
"memory": spilled_memory,
"disk": spilled_disk,
},
transfer={
"incoming_bytes": self.state.transfer_incoming_bytes,
"incoming_count": self.state.transfer_incoming_count,
"incoming_count_total": self.state.transfer_incoming_count_total,
"outgoing_bytes": self.transfer_outgoing_bytes,
"outgoing_count": self.transfer_outgoing_count,
"outgoing_count_total": self.transfer_outgoing_count_total,
},
event_loop_interval=self._tick_interval_observed,
)
out.update(self.monitor.recent())
Expand Down Expand Up @@ -1684,6 +1695,7 @@ async def get_data(
return {"status": "busy"}

self.transfer_outgoing_count += 1
self.transfer_outgoing_count_total += 1
data = {k: self.data[k] for k in keys if k in self.data}

if len(data) < len(keys):
Expand All @@ -1696,7 +1708,11 @@ async def get_data(
)

msg = {"status": "OK", "data": {k: to_serialize(v) for k, v in data.items()}}
nbytes = {k: self.state.tasks[k].nbytes for k in data if k in self.state.tasks}
# Note: `if k in self.data` above guarantees that
# k is in self.state.tasks too and that nbytes is non-None
bytes_per_task = {k: self.state.tasks[k].nbytes or 0 for k in data}
total_bytes = sum(bytes_per_task.values())
self.transfer_outgoing_bytes += total_bytes
stop = time()
if self.digests is not None:
self.digests["get-data-load-duration"].add(stop - start)
Expand All @@ -1715,14 +1731,12 @@ async def get_data(
comm.abort()
raise
finally:
self.transfer_outgoing_bytes -= total_bytes
self.transfer_outgoing_count -= 1
stop = time()
if self.digests is not None:
self.digests["get-data-send-duration"].add(stop - start)

total_bytes = sum(filter(None, nbytes.values()))

self.transfer_outgoing_count_total += 1
duration = (stop - start) or 0.5 # windows
self.transfer_outgoing_log.append(
{
Expand All @@ -1731,7 +1745,7 @@ async def get_data(
"middle": (start + stop) / 2,
"duration": duration,
"who": who,
"keys": nbytes,
"keys": bytes_per_task,
"total": total_bytes,
"compressed": compressed,
"bandwidth": total_bytes / duration,
Expand Down
12 changes: 6 additions & 6 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1149,14 +1149,14 @@ class WorkerState:
#: dependencies until the current query returns.
in_flight_workers: dict[str, set[str]]

#: The total size of incoming data transfers for in-flight tasks.
#: Current total size of open data transfers from other workers
transfer_incoming_bytes: int

#: The maximum number of concurrent incoming data transfers from other workers.
#: Maximum number of concurrent incoming data transfers from other workers.
#: See also :attr:`distributed.worker.Worker.transfer_outgoing_count_limit`.
transfer_incoming_count_limit: int

#: Number of total data transfers from other workers since the worker was started.
#: Total number of data transfers from other workers since the worker was started.
transfer_incoming_count_total: int

#: Ignore :attr:`transfer_incoming_count_limit` as long as :attr:`transfer_incoming_bytes` is
Expand Down Expand Up @@ -1354,7 +1354,7 @@ def all_running_tasks(self) -> set[TaskState]:

@property
def in_flight_tasks_count(self) -> int:
"""Count of tasks currently being replicated from other workers to this one.
"""Number of tasks currently being replicated from other workers to this one.
See also
--------
Expand All @@ -1364,7 +1364,7 @@ def in_flight_tasks_count(self) -> int:

@property
def transfer_incoming_count(self) -> int:
"""Count of open data transfers from other workers.
"""Current number of open data transfers from other workers.
See also
--------
Expand Down Expand Up @@ -1529,6 +1529,7 @@ def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs:
)

self.in_flight_workers[worker] = to_gather_keys
self.transfer_incoming_count_total += 1
self.transfer_incoming_bytes += total_nbytes
if (
self.transfer_incoming_count >= self.transfer_incoming_count_limit
Expand Down Expand Up @@ -2845,7 +2846,6 @@ def _gather_dep_done_common(self, ev: GatherDepDoneEvent) -> Iterator[TaskState]
_execute_done_common
"""
self.transfer_incoming_bytes -= ev.total_nbytes
self.transfer_incoming_count_total += 1
keys = self.in_flight_workers.pop(ev.worker)
for key in keys:
ts = self.tasks[key]
Expand Down

0 comments on commit bfc5cfe

Please sign in to comment.