Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KED-2419] Make pipeline and Pipeline consistent, take 2 #1147

Merged
merged 15 commits into from
Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
# Release 0.17.7
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this exist? 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think it makes sense to make a small release before 0.18.0


## Major features and improvements
* `pipeline` now accepts `tags` and a collection of `Node`s and/or `Pipeline`s rather than just a single `Pipeline` object. `pipeline` should be used in preference to `Pipeline` when creating a Kedro pipeline.

## Bug fixes and other changes

## Minor breaking changes to the API

## Upcoming deprecations for Kedro 0.18.0

# Release 0.17.6

## Major features and improvements
Expand Down
4 changes: 2 additions & 2 deletions features/steps/cli_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ def check_empty_pipeline_exists(context):
/ context.project_name.replace("-", "_")
/ "pipeline_registry.py"
)
assert '"__default__": Pipeline([])' in pipeline_file.read_text("utf-8")
assert '"__default__": pipeline([])' in pipeline_file.read_text("utf-8")


@then("the pipeline should contain nodes")
Expand All @@ -510,7 +510,7 @@ def check_pipeline_not_empty(context):
/ context.project_name.replace("-", "_")
/ "pipeline_registry.py"
)
assert "pipeline = Pipeline([])" not in pipeline_file.read_text("utf-8")
assert "pipeline = pipeline([])" not in pipeline_file.read_text("utf-8")


@then("the console log should show that {number} nodes were run")
Expand Down
4 changes: 2 additions & 2 deletions features/steps/test_plugin/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging

from kedro.framework.hooks import hook_impl
from kedro.pipeline import Pipeline, node
from kedro.pipeline import node, pipeline


