Skip to content

Commit

Permalink
Replace Pipeline with modular_pipeline.pipeline (#2189)
Browse files Browse the repository at this point in the history
* Replace Pipeline with modular_pipeline.pipeline

Replace all instaces to kedro.pipeline.Pipeline in the test codebase
with modular_pipeline.pipeline less the tests in tests_spark_dataset.py
due to local env setup restrictions.

Signed-off-by: Adam Farley <adamfrly@gmail.com>
  • Loading branch information
adamfrly authored Jan 24, 2023
1 parent 84275a4 commit a6919ef
Show file tree
Hide file tree
Showing 14 changed files with 201 additions and 145 deletions.
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* Fixed bug where `micropkg` manifest section in `pyproject.toml` isn't recognised as allowed configuration.
* Fixed bug causing `load_ipython_extension` not to register the `%reload_kedro` line magic when called in a directory that does not contain a Kedro project.
* Added anyconfig's `ac_context` parameter to `kedro.config.commons` module functions for more flexible `ConfigLoader` customizations.
* Change reference to `kedro.pipeline.Pipeline` object throughout test suite with `kedro.modular_pipeline.pipeline` factory.
* Fixed bug causing the `after_dataset_saved` hook only to be called for one output dataset when multiple are saved in a single node and async saving is in use.
* Log level for "Credentials not found in your Kedro project config" was changed from `WARNING` to `DEBUG`.

Expand Down
5 changes: 3 additions & 2 deletions tests/extras/datasets/spark/test_deltatable_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

from kedro.extras.datasets.spark import DeltaTableDataSet, SparkDataSet
from kedro.io import DataCatalog, DataSetError
from kedro.pipeline import Pipeline, node
from kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline
from kedro.runner import ParallelRunner


Expand Down Expand Up @@ -80,7 +81,7 @@ def no_output(x):

delta_ds = DeltaTableDataSet(filepath="")
catalog = DataCatalog(data_sets={"delta_in": delta_ds})
pipeline = Pipeline([node(no_output, "delta_in", None)])
pipeline = modular_pipeline([node(no_output, "delta_in", None)])
pattern = (
r"The following data sets cannot be used with "
r"multiprocessing: \['delta_in'\]"
Expand Down
11 changes: 6 additions & 5 deletions tests/extras/datasets/spark/test_spark_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
)
from kedro.io import DataCatalog, DataSetError, Version
from kedro.io.core import generate_timestamp
from kedro.pipeline import Pipeline, node
from kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline
from kedro.runner import ParallelRunner, SequentialRunner

FOLDER_NAME = "fake_folder"
Expand Down Expand Up @@ -417,7 +418,7 @@ def test_exists_raises_error(self, mocker):
def test_parallel_runner(self, is_async, spark_in):
"""Test ParallelRunner with SparkDataSet fails."""
catalog = DataCatalog(data_sets={"spark_in": spark_in})
pipeline = Pipeline([node(identity, "spark_in", "spark_out")])
pipeline = modular_pipeline([node(identity, "spark_in", "spark_out")])
pattern = (
r"The following data sets cannot be used with "
r"multiprocessing: \['spark_in'\]"
Expand Down Expand Up @@ -954,7 +955,7 @@ def data_catalog(tmp_path):
class TestDataFlowSequentialRunner:
def test_spark_load_save(self, is_async, data_catalog):
"""SparkDataSet(load) -> node -> Spark (save)."""
pipeline = Pipeline([node(identity, "spark_in", "spark_out")])
pipeline = modular_pipeline([node(identity, "spark_in", "spark_out")])
SequentialRunner(is_async=is_async).run(pipeline, data_catalog)

save_path = Path(data_catalog._data_sets["spark_out"]._filepath.as_posix())
Expand All @@ -963,15 +964,15 @@ def test_spark_load_save(self, is_async, data_catalog):

def test_spark_pickle(self, is_async, data_catalog):
"""SparkDataSet(load) -> node -> PickleDataSet (save)"""
pipeline = Pipeline([node(identity, "spark_in", "pickle_ds")])
pipeline = modular_pipeline([node(identity, "spark_in", "pickle_ds")])
pattern = ".* was not serialised due to.*"
with pytest.raises(DataSetError, match=pattern):
SequentialRunner(is_async=is_async).run(pipeline, data_catalog)

def test_spark_memory_spark(self, is_async, data_catalog):
"""SparkDataSet(load) -> node -> MemoryDataSet (save and then load) ->
node -> SparkDataSet (save)"""
pipeline = Pipeline(
pipeline = modular_pipeline(
[
node(identity, "spark_in", "memory_ds"),
node(identity, "memory_ds", "spark_out"),
Expand Down
10 changes: 7 additions & 3 deletions tests/framework/cli/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

from kedro.extras.datasets.pandas import CSVDataSet
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline import Pipeline, node
from kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline


@pytest.fixture
Expand All @@ -22,7 +23,10 @@ def fake_load_context(mocker):

@pytest.fixture
def mock_pipelines(mocker):
dummy_pipelines = {PIPELINE_NAME: Pipeline([]), "second": Pipeline([])}
dummy_pipelines = {
PIPELINE_NAME: modular_pipeline([]),
"second": modular_pipeline([]),
}
return mocker.patch("kedro.framework.cli.catalog.pipelines", dummy_pipelines)


Expand Down Expand Up @@ -246,7 +250,7 @@ def test_no_missing_datasets(
}
mocked_context.catalog = DataCatalog(data_sets=catalog_data_sets)
mocked_context.project_path = fake_repo_path
mock_pipelines[self.PIPELINE_NAME] = Pipeline(
mock_pipelines[self.PIPELINE_NAME] = modular_pipeline(
[node(identity, "input_data", "output_data")]
)

Expand Down
3 changes: 2 additions & 1 deletion tests/framework/session/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from kedro.framework.session import KedroSession
from kedro.io import DataCatalog
from kedro.pipeline import Pipeline
from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline
from kedro.pipeline.node import Node, node

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -110,7 +111,7 @@ def dummy_dataframe() -> pd.DataFrame:

@pytest.fixture
def mock_pipeline() -> Pipeline:
return Pipeline(
return modular_pipeline(
[
node(identity_node, "cars", "planes", name="node1"),
node(identity_node, "boats", "ships", name="node2"),
Expand Down
8 changes: 4 additions & 4 deletions tests/ipython/test_ipython.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from kedro.framework.project import pipelines
from kedro.framework.startup import ProjectMetadata
from kedro.ipython import _resolve_project_path, load_ipython_extension, reload_kedro
from kedro.pipeline import Pipeline
from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline

PACKAGE_NAME = "fake_package_name"
PROJECT_NAME = "fake_project_name"
Expand Down Expand Up @@ -76,7 +76,7 @@ def test_ipython_lazy_load_pipeline(
):
pipelines.configure("dummy_pipeline") # Setup the pipelines

my_pipelines = {"ds": Pipeline([])}
my_pipelines = {"ds": modular_pipeline([])}

def my_register_pipeline():
return my_pipelines
Expand All @@ -100,7 +100,7 @@ def test_ipython_load_objects(
mock_session_create = mocker.patch("kedro.ipython.KedroSession.create")
pipelines.configure("dummy_pipeline") # Setup the pipelines

my_pipelines = {"ds": Pipeline([])}
my_pipelines = {"ds": modular_pipeline([])}

def my_register_pipeline():
return my_pipelines
Expand Down Expand Up @@ -129,7 +129,7 @@ def test_ipython_load_objects_with_args(self, mocker, fake_metadata, ipython):
mock_session_create = mocker.patch("kedro.ipython.KedroSession.create")
pipelines.configure("dummy_pipeline") # Setup the pipelines

my_pipelines = {"ds": Pipeline([])}
my_pipelines = {"ds": modular_pipeline([])}

def my_register_pipeline():
return my_pipelines
Expand Down
51 changes: 29 additions & 22 deletions tests/pipeline/test_modular_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pytest

from kedro.pipeline import Pipeline, node, pipeline
from kedro.pipeline import node, pipeline
from kedro.pipeline.modular_pipeline import ModularPipelineError
from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline

# Different dummy func based on the number of arguments

Expand All @@ -27,7 +28,7 @@ def test_transform_dataset_names(self):
"""
Rename some datasets, test string, list and dict formats.
"""
raw_pipeline = Pipeline(
raw_pipeline = modular_pipeline(
[
node(identity, "A", "B", name="node1"),
node(biconcat, ["C", "D"], ["E", "F"], name="node2"),
Expand Down Expand Up @@ -58,7 +59,7 @@ def test_prefix_dataset_names(self):
"""
Simple prefixing for dataset of all formats: str, list and dict
"""
raw_pipeline = Pipeline(
raw_pipeline = modular_pipeline(
[
node(identity, "A", "B", name="node1"),
node(biconcat, ["C", "D"], ["E", "F"], name="node2"),
Expand All @@ -83,7 +84,7 @@ def test_prefixing_and_renaming(self):
Prefixing and renaming at the same time.
Explicitly renamed datasets should not be prefixed anymore.
"""
raw_pipeline = Pipeline([node(biconcat, ["C", "D"], ["E", "F"])])
raw_pipeline = modular_pipeline([node(biconcat, ["C", "D"], ["E", "F"])])
resulting_pipeline = pipeline(
raw_pipeline,
namespace="PREFIX",
Expand All @@ -98,7 +99,7 @@ def test_prefixing_and_renaming(self):
[("A", "D"), (["A"], ["D"]), ({"A"}, {"D"}), ({"A": "A"}, {"D": "D"})],
)
def test_prefix_exclude_free_inputs(self, inputs, outputs):
raw_pipeline = Pipeline(
raw_pipeline = modular_pipeline(
[
node(identity, "A", "B", name="node1"),
node(identity, "B", "C", name="node2"),
Expand All @@ -122,7 +123,7 @@ def test_transform_params_prefix_and_parameters(self):
"""
Test that transform should prefix all parameters by default.
"""
raw_pipeline = Pipeline(
raw_pipeline = modular_pipeline(
[
node(identity, "parameters", "params:B", name="node1"),
node(biconcat, ["params:C", "D"], ["parameters", "F"], name="node2"),
Expand All @@ -147,7 +148,9 @@ def test_transform_params_prefix_and_parameters(self):
assert nodes[2].name == "PREFIX.node3"

def test_dataset_transcoding_mapping_base_name(self):
raw_pipeline = Pipeline([node(biconcat, ["C@pandas", "D"], ["E@spark", "F"])])
raw_pipeline = modular_pipeline(
[node(biconcat, ["C@pandas", "D"], ["E@spark", "F"])]
)
resulting_pipeline = pipeline(
raw_pipeline, namespace="PREFIX", inputs={"C": "C_new"}
)
Expand All @@ -156,7 +159,7 @@ def test_dataset_transcoding_mapping_base_name(self):
assert resulting_pipeline.nodes[0]._outputs == ["PREFIX.E@spark", "PREFIX.F"]

def test_dataset_transcoding_mapping_full_dataset(self):
raw_pipeline = Pipeline(
raw_pipeline = modular_pipeline(
[
node(biconcat, ["A@pandas", "B"], "C"),
node(biconcat, ["A@spark", "C"], "CC"),
Expand All @@ -173,7 +176,7 @@ def test_dataset_transcoding_mapping_full_dataset(self):
assert resulting_pipeline.nodes[1]._outputs == "PREFIX.CC"

def test_empty_input(self):
raw_pipeline = Pipeline([node(constant_output, None, ["A", "B"])])
raw_pipeline = modular_pipeline([node(constant_output, None, ["A", "B"])])

resulting_pipeline = pipeline(
raw_pipeline, namespace="PREFIX", outputs={"A": "A_new"}
Expand All @@ -182,7 +185,7 @@ def test_empty_input(self):
assert resulting_pipeline.nodes[0]._outputs == ["A_new", "PREFIX.B"]

def test_empty_output(self):
raw_pipeline = Pipeline([node(biconcat, ["A", "B"], None)])
raw_pipeline = modular_pipeline([node(biconcat, ["A", "B"], None)])

resulting_pipeline = pipeline(
raw_pipeline, namespace="PREFIX", inputs={"A": "A_new"}
Expand Down Expand Up @@ -217,7 +220,7 @@ def test_empty_output(self):
def test_missing_dataset_name(
self, func, inputs, outputs, inputs_map, outputs_map, expected_missing
): # pylint: disable=too-many-arguments
raw_pipeline = Pipeline([node(func, inputs, outputs)])
raw_pipeline = modular_pipeline([node(func, inputs, outputs)])

with pytest.raises(ModularPipelineError, match=r"Failed to map datasets") as e:
pipeline(
Expand All @@ -230,15 +233,17 @@ def test_node_properties_preserved(self):
Check that we don't loose any valuable properties on node cloning.
Also an explicitly defined name should get prefixed.
"""
raw_pipeline = Pipeline([node(identity, "A", "B", name="node1", tags=["tag1"])])
raw_pipeline = modular_pipeline(
[node(identity, "A", "B", name="node1", tags=["tag1"])]
)
resulting_pipeline = pipeline(raw_pipeline, namespace="PREFIX")

assert resulting_pipeline.nodes[0].name == "PREFIX.node1"
assert resulting_pipeline.nodes[0].tags == {"tag1"}

def test_default_node_name_is_namespaced(self):
"""Check that auto-generated node names are also namespaced"""
raw_pipeline = Pipeline([node(identity, "A", "B")])
raw_pipeline = modular_pipeline([node(identity, "A", "B")])
first_layer_nested_pipe = pipeline(raw_pipeline, namespace="PREFIX")
resulting_node = first_layer_nested_pipe.nodes[0]

Expand All @@ -254,7 +259,7 @@ def test_default_node_name_is_namespaced(self):
def test_expose_intermediate_output(self):
"""Check that we don't namespace an intermediary dataset, anywhere it
is used - either input or output"""
raw_pipeline = Pipeline(
raw_pipeline = modular_pipeline(
[
node(identity, "A", "B", name="node1"),
node(identity, "B", "C", name="node2"),
Expand All @@ -279,7 +284,9 @@ def test_expose_intermediate_output(self):
assert actual_nodes[3]._outputs == "ACTUAL.X"

def test_parameters_left_intact_when_defined_as_str(self):
raw_pipeline = Pipeline([node(biconcat, ["A", "params:x"], "AA", name="node1")])
raw_pipeline = modular_pipeline(
[node(biconcat, ["A", "params:x"], "AA", name="node1")]
)
resulting_pipeline = pipeline(
raw_pipeline, outputs={"AA": "B"}, parameters="x", namespace="PREFIX"
)
Expand All @@ -292,7 +299,7 @@ def test_parameters_left_intact_when_defined_as_str(self):
"parameters", ["params:x", {"params:x"}, {"params:x": "params:x"}]
)
def test_parameters_left_intact_when_defined_as_(self, parameters):
raw_pipeline = Pipeline(
raw_pipeline = modular_pipeline(
[node(triconcat, ["A", "params:x", "params:y"], "AA", name="node1")]
)
resulting_pipeline = pipeline(
Expand All @@ -308,7 +315,7 @@ def test_parameters_left_intact_when_defined_as_(self, parameters):
assert actual_nodes[0]._outputs == "B"

def test_parameters_updated_with_dict(self):
raw_pipeline = Pipeline(
raw_pipeline = modular_pipeline(
[
node(biconcat, ["A", "params:x"], "AA", name="node1"),
node(biconcat, ["AA", "params:y"], "B", name="node2"),
Expand All @@ -333,7 +340,7 @@ def test_parameters_updated_with_dict(self):
assert actual_nodes[2]._outputs == "ACTUAL.BB"

def test_parameters_defined_with_params_prefix(self):
raw_pipeline = Pipeline(
raw_pipeline = modular_pipeline(
[node(triconcat, ["A", "params:x", "params:y"], "AA", name="node1")]
)
resulting_pipeline = pipeline(
Expand All @@ -349,7 +356,7 @@ def test_parameters_defined_with_params_prefix(self):
assert actual_nodes[0]._outputs == "B"

def test_parameters_specified_under_inputs(self):
raw_pipeline = Pipeline(
raw_pipeline = modular_pipeline(
[
node(biconcat, ["A", "params:alpha"], "AA", name="node1"),
node(biconcat, ["AA", "parameters"], "BB", name="node2"),
Expand All @@ -364,7 +371,7 @@ def test_parameters_specified_under_inputs(self):
pipeline(raw_pipeline, inputs={"parameters": "some_yaml_dataset"})

def test_non_existent_parameters_mapped(self):
raw_pipeline = Pipeline(
raw_pipeline = modular_pipeline(
[
node(biconcat, ["A", "params:alpha"], "AA", name="node1"),
node(biconcat, ["AA", "CC"], "BB", name="node2"),
Expand All @@ -380,7 +387,7 @@ def test_non_existent_parameters_mapped(self):
pipeline(raw_pipeline, parameters={"parameters": "some_yaml_dataset"})

def test_bad_inputs_mapping(self):
raw_pipeline = Pipeline(
raw_pipeline = modular_pipeline(
[
node(biconcat, ["A", "params:alpha"], "AA", name="node1"),
node(biconcat, ["AA", "parameters"], "BB", name="node2"),
Expand All @@ -392,7 +399,7 @@ def test_bad_inputs_mapping(self):
pipeline(raw_pipeline, inputs={"AA": "CC"})

def test_bad_outputs_mapping(self):
raw_pipeline = Pipeline(
raw_pipeline = modular_pipeline(
[
node(biconcat, ["A", "params:alpha"], "AA", name="node1"),
node(biconcat, ["AA", "parameters"], "BB", name="node2"),
Expand Down
Loading

0 comments on commit a6919ef

Please sign in to comment.