Skip to content

Commit

Permalink
spans/metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Jun 8, 2023
1 parent 36c9121 commit 93bd3c1
Show file tree
Hide file tree
Showing 7 changed files with 386 additions and 139 deletions.
132 changes: 77 additions & 55 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from collections import OrderedDict, defaultdict
from collections.abc import Iterable
from datetime import datetime, timedelta
from itertools import chain
from numbers import Number
from typing import Any, TypeVar

Expand Down Expand Up @@ -3402,7 +3403,7 @@ def handle_selector_chng(attr, old, new):

self.function_selector = MultiChoice(value=[], options=[])
self.function_selector.placeholder = "Select specific functions"
self.unit_selector = Select(title="Unit selection", options=[])
self.unit_selector = Select(title="Unit selection", options=["seconds"])
self.unit_selector.on_change("value", handle_selector_chng)
self.unit_selected = "seconds"
self.task_exec_by_activity_chart = None
Expand All @@ -3418,76 +3419,97 @@ def format(self, unit: str, val: Any) -> str:
formatters = {"bytes": format_bytes, "seconds": format_time}
return formatters.get(unit, str)(val)

def get_metrics(
self,
) -> tuple[list[tuple[str, str, str, float]], list[tuple[str, str, float]]]:
"""Pre-process fine performance metrics
Returns
-------
- 'execute' metrics: [(function, activity, unit, value), ...]
- 'get_data' metrics: [(activity, unit, value), ...]
"""
execute: defaultdict[tuple[str, str, str], float] = defaultdict(float)
get_data = []

for k, v in self.scheduler.cumulative_worker_metrics.items():
if not isinstance(k, tuple):
continue
context, *other, activity, unit = k
assert isinstance(unit, str)

if context == "execute":
span_id, function = other
assert isinstance(function, str)
# Custom metrics can provide any hashable as the label
# Squash all span_ids together
# TODO offer a filter by span_id, like we already do by function
execute[function, str(activity), unit] += v
elif context == "get-data":
assert not other
assert isinstance(activity, str)
get_data.append((activity, unit, v))

# Ignore memory-monitor and gather-dep metrics

return (
[
(function, activity, unit, v)
for (function, activity, unit), v in sorted(execute.items())
],
sorted(get_data),
)

@without_property_validation
@log_errors
def update(self):
items = sorted(
# Custom metrics can be any hashable
(tuple(map(str, k)), v)
for k, v in self.scheduler.cumulative_worker_metrics.items()
if isinstance(k, tuple)
)
execute_metrics, get_data_metrics = self.get_metrics()

activities = {
activity
for (*_, activity, _), _ in items
if activity not in self.task_activities
activity for *_, activity, _, _ in chain(execute_metrics, get_data_metrics)
}
activities.difference_update(self.task_activities)
if activities:
self.substantial_change = True
self.task_activities.extend(activities)

functions = {
func
for (ctx, func, *_), _ in items
if ctx == "execute" and func not in self.task_exec_data["functions"]
}
units = {unit for *_, unit, _ in chain(execute_metrics, get_data_metrics)}
units.difference_update(self.unit_selector.options)
self.unit_selector.options.extend(units)

functions = {function for function, *_ in execute_metrics}
functions.difference_update(self.task_exec_data["functions"])
if functions:
self.substantial_change = True
self.task_exec_data["timestamp"].extend(datetime.now() for _ in functions)
self.function_selector.options.extend(functions)
self.task_exec_data["functions"].extend(functions)

for (context, *parts), val in items:
if context == "get-data":
activity, unit = parts

if unit not in self.unit_selector.options:
# note append doesn't work here
self.unit_selector.options += [unit]

if activity not in self.senddata["activity"]:
self.substantial_change = True
self.senddata["activity"].append(activity)

idx = self.senddata["activity"].index(activity)
while idx >= len(self.senddata[f"{activity}_{unit}"]):
self.senddata[f"{activity}_{unit}"].append(0)
self.senddata[f"{activity}_{unit}_text"].append("")
self.senddata[f"{activity}_{unit}_text"][idx] = self.format(unit, val)
self.senddata[f"{activity}_{unit}"][idx] = val

elif context == "execute":
prefix, activity, unit = parts

if unit not in self.unit_selector.options:
# note append doesn't work here
self.unit_selector.options += [unit]

idx = self.task_exec_data["functions"].index(prefix)

# Some function/activity combos missing, so need to keep columns aligned
for op in self.task_activities:
while len(self.task_exec_data[f"{op}_{unit}"]) != len(
self.task_exec_data["functions"]
):
self.task_exec_data[f"{op}_{unit}"].append(0)
self.task_exec_data[f"{op}_{unit}_text"].append("")

self.task_exec_data[f"{activity}_{unit}"][idx] = val
self.task_exec_data[f"{activity}_{unit}_text"][idx] = self.format(
unit, val
)
for activity, unit, val in get_data_metrics:
if activity not in self.senddata["activity"]:
self.substantial_change = True
self.senddata["activity"].append(activity)

idx = self.senddata["activity"].index(activity)
while idx >= len(self.senddata[f"{activity}_{unit}"]):
self.senddata[f"{activity}_{unit}"].append(0)
self.senddata[f"{activity}_{unit}_text"].append("")
self.senddata[f"{activity}_{unit}_text"][idx] = self.format(unit, val)
self.senddata[f"{activity}_{unit}"][idx] = val

