Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to raise errors during find_pipelines #3823

Merged
merged 8 commits into from
May 22, 2024
3 changes: 2 additions & 1 deletion RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Upcoming Release 0.19.6

## Major features and improvements
* Added `raise_errors` argument to `find_pipelines`. If `True`, the first pipeline for which autodiscovery fails will cause an error to be raised. The default behaviour is still to raise a warning for each failing pipeline.

Check warning on line 4 in RELEASE.md

View workflow job for this annotation

GitHub Actions / vale

[vale] RELEASE.md#L4

[Kedro.Spellings] Did you really mean 'autodiscovery'?
Raw output
{"message": "[Kedro.Spellings] Did you really mean 'autodiscovery'?", "location": {"path": "RELEASE.md", "range": {"start": {"line": 4, "column": 94}}}, "severity": "WARNING"}
* It is now possible to use Kedro without having `rich` installed.
* Added a `--telemetry` flag to `kedro run`, allowing consent to data usage to be granted or revoked at the same time the command is run.

Expand Down Expand Up @@ -45,7 +46,7 @@
* Updated `kedro pipeline create` and `kedro pipeline delete` to read the base environment from the project settings.
* Updated CLI command `kedro catalog resolve` to read credentials properly.
* Changed the path of where pipeline tests generated with `kedro pipeline create` from `<project root>/src/tests/pipelines/<pipeline name>` to `<project root>/tests/pipelines/<pipeline name>`.
* Updated ``.gitignore`` to prevent pushing Mlflow local runs folder to a remote forge when using mlflow and git.
* Updated ``.gitignore`` to prevent pushing MLflow local runs folder to a remote forge when using MLflow and Git.
* Fixed error handling message for malformed yaml/json files in OmegaConfigLoader.
* Fixed a bug in `node`-creation allowing self-dependencies when using transcoding, that is datasets named like `name@format`.
* Improved error message when passing wrong value to node.
Expand Down
4 changes: 3 additions & 1 deletion docs/source/nodes_and_pipelines/pipeline_registry.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@
2. Calling the `create_pipeline()` function exposed by the `<package_name>.pipelines.<pipeline_name>` module
3. Validating that the constructed object is a {py:class}`~kedro.pipeline.Pipeline`

If any of these steps fail, `find_pipelines()` raises an appropriate warning and skips the current pipeline but continues traversal.
By default, if any of these steps fail, `find_pipelines()` (or `find_pipelines(raise_errors=False)`) raises an appropriate warning and skips the current pipeline but continues traversal. During development, this enables you to run your project with some pipelines, even if other pipelines are broken or works in progress.

If you specify `find_pipelines(raise_errors=True)`, the autodiscovery process will fail upon the first error. In production, this ensures errors are caught up front, and pipelines do not get excluded accidentally.

Check warning on line 56 in docs/source/nodes_and_pipelines/pipeline_registry.md

View workflow job for this annotation

GitHub Actions / vale

[vale] docs/source/nodes_and_pipelines/pipeline_registry.md#L56

[Kedro.Spellings] Did you really mean 'autodiscovery'?
Raw output
{"message": "[Kedro.Spellings] Did you really mean 'autodiscovery'?", "location": {"path": "docs/source/nodes_and_pipelines/pipeline_registry.md", "range": {"start": {"line": 56, "column": 57}}}, "severity": "WARNING"}

Check warning on line 56 in docs/source/nodes_and_pipelines/pipeline_registry.md

View workflow job for this annotation

GitHub Actions / vale

[vale] docs/source/nodes_and_pipelines/pipeline_registry.md#L56

[Kedro.weaselwords] 'accidentally' is a weasel word!
Raw output
{"message": "[Kedro.weaselwords] 'accidentally' is a weasel word!", "location": {"path": "docs/source/nodes_and_pipelines/pipeline_registry.md", "range": {"start": {"line": 56, "column": 201}}}, "severity": "WARNING"}

The mapping returned by `find_pipelines()` can be modified, meaning you are not limited to the pipelines returned by each of the `create_pipeline()` functions found above. For example, to add a data engineering pipeline that isn't part of the default pipeline, add it to the dictionary *after* constructing the default pipeline:

Expand Down
26 changes: 24 additions & 2 deletions kedro/framework/project/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def _create_pipeline(pipeline_module: types.ModuleType) -> Pipeline | None:
return obj


def find_pipelines() -> dict[str, Pipeline]: # noqa: PLR0912
def find_pipelines(raise_errors: bool = False) -> dict[str, Pipeline]: # noqa: PLR0912
"""Automatically find modular pipelines having a ``create_pipeline``
function. By default, projects created using Kedro 0.18.3 and higher
call this function to autoregister pipelines upon creation/addition.
Expand All @@ -359,13 +359,23 @@ def find_pipelines() -> dict[str, Pipeline]: # noqa: PLR0912
For more information on the pipeline registry and autodiscovery, see
https://kedro.readthedocs.io/en/stable/nodes_and_pipelines/pipeline_registry.html

Args:
raise_errors: If ``True``, raise an error upon failed discovery.

Returns:
A generated mapping from pipeline names to ``Pipeline`` objects.

Raises:
ImportError: When a module does not expose a ``create_pipeline``
function, the ``create_pipeline`` function does not return a
``Pipeline`` object, or if the module import fails up front.
If ``raise_errors`` is ``False``, see Warns section instead.

Warns:
UserWarning: When a module does not expose a ``create_pipeline``
function, the ``create_pipeline`` function does not return a
``Pipeline`` object, or if the module import fails up front.
If ``raise_errors`` is ``True``, see Raises section instead.
"""
pipeline_obj = None

