From b872d4504582b2b09fe35f6b41e17b930a3f99cc Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 23 Mar 2022 12:52:15 +0200 Subject: [PATCH] Cluster dump utilities (#5920) --- distributed/cluster_dump.py | 271 ++++++++++++++++++++++++- distributed/scheduler.py | 5 +- distributed/stories.py | 44 ++++ distributed/tests/test_client.py | 20 +- distributed/tests/test_cluster_dump.py | 166 ++++++++++++++- distributed/worker.py | 15 +- 6 files changed, 487 insertions(+), 34 deletions(-) create mode 100644 distributed/stories.py diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index bf00708f3b8..2d3a400b256 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -2,12 +2,17 @@ from __future__ import annotations -from typing import IO, Any, Awaitable, Callable, Literal +from collections import defaultdict +from collections.abc import Mapping +from pathlib import Path +from typing import IO, Any, Awaitable, Callable, Collection, Literal import fsspec import msgpack from distributed.compatibility import to_thread +from distributed.stories import scheduler_story as _scheduler_story +from distributed.stories import worker_story as _worker_story def _tuple_to_list(node): @@ -57,3 +62,267 @@ def writer(state: dict, f: IO): # Write from a thread so we don't block the event loop quite as badly # (the writer will still hold the GIL a lot though). await to_thread(writer, state, f) + + +def load_cluster_dump(url: str, **kwargs) -> dict: + """Loads a cluster dump from a disk artefact + + Parameters + ---------- + url : str + Name of the disk artefact. This should have either a + ``.msgpack.gz`` or ``yaml`` suffix, depending on the dump format. + **kwargs : + Extra arguments passed to :func:`fsspec.open`. + + Returns + ------- + state : dict + The cluster state at the time of the dump. + """ + if url.endswith(".msgpack.gz"): + mode = "rb" + reader = msgpack.unpack + elif url.endswith(".yaml"): + import yaml + + mode = "r" + reader = yaml.safe_load + else: + raise ValueError(f"url ({url}) must have a .msgpack.gz or .yaml suffix") + + kwargs.setdefault("compression", "infer") + + with fsspec.open(url, mode, **kwargs) as f: + return reader(f) + + +class DumpArtefact(Mapping): + """ + Utility class for inspecting the state of a cluster dump + + .. code-block:: python + + dump = DumpArtefact.from_url("dump.msgpack.gz") + memory_tasks = dump.scheduler_tasks("memory") + executing_tasks = dump.worker_tasks("executing") + """ + + def __init__(self, state: dict): + self.dump = state + + @classmethod + def from_url(cls, url: str, **kwargs) -> DumpArtefact: + """Loads a cluster dump from a disk artefact + + Parameters + ---------- + url : str + Name of the disk artefact. This should have either a + ``.msgpack.gz`` or ``yaml`` suffix, depending on the dump format. + **kwargs : + Extra arguments passed to :func:`fsspec.open`. + + Returns + ------- + state : dict + The cluster state at the time of the dump. + """ + return DumpArtefact(load_cluster_dump(url, **kwargs)) + + def __getitem__(self, key): + return self.dump[key] + + def __iter__(self): + return iter(self.dump) + + def __len__(self): + return len(self.dump) + + def _extract_tasks(self, state: str | None, context: dict): + if state: + return [v for v in context.values() if v["state"] == state] + else: + return list(context.values()) + + def scheduler_tasks_in_state(self, state: str | None = None) -> list: + """ + Parameters + ---------- + state : optional, str + If provided, only tasks in the given state are returned. + Otherwise, all tasks are returned. + + Returns + ------- + tasks : list + The list of scheduler tasks in ``state``. + """ + return self._extract_tasks(state, self.dump["scheduler"]["tasks"]) + + def worker_tasks_in_state(self, state: str | None = None) -> list: + """ + Parameters + ---------- + state : optional, str + If provided, only tasks in the given state are returned. + Otherwise, all tasks are returned. + + Returns + ------- + tasks : list + The list of worker tasks in ``state`` + """ + tasks = [] + + for worker_dump in self.dump["workers"].values(): + if isinstance(worker_dump, dict) and "tasks" in worker_dump: + tasks.extend(self._extract_tasks(state, worker_dump["tasks"])) + + return tasks + + def scheduler_story(self, *key_or_stimulus_id: str) -> dict: + """ + Returns + ------- + stories : dict + A list of stories for the keys/stimulus ID's in ``*key_or_stimulus_id``. + """ + stories = defaultdict(list) + + log = self.dump["scheduler"]["transition_log"] + keys = set(key_or_stimulus_id) + + for story in _scheduler_story(keys, log): + stories[story[0]].append(tuple(story)) + + return dict(stories) + + def worker_story(self, *key_or_stimulus_id: str) -> dict: + """ + Returns + ------- + stories : dict + A dict of stories for the keys/stimulus ID's in ``*key_or_stimulus_id`.` + """ + keys = set(key_or_stimulus_id) + stories = defaultdict(list) + + for worker_dump in self.dump["workers"].values(): + if isinstance(worker_dump, dict) and "log" in worker_dump: + for story in _worker_story(keys, worker_dump["log"]): + stories[story[0]].append(tuple(story)) + + return dict(stories) + + def missing_workers(self) -> list: + """ + Returns + ------- + missing : list + A list of workers connected to the scheduler, but which + did not respond to requests for a state dump. + """ + scheduler_workers = self.dump["scheduler"]["workers"] + responsive_workers = self.dump["workers"] + return [ + w + for w in scheduler_workers + if w not in responsive_workers + or not isinstance(responsive_workers[w], dict) + ] + + def _compact_state(self, state: dict, expand_keys: set[str]): + """Compacts ``state`` keys into a general key, + unless the key is in ``expand_keys``""" + assert "general" not in state + result = {} + general = {} + + for k, v in state.items(): + if k in expand_keys: + result[k] = v + else: + general[k] = v + + result["general"] = general + return result + + def to_yamls( + self, + root_dir: str | Path | None = None, + worker_expand_keys: Collection[str] = ("config", "log", "logs", "tasks"), + scheduler_expand_keys: Collection[str] = ( + "events", + "extensions", + "log", + "task_groups", + "tasks", + "transition_log", + "workers", + ), + ): + """ + Splits the Dump Artefact into a tree of yaml files with + ``root_dir`` as it's base. + + The root level of the tree contains a directory for the scheduler + and directories for each individual worker. + Each directory contains yaml files describing the state of the scheduler + or worker when the artefact was created. + + In general, keys associated with the state are compacted into a ``general.yaml`` + file, unless they are in ``scheduler_expand_keys`` and ``worker_expand_keys``. + + Parameters + ---------- + root_dir : str or Path + The root directory into which the tree is written. + Defaults to the current working directory if ``None``. + worker_expand_keys : iterable of str + An iterable of artefact worker keys that will be expanded + into separate yaml files. + Keys that are not in this iterable are compacted into a + `general.yaml` file. + scheduler_expand_keys : iterable of str + An iterable of artefact scheduler keys that will be expanded + into separate yaml files. + Keys that are not in this iterable are compacted into a + ``general.yaml`` file. + """ + import yaml + + root_dir = Path(root_dir) if root_dir else Path.cwd() + dumper = yaml.CSafeDumper + scheduler_expand_keys = set(scheduler_expand_keys) + worker_expand_keys = set(worker_expand_keys) + + workers = self.dump["workers"] + for info in workers.values(): + try: + worker_id = info["id"] + except KeyError: + continue + + worker_state = self._compact_state(info, worker_expand_keys) + + log_dir = root_dir / worker_id + log_dir.mkdir(parents=True, exist_ok=True) + + for name, _logs in worker_state.items(): + filename = str(log_dir / f"{name}.yaml") + with open(filename, "w") as fd: + yaml.dump(_logs, fd, Dumper=dumper) + + context = "scheduler" + scheduler_state = self._compact_state(self.dump[context], scheduler_expand_keys) + + log_dir = root_dir / context + log_dir.mkdir(parents=True, exist_ok=True) + # Compact smaller keys into a general dict + + for name, _logs in scheduler_state.items(): + filename = str(log_dir / f"{name}.yaml") + + with open(filename, "w") as fd: + yaml.dump(_logs, fd, Dumper=dumper) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 8a9d99c53d0..85fe15b6542 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -80,6 +80,7 @@ from distributed.security import Security from distributed.semaphore import SemaphoreExtension from distributed.stealing import WorkStealing +from distributed.stories import scheduler_story from distributed.utils import ( All, TimeoutError, @@ -7534,9 +7535,7 @@ def transitions(self, recommendations: dict): def story(self, *keys): """Get all transitions that touch one of the input keys""" keys = {key.key if isinstance(key, TaskState) else key for key in keys} - return [ - t for t in self.transition_log if t[0] in keys or keys.intersection(t[3]) - ] + return scheduler_story(keys, self.transition_log) transition_story = story diff --git a/distributed/stories.py b/distributed/stories.py new file mode 100644 index 00000000000..d17e54df53f --- /dev/null +++ b/distributed/stories.py @@ -0,0 +1,44 @@ +from typing import Iterable + + +def scheduler_story(keys: set, transition_log: Iterable) -> list: + """Creates a story from the scheduler transition log given a set of keys + describing tasks or stimuli. + + Parameters + ---------- + keys : set + A set of task `keys` or `stimulus_id`'s + log : iterable + The scheduler transition log + + Returns + ------- + story : list + """ + return [t for t in transition_log if t[0] in keys or keys.intersection(t[3])] + + +def worker_story(keys: set, log: Iterable) -> list: + """Creates a story from the worker log given a set of keys + describing tasks or stimuli. + + Parameters + ---------- + keys : set + A set of task `keys` or `stimulus_id`'s + log : iterable + The worker log + + Returns + ------- + story : list + """ + return [ + msg + for msg in log + if any(key in msg for key in keys) + or any( + key in c for key in keys for c in msg if isinstance(c, (tuple, list, set)) + ) + ] diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 1df31fc80ce..f9558d2dd3f 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -63,6 +63,7 @@ tokenize, wait, ) +from distributed.cluster_dump import load_cluster_dump from distributed.comm import CommClosedError from distributed.compatibility import LINUX, WINDOWS from distributed.core import Status @@ -7261,22 +7262,9 @@ def test_print_simple(capsys): def _verify_cluster_dump(url, format: str, addresses: set[str]) -> dict: - fsspec = pytest.importorskip("fsspec") - - url = str(url) - if format == "msgpack": - import msgpack - - url += ".msgpack.gz" - loader = msgpack.unpack - else: - import yaml - - url += ".yaml" - loader = yaml.safe_load - - with fsspec.open(url, mode="rb", compression="infer") as f: - state = loader(f) + fsspec = pytest.importorskip("fsspec") # for load_cluster_dump + url = str(url) + (".msgpack.gz" if format == "msgpack" else ".yaml") + state = load_cluster_dump(url) assert isinstance(state, dict) assert "scheduler" in state diff --git a/distributed/tests/test_cluster_dump.py b/distributed/tests/test_cluster_dump.py index 5653b06732e..963d7563725 100644 --- a/distributed/tests/test_cluster_dump.py +++ b/distributed/tests/test_cluster_dump.py @@ -1,10 +1,14 @@ +import asyncio +from pathlib import Path + import fsspec import msgpack import pytest import yaml -from distributed.cluster_dump import _tuple_to_list, write_state -from distributed.utils_test import gen_test +import distributed +from distributed.cluster_dump import DumpArtefact, _tuple_to_list, write_state +from distributed.utils_test import assert_worker_story, gen_cluster, gen_test, inc @pytest.mark.parametrize( @@ -44,3 +48,161 @@ async def test_write_state_yaml(tmp_path): assert readback == _tuple_to_list(await get_state()) f.seek(0) assert "!!python/tuple" not in f.read() + + +def blocked_inc(x, event): + event.wait() + return x + 1 + + +@gen_cluster(client=True) +async def test_cluster_dump_state(c, s, a, b, tmp_path): + filename = tmp_path / "dump" + futs = c.map(inc, range(2)) + fut_keys = {f.key for f in futs} + await c.gather(futs) + + event = distributed.Event() + blocked_fut = c.submit(blocked_inc, 1, event) + await asyncio.sleep(0.05) + await c.dump_cluster_state(filename, format="msgpack") + + scheduler_tasks = list(s.tasks.values()) + worker_tasks = [t for w in (a, b) for t in w.tasks.values()] + + smem_tasks = [t for t in scheduler_tasks if t.state == "memory"] + wmem_tasks = [t for t in worker_tasks if t.state == "memory"] + + assert len(smem_tasks) == 2 + assert len(wmem_tasks) == 2 + + sproc_tasks = [t for t in scheduler_tasks if t.state == "processing"] + wproc_tasks = [t for t in worker_tasks if t.state == "executing"] + + assert len(sproc_tasks) == 1 + assert len(wproc_tasks) == 1 + + await c.gather(event.set(), blocked_fut) + + dump = DumpArtefact.from_url(f"{filename}.msgpack.gz") + + smem_keys = {t["key"] for t in dump.scheduler_tasks_in_state("memory")} + wmem_keys = {t["key"] for t in dump.worker_tasks_in_state("memory")} + + assert smem_keys == fut_keys + assert smem_keys == {t.key for t in smem_tasks} + assert wmem_keys == fut_keys + assert wmem_keys == {t.key for t in wmem_tasks} + + sproc_keys = {t["key"] for t in dump.scheduler_tasks_in_state("processing")} + wproc_keys = {t["key"] for t in dump.worker_tasks_in_state("executing")} + + assert sproc_keys == {t.key for t in sproc_tasks} + assert wproc_keys == {t.key for t in wproc_tasks} + + sall_keys = {t["key"] for t in dump.scheduler_tasks_in_state()} + wall_keys = {t["key"] for t in dump.worker_tasks_in_state()} + + assert fut_keys | {blocked_fut.key} == sall_keys + assert fut_keys | {blocked_fut.key} == wall_keys + + # Mapping API works + assert "transition_log" in dump["scheduler"] + assert "log" in dump["workers"][a.address] + assert len(dump) == 3 + + +@gen_cluster(client=True) +async def test_cluster_dump_story(c, s, a, b, tmp_path): + filename = tmp_path / "dump" + futs = c.map(inc, range(2)) + fut_keys = {f.key for f in futs} + await c.gather(futs) + await c.dump_cluster_state(filename, format="msgpack") + + dump = DumpArtefact.from_url(f"{filename}.msgpack.gz") + task_key = next(iter(fut_keys)) + + def _expected_story(task_key): + return + + story = dump.scheduler_story(*fut_keys) + assert len(story) == len(fut_keys) + + for k, task_story in story.items(): + expected = [ + (k, "released", "waiting", {k: "processing"}), + (k, "waiting", "processing", {}), + (k, "processing", "memory", {}), + ] + + for event, expected_event in zip(task_story, expected): + for e1, e2 in zip(event, expected_event): + assert e1 == e2 + + story = dump.worker_story(*fut_keys) + assert len(story) == len(fut_keys) + + for k, task_story in story.items(): + assert_worker_story( + task_story, + [ + (k, "compute-task"), + (k, "released", "waiting", "waiting", {k: "ready"}), + (k, "waiting", "ready", "ready", {}), + (k, "ready", "executing", "executing", {}), + (k, "put-in-memory"), + (k, "executing", "memory", "memory", {}), + ], + ) + + +@gen_cluster(client=True) +async def test_cluster_dump_to_yamls(c, s, a, b, tmp_path): + futs = c.map(inc, range(2)) + await c.gather(futs) + + event = distributed.Event() + blocked_fut = c.submit(blocked_inc, 1, event) + filename = tmp_path / "dump" + await asyncio.sleep(0.05) + await c.dump_cluster_state(filename, format="msgpack") + await event.set() + await blocked_fut + + dump = DumpArtefact.from_url(f"{filename}.msgpack.gz") + yaml_path = Path(tmp_path / "dump") + dump.to_yamls(yaml_path) + + scheduler_files = { + "events.yaml", + "extensions.yaml", + "general.yaml", + "log.yaml", + "task_groups.yaml", + "tasks.yaml", + "transition_log.yaml", + "workers.yaml", + } + + scheduler_yaml_path = yaml_path / "scheduler" + expected = {scheduler_yaml_path / f for f in scheduler_files} + assert expected == set(scheduler_yaml_path.iterdir()) + + worker_files = { + "config.yaml", + "general.yaml", + "log.yaml", + "logs.yaml", + "tasks.yaml", + } + + for worker in (a, b): + worker_yaml_path = yaml_path / worker.id + expected = {worker_yaml_path / f for f in worker_files} + assert expected == set(worker_yaml_path.iterdir()) + + # Internal dictionary state compaction + # has not been destructive of the original dictionary + assert "id" in dump["scheduler"] + assert "address" in dump["scheduler"] diff --git a/distributed/worker.py b/distributed/worker.py index 210ecaeb3fe..13e5adeff00 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -69,6 +69,7 @@ from distributed.security import Security from distributed.shuffle import ShuffleWorkerExtension from distributed.sizeof import safe_sizeof as sizeof +from distributed.stories import worker_story from distributed.threadpoolexecutor import ThreadPoolExecutor from distributed.threadpoolexecutor import secede as tpe_secede from distributed.utils import ( @@ -2616,18 +2617,8 @@ def stateof(self, key: str) -> dict[str, Any]: } def story(self, *keys_or_tasks: str | TaskState) -> list[tuple]: - keys = [e.key if isinstance(e, TaskState) else e for e in keys_or_tasks] - return [ - msg - for msg in self.log - if any(key in msg for key in keys) - or any( - key in c - for key in keys - for c in msg - if isinstance(c, (tuple, list, set)) - ) - ] + keys = {e.key if isinstance(e, TaskState) else e for e in keys_or_tasks} + return worker_story(keys, self.log) def ensure_communicating(self) -> None: stimulus_id = f"ensure-communicating-{time()}"