Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose transfer-related metrics in Worker.get_metrics and WorkerMetricCollector #6936

Merged
merged 8 commits into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion distributed/http/worker/prometheus/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ def collect(self):

yield GaugeMetricFamily(
self.build_name("concurrent_fetch_requests"),
"Number of open fetch requests to other workers.",
(
"[Deprecated: This metric has been renamed to transfer_incoming_count.] "
"Number of open fetch requests to other workers."
),
value=self.server.state.transfer_incoming_count,
)

Expand All @@ -53,6 +56,48 @@ def collect(self):
value=self.server.latency,
)

yield GaugeMetricFamily(
self.build_name("transfer_incoming_bytes"),
"Total size of open data transfers from other workers.",
value=self.server.state.transfer_incoming_bytes,
)

yield GaugeMetricFamily(
self.build_name("transfer_incoming_count"),
"Number of open data transfers from other workers.",
value=self.server.state.transfer_incoming_bytes,
)

yield GaugeMetricFamily(
self.build_name("transfer_incoming_count_total"),
(
"Total number of data transfers from other workers "
"since the worker was started."
),
value=self.server.state.transfer_incoming_count_total,
)

yield GaugeMetricFamily(
self.build_name("transfer_outgoing_bytes"),
"Total size of open data transfers to other workers.",
value=self.server.transfer_outgoing_bytes,
)

yield GaugeMetricFamily(
self.build_name("transfer_outgoing_count"),
"Number of open data transfers to other workers.",
value=self.server.transfer_outgoing_count,
)

yield GaugeMetricFamily(
self.build_name("transfer_outgoing_count_total"),
(
"Total number of data transfers to other workers "
"since the worker was started."
),
value=self.server.transfer_outgoing_count_total,
)

# all metrics using digests require crick to be installed
# the following metrics will export NaN, if the corresponding digests are None
if self.crick_available:
Expand Down
6 changes: 6 additions & 0 deletions distributed/http/worker/tests/test_worker_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ async def test_prometheus(c, s, a):
"dask_worker_concurrent_fetch_requests",
"dask_worker_threads",
"dask_worker_latency_seconds",
"dask_worker_transfer_incoming_bytes",
"dask_worker_transfer_incoming_count",
"dask_worker_transfer_incoming_count_total",
"dask_worker_transfer_outgoing_bytes",
"dask_worker_transfer_outgoing_count",
"dask_worker_transfer_outgoing_count_total",
}

try:
Expand Down
87 changes: 87 additions & 0 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,9 @@ def test_aggregate_gather_deps(ws, nbytes, n_in_flight):
assert instructions == [GatherDep.match(worker=ws2, stimulus_id="s1")]
assert len(instructions[0].to_gather) == n_in_flight
assert len(ws.in_flight_tasks) == n_in_flight
assert ws.transfer_incoming_bytes == nbytes * n_in_flight
assert ws.transfer_incoming_count == 1
assert ws.transfer_incoming_count_total == 1


def test_gather_priority(ws):
Expand Down Expand Up @@ -1111,6 +1114,10 @@ def test_gather_priority(ws):
total_nbytes=4 * 2**20,
),
]
expected_bytes = 1 + 4 * 2**20 + 4 * 2**20 + 8 * 2**20 + 4 * 2**20
assert ws.transfer_incoming_bytes == expected_bytes
assert ws.transfer_incoming_count == 4
assert ws.transfer_incoming_count_total == 4


@pytest.mark.parametrize("state", ["executing", "long-running"])
Expand Down Expand Up @@ -1264,3 +1271,83 @@ def test_gather_dep_failure(ws):
]
assert ws.tasks["x"].state == "error"
assert ws.tasks["y"].state == "waiting" # Not ready
assert ws.transfer_incoming_bytes == 0
assert ws.transfer_incoming_count == 0
assert ws.transfer_incoming_count_total == 1


def test_transfer_incoming_metrics(ws):
assert ws.transfer_incoming_bytes == 0
assert ws.transfer_incoming_count == 0
assert ws.transfer_incoming_count_total == 0