class MyPluginHook:
Expand All @@ -15,7 +15,7 @@ def after_catalog_created(
@hook_impl
def register_pipelines(self): # pylint: disable=no-self-use
return {
"from_plugin": Pipeline([node(lambda: "sth", inputs=None, outputs="x")])
"from_plugin": pipeline([node(lambda: "sth", inputs=None, outputs="x")])
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
Delete this when you start working on your own Kedro project.
"""

from kedro.pipeline import Pipeline, node
from kedro.pipeline import node, pipeline

from .nodes import split_data


def create_pipeline(**kwargs):
return Pipeline(
return pipeline(
[
node(
split_data,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
Delete this when you start working on your own Kedro project.
"""

from kedro.pipeline import Pipeline, node
from kedro.pipeline import node, pipeline

from .nodes import predict, report_accuracy, train_model


def create_pipeline(**kwargs):
return Pipeline(
return pipeline(
[
node(
train_model,
Expand Down
33 changes: 24 additions & 9 deletions kedro/pipeline/modular_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Helper to integrate modular pipelines into a master pipeline."""
import copy
from typing import AbstractSet, Dict, List, Set, Union
from typing import AbstractSet, Dict, Iterable, List, Set, Union

from kedro.pipeline.node import Node
from kedro.pipeline.pipeline import (
Expand Down Expand Up @@ -69,34 +69,40 @@ def _validate_datasets_exist(


def pipeline(
pipe: Pipeline,
pipe: Union[Iterable[Union[Node, Pipeline]], Pipeline],
*,
inputs: Union[str, Set[str], Dict[str, str]] = None,
outputs: Union[str, Set[str], Dict[str, str]] = None,
parameters: Dict[str, str] = None,
tags: Union[str, Iterable[str]] = None,
namespace: str = None,
) -> Pipeline:
"""Create a copy of the pipeline and its nodes,
with some dataset names and node names modified.
"""Create a ``Pipeline`` from a collection of nodes and/or ``Pipeline``s.

Args:
pipe: Original modular pipeline to integrate
pipe: The nodes the ``Pipeline`` will be made of. If you
provide pipelines among the list of nodes, those pipelines will
be expanded and all their nodes will become part of this
new pipeline.
inputs: A name or collection of input names to be exposed as connection points
to other pipelines upstream.
to other pipelines upstream. This is optional; if not provided, the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

pipeline inputs are automatically inferred from the pipeline structure.
When str or Set[str] is provided, the listed input names will stay
the same as they are named in the provided pipeline.
When Dict[str, str] is provided, current input names will be
mapped to new names.
Must only refer to the pipeline's free inputs.
outputs: A name or collection of names to be exposed as connection points
to other pipelines downstream.
to other pipelines downstream. This is optional; if not provided, the
pipeline inputs are automatically inferred from the pipeline structure.
When str or Set[str] is provided, the listed output names will stay
the same as they are named in the provided pipeline.
When Dict[str, str] is provided, current output names will be
mapped to new names.
Can refer to both the pipeline's free outputs, as well as
intermediate results that need to be exposed.
parameters: A map of existing parameter to the new one.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also explicitly say that parameters are optional, now it's mentioned for inputs, outputs and tags?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok without because, unlike the others, parameters is not a required argument to node so there's no real possible confusion here 🙂

tags: Optional set of tags to be applied to all the pipeline nodes.
namespace: A prefix to give to all dataset names,
except those explicitly named with the `inputs`/`outputs`
arguments, and parameter references (`params:` and `parameters`).
Expand All @@ -108,8 +114,17 @@ def pipeline(
any of the expected types (str, dict, list, or None).

Returns:
A new ``Pipeline`` object with the new nodes, modified as requested.
A new ``Pipeline`` object.
"""
if isinstance(pipe, Pipeline):
# To ensure that we are always dealing with a *copy* of pipe.
pipe = Pipeline([pipe], tags=tags)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipeline only takes an iterable of pipelines/nodes, hence wrapping this in [].

else:
pipe = Pipeline(pipe, tags=tags)

if not any([inputs, outputs, parameters, namespace]):
return pipe

# pylint: disable=protected-access
inputs = _to_dict(inputs)
outputs = _to_dict(outputs)
Expand Down Expand Up @@ -181,7 +196,7 @@ def _copy_node(node: Node) -> Node:

new_nodes = [_copy_node(n) for n in pipe.nodes]

return Pipeline(new_nodes)
return Pipeline(new_nodes, tags=tags)


def _to_dict(element: Union[None, str, Set[str], Dict[str, str]]) -> Dict[str, str]:
Expand Down
2 changes: 1 addition & 1 deletion kedro/pipeline/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ def node(
outputs: Union[None, str, List[str], Dict[str, str]],
*,
name: str = None,
tags: Iterable[str] = None,
tags: Union[str, Iterable[str]] = None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should always have been this way so that node and Node match identically.

confirms: Union[str, List[str]] = None,
namespace: str = None,
) -> Node:
Expand Down
11 changes: 7 additions & 4 deletions kedro/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,10 +692,13 @@ def decorate(self, *decorators: Callable) -> "Pipeline":
return Pipeline(nodes)

def tag(self, tags: Union[str, Iterable[str]]) -> "Pipeline":
"""
Return a copy of the pipeline, with each node tagged accordingly.
:param tags: The tags to be added to the nodes.
:return: New `Pipeline` object.
"""Tags all the nodes in the pipeline.

Args:
tags: The tags to be added to the nodes.

Returns:
New ``Pipeline`` object with nodes tagged.
"""
nodes = [n.tag(tags) for n in self.nodes]
return Pipeline(nodes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
generated using Kedro {{ cookiecutter.kedro_version }}
"""

from kedro.pipeline import Pipeline, node
from kedro.pipeline import Pipeline, node, pipeline


def create_pipeline(**kwargs):
return Pipeline([])
def create_pipeline(**kwargs) -> Pipeline:
return pipeline([])
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Project pipelines."""
from typing import Dict

from kedro.pipeline import Pipeline
from kedro.pipeline import Pipeline, pipeline


def register_pipelines() -> Dict[str, Pipeline]:
Expand All @@ -10,4 +10,4 @@ def register_pipelines() -> Dict[str, Pipeline]:
Returns:
A mapping from a pipeline name to a ``Pipeline`` object.
"""
return {"__default__": Pipeline([])}
return {"__default__": pipeline([])}
13 changes: 13 additions & 0 deletions tests/pipeline/test_pipeline_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,3 +355,16 @@ def test_bad_outputs_mapping(self):
pattern = "Outputs can't contain free inputs to the pipeline"
with pytest.raises(ModularPipelineError, match=pattern):
pipeline(raw_pipeline, outputs={"A": "C"})

def test_pipeline_always_copies(self):
original_pipeline = pipeline([node(constant_output, None, "A")])
new_pipeline = pipeline(original_pipeline)
assert new_pipeline is not original_pipeline
antonymilne marked this conversation as resolved.
Show resolved Hide resolved

def test_pipeline_tags(self):
tagged_pipeline = pipeline(
[node(constant_output, None, "A"), node(constant_output, None, "B")],
tags="tag",
)

assert all(n.tags == {"tag"} for n in tagged_pipeline.nodes)