Skip to content

Commit

Permalink
[KED-2419] Make pipeline and Pipeline consistent, take 2 (kedro-org#1147
Browse files Browse the repository at this point in the history
)

Signed-off-by: Laurens Vijnck <laurens_vijnck@mckinsey.com>
  • Loading branch information
antonymilne authored and lvijnck committed Apr 7, 2022
1 parent 34f8738 commit 0bde80e
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 29 deletions.
11 changes: 9 additions & 2 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
# Release 0.17.7

## Major features and improvements
* `pipeline` now accepts `tags` and a collection of `Node`s and/or `Pipeline`s rather than just a single `Pipeline` object. `pipeline` should be used in preference to `Pipeline` when creating a Kedro pipeline.

## Bug fixes and other changes
* Added tutorial documentation for experiment tracking in Kedro docs. (`03_tutorial/07_set_up_experiment_tracking.md`).
* Added Plotly documentation in Kedro docs. (`03_tutorial/06_visualise_pipeline.md`).
* Added tutorial documentation for experiment tracking in Kedro docs (`03_tutorial/07_set_up_experiment_tracking.md`).
* Added Plotly documentation in Kedro docs (`03_tutorial/06_visualise_pipeline.md`).

## Minor breaking changes to the API

## Upcoming deprecations for Kedro 0.18.0

# Release 0.17.6

Expand Down
4 changes: 2 additions & 2 deletions features/steps/cli_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ def check_empty_pipeline_exists(context):
/ context.project_name.replace("-", "_")
/ "pipeline_registry.py"
)
assert '"__default__": Pipeline([])' in pipeline_file.read_text("utf-8")
assert '"__default__": pipeline([])' in pipeline_file.read_text("utf-8")


@then("the pipeline should contain nodes")
Expand All @@ -510,7 +510,7 @@ def check_pipeline_not_empty(context):
/ context.project_name.replace("-", "_")
/ "pipeline_registry.py"
)
assert "pipeline = Pipeline([])" not in pipeline_file.read_text("utf-8")
assert "pipeline = pipeline([])" not in pipeline_file.read_text("utf-8")


@then("the console log should show that {number} nodes were run")
Expand Down
4 changes: 2 additions & 2 deletions features/steps/test_plugin/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging

from kedro.framework.hooks import hook_impl
from kedro.pipeline import Pipeline, node
from kedro.pipeline import node, pipeline


