Skip to content

Commit

Permalink
[KED-1107] Add Pipeline.tags, deprecate Pipeline.name (kedro-org#283)
Browse files Browse the repository at this point in the history
  • Loading branch information
Anton Kirilenko authored Oct 10, 2019
1 parent 8c18ff4 commit 0760266
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 12 deletions.
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Release 0.15.3

## Major features and improvements
* `Pipeline.name` has been deprecated in favour of `Pipeline.tags`.

## Bug fixes and other changes

Expand Down
10 changes: 6 additions & 4 deletions docs/source/03_tutorial/04_create_pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ kedro.io.core.DataSetError: Failed while loading data from data set CSVLocalData
### Using tags
Another way to run partial pipelines without changing your code is to use tags. Each node within the pipeline can be tagged by passing **`name`** into the `Pipeline()`. Update the `create_pipelines()` code in `src/kedro_tutorial/pipeline.py` one more time:
Another way to run partial pipelines without changing your code is to use tags. Each node within the pipeline can be tagged by passing **`tags`** into the `Pipeline()`. Update the `create_pipelines()` code in `src/kedro_tutorial/pipeline.py` one more time:
```python
def create_pipelines(**kwargs) -> Dict[str, Pipeline]:
Expand Down Expand Up @@ -616,7 +616,7 @@ def create_pipelines(**kwargs) -> Dict[str, Pipeline]:
name="master_table",
),
],
name="de_tag",
tags=["de_tag"],
)

ds_pipeline = Pipeline(
Expand All @@ -629,7 +629,7 @@ def create_pipelines(**kwargs) -> Dict[str, Pipeline]:
node(train_model, ["X_train", "y_train"], "regressor"),
node(evaluate_model, ["regressor", "X_test", "y_test"], None),
],
name="ds_tag",
tags=["ds_tag"],
)

return {
Expand All @@ -639,7 +639,7 @@ def create_pipelines(**kwargs) -> Dict[str, Pipeline]:
}
```
If the pipeline definition contains `name=` argument, Kedro will attach the corresponding tag (`de_tag` and `ds_tag` in the example above) to every node within that pipeline.
If the pipeline definition contains `tags=` argument, Kedro will attach the corresponding tags (`de_tag` and `ds_tag` in the example above) to every node within that pipeline.
To run a partial pipeline using a tag:
Expand Down Expand Up @@ -670,6 +670,8 @@ node(
)
```
> ❗The `name` `Pipeline` constructor parameter and object property are deprecated, use `tags` instead.
## Using decorators for nodes and pipelines
Expand Down
43 changes: 38 additions & 5 deletions kedro/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"""
import copy
import json
import warnings
from collections import Counter, defaultdict
from itertools import chain
from typing import Callable, Dict, Iterable, List, Optional, Set, Union
Expand Down Expand Up @@ -78,8 +79,14 @@ class Pipeline:
outputs and execution order.
"""

# pylint: disable=too-many-public-methods

def __init__(
self, nodes: Iterable[Union[Node, "Pipeline"]], *, name: str = None
self,
nodes: Iterable[Union[Node, "Pipeline"]],
*,
name: str = None,
tags: Iterable[str] = None
): # pylint: disable=missing-type-doc
"""Initialise ``Pipeline`` with a list of ``Node`` instances.
Expand All @@ -88,8 +95,10 @@ def __init__(
provide pipelines among the list of nodes, those pipelines will
be expanded and all their nodes will become part of this
new pipeline.
name: The name of the pipeline. If specified, this name
will be used to tag all of the nodes in the pipeline.
name: (DEPRECATED, use `tags` instead) The name of the pipeline.
If specified, this name will be used to tag all of the nodes
in the pipeline.
tags: Optional set of tags to be applied to all the pipeline nodes.
Raises:
ValueError:
Expand Down Expand Up @@ -134,8 +143,18 @@ def __init__(
_validate_duplicate_nodes(nodes)
_validate_transcoded_inputs_outputs(nodes)

self._tags = set(tags or [])

if name:
nodes = [n.tag([name]) for n in nodes]
warnings.warn(
"`name` parameter is deprecated for the `Pipeline`"
" constructor, use `tags` instead",
DeprecationWarning,
)
self._tags.add(name)

nodes = [n.tag(self._tags) for n in nodes]

self._name = name
self._nodes_by_name = {node.name: node for node in nodes}
_validate_unique_outputs(nodes)
Expand Down Expand Up @@ -312,14 +331,28 @@ def set_to_string(set_of_strings):

@property
def name(self) -> Optional[str]:
"""Get the pipeline name.
"""(DEPRECATED, use `tags` instead) Get the pipeline name.
Returns:
The name of the pipeline as provided in the constructor.
"""
warnings.warn(
"`Pipeline.name` is deprecated, use `Pipeline.tags` instead.",
DeprecationWarning,
)
return self._name

@property
def tags(self) -> Iterable[str]:
"""Get the pipeline tags.
Returns:
The list of the pipeline tags as provided in the constructor.
"""
return self._tags

@property
def node_dependencies(self) -> Dict[Node, Set[Node]]:
"""All dependencies of nodes where the first Node has a direct dependency on
Expand Down
25 changes: 22 additions & 3 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,25 @@ def test_empty_case(self):
"""Empty pipeline is possible"""
Pipeline([])

def test_pipeline_name_is_deprecated(self):
with pytest.warns(DeprecationWarning, match=r"`name` parameter is deprecated"):
pipeline = Pipeline([], name="p_name")

with pytest.warns(DeprecationWarning, match=r"`Pipeline\.name` is deprecated"):
assert pipeline.name == "p_name"

def test_initialized_with_tags(self):
pipeline = Pipeline(
[node(identity, "A", "B", tags=["node1", "p1"]), node(identity, "B", "C")],
tags=["p1", "p2"],
)

node1 = pipeline.grouped_nodes[0].pop()
node2 = pipeline.grouped_nodes[1].pop()
assert node1.tags == {"node1", "p1", "p2"}
assert node2.tags == {"p1", "p2"}
assert pipeline.tags == {"p1", "p2"}


def pipeline_with_circle():
return [
Expand Down Expand Up @@ -516,7 +535,7 @@ def test_to_nodes_unknown(self, complex_pipeline):
def test_connected_pipeline(self, disjoint_pipeline):
"""Connect two separate pipelines."""
nodes = disjoint_pipeline["nodes"]
subpipeline = Pipeline(nodes, name="subpipeline")
subpipeline = Pipeline(nodes, tags=["subpipeline"])

assert len(subpipeline.inputs()) == 2
assert len(subpipeline.outputs()) == 2
Expand All @@ -528,9 +547,9 @@ def test_connected_pipeline(self, disjoint_pipeline):
assert len(pipeline.nodes) == 1 + len(nodes)
assert len(pipeline.inputs()) == 1
assert len(pipeline.outputs()) == 1
assert all(pipeline.name in n.tags for n in pipeline.nodes)
assert all(set(pipeline.tags) <= n.tags for n in pipeline.nodes)
assert all(
subpipeline.name in n.tags
set(subpipeline.tags) <= n.tags
for n in pipeline.nodes
if n.name != "connecting_node"
)
Expand Down

0 comments on commit 0760266

Please sign in to comment.