From 076026602854b3b9c06ce0df5c71813aa92caf83 Mon Sep 17 00:00:00 2001 From: Anton Kirilenko Date: Thu, 10 Oct 2019 16:09:19 +0100 Subject: [PATCH] [KED-1107] Add Pipeline.tags, deprecate Pipeline.name (#283) --- RELEASE.md | 1 + .../source/03_tutorial/04_create_pipelines.md | 10 +++-- kedro/pipeline/pipeline.py | 43 ++++++++++++++++--- tests/pipeline/test_pipeline.py | 25 +++++++++-- 4 files changed, 67 insertions(+), 12 deletions(-) diff --git a/RELEASE.md b/RELEASE.md index a9330f5046..441ffe95ab 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,6 +1,7 @@ # Release 0.15.3 ## Major features and improvements +* `Pipeline.name` has been deprecated in favour of `Pipeline.tags`. ## Bug fixes and other changes diff --git a/docs/source/03_tutorial/04_create_pipelines.md b/docs/source/03_tutorial/04_create_pipelines.md index 612e962bb7..10e51555cf 100644 --- a/docs/source/03_tutorial/04_create_pipelines.md +++ b/docs/source/03_tutorial/04_create_pipelines.md @@ -582,7 +582,7 @@ kedro.io.core.DataSetError: Failed while loading data from data set CSVLocalData ### Using tags -Another way to run partial pipelines without changing your code is to use tags. Each node within the pipeline can be tagged by passing **`name`** into the `Pipeline()`. Update the `create_pipelines()` code in `src/kedro_tutorial/pipeline.py` one more time: +Another way to run partial pipelines without changing your code is to use tags. Each node within the pipeline can be tagged by passing **`tags`** into the `Pipeline()`. Update the `create_pipelines()` code in `src/kedro_tutorial/pipeline.py` one more time: ```python def create_pipelines(**kwargs) -> Dict[str, Pipeline]: @@ -616,7 +616,7 @@ def create_pipelines(**kwargs) -> Dict[str, Pipeline]: name="master_table", ), ], - name="de_tag", + tags=["de_tag"], ) ds_pipeline = Pipeline( @@ -629,7 +629,7 @@ def create_pipelines(**kwargs) -> Dict[str, Pipeline]: node(train_model, ["X_train", "y_train"], "regressor"), node(evaluate_model, ["regressor", "X_test", "y_test"], None), ], - name="ds_tag", + tags=["ds_tag"], ) return { @@ -639,7 +639,7 @@ def create_pipelines(**kwargs) -> Dict[str, Pipeline]: } ``` -If the pipeline definition contains `name=` argument, Kedro will attach the corresponding tag (`de_tag` and `ds_tag` in the example above) to every node within that pipeline. +If the pipeline definition contains `tags=` argument, Kedro will attach the corresponding tags (`de_tag` and `ds_tag` in the example above) to every node within that pipeline. To run a partial pipeline using a tag: @@ -670,6 +670,8 @@ node( ) ``` +> ❗The `name` `Pipeline` constructor parameter and object property are deprecated, use `tags` instead. + ## Using decorators for nodes and pipelines diff --git a/kedro/pipeline/pipeline.py b/kedro/pipeline/pipeline.py index e935784da9..fd8efa38db 100644 --- a/kedro/pipeline/pipeline.py +++ b/kedro/pipeline/pipeline.py @@ -32,6 +32,7 @@ """ import copy import json +import warnings from collections import Counter, defaultdict from itertools import chain from typing import Callable, Dict, Iterable, List, Optional, Set, Union @@ -78,8 +79,14 @@ class Pipeline: outputs and execution order. """ + # pylint: disable=too-many-public-methods + def __init__( - self, nodes: Iterable[Union[Node, "Pipeline"]], *, name: str = None + self, + nodes: Iterable[Union[Node, "Pipeline"]], + *, + name: str = None, + tags: Iterable[str] = None ): # pylint: disable=missing-type-doc """Initialise ``Pipeline`` with a list of ``Node`` instances. @@ -88,8 +95,10 @@ def __init__( provide pipelines among the list of nodes, those pipelines will be expanded and all their nodes will become part of this new pipeline. - name: The name of the pipeline. If specified, this name - will be used to tag all of the nodes in the pipeline. + name: (DEPRECATED, use `tags` instead) The name of the pipeline. + If specified, this name will be used to tag all of the nodes + in the pipeline. + tags: Optional set of tags to be applied to all the pipeline nodes. Raises: ValueError: @@ -134,8 +143,18 @@ def __init__( _validate_duplicate_nodes(nodes) _validate_transcoded_inputs_outputs(nodes) + self._tags = set(tags or []) + if name: - nodes = [n.tag([name]) for n in nodes] + warnings.warn( + "`name` parameter is deprecated for the `Pipeline`" + " constructor, use `tags` instead", + DeprecationWarning, + ) + self._tags.add(name) + + nodes = [n.tag(self._tags) for n in nodes] + self._name = name self._nodes_by_name = {node.name: node for node in nodes} _validate_unique_outputs(nodes) @@ -312,14 +331,28 @@ def set_to_string(set_of_strings): @property def name(self) -> Optional[str]: - """Get the pipeline name. + """(DEPRECATED, use `tags` instead) Get the pipeline name. Returns: The name of the pipeline as provided in the constructor. """ + warnings.warn( + "`Pipeline.name` is deprecated, use `Pipeline.tags` instead.", + DeprecationWarning, + ) return self._name + @property + def tags(self) -> Iterable[str]: + """Get the pipeline tags. + + Returns: + The list of the pipeline tags as provided in the constructor. + + """ + return self._tags + @property def node_dependencies(self) -> Dict[Node, Set[Node]]: """All dependencies of nodes where the first Node has a direct dependency on diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index e99ca257e8..816810effd 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -347,6 +347,25 @@ def test_empty_case(self): """Empty pipeline is possible""" Pipeline([]) + def test_pipeline_name_is_deprecated(self): + with pytest.warns(DeprecationWarning, match=r"`name` parameter is deprecated"): + pipeline = Pipeline([], name="p_name") + + with pytest.warns(DeprecationWarning, match=r"`Pipeline\.name` is deprecated"): + assert pipeline.name == "p_name" + + def test_initialized_with_tags(self): + pipeline = Pipeline( + [node(identity, "A", "B", tags=["node1", "p1"]), node(identity, "B", "C")], + tags=["p1", "p2"], + ) + + node1 = pipeline.grouped_nodes[0].pop() + node2 = pipeline.grouped_nodes[1].pop() + assert node1.tags == {"node1", "p1", "p2"} + assert node2.tags == {"p1", "p2"} + assert pipeline.tags == {"p1", "p2"} + def pipeline_with_circle(): return [ @@ -516,7 +535,7 @@ def test_to_nodes_unknown(self, complex_pipeline): def test_connected_pipeline(self, disjoint_pipeline): """Connect two separate pipelines.""" nodes = disjoint_pipeline["nodes"] - subpipeline = Pipeline(nodes, name="subpipeline") + subpipeline = Pipeline(nodes, tags=["subpipeline"]) assert len(subpipeline.inputs()) == 2 assert len(subpipeline.outputs()) == 2 @@ -528,9 +547,9 @@ def test_connected_pipeline(self, disjoint_pipeline): assert len(pipeline.nodes) == 1 + len(nodes) assert len(pipeline.inputs()) == 1 assert len(pipeline.outputs()) == 1 - assert all(pipeline.name in n.tags for n in pipeline.nodes) + assert all(set(pipeline.tags) <= n.tags for n in pipeline.nodes) assert all( - subpipeline.name in n.tags + set(subpipeline.tags) <= n.tags for n in pipeline.nodes if n.name != "connecting_node" )