diff --git a/RELEASE.md b/RELEASE.md index 716f3c5c7c..669176a2a1 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,8 +1,15 @@ # Release 0.17.7 +## Major features and improvements +* `pipeline` now accepts `tags` and a collection of `Node`s and/or `Pipeline`s rather than just a single `Pipeline` object. `pipeline` should be used in preference to `Pipeline` when creating a Kedro pipeline. + ## Bug fixes and other changes -* Added tutorial documentation for experiment tracking in Kedro docs. (`03_tutorial/07_set_up_experiment_tracking.md`). -* Added Plotly documentation in Kedro docs. (`03_tutorial/06_visualise_pipeline.md`). +* Added tutorial documentation for experiment tracking in Kedro docs (`03_tutorial/07_set_up_experiment_tracking.md`). +* Added Plotly documentation in Kedro docs (`03_tutorial/06_visualise_pipeline.md`). + +## Minor breaking changes to the API + +## Upcoming deprecations for Kedro 0.18.0 # Release 0.17.6 diff --git a/features/steps/cli_steps.py b/features/steps/cli_steps.py index b6f98b6f78..d5b4ae56a5 100644 --- a/features/steps/cli_steps.py +++ b/features/steps/cli_steps.py @@ -496,7 +496,7 @@ def check_empty_pipeline_exists(context): / context.project_name.replace("-", "_") / "pipeline_registry.py" ) - assert '"__default__": Pipeline([])' in pipeline_file.read_text("utf-8") + assert '"__default__": pipeline([])' in pipeline_file.read_text("utf-8") @then("the pipeline should contain nodes") @@ -510,7 +510,7 @@ def check_pipeline_not_empty(context): / context.project_name.replace("-", "_") / "pipeline_registry.py" ) - assert "pipeline = Pipeline([])" not in pipeline_file.read_text("utf-8") + assert "pipeline = pipeline([])" not in pipeline_file.read_text("utf-8") @then("the console log should show that {number} nodes were run") diff --git a/features/steps/test_plugin/plugin.py b/features/steps/test_plugin/plugin.py index 0abed7c9f6..f979f8c096 100644 --- a/features/steps/test_plugin/plugin.py +++ b/features/steps/test_plugin/plugin.py @@ -2,7 +2,7 @@ import logging from kedro.framework.hooks import hook_impl -from kedro.pipeline import Pipeline, node +from kedro.pipeline import node, pipeline class MyPluginHook: @@ -15,7 +15,7 @@ def after_catalog_created( @hook_impl def register_pipelines(self): # pylint: disable=no-self-use return { - "from_plugin": Pipeline([node(lambda: "sth", inputs=None, outputs="x")]) + "from_plugin": pipeline([node(lambda: "sth", inputs=None, outputs="x")]) } diff --git a/features/steps/test_starter/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipelines/data_engineering/pipeline.py b/features/steps/test_starter/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipelines/data_engineering/pipeline.py index 0a9e863d87..dee78699fd 100644 --- a/features/steps/test_starter/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipelines/data_engineering/pipeline.py +++ b/features/steps/test_starter/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipelines/data_engineering/pipeline.py @@ -4,13 +4,13 @@ Delete this when you start working on your own Kedro project. """ -from kedro.pipeline import Pipeline, node +from kedro.pipeline import node, pipeline from .nodes import split_data def create_pipeline(**kwargs): - return Pipeline( + return pipeline( [ node( split_data, diff --git a/features/steps/test_starter/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipelines/data_science/pipeline.py b/features/steps/test_starter/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipelines/data_science/pipeline.py index 4a2d6c1b3e..a2e44d9424 100644 --- a/features/steps/test_starter/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipelines/data_science/pipeline.py +++ b/features/steps/test_starter/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipelines/data_science/pipeline.py @@ -4,13 +4,13 @@ Delete this when you start working on your own Kedro project. """ -from kedro.pipeline import Pipeline, node +from kedro.pipeline import node, pipeline from .nodes import predict, report_accuracy, train_model def create_pipeline(**kwargs): - return Pipeline( + return pipeline( [ node( train_model, diff --git a/kedro/pipeline/modular_pipeline.py b/kedro/pipeline/modular_pipeline.py index ec19628a6f..ac864388e5 100644 --- a/kedro/pipeline/modular_pipeline.py +++ b/kedro/pipeline/modular_pipeline.py @@ -1,6 +1,6 @@ """Helper to integrate modular pipelines into a master pipeline.""" import copy -from typing import AbstractSet, Dict, List, Set, Union +from typing import AbstractSet, Dict, Iterable, List, Set, Union from kedro.pipeline.node import Node from kedro.pipeline.pipeline import ( @@ -69,27 +69,32 @@ def _validate_datasets_exist( def pipeline( - pipe: Pipeline, + pipe: Union[Iterable[Union[Node, Pipeline]], Pipeline], *, inputs: Union[str, Set[str], Dict[str, str]] = None, outputs: Union[str, Set[str], Dict[str, str]] = None, parameters: Dict[str, str] = None, + tags: Union[str, Iterable[str]] = None, namespace: str = None, ) -> Pipeline: - """Create a copy of the pipeline and its nodes, - with some dataset names and node names modified. + """Create a ``Pipeline`` from a collection of nodes and/or ``Pipeline``s. Args: - pipe: Original modular pipeline to integrate + pipe: The nodes the ``Pipeline`` will be made of. If you + provide pipelines among the list of nodes, those pipelines will + be expanded and all their nodes will become part of this + new pipeline. inputs: A name or collection of input names to be exposed as connection points - to other pipelines upstream. + to other pipelines upstream. This is optional; if not provided, the + pipeline inputs are automatically inferred from the pipeline structure. When str or Set[str] is provided, the listed input names will stay the same as they are named in the provided pipeline. When Dict[str, str] is provided, current input names will be mapped to new names. Must only refer to the pipeline's free inputs. outputs: A name or collection of names to be exposed as connection points - to other pipelines downstream. + to other pipelines downstream. This is optional; if not provided, the + pipeline inputs are automatically inferred from the pipeline structure. When str or Set[str] is provided, the listed output names will stay the same as they are named in the provided pipeline. When Dict[str, str] is provided, current output names will be @@ -97,6 +102,7 @@ def pipeline( Can refer to both the pipeline's free outputs, as well as intermediate results that need to be exposed. parameters: A map of existing parameter to the new one. + tags: Optional set of tags to be applied to all the pipeline nodes. namespace: A prefix to give to all dataset names, except those explicitly named with the `inputs`/`outputs` arguments, and parameter references (`params:` and `parameters`). @@ -108,8 +114,17 @@ def pipeline( any of the expected types (str, dict, list, or None). Returns: - A new ``Pipeline`` object with the new nodes, modified as requested. + A new ``Pipeline`` object. """ + if isinstance(pipe, Pipeline): + # To ensure that we are always dealing with a *copy* of pipe. + pipe = Pipeline([pipe], tags=tags) + else: + pipe = Pipeline(pipe, tags=tags) + + if not any([inputs, outputs, parameters, namespace]): + return pipe + # pylint: disable=protected-access inputs = _to_dict(inputs) outputs = _to_dict(outputs) @@ -181,7 +196,7 @@ def _copy_node(node: Node) -> Node: new_nodes = [_copy_node(n) for n in pipe.nodes] - return Pipeline(new_nodes) + return Pipeline(new_nodes, tags=tags) def _to_dict(element: Union[None, str, Set[str], Dict[str, str]]) -> Dict[str, str]: diff --git a/kedro/pipeline/node.py b/kedro/pipeline/node.py index 9c0aae2340..781cb2b634 100644 --- a/kedro/pipeline/node.py +++ b/kedro/pipeline/node.py @@ -599,7 +599,7 @@ def node( outputs: Union[None, str, List[str], Dict[str, str]], *, name: str = None, - tags: Iterable[str] = None, + tags: Union[str, Iterable[str]] = None, confirms: Union[str, List[str]] = None, namespace: str = None, ) -> Node: diff --git a/kedro/pipeline/pipeline.py b/kedro/pipeline/pipeline.py index 88032bbc24..17ad1187fd 100644 --- a/kedro/pipeline/pipeline.py +++ b/kedro/pipeline/pipeline.py @@ -692,10 +692,13 @@ def decorate(self, *decorators: Callable) -> "Pipeline": return Pipeline(nodes) def tag(self, tags: Union[str, Iterable[str]]) -> "Pipeline": - """ - Return a copy of the pipeline, with each node tagged accordingly. - :param tags: The tags to be added to the nodes. - :return: New `Pipeline` object. + """Tags all the nodes in the pipeline. + + Args: + tags: The tags to be added to the nodes. + + Returns: + New ``Pipeline`` object with nodes tagged. """ nodes = [n.tag(tags) for n in self.nodes] return Pipeline(nodes) diff --git a/kedro/templates/pipeline/{{ cookiecutter.pipeline_name }}/pipeline.py b/kedro/templates/pipeline/{{ cookiecutter.pipeline_name }}/pipeline.py index 512812aea2..670fd43fff 100644 --- a/kedro/templates/pipeline/{{ cookiecutter.pipeline_name }}/pipeline.py +++ b/kedro/templates/pipeline/{{ cookiecutter.pipeline_name }}/pipeline.py @@ -3,8 +3,8 @@ generated using Kedro {{ cookiecutter.kedro_version }} """ -from kedro.pipeline import Pipeline, node +from kedro.pipeline import Pipeline, node, pipeline -def create_pipeline(**kwargs): - return Pipeline([]) +def create_pipeline(**kwargs) -> Pipeline: + return pipeline([]) diff --git a/kedro/templates/project/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipeline_registry.py b/kedro/templates/project/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipeline_registry.py index f13473196e..275bf93f84 100644 --- a/kedro/templates/project/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipeline_registry.py +++ b/kedro/templates/project/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipeline_registry.py @@ -1,7 +1,7 @@ """Project pipelines.""" from typing import Dict -from kedro.pipeline import Pipeline +from kedro.pipeline import Pipeline, pipeline def register_pipelines() -> Dict[str, Pipeline]: @@ -10,4 +10,4 @@ def register_pipelines() -> Dict[str, Pipeline]: Returns: A mapping from a pipeline name to a ``Pipeline`` object. """ - return {"__default__": Pipeline([])} + return {"__default__": pipeline([])} diff --git a/tests/pipeline/test_pipeline_helper.py b/tests/pipeline/test_pipeline_helper.py index 77364efc2b..7173a2bcbe 100644 --- a/tests/pipeline/test_pipeline_helper.py +++ b/tests/pipeline/test_pipeline_helper.py @@ -355,3 +355,17 @@ def test_bad_outputs_mapping(self): pattern = "Outputs can't contain free inputs to the pipeline" with pytest.raises(ModularPipelineError, match=pattern): pipeline(raw_pipeline, outputs={"A": "C"}) + + def test_pipeline_always_copies(self): + original_pipeline = pipeline([node(constant_output, None, "A")]) + new_pipeline = pipeline(original_pipeline) + assert new_pipeline.nodes == original_pipeline.nodes + assert new_pipeline is not original_pipeline + + def test_pipeline_tags(self): + tagged_pipeline = pipeline( + [node(constant_output, None, "A"), node(constant_output, None, "B")], + tags="tag", + ) + + assert all(n.tags == {"tag"} for n in tagged_pipeline.nodes)