for prefix, activity, unit, val in execute_metrics:
idx = self.task_exec_data["functions"].index(prefix)

# Some function/activity combos missing, so need to keep columns aligned
for op in self.task_activities:
while len(self.task_exec_data[f"{op}_{unit}"]) != len(
self.task_exec_data["functions"]
):
self.task_exec_data[f"{op}_{unit}"].append(0)
self.task_exec_data[f"{op}_{unit}_text"].append("")

self.task_exec_data[f"{activity}_{unit}"][idx] = val
self.task_exec_data[f"{activity}_{unit}_text"][idx] = self.format(unit, val)

data = self.task_exec_data.copy()
# If user has manually selected function(s) then we are only showing them.
Expand Down
6 changes: 3 additions & 3 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
from distributed.security import Security
from distributed.semaphore import SemaphoreExtension
from distributed.shuffle import ShuffleSchedulerExtension
from distributed.spans import SpansExtension
from distributed.spans import SpansSchedulerExtension
from distributed.stealing import WorkStealing
from distributed.utils import (
All,
Expand Down Expand Up @@ -171,7 +171,7 @@
"amm": ActiveMemoryManagerExtension,
"memory_sampler": MemorySamplerExtension,
"shuffle": ShuffleSchedulerExtension,
"spans": SpansExtension,
"spans": SpansSchedulerExtension,
"stealing": WorkStealing,
}

Expand Down Expand Up @@ -4448,7 +4448,7 @@ def update_graph(
recommendations[ts.key] = "erred"
break

spans_ext: SpansExtension | None = self.extensions.get("spans")
spans_ext: SpansSchedulerExtension | None = self.extensions.get("spans")
if spans_ext:
# new_tasks does not necessarily contain all runnable tasks;
# _generate_taskstates is not the only thing that calls new_task(). A
Expand Down
72 changes: 69 additions & 3 deletions distributed/spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from distributed.metrics import time

if TYPE_CHECKING:
from distributed import Scheduler
from distributed.scheduler import TaskGroup, TaskState, TaskStateState
from distributed import Scheduler, Worker
from distributed.scheduler import TaskGroup, TaskState, TaskStateState, WorkerState


@contextmanager
Expand Down Expand Up @@ -124,6 +124,8 @@ class Span:
#: stop
enqueued: float

_cumulative_worker_metrics: defaultdict[tuple[str, ...], float]

# Support for weakrefs to a class with __slots__
__weakref__: Any

Expand All @@ -136,6 +138,7 @@ def __init__(self, name: tuple[str, ...], id_: str, parent: Span | None):
self.enqueued = time()
self.children = []
self.groups = set()
self._cumulative_worker_metrics = defaultdict(float)

def __repr__(self) -> str:
return f"Span<name={self.name}, id={self.id}>"
Expand Down Expand Up @@ -262,8 +265,25 @@ def nbytes_total(self) -> int:
"""
return sum(tg.nbytes_total for tg in self.traverse_groups())

@property
def cumulative_worker_metrics(self) -> defaultdict[tuple[str, ...], float]:
"""Replica of Worker.digests_total and Scheduler.cumulative_worker_metrics, but
only for the metrics that can be attributed to the current span tree.
The span_id has been removed from the key.
At the moment of writing, all keys are
``("execute", <task prefix>, <activity>, <unit>)``
but more may be added in the future with a different format; please test for
``k[0] == "execute"``.
"""
out: defaultdict[tuple[str, ...], float] = defaultdict(float)
for child in self.traverse_spans():
for k, v in child._cumulative_worker_metrics.items():
out[k] += v
return out


class SpansExtension:
class SpansSchedulerExtension:
"""Scheduler extension for spans support"""

#: All Span objects by id
Expand Down Expand Up @@ -358,3 +378,49 @@ def _ensure_span(self, name: tuple[str, ...], ids: tuple[str, ...]) -> Span:
self.root_spans.append(span)

return span

def heartbeat(
self, ws: WorkerState, data: dict[str, dict[tuple[str, ...], float]]
) -> None:
"""Triggered by SpansWorkerExtension.heartbeat().
Populate :meth:`Span.cumulative_worker_metrics` with data from the worker.
See also
--------
SpansWorkerExtension.heartbeat
Span.cumulative_worker_metrics
"""
for span_id, metrics in data.items():
span = self.spans[span_id]
for k, v in metrics.items():
span._cumulative_worker_metrics[k] += v


class SpansWorkerExtension:
"""Worker extension for spans support"""

worker: Worker

def __init__(self, worker: Worker):
self.worker = worker

def heartbeat(self) -> dict[str, dict[tuple[str, ...], float]]:
"""Apportion the metrics that do have a span to the Spans on the scheduler
Returns
-------
``{span_id: {("execute", prefix, activity, unit): value}}``
See also
--------
SpansSchedulerExtension.heartbeat
Span.cumulative_worker_metrics
"""
out: defaultdict[str, dict[tuple[str, ...], float]] = defaultdict(dict)
for k, v in self.worker.digests_total_since_heartbeat.items():
if isinstance(k, tuple) and k[0] == "execute":
_, span_id, prefix, activity, unit = k
assert span_id is not None
out[span_id]["execute", prefix, activity, unit] = v
return dict(out)
Loading

0 comments on commit 93bd3c1

Please sign in to comment.