class MyPluginHook:
Expand All @@ -15,7 +15,7 @@ def after_catalog_created(
@hook_impl
def register_pipelines(self): # pylint: disable=no-self-use
return {
"from_plugin": Pipeline([node(lambda: "sth", inputs=None, outputs="x")])
"from_plugin": pipeline([node(lambda: "sth", inputs=None, outputs="x")])
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
Delete this when you start working on your own Kedro project.
"""

from kedro.pipeline import Pipeline, node
from kedro.pipeline import node, pipeline

from .nodes import split_data


def create_pipeline(**kwargs):
return Pipeline(
return pipeline(
[
node(
split_data,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
Delete this when you start working on your own Kedro project.
"""

from kedro.pipeline import Pipeline, node
from kedro.pipeline import node, pipeline

from .nodes import predict, report_accuracy, train_model


def create_pipeline(**kwargs):
return Pipeline(
return pipeline(
[
node(
train_model,
Expand Down
33 changes: 24 additions & 9 deletions kedro/pipeline/modular_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Helper to integrate modular pipelines into a master pipeline."""
import copy
from typing import AbstractSet, Dict, List, Set, Union
from typing import AbstractSet, Dict, Iterable, List, Set, Union

from kedro.pipeline.node import Node
from kedro.pipeline.pipeline import (
Expand Down Expand Up @@ -69,34 +69,40 @@ def _validate_datasets_exist(


def pipeline(
pipe: Pipeline,
pipe: Union[Iterable[Union[Node, Pipeline]], Pipeline],
*,
inputs: Union[str, Set[str], Dict[str, str]] = None,
outputs: Union[str, Set[str], Dict[str, str]] = None,
parameters: Dict[str, str] = None,
tags: Union[str, Iterable[str]] = None,
namespace: str = None,
) -> Pipeline:
"""Create a copy of the pipeline and its nodes,
with some dataset names and node names modified.
"""Create a ``Pipeline`` from a collection of nodes and/or ``Pipeline``s.
Args:
pipe: Original modular pipeline to integrate
pipe: The nodes the ``Pipeline`` will be made of. If you
provide pipelines among the list of nodes, those pipelines will
be expanded and all their nodes will become part of this
new pipeline.
inputs: A name or collection of input names to be exposed as connection points
to other pipelines upstream.
to other pipelines upstream. This is optional; if not provided, the
pipeline inputs are automatically inferred from the pipeline structure.
When str or Set[str] is provided, the listed input names will stay
the same as they are named in the provided pipeline.
When Dict[str, str] is provided, current input names will be
mapped to new names.
Must only refer to the pipeline's free inputs.
outputs: A name or collection of names to be exposed as connection points
to other pipelines downstream.
to other pipelines downstream. This is optional; if not provided, the
pipeline inputs are automatically inferred from the pipeline structure.
When str or Set[str] is provided, the listed output names will stay
the same as they are named in the provided pipeline.
When Dict[str, str] is provided, current output names will be
mapped to new names.
Can refer to both the pipeline's free outputs, as well as
intermediate results that need to be exposed.
parameters: A map of existing parameter to the new one.
tags: Optional set of tags to be applied to all the pipeline nodes.
namespace: A prefix to give to all dataset names,
except those explicitly named with the `inputs`/`outputs`
arguments, and parameter references (`params:` and `parameters`).
Expand All @@ -108,8 +114,17 @@ def pipeline(
any of the expected types (str, dict, list, or None).
Returns:
A new ``Pipeline`` object with the new nodes, modified as requested.
A new ``Pipeline`` object.
"""
if isinstance(pipe, Pipeline):
# To ensure that we are always dealing with a *copy* of pipe.
pipe = Pipeline([pipe], tags=tags)
else:
pipe = Pipeline(pipe, tags=tags)

if not any([inputs, outputs, parameters, namespace]):
return pipe

# pylint: disable=protected-access
inputs = _to_dict(inputs)
outputs = _to_dict(outputs)
Expand Down Expand Up @@ -181,7 +196,7 @@ def _copy_node(node: Node) -> Node:

new_nodes = [_copy_node(n) for n in pipe.nodes]

return Pipeline(new_nodes)
return Pipeline(new_nodes, tags=tags)


def _to_dict(element: Union[None, str, Set[str], Dict[str, str]]) -> Dict[str, str]:
Expand Down
2 changes: 1 addition & 1 deletion kedro/pipeline/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ def node(
outputs: Union[None, str, List[str], Dict[str, str]],
*,
name: str = None,
tags: Iterable[str] = None,
tags: Union[str, Iterable[str]] = None,
confirms: Union[str, List[str]] = None,
namespace: str = None,
) -> Node:
Expand Down
11 changes: 7 additions & 4 deletions kedro/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,10 +692,13 @@ def decorate(self, *decorators: Callable) -> "Pipeline":
return Pipeline(nodes)

def tag(self, tags: Union[str, Iterable[str]]) -> "Pipeline":
"""
Return a copy of the pipeline, with each node tagged accordingly.
:param tags: The tags to be added to the nodes.
:return: New `Pipeline` object.
"""Tags all the nodes in the pipeline.
Args:
tags: The tags to be added to the nodes.
Returns:
New ``Pipeline`` object with nodes tagged.
"""
nodes = [n.tag(tags) for n in self.nodes]
return Pipeline(nodes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
generated using Kedro {{ cookiecutter.kedro_version }}
"""

from kedro.pipeline import Pipeline, node
from kedro.pipeline import Pipeline, node, pipeline


def create_pipeline(**kwargs):
return Pipeline([])
def create_pipeline(**kwargs) -> Pipeline:
return pipeline([])
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Project pipelines."""
from typing import Dict

from kedro.pipeline import Pipeline
from kedro.pipeline import Pipeline, pipeline


def register_pipelines() -> Dict[str, Pipeline]:
Expand All @@ -10,4 +10,4 @@ def register_pipelines() -> Dict[str, Pipeline]:
Returns:
A mapping from a pipeline name to a ``Pipeline`` object.
"""
return {"__default__": Pipeline([])}
return {"__default__": pipeline([])}
14 changes: 14 additions & 0 deletions tests/pipeline/test_pipeline_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,3 +355,17 @@ def test_bad_outputs_mapping(self):
pattern = "Outputs can't contain free inputs to the pipeline"
with pytest.raises(ModularPipelineError, match=pattern):
pipeline(raw_pipeline, outputs={"A": "C"})

def test_pipeline_always_copies(self):
original_pipeline = pipeline([node(constant_output, None, "A")])
new_pipeline = pipeline(original_pipeline)
assert new_pipeline.nodes == original_pipeline.nodes
assert new_pipeline is not original_pipeline

def test_pipeline_tags(self):
tagged_pipeline = pipeline(
[node(constant_output, None, "A"), node(constant_output, None, "B")],
tags="tag",
)

assert all(n.tags == {"tag"} for n in tagged_pipeline.nodes)

0 comments on commit 0bde80e

Please sign in to comment.