Skip to content

Commit

Permalink
Adjust total count logic and calculate transfer_outgoing_bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait committed Aug 31, 2022
1 parent 8ae02e1 commit 15dce70
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 17 deletions.
47 changes: 43 additions & 4 deletions distributed/http/worker/prometheus/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ def collect(self):

yield GaugeMetricFamily(
self.build_name("concurrent_fetch_requests"),
"Number of open fetch requests to other workers.",
(
"[Deprecated: This metric has been renamed to transfer_incoming_count.] "
"Number of open fetch requests to other workers."
),
value=self.server.state.transfer_incoming_count,
)

Expand All @@ -54,9 +57,45 @@ def collect(self):
)

yield GaugeMetricFamily(
self.build_name("comm_reserved_bytes"),
"Number of bytes currently reserved for incoming/outgoing data transfers.",
value=self.server.state.comm_nbytes,
self.build_name("transfer_incoming_bytes"),
"Total size of open data transfers from other workers.",
value=self.server.state.transfer_incoming_bytes,
)

yield GaugeMetricFamily(
self.build_name("transfer_incoming_count"),
"Number of open data transfers from other workers.",
value=self.server.state.transfer_incoming_bytes,
)

yield GaugeMetricFamily(
self.build_name("transfer_incoming_count_total"),
(
"Total number of data transfers from other workers "
"since the worker was started."
),
value=self.server.state.transfer_incoming_count_total,
)

yield GaugeMetricFamily(
self.build_name("transfer_outgoing_bytes"),
"Total size of open data transfers to other workers.",
value=self.server.transfer_outgoing_bytes,
)

yield GaugeMetricFamily(
self.build_name("transfer_outgoing_count"),
"Number of open data transfers to other workers.",
value=self.server.transfer_outgoing_count,
)

yield GaugeMetricFamily(
self.build_name("transfer_outgoing_count_total"),
(
"Total number of data transfers to other workers "
"since the worker was started."
),
value=self.server.transfer_outgoing_count_total,
)

# all metrics using digests require crick to be installed
Expand Down
27 changes: 20 additions & 7 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,10 @@ class Worker(BaseWorker, ServerNode):
profile_history: deque[tuple[float, dict[str, Any]]]
transfer_incoming_log: deque[dict[str, Any]]
transfer_outgoing_log: deque[dict[str, Any]]
#: Number of total data transfers to other workers.
#: Total number of data transfers to other workers since the worker was started.
transfer_outgoing_count_total: int
#: Total size of open data transfers to other worker.
transfer_outgoing_bytes: int
#: Number of open data transfers to other workers.
transfer_outgoing_count: int
bandwidth: float
Expand Down Expand Up @@ -543,6 +545,7 @@ def __init__(
self.transfer_incoming_log = deque(maxlen=100000)
self.transfer_outgoing_log = deque(maxlen=100000)
self.transfer_outgoing_count_total = 0
self.transfer_outgoing_bytes = 0
self.transfer_outgoing_count = 0
self.bandwidth = parse_bytes(dask.config.get("distributed.scheduler.bandwidth"))
self.bandwidth_workers = defaultdict(
Expand Down Expand Up @@ -983,7 +986,14 @@ async def get_metrics(self) -> dict:
"memory": spilled_memory,
"disk": spilled_disk,
},
comm_reserved_bytes=self.state.comm_nbytes,
transfer={
"incoming_bytes": self.state.transfer_incoming_bytes,
"incoming_count": self.state.transfer_incoming_count,
"incoming_count_total": self.state.transfer_incoming_count_total,
"outgoing_bytes": self.transfer_outgoing_bytes,
"outgoing_count": self.transfer_outgoing_count,
"outgoing_count_total": self.transfer_outgoing_count_total,
},
event_loop_interval=self._tick_interval_observed,
)
out.update(self.monitor.recent())
Expand Down Expand Up @@ -1685,6 +1695,7 @@ async def get_data(
return {"status": "busy"}

self.transfer_outgoing_count += 1
self.transfer_outgoing_count_total += 1
data = {k: self.data[k] for k in keys if k in self.data}

if len(data) < len(keys):
Expand All @@ -1697,7 +1708,11 @@ async def get_data(
)

msg = {"status": "OK", "data": {k: to_serialize(v) for k, v in data.items()}}
nbytes = {k: self.state.tasks[k].nbytes for k in data if k in self.state.tasks}
bytes_per_task = {
k: self.state.tasks[k].nbytes for k in data if k in self.state.tasks
}
total_bytes = sum(filter(None, bytes_per_task.values()))
self.transfer_outgoing_bytes += total_bytes
stop = time()
if self.digests is not None:
self.digests["get-data-load-duration"].add(stop - start)
Expand All @@ -1716,14 +1731,12 @@ async def get_data(
comm.abort()
raise
finally:
self.transfer_outgoing_bytes -= total_bytes
self.transfer_outgoing_count -= 1
stop = time()
if self.digests is not None:
self.digests["get-data-send-duration"].add(stop - start)

total_bytes = sum(filter(None, nbytes.values()))

self.transfer_outgoing_count_total += 1
duration = (stop - start) or 0.5 # windows
self.transfer_outgoing_log.append(
{
Expand All @@ -1732,7 +1745,7 @@ async def get_data(
"middle": (start + stop) / 2,
"duration": duration,
"who": who,
"keys": nbytes,
"keys": bytes_per_task,
"total": total_bytes,
"compressed": compressed,
"bandwidth": total_bytes / duration,
Expand Down
12 changes: 6 additions & 6 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1149,14 +1149,14 @@ class WorkerState:
#: dependencies until the current query returns.
in_flight_workers: dict[str, set[str]]

#: The total size of incoming data transfers for in-flight tasks.
#: Total size of open data transfers from other workers.
transfer_incoming_bytes: int

#: The maximum number of concurrent incoming data transfers from other workers.
#: Maximum number of concurrent incoming data transfers from other workers.
#: See also :attr:`distributed.worker.Worker.transfer_outgoing_count_limit`.
transfer_incoming_count_limit: int

#: Number of total data transfers from other workers since the worker was started.
#: Total number of data transfers from other workers since the worker was started.
transfer_incoming_count_total: int

#: Ignore :attr:`transfer_incoming_count_limit` as long as :attr:`transfer_incoming_bytes` is
Expand Down Expand Up @@ -1354,7 +1354,7 @@ def all_running_tasks(self) -> set[TaskState]:

@property
def in_flight_tasks_count(self) -> int:
"""Count of tasks currently being replicated from other workers to this one.
"""Number of tasks currently being replicated from other workers to this one.
See also
--------
Expand All @@ -1364,7 +1364,7 @@ def in_flight_tasks_count(self) -> int:

@property
def transfer_incoming_count(self) -> int:
"""Count of open data transfers from other workers.
"""Number of open data transfers from other workers.
See also
--------
Expand Down Expand Up @@ -1529,6 +1529,7 @@ def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs:
)

self.in_flight_workers[worker] = to_gather_keys
self.transfer_incoming_count_total += 1
self.transfer_incoming_bytes += total_nbytes
if (
self.transfer_incoming_count >= self.transfer_incoming_count_limit
Expand Down Expand Up @@ -2784,7 +2785,6 @@ def _gather_dep_done_common(self, ev: GatherDepDoneEvent) -> Iterator[TaskState]
_execute_done_common
"""
self.transfer_incoming_bytes -= ev.total_nbytes
self.transfer_incoming_count_total += 1
keys = self.in_flight_workers.pop(ev.worker)
for key in keys:
ts = self.tasks[key]
Expand Down

0 comments on commit 15dce70

Please sign in to comment.