Skip to content

Commit

Permalink
Cluster dump utilities (#5920)
Browse files Browse the repository at this point in the history
  • Loading branch information
sjperkins authored Mar 23, 2022
1 parent 4ddb006 commit b872d45
Show file tree
Hide file tree
Showing 6 changed files with 487 additions and 34 deletions.
271 changes: 270 additions & 1 deletion distributed/cluster_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@

from __future__ import annotations

from typing import IO, Any, Awaitable, Callable, Literal
from collections import defaultdict
from collections.abc import Mapping
from pathlib import Path
from typing import IO, Any, Awaitable, Callable, Collection, Literal

import fsspec
import msgpack

from distributed.compatibility import to_thread
from distributed.stories import scheduler_story as _scheduler_story
from distributed.stories import worker_story as _worker_story


def _tuple_to_list(node):
Expand Down Expand Up @@ -57,3 +62,267 @@ def writer(state: dict, f: IO):
# Write from a thread so we don't block the event loop quite as badly
# (the writer will still hold the GIL a lot though).
await to_thread(writer, state, f)


def load_cluster_dump(url: str, **kwargs) -> dict:
"""Loads a cluster dump from a disk artefact
Parameters
----------
url : str
Name of the disk artefact. This should have either a
``.msgpack.gz`` or ``yaml`` suffix, depending on the dump format.
**kwargs :
Extra arguments passed to :func:`fsspec.open`.
Returns
-------
state : dict
The cluster state at the time of the dump.
"""
if url.endswith(".msgpack.gz"):
mode = "rb"
reader = msgpack.unpack
elif url.endswith(".yaml"):
import yaml

mode = "r"
reader = yaml.safe_load
else:
raise ValueError(f"url ({url}) must have a .msgpack.gz or .yaml suffix")

kwargs.setdefault("compression", "infer")

with fsspec.open(url, mode, **kwargs) as f:
return reader(f)


class DumpArtefact(Mapping):
"""
Utility class for inspecting the state of a cluster dump
.. code-block:: python
dump = DumpArtefact.from_url("dump.msgpack.gz")
memory_tasks = dump.scheduler_tasks("memory")
executing_tasks = dump.worker_tasks("executing")
"""

def __init__(self, state: dict):
self.dump = state

@classmethod
def from_url(cls, url: str, **kwargs) -> DumpArtefact:
"""Loads a cluster dump from a disk artefact
Parameters
----------
url : str
Name of the disk artefact. This should have either a
``.msgpack.gz`` or ``yaml`` suffix, depending on the dump format.
**kwargs :
Extra arguments passed to :func:`fsspec.open`.
Returns
-------
state : dict
The cluster state at the time of the dump.
"""
return DumpArtefact(load_cluster_dump(url, **kwargs))

def __getitem__(self, key):
return self.dump[key]

def __iter__(self):
return iter(self.dump)

def __len__(self):
return len(self.dump)

def _extract_tasks(self, state: str | None, context: dict):
if state:
return [v for v in context.values() if v["state"] == state]
else:
return list(context.values())

def scheduler_tasks_in_state(self, state: str | None = None) -> list:
"""
Parameters
----------
state : optional, str
If provided, only tasks in the given state are returned.
Otherwise, all tasks are returned.
Returns
-------
tasks : list
The list of scheduler tasks in ``state``.
"""
return self._extract_tasks(state, self.dump["scheduler"]["tasks"])

def worker_tasks_in_state(self, state: str | None = None) -> list:
"""
Parameters
----------
state : optional, str
If provided, only tasks in the given state are returned.
Otherwise, all tasks are returned.
Returns
-------
tasks : list
The list of worker tasks in ``state``
"""
tasks = []

for worker_dump in self.dump["workers"].values():
if isinstance(worker_dump, dict) and "tasks" in worker_dump:
tasks.extend(self._extract_tasks(state, worker_dump["tasks"]))

return tasks

def scheduler_story(self, *key_or_stimulus_id: str) -> dict:
"""
Returns
-------
stories : dict
A list of stories for the keys/stimulus ID's in ``*key_or_stimulus_id``.
"""
stories = defaultdict(list)

log = self.dump["scheduler"]["transition_log"]
keys = set(key_or_stimulus_id)

for story in _scheduler_story(keys, log):
stories[story[0]].append(tuple(story))

return dict(stories)

def worker_story(self, *key_or_stimulus_id: str) -> dict:
"""
Returns
-------
stories : dict
A dict of stories for the keys/stimulus ID's in ``*key_or_stimulus_id`.`
"""
keys = set(key_or_stimulus_id)
stories = defaultdict(list)

for worker_dump in self.dump["workers"].values():
if isinstance(worker_dump, dict) and "log" in worker_dump:
for story in _worker_story(keys, worker_dump["log"]):
stories[story[0]].append(tuple(story))

return dict(stories)

def missing_workers(self) -> list:
"""
Returns
-------
missing : list
A list of workers connected to the scheduler, but which
did not respond to requests for a state dump.
"""
scheduler_workers = self.dump["scheduler"]["workers"]
responsive_workers = self.dump["workers"]
return [
w
for w in scheduler_workers
if w not in responsive_workers
or not isinstance(responsive_workers[w], dict)
]

def _compact_state(self, state: dict, expand_keys: set[str]):
"""Compacts ``state`` keys into a general key,
unless the key is in ``expand_keys``"""
assert "general" not in state
result = {}
general = {}

for k, v in state.items():
if k in expand_keys:
result[k] = v
else:
general[k] = v

result["general"] = general
return result

def to_yamls(
self,
root_dir: str | Path | None = None,
worker_expand_keys: Collection[str] = ("config", "log", "logs", "tasks"),
scheduler_expand_keys: Collection[str] = (
"events",
"extensions",
"log",
"task_groups",
"tasks",
"transition_log",
"workers",
),
):
"""
Splits the Dump Artefact into a tree of yaml files with
``root_dir`` as it's base.
The root level of the tree contains a directory for the scheduler
and directories for each individual worker.
Each directory contains yaml files describing the state of the scheduler
or worker when the artefact was created.
In general, keys associated with the state are compacted into a ``general.yaml``
file, unless they are in ``scheduler_expand_keys`` and ``worker_expand_keys``.
Parameters
----------
root_dir : str or Path
The root directory into which the tree is written.
Defaults to the current working directory if ``None``.
worker_expand_keys : iterable of str
An iterable of artefact worker keys that will be expanded
into separate yaml files.
Keys that are not in this iterable are compacted into a
`general.yaml` file.
scheduler_expand_keys : iterable of str
An iterable of artefact scheduler keys that will be expanded
into separate yaml files.
Keys that are not in this iterable are compacted into a
``general.yaml`` file.
"""
import yaml

root_dir = Path(root_dir) if root_dir else Path.cwd()
dumper = yaml.CSafeDumper
scheduler_expand_keys = set(scheduler_expand_keys)
worker_expand_keys = set(worker_expand_keys)

workers = self.dump["workers"]
for info in workers.values():
try:
worker_id = info["id"]
except KeyError:
continue

worker_state = self._compact_state(info, worker_expand_keys)

log_dir = root_dir / worker_id
log_dir.mkdir(parents=True, exist_ok=True)

for name, _logs in worker_state.items():
filename = str(log_dir / f"{name}.yaml")
with open(filename, "w") as fd:
yaml.dump(_logs, fd, Dumper=dumper)

context = "scheduler"
scheduler_state = self._compact_state(self.dump[context], scheduler_expand_keys)

log_dir = root_dir / context
log_dir.mkdir(parents=True, exist_ok=True)
# Compact smaller keys into a general dict

for name, _logs in scheduler_state.items():
filename = str(log_dir / f"{name}.yaml")

with open(filename, "w") as fd:
yaml.dump(_logs, fd, Dumper=dumper)
5 changes: 2 additions & 3 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
from distributed.security import Security
from distributed.semaphore import SemaphoreExtension
from distributed.stealing import WorkStealing
from distributed.stories import scheduler_story
from distributed.utils import (
All,
TimeoutError,
Expand Down Expand Up @@ -7534,9 +7535,7 @@ def transitions(self, recommendations: dict):
def story(self, *keys):
"""Get all transitions that touch one of the input keys"""
keys = {key.key if isinstance(key, TaskState) else key for key in keys}
return [
t for t in self.transition_log if t[0] in keys or keys.intersection(t[3])
]
return scheduler_story(keys, self.transition_log)

transition_story = story

Expand Down
44 changes: 44 additions & 0 deletions distributed/stories.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from typing import Iterable


def scheduler_story(keys: set, transition_log: Iterable) -> list:
"""Creates a story from the scheduler transition log given a set of keys
describing tasks or stimuli.
Parameters
----------
keys : set
A set of task `keys` or `stimulus_id`'s
log : iterable
The scheduler transition log
Returns
-------
story : list
"""
return [t for t in transition_log if t[0] in keys or keys.intersection(t[3])]


def worker_story(keys: set, log: Iterable) -> list:
"""Creates a story from the worker log given a set of keys
describing tasks or stimuli.
Parameters
----------
keys : set
A set of task `keys` or `stimulus_id`'s
log : iterable
The worker log
Returns
-------
story : list
"""
return [
msg
for msg in log
if any(key in msg for key in keys)
or any(
key in c for key in keys for c in msg if isinstance(c, (tuple, list, set))
)
]
20 changes: 4 additions & 16 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
tokenize,
wait,
)
from distributed.cluster_dump import load_cluster_dump
from distributed.comm import CommClosedError
from distributed.compatibility import LINUX, WINDOWS
from distributed.core import Status
Expand Down Expand Up @@ -7261,22 +7262,9 @@ def test_print_simple(capsys):


def _verify_cluster_dump(url, format: str, addresses: set[str]) -> dict:
fsspec = pytest.importorskip("fsspec")

url = str(url)
if format == "msgpack":
import msgpack

url += ".msgpack.gz"
loader = msgpack.unpack
else:
import yaml

url += ".yaml"
loader = yaml.safe_load

with fsspec.open(url, mode="rb", compression="infer") as f:
state = loader(f)
fsspec = pytest.importorskip("fsspec") # for load_cluster_dump
url = str(url) + (".msgpack.gz" if format == "msgpack" else ".yaml")
state = load_cluster_dump(url)

assert isinstance(state, dict)
assert "scheduler" in state
Expand Down
Loading

0 comments on commit b872d45

Please sign in to comment.