Skip to content

Commit

Permalink
Moved to dask#6442
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed May 25, 2022
1 parent e1c4390 commit 4b3adf8
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 19 deletions.
17 changes: 6 additions & 11 deletions distributed/_stories.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import annotations

from typing import Iterable


Expand All @@ -21,29 +19,26 @@ def scheduler_story(keys: set, transition_log: Iterable) -> list:
return [t for t in transition_log if t[0] in keys or keys.intersection(t[3])]


def worker_story(keys_or_tags: set[str], log: Iterable) -> list:
def worker_story(keys: set, log: Iterable) -> list:
"""Creates a story from the worker log given a set of keys
describing tasks or stimuli.
Parameters
----------
keys_or_tags : set[str]
A set of task `keys` or arbitrary tags from the event log, e.g. `stimulus_id`'s
keys : set
A set of task `keys` or `stimulus_id`'s
log : iterable
The worker log
Returns
-------
story : list[str]
story : list
"""
return [
msg
for msg in log
if any(key in msg for key in keys_or_tags)
if any(key in msg for key in keys)
or any(
key in c
for key in keys_or_tags
for c in msg
if isinstance(c, (tuple, list, set))
key in c for key in keys for c in msg if isinstance(c, (tuple, list, set))
)
]
12 changes: 4 additions & 8 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2954,14 +2954,10 @@ def stateof(self, key: str) -> dict[str, Any]:
"data": key in self.data,
}

def story(self, *keys_or_tasks_or_tags: str | TaskState) -> list[tuple]:
"""Return all records from the transitions log involving one or more tasks;
it can also be used for arbitrary non-transition tags.
"""
keys_or_tags = {
e.key if isinstance(e, TaskState) else e for e in keys_or_tasks_or_tags
}
return worker_story(keys_or_tags, self.log)
def story(self, *keys_or_tasks: str | TaskState) -> list[tuple]:
"""Return all transitions involving one or more tasks"""
keys = {e.key if isinstance(e, TaskState) else e for e in keys_or_tasks}
return worker_story(keys, self.log)

async def get_story(self, keys=None):
return self.story(*keys)
Expand Down

0 comments on commit 4b3adf8

Please sign in to comment.