Expand All @@ -375,6 +385,12 @@ def find_pipelines() -> dict[str, Pipeline]: # noqa: PLR0912
pipeline_module = importlib.import_module(pipeline_module_name)
except Exception as exc:
if str(exc) != f"No module named '{pipeline_module_name}'":
if raise_errors:
raise ImportError(
f"An error occurred while importing the "
f"'{pipeline_module_name}' module."
) from exc

warnings.warn(
IMPORT_ERROR_MESSAGE.format(
module=pipeline_module_name, tb_exc=traceback.format_exc()
Expand Down Expand Up @@ -406,7 +422,13 @@ def find_pipelines() -> dict[str, Pipeline]: # noqa: PLR0912
pipeline_module_name = f"{PACKAGE_NAME}.pipelines.{pipeline_name}"
try:
pipeline_module = importlib.import_module(pipeline_module_name)
except: # noqa: E722
except Exception as exc:
if raise_errors:
raise ImportError(
f"An error occurred while importing the "
f"'{pipeline_module_name}' module."
) from exc

warnings.warn(
IMPORT_ERROR_MESSAGE.format(
module=pipeline_module_name, tb_exc=traceback.format_exc()
Expand Down
4 changes: 4 additions & 0 deletions kedro/pipeline/modular_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ def _get_dataset_names_mapping(
the same as they are named in the provided pipeline.
When dict[str, str] is provided, current names will be
mapped to new names in the resultant pipeline.

Returns:
A dictionary that maps the old dataset names to the provided ones.

Examples:
>>> _get_dataset_names_mapping("dataset_name")
{"dataset_name": "dataset_name"} # a str name will stay the same
Expand Down Expand Up @@ -138,8 +140,10 @@ def _get_param_names_mapping(
the same as they are named in the provided pipeline.
When dict[str, str] is provided, current names will be
mapped to new names in the resultant pipeline.

Returns:
A dictionary that maps the old parameter names to the provided ones.

Examples:
>>> _get_param_names_mapping("param_name")
{"params:param_name": "params:param_name"} # a str name will stay the same
Expand Down
48 changes: 30 additions & 18 deletions tests/framework/project/test_pipeline_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,16 @@ def test_find_pipelines_skips_regular_files_within_the_pipelines_folder(


@pytest.mark.parametrize(
"mock_package_name_with_pipelines,pipeline_names",
[(x, x) for x in [set(), {"my_pipeline"}]],
indirect=True,
"mock_package_name_with_pipelines,pipeline_names,raise_errors",
[
(x, x, raise_errors)
for x in [set(), {"my_pipeline"}]
for raise_errors in [True, False]
],
indirect=["mock_package_name_with_pipelines", "pipeline_names"],
)
def test_find_pipelines_skips_modules_that_cause_exceptions_upon_import(
mock_package_name_with_pipelines, pipeline_names
mock_package_name_with_pipelines, pipeline_names, raise_errors
):
# Create a module that will result in errors when we try to load it.
pipelines_dir = Path(sys.path[0]) / mock_package_name_with_pipelines / "pipelines"
Expand All @@ -191,12 +195,14 @@ def test_find_pipelines_skips_modules_that_cause_exceptions_upon_import(
(pipeline_dir / "__init__.py").write_text("I walk a lonely road...")

configure_project(mock_package_name_with_pipelines)
with pytest.warns(
UserWarning, match=r"An error occurred while importing the '\S+' module."
with getattr(pytest, "raises" if raise_errors else "warns")(
ImportError if raise_errors else UserWarning,
match=r"An error occurred while importing the '\S+' module.",
):
pipelines = find_pipelines()
assert set(pipelines) == pipeline_names | {"__default__"}
assert sum(pipelines.values()).outputs() == pipeline_names
pipelines = find_pipelines(raise_errors=raise_errors)
if not raise_errors:
assert set(pipelines) == pipeline_names | {"__default__"}
assert sum(pipelines.values()).outputs() == pipeline_names


@pytest.mark.parametrize(
Expand Down Expand Up @@ -226,12 +232,16 @@ def create_pipeline(**kwargs) -> Pipeline:


@pytest.mark.parametrize(
"mock_package_name_with_pipelines,pipeline_names",
[(x, x) for x in [set(), {"my_pipeline"}]],
indirect=True,
"mock_package_name_with_pipelines,pipeline_names,raise_errors",
[
(x, x, raise_errors)
for x in [set(), {"my_pipeline"}]
for raise_errors in [True, False]
],
indirect=["mock_package_name_with_pipelines", "pipeline_names"],
)
def test_find_pipelines_skips_unimportable_pipeline_module(
mock_package_name_with_pipelines, pipeline_names
mock_package_name_with_pipelines, pipeline_names, raise_errors
):
(Path(sys.path[0]) / mock_package_name_with_pipelines / "pipeline.py").write_text(
textwrap.dedent(
Expand All @@ -248,12 +258,14 @@ def create_pipeline(**kwargs) -> Pipeline:
)

configure_project(mock_package_name_with_pipelines)
with pytest.warns(
UserWarning, match=r"An error occurred while importing the '\S+' module."
with getattr(pytest, "raises" if raise_errors else "warns")(
ImportError if raise_errors else UserWarning,
match=r"An error occurred while importing the '\S+' module.",
):
pipelines = find_pipelines()
assert set(pipelines) == pipeline_names | {"__default__"}
assert sum(pipelines.values()).outputs() == pipeline_names
pipelines = find_pipelines(raise_errors=raise_errors)
if not raise_errors:
assert set(pipelines) == pipeline_names | {"__default__"}
assert sum(pipelines.values()).outputs() == pipeline_names


@pytest.mark.parametrize(
Expand Down