ws2 = "127.0.0.1:2"
ws.handle_stimulus(
ComputeTaskEvent.dummy(
"b", who_has={"a": [ws2]}, nbytes={"a": 7}, stimulus_id="s1"
)
)
assert ws.transfer_incoming_bytes == 7
assert ws.transfer_incoming_count == 1
assert ws.transfer_incoming_count_total == 1

ws.handle_stimulus(
GatherDepSuccessEvent(
worker=ws2, data={"a": 123}, total_nbytes=7, stimulus_id="s2"
)
)
assert ws.transfer_incoming_bytes == 0
assert ws.transfer_incoming_count == 0
assert ws.transfer_incoming_count_total == 1

ws.handle_stimulus(
ComputeTaskEvent.dummy(
"e",
who_has={"c": [ws2], "d": [ws2]},
nbytes={"c": 11, "d": 13},
stimulus_id="s2",
)
)
assert ws.transfer_incoming_bytes == 24
assert ws.transfer_incoming_count == 1
assert ws.transfer_incoming_count_total == 2

ws.handle_stimulus(
GatherDepSuccessEvent(
worker=ws2, data={"c": 123, "d": 234}, total_nbytes=24, stimulus_id="s3"
)
)
assert ws.transfer_incoming_bytes == 0
assert ws.transfer_incoming_count == 0
assert ws.transfer_incoming_count_total == 2

ws3 = "127.0.0.1:3"
ws.handle_stimulus(
ComputeTaskEvent.dummy(
"h",
who_has={"f": [ws2], "g": [ws3]},
nbytes={"f": 17, "g": 19},
stimulus_id="s4",
)
)
assert ws.transfer_incoming_bytes == 36
assert ws.transfer_incoming_count == 2
assert ws.transfer_incoming_count_total == 4

ws.handle_stimulus(
GatherDepSuccessEvent(
worker=ws3, data={"g": 345}, total_nbytes=19, stimulus_id="s5"
)
)
assert ws.transfer_incoming_bytes == 17
assert ws.transfer_incoming_count == 1
assert ws.transfer_incoming_count_total == 4

