Skip to content

Commit

Permalink
Store perf metrics on scheduler
Browse files Browse the repository at this point in the history
Part of dask#7665
  • Loading branch information
milesgranger committed Mar 23, 2023
1 parent 9bd90c0 commit db061dd
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 0 deletions.
3 changes: 3 additions & 0 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ def stop() -> bool:
# In case crick is not installed, also log cumulative totals (reset at server
# restart) and local maximums (reset by prometheus poll)
self.digests_total = defaultdict(float)
self.digests_total_since_heartbeat = defaultdict(float)
self.digests_max = defaultdict(float)

self.counters = defaultdict(Counter)
Expand Down Expand Up @@ -960,6 +961,8 @@ def digest_metric(self, name: Hashable, value: float) -> None:
self.digests[name].add(value)
# Cumulative data (reset by server restart)
self.digests_total[name] += value
# Cumulative data sent to scheduler and reset on heartbeat
self.digests_total_since_heartbeat[name] += value
# Local maximums (reset by Prometheus poll)
self.digests_max[name] = max(self.digests_max[name], value)

Expand Down
5 changes: 5 additions & 0 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3442,6 +3442,8 @@ def __init__(
self.bandwidth_workers = defaultdict(float)
self.bandwidth_types = defaultdict(float)

self.cumulative_worker_metrics = defaultdict(float)

if not preload:
preload = dask.config.get("distributed.scheduler.preload")
if not preload_argv:
Expand Down Expand Up @@ -4045,6 +4047,9 @@ def heartbeat_worker(
ws.executing[ts] = duration
ts.prefix.add_exec_time(duration)

for name, value in metrics["digests_total_since_heartbeat"].items():
self.cumulative_worker_metrics[name] += value

ws.metrics = metrics

# Calculate RSS - dask keys, separating "old" and "new" usage
Expand Down
23 changes: 23 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2344,6 +2344,29 @@ async def test_idle_timeout_no_workers(c, s):
assert s.check_idle()


@gen_cluster(client=True)
async def test_cumulative_worker_metrics(c, s, a, b):
assert s.cumulative_worker_metrics == dict()

def do_work():
pass

await c.submit(do_work)
await asyncio.sleep(0.1)

await a.heartbeat()
await b.heartbeat()

metrics = s.cumulative_worker_metrics
assert metrics != dict() # Have some data now

# Subset of expected keys
assert "latency" in metrics
assert ("execute", "do_work", "deserialize", "seconds") in metrics

assert all(isinstance(value, float) for value in metrics.values())


@gen_cluster(client=True, config={"distributed.scheduler.bandwidth": "100 GB"})
async def test_bandwidth(c, s, a, b):
start = s.bandwidth
Expand Down
3 changes: 3 additions & 0 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,7 @@ async def get_metrics(self) -> dict:
"workers": dict(self.bandwidth_workers),
"types": keymap(typename, self.bandwidth_types),
},
digests_total_since_heartbeat=dict(self.digests_total_since_heartbeat),
managed_bytes=self.state.nbytes,
spilled_bytes={
"memory": spilled_memory,
Expand All @@ -1058,6 +1059,8 @@ async def get_metrics(self) -> dict:
event_loop_interval=self._tick_interval_observed,
)

self.digests_total_since_heartbeat.clear()

monitor_recent = self.monitor.recent()
# Convert {foo.bar: 123} to {foo: {bar: 123}}
for k, v in monitor_recent.items():
Expand Down

0 comments on commit db061dd

Please sign in to comment.