-
Notifications
You must be signed in to change notification settings - Fork 906
/
sequential_runner.py
95 lines (78 loc) · 3.3 KB
/
sequential_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
"""``SequentialRunner`` is an ``AbstractRunner`` implementation. It can be
used to run the ``Pipeline`` in a sequential manner using a topological sort
of provided nodes.
"""
from __future__ import annotations
from collections import Counter
from itertools import chain
from typing import TYPE_CHECKING, Any
from kedro.runner.runner import AbstractRunner
from kedro.runner.task import Task
if TYPE_CHECKING:
from pluggy import PluginManager
from kedro.io import CatalogProtocol
from kedro.pipeline import Pipeline
class SequentialRunner(AbstractRunner):
"""``SequentialRunner`` is an ``AbstractRunner`` implementation. It can
be used to run the ``Pipeline`` in a sequential manner using a
topological sort of provided nodes.
"""
def __init__(
self,
is_async: bool = False,
extra_dataset_patterns: dict[str, dict[str, Any]] | None = None,
):
"""Instantiates the runner class.
Args:
is_async: If True, the node inputs and outputs are loaded and saved
asynchronously with threads. Defaults to False.
extra_dataset_patterns: Extra dataset factory patterns to be added to the catalog
during the run. This is used to set the default datasets to MemoryDataset
for `SequentialRunner`.
"""
default_dataset_pattern = {"{default}": {"type": "MemoryDataset"}}
self._extra_dataset_patterns = extra_dataset_patterns or default_dataset_pattern
super().__init__(
is_async=is_async, extra_dataset_patterns=self._extra_dataset_patterns
)
def _run(
self,
pipeline: Pipeline,
catalog: CatalogProtocol,
hook_manager: PluginManager,
session_id: str | None = None,
) -> None:
"""The method implementing sequential pipeline running.
Args:
pipeline: The ``Pipeline`` to run.
catalog: An implemented instance of ``CatalogProtocol`` from which to fetch data.
hook_manager: The ``PluginManager`` to activate hooks.
session_id: The id of the session.
Raises:
Exception: in case of any downstream node failure.
"""
if not self._is_async:
self._logger.info(
"Using synchronous mode for loading and saving data. Use the --async flag "
"for potential performance gains. https://docs.kedro.org/en/stable/nodes_and_pipelines/run_a_pipeline.html#load-and-save-asynchronously"
)
nodes = pipeline.nodes
done_nodes = set()
load_counts = Counter(chain.from_iterable(n.inputs for n in nodes))
for exec_index, node in enumerate(nodes):
try:
Task(
node=node,
catalog=catalog,
hook_manager=hook_manager,
is_async=self._is_async,
session_id=session_id,
).execute()
done_nodes.add(node)
except Exception:
self._suggest_resume_scenario(pipeline, done_nodes, catalog)
raise
self._release_datasets(node, catalog, load_counts, pipeline)
self._logger.info(
"Completed %d out of %d tasks", len(done_nodes), len(nodes)
)