From 15dce7040bbb43f5fcb412b12caad4d8d5795837 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 31 Aug 2022 10:26:30 +0200 Subject: [PATCH] Adjust total count logic and calculate transfer_outgoing_bytes --- distributed/http/worker/prometheus/core.py | 47 ++++++++++++++++++++-- distributed/worker.py | 27 +++++++++---- distributed/worker_state_machine.py | 12 +++--- 3 files changed, 69 insertions(+), 17 deletions(-) diff --git a/distributed/http/worker/prometheus/core.py b/distributed/http/worker/prometheus/core.py index c5406bd44ee..3268f64857d 100644 --- a/distributed/http/worker/prometheus/core.py +++ b/distributed/http/worker/prometheus/core.py @@ -37,7 +37,10 @@ def collect(self): yield GaugeMetricFamily( self.build_name("concurrent_fetch_requests"), - "Number of open fetch requests to other workers.", + ( + "[Deprecated: This metric has been renamed to transfer_incoming_count.] " + "Number of open fetch requests to other workers." + ), value=self.server.state.transfer_incoming_count, ) @@ -54,9 +57,45 @@ def collect(self): ) yield GaugeMetricFamily( - self.build_name("comm_reserved_bytes"), - "Number of bytes currently reserved for incoming/outgoing data transfers.", - value=self.server.state.comm_nbytes, + self.build_name("transfer_incoming_bytes"), + "Total size of open data transfers from other workers.", + value=self.server.state.transfer_incoming_bytes, + ) + + yield GaugeMetricFamily( + self.build_name("transfer_incoming_count"), + "Number of open data transfers from other workers.", + value=self.server.state.transfer_incoming_bytes, + ) + + yield GaugeMetricFamily( + self.build_name("transfer_incoming_count_total"), + ( + "Total number of data transfers from other workers " + "since the worker was started." + ), + value=self.server.state.transfer_incoming_count_total, + ) + + yield GaugeMetricFamily( + self.build_name("transfer_outgoing_bytes"), + "Total size of open data transfers to other workers.", + value=self.server.transfer_outgoing_bytes, + ) + + yield GaugeMetricFamily( + self.build_name("transfer_outgoing_count"), + "Number of open data transfers to other workers.", + value=self.server.transfer_outgoing_count, + ) + + yield GaugeMetricFamily( + self.build_name("transfer_outgoing_count_total"), + ( + "Total number of data transfers to other workers " + "since the worker was started." + ), + value=self.server.transfer_outgoing_count_total, ) # all metrics using digests require crick to be installed diff --git a/distributed/worker.py b/distributed/worker.py index 0b49aa21ca0..0d777cfc82f 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -385,8 +385,10 @@ class Worker(BaseWorker, ServerNode): profile_history: deque[tuple[float, dict[str, Any]]] transfer_incoming_log: deque[dict[str, Any]] transfer_outgoing_log: deque[dict[str, Any]] - #: Number of total data transfers to other workers. + #: Total number of data transfers to other workers since the worker was started. transfer_outgoing_count_total: int + #: Total size of open data transfers to other worker. + transfer_outgoing_bytes: int #: Number of open data transfers to other workers. transfer_outgoing_count: int bandwidth: float @@ -543,6 +545,7 @@ def __init__( self.transfer_incoming_log = deque(maxlen=100000) self.transfer_outgoing_log = deque(maxlen=100000) self.transfer_outgoing_count_total = 0 + self.transfer_outgoing_bytes = 0 self.transfer_outgoing_count = 0 self.bandwidth = parse_bytes(dask.config.get("distributed.scheduler.bandwidth")) self.bandwidth_workers = defaultdict( @@ -983,7 +986,14 @@ async def get_metrics(self) -> dict: "memory": spilled_memory, "disk": spilled_disk, }, - comm_reserved_bytes=self.state.comm_nbytes, + transfer={ + "incoming_bytes": self.state.transfer_incoming_bytes, + "incoming_count": self.state.transfer_incoming_count, + "incoming_count_total": self.state.transfer_incoming_count_total, + "outgoing_bytes": self.transfer_outgoing_bytes, + "outgoing_count": self.transfer_outgoing_count, + "outgoing_count_total": self.transfer_outgoing_count_total, + }, event_loop_interval=self._tick_interval_observed, ) out.update(self.monitor.recent()) @@ -1685,6 +1695,7 @@ async def get_data( return {"status": "busy"} self.transfer_outgoing_count += 1 + self.transfer_outgoing_count_total += 1 data = {k: self.data[k] for k in keys if k in self.data} if len(data) < len(keys): @@ -1697,7 +1708,11 @@ async def get_data( ) msg = {"status": "OK", "data": {k: to_serialize(v) for k, v in data.items()}} - nbytes = {k: self.state.tasks[k].nbytes for k in data if k in self.state.tasks} + bytes_per_task = { + k: self.state.tasks[k].nbytes for k in data if k in self.state.tasks + } + total_bytes = sum(filter(None, bytes_per_task.values())) + self.transfer_outgoing_bytes += total_bytes stop = time() if self.digests is not None: self.digests["get-data-load-duration"].add(stop - start) @@ -1716,14 +1731,12 @@ async def get_data( comm.abort() raise finally: + self.transfer_outgoing_bytes -= total_bytes self.transfer_outgoing_count -= 1 stop = time() if self.digests is not None: self.digests["get-data-send-duration"].add(stop - start) - total_bytes = sum(filter(None, nbytes.values())) - - self.transfer_outgoing_count_total += 1 duration = (stop - start) or 0.5 # windows self.transfer_outgoing_log.append( { @@ -1732,7 +1745,7 @@ async def get_data( "middle": (start + stop) / 2, "duration": duration, "who": who, - "keys": nbytes, + "keys": bytes_per_task, "total": total_bytes, "compressed": compressed, "bandwidth": total_bytes / duration, diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 431c8d9a8c9..26b213c0b1b 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -1149,14 +1149,14 @@ class WorkerState: #: dependencies until the current query returns. in_flight_workers: dict[str, set[str]] - #: The total size of incoming data transfers for in-flight tasks. + #: Total size of open data transfers from other workers. transfer_incoming_bytes: int - #: The maximum number of concurrent incoming data transfers from other workers. + #: Maximum number of concurrent incoming data transfers from other workers. #: See also :attr:`distributed.worker.Worker.transfer_outgoing_count_limit`. transfer_incoming_count_limit: int - #: Number of total data transfers from other workers since the worker was started. + #: Total number of data transfers from other workers since the worker was started. transfer_incoming_count_total: int #: Ignore :attr:`transfer_incoming_count_limit` as long as :attr:`transfer_incoming_bytes` is @@ -1354,7 +1354,7 @@ def all_running_tasks(self) -> set[TaskState]: @property def in_flight_tasks_count(self) -> int: - """Count of tasks currently being replicated from other workers to this one. + """Number of tasks currently being replicated from other workers to this one. See also -------- @@ -1364,7 +1364,7 @@ def in_flight_tasks_count(self) -> int: @property def transfer_incoming_count(self) -> int: - """Count of open data transfers from other workers. + """Number of open data transfers from other workers. See also -------- @@ -1529,6 +1529,7 @@ def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs: ) self.in_flight_workers[worker] = to_gather_keys + self.transfer_incoming_count_total += 1 self.transfer_incoming_bytes += total_nbytes if ( self.transfer_incoming_count >= self.transfer_incoming_count_limit @@ -2784,7 +2785,6 @@ def _gather_dep_done_common(self, ev: GatherDepDoneEvent) -> Iterator[TaskState] _execute_done_common """ self.transfer_incoming_bytes -= ev.total_nbytes - self.transfer_incoming_count_total += 1 keys = self.in_flight_workers.pop(ev.worker) for key in keys: ts = self.tasks[key]