ws.handle_stimulus(
GatherDepSuccessEvent(
worker=ws2, data={"g": 456}, total_nbytes=17, stimulus_id="s6"
)
)
assert ws.transfer_incoming_bytes == 0
assert ws.transfer_incoming_count == 0
assert ws.transfer_incoming_count_total == 4
28 changes: 21 additions & 7 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,11 @@ class Worker(BaseWorker, ServerNode):
profile_history: deque[tuple[float, dict[str, Any]]]
transfer_incoming_log: deque[dict[str, Any]]
transfer_outgoing_log: deque[dict[str, Any]]
#: Number of total data transfers to other workers.
#: Total number of data transfers to other workers since the worker was started
transfer_outgoing_count_total: int
#: Number of open data transfers to other workers.
#: Current total size of open data transfers to other workers
transfer_outgoing_bytes: int
#: Current number of open data transfers to other workers
transfer_outgoing_count: int
bandwidth: float
latency: float
Expand Down Expand Up @@ -543,6 +545,7 @@ def __init__(
self.transfer_incoming_log = deque(maxlen=100000)
self.transfer_outgoing_log = deque(maxlen=100000)
self.transfer_outgoing_count_total = 0
self.transfer_outgoing_bytes = 0
self.transfer_outgoing_count = 0
self.bandwidth = parse_bytes(dask.config.get("distributed.scheduler.bandwidth"))
self.bandwidth_workers = defaultdict(
Expand Down Expand Up @@ -983,6 +986,14 @@ async def get_metrics(self) -> dict:
"memory": spilled_memory,
"disk": spilled_disk,
},
transfer={
"incoming_bytes": self.state.transfer_incoming_bytes,
"incoming_count": self.state.transfer_incoming_count,
"incoming_count_total": self.state.transfer_incoming_count_total,
"outgoing_bytes": self.transfer_outgoing_bytes,
"outgoing_count": self.transfer_outgoing_count,
"outgoing_count_total": self.transfer_outgoing_count_total,
},
event_loop_interval=self._tick_interval_observed,
)
out.update(self.monitor.recent())
Expand Down Expand Up @@ -1684,6 +1695,7 @@ async def get_data(
return {"status": "busy"}

self.transfer_outgoing_count += 1
self.transfer_outgoing_count_total += 1
data = {k: self.data[k] for k in keys if k in self.data}

if len(data) < len(keys):
Expand All @@ -1696,7 +1708,11 @@ async def get_data(
)

msg = {"status": "OK", "data": {k: to_serialize(v) for k, v in data.items()}}
nbytes = {k: self.state.tasks[k].nbytes for k in data if k in self.state.tasks}
# Note: `if k in self.data` above guarantees that
# k is in self.state.tasks too and that nbytes is non-None
bytes_per_task = {k: self.state.tasks[k].nbytes or 0 for k in data}
total_bytes = sum(bytes_per_task.values())
self.transfer_outgoing_bytes += total_bytes
stop = time()
if self.digests is not None:
self.digests["get-data-load-duration"].add(stop - start)
Expand All @@ -1715,14 +1731,12 @@ async def get_data(
comm.abort()
raise
finally:
self.transfer_outgoing_bytes -= total_bytes
self.transfer_outgoing_count -= 1
stop = time()
if self.digests is not None:
self.digests["get-data-send-duration"].add(stop - start)

total_bytes = sum(filter(None, nbytes.values()))

self.transfer_outgoing_count_total += 1
duration = (stop - start) or 0.5 # windows
self.transfer_outgoing_log.append(
{
Expand All @@ -1731,7 +1745,7 @@ async def get_data(
"middle": (start + stop) / 2,
"duration": duration,
"who": who,
"keys": nbytes,
"keys": bytes_per_task,
"total": total_bytes,
"compressed": compressed,
"bandwidth": total_bytes / duration,
Expand Down
12 changes: 6 additions & 6 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1149,14 +1149,14 @@ class WorkerState:
#: dependencies until the current query returns.
in_flight_workers: dict[str, set[str]]

#: The total size of incoming data transfers for in-flight tasks.
#: Current total size of open data transfers from other workers
transfer_incoming_bytes: int

#: The maximum number of concurrent incoming data transfers from other workers.
#: Maximum number of concurrent incoming data transfers from other workers.
#: See also :attr:`distributed.worker.Worker.transfer_outgoing_count_limit`.
transfer_incoming_count_limit: int

#: Number of total data transfers from other workers since the worker was started.
#: Total number of data transfers from other workers since the worker was started.
transfer_incoming_count_total: int

#: Ignore :attr:`transfer_incoming_count_limit` as long as :attr:`transfer_incoming_bytes` is
Expand Down Expand Up @@ -1354,7 +1354,7 @@ def all_running_tasks(self) -> set[TaskState]:

@property
def in_flight_tasks_count(self) -> int:
"""Count of tasks currently being replicated from other workers to this one.
"""Number of tasks currently being replicated from other workers to this one.

See also
--------
Expand All @@ -1364,7 +1364,7 @@ def in_flight_tasks_count(self) -> int:

@property
def transfer_incoming_count(self) -> int:
"""Count of open data transfers from other workers.
"""Current number of open data transfers from other workers.

See also
--------
Expand Down Expand Up @@ -1529,6 +1529,7 @@ def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs:
)

self.in_flight_workers[worker] = to_gather_keys
self.transfer_incoming_count_total += 1
self.transfer_incoming_bytes += total_nbytes
if (
self.transfer_incoming_count >= self.transfer_incoming_count_limit
Expand Down Expand Up @@ -2845,7 +2846,6 @@ def _gather_dep_done_common(self, ev: GatherDepDoneEvent) -> Iterator[TaskState]
_execute_done_common
"""
self.transfer_incoming_bytes -= ev.total_nbytes
self.transfer_incoming_count_total += 1
keys = self.in_flight_workers.pop(ev.worker)
for key in keys:
ts = self.tasks[key]
Expand Down