Skip to content

Commit

Permalink
Add option to raise errors during find_pipelines (#3823)
Browse files Browse the repository at this point in the history
* Add option to raise errors during `find_pipelines`

Signed-off-by: Deepyaman Datta <deepyaman.datta@utexas.edu>

* Add coverage for non-simplified project structures

Signed-off-by: Deepyaman Datta <deepyaman.datta@utexas.edu>

* Fix skipping for non-simplified project structures

Signed-off-by: Deepyaman Datta <deepyaman.datta@utexas.edu>

* Add required type hint for `raise_errors` argument

Signed-off-by: Deepyaman Datta <deepyaman.datta@utexas.edu>

* Add docstring and release notes for `raise_errors`

Signed-off-by: Deepyaman Datta <deepyaman.datta@utexas.edu>

* Add documentation on the `raise_error` flag to web

Signed-off-by: Deepyaman Datta <deepyaman.datta@utexas.edu>

---------

Signed-off-by: Deepyaman Datta <deepyaman.datta@utexas.edu>
  • Loading branch information
deepyaman authored May 22, 2024
1 parent 570f66a commit 56f2095
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 22 deletions.
3 changes: 2 additions & 1 deletion RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Upcoming Release 0.19.6

## Major features and improvements
* Added `raise_errors` argument to `find_pipelines`. If `True`, the first pipeline for which autodiscovery fails will cause an error to be raised. The default behaviour is still to raise a warning for each failing pipeline.
* It is now possible to use Kedro without having `rich` installed.
* Added a `--telemetry` flag to `kedro run`, allowing consent to data usage to be granted or revoked at the same time the command is run.

Expand Down Expand Up @@ -45,7 +46,7 @@ Many thanks to the following Kedroids for contributing PRs to this release:
* Updated `kedro pipeline create` and `kedro pipeline delete` to read the base environment from the project settings.
* Updated CLI command `kedro catalog resolve` to read credentials properly.
* Changed the path of where pipeline tests generated with `kedro pipeline create` from `<project root>/src/tests/pipelines/<pipeline name>` to `<project root>/tests/pipelines/<pipeline name>`.
* Updated ``.gitignore`` to prevent pushing Mlflow local runs folder to a remote forge when using mlflow and git.
* Updated ``.gitignore`` to prevent pushing MLflow local runs folder to a remote forge when using MLflow and Git.
* Fixed error handling message for malformed yaml/json files in OmegaConfigLoader.
* Fixed a bug in `node`-creation allowing self-dependencies when using transcoding, that is datasets named like `name@format`.
* Improved error message when passing wrong value to node.
Expand Down
4 changes: 3 additions & 1 deletion docs/source/nodes_and_pipelines/pipeline_registry.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ Under the hood, the `find_pipelines()` function traverses the `src/<package_name
2. Calling the `create_pipeline()` function exposed by the `<package_name>.pipelines.<pipeline_name>` module
3. Validating that the constructed object is a {py:class}`~kedro.pipeline.Pipeline`

If any of these steps fail, `find_pipelines()` raises an appropriate warning and skips the current pipeline but continues traversal.
By default, if any of these steps fail, `find_pipelines()` (or `find_pipelines(raise_errors=False)`) raises an appropriate warning and skips the current pipeline but continues traversal. During development, this enables you to run your project with some pipelines, even if other pipelines are broken or works in progress.

If you specify `find_pipelines(raise_errors=True)`, the autodiscovery process will fail upon the first error. In production, this ensures errors are caught up front, and pipelines do not get excluded accidentally.

The mapping returned by `find_pipelines()` can be modified, meaning you are not limited to the pipelines returned by each of the `create_pipeline()` functions found above. For example, to add a data engineering pipeline that isn't part of the default pipeline, add it to the dictionary *after* constructing the default pipeline:

Expand Down
26 changes: 24 additions & 2 deletions kedro/framework/project/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def _create_pipeline(pipeline_module: types.ModuleType) -> Pipeline | None:
return obj


def find_pipelines() -> dict[str, Pipeline]: # noqa: PLR0912
def find_pipelines(raise_errors: bool = False) -> dict[str, Pipeline]: # noqa: PLR0912
"""Automatically find modular pipelines having a ``create_pipeline``
function. By default, projects created using Kedro 0.18.3 and higher
call this function to autoregister pipelines upon creation/addition.
Expand All @@ -359,13 +359,23 @@ def find_pipelines() -> dict[str, Pipeline]: # noqa: PLR0912
For more information on the pipeline registry and autodiscovery, see
https://kedro.readthedocs.io/en/stable/nodes_and_pipelines/pipeline_registry.html
Args:
raise_errors: If ``True``, raise an error upon failed discovery.
Returns:
A generated mapping from pipeline names to ``Pipeline`` objects.
Raises:
ImportError: When a module does not expose a ``create_pipeline``
function, the ``create_pipeline`` function does not return a
``Pipeline`` object, or if the module import fails up front.
If ``raise_errors`` is ``False``, see Warns section instead.
Warns:
UserWarning: When a module does not expose a ``create_pipeline``
function, the ``create_pipeline`` function does not return a
``Pipeline`` object, or if the module import fails up front.
If ``raise_errors`` is ``True``, see Raises section instead.
"""
pipeline_obj = None

Expand All @@ -375,6 +385,12 @@ def find_pipelines() -> dict[str, Pipeline]: # noqa: PLR0912
pipeline_module = importlib.import_module(pipeline_module_name)
except Exception as exc:
if str(exc) != f"No module named '{pipeline_module_name}'":
if raise_errors:
raise ImportError(
f"An error occurred while importing the "
f"'{pipeline_module_name}' module."
) from exc

warnings.warn(
IMPORT_ERROR_MESSAGE.format(
module=pipeline_module_name, tb_exc=traceback.format_exc()
Expand Down Expand Up @@ -406,7 +422,13 @@ def find_pipelines() -> dict[str, Pipeline]: # noqa: PLR0912
pipeline_module_name = f"{PACKAGE_NAME}.pipelines.{pipeline_name}"
try:
pipeline_module = importlib.import_module(pipeline_module_name)
except: # noqa: E722
except Exception as exc:
if raise_errors:
raise ImportError(
f"An error occurred while importing the "
f"'{pipeline_module_name}' module."
) from exc

warnings.warn(
IMPORT_ERROR_MESSAGE.format(
module=pipeline_module_name, tb_exc=traceback.format_exc()
Expand Down
4 changes: 4 additions & 0 deletions kedro/pipeline/modular_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ def _get_dataset_names_mapping(
the same as they are named in the provided pipeline.
When dict[str, str] is provided, current names will be
mapped to new names in the resultant pipeline.
Returns:
A dictionary that maps the old dataset names to the provided ones.
Examples:
>>> _get_dataset_names_mapping("dataset_name")
{"dataset_name": "dataset_name"} # a str name will stay the same
Expand Down Expand Up @@ -138,8 +140,10 @@ def _get_param_names_mapping(
the same as they are named in the provided pipeline.
When dict[str, str] is provided, current names will be
mapped to new names in the resultant pipeline.
Returns:
A dictionary that maps the old parameter names to the provided ones.
Examples:
>>> _get_param_names_mapping("param_name")
{"params:param_name": "params:param_name"} # a str name will stay the same
Expand Down
48 changes: 30 additions & 18 deletions tests/framework/project/test_pipeline_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,16 @@ def test_find_pipelines_skips_regular_files_within_the_pipelines_folder(


@pytest.mark.parametrize(
"mock_package_name_with_pipelines,pipeline_names",
[(x, x) for x in [set(), {"my_pipeline"}]],
indirect=True,
"mock_package_name_with_pipelines,pipeline_names,raise_errors",
[
(x, x, raise_errors)
for x in [set(), {"my_pipeline"}]
for raise_errors in [True, False]
],
indirect=["mock_package_name_with_pipelines", "pipeline_names"],
)
def test_find_pipelines_skips_modules_that_cause_exceptions_upon_import(
mock_package_name_with_pipelines, pipeline_names
mock_package_name_with_pipelines, pipeline_names, raise_errors
):
# Create a module that will result in errors when we try to load it.
pipelines_dir = Path(sys.path[0]) / mock_package_name_with_pipelines / "pipelines"
Expand All @@ -191,12 +195,14 @@ def test_find_pipelines_skips_modules_that_cause_exceptions_upon_import(
(pipeline_dir / "__init__.py").write_text("I walk a lonely road...")

configure_project(mock_package_name_with_pipelines)
with pytest.warns(
UserWarning, match=r"An error occurred while importing the '\S+' module."
with getattr(pytest, "raises" if raise_errors else "warns")(
ImportError if raise_errors else UserWarning,
match=r"An error occurred while importing the '\S+' module.",
):
pipelines = find_pipelines()
assert set(pipelines) == pipeline_names | {"__default__"}
assert sum(pipelines.values()).outputs() == pipeline_names
pipelines = find_pipelines(raise_errors=raise_errors)
if not raise_errors:
assert set(pipelines) == pipeline_names | {"__default__"}
assert sum(pipelines.values()).outputs() == pipeline_names


@pytest.mark.parametrize(
Expand Down Expand Up @@ -226,12 +232,16 @@ def create_pipeline(**kwargs) -> Pipeline:


@pytest.mark.parametrize(
"mock_package_name_with_pipelines,pipeline_names",
[(x, x) for x in [set(), {"my_pipeline"}]],
indirect=True,
"mock_package_name_with_pipelines,pipeline_names,raise_errors",
[
(x, x, raise_errors)
for x in [set(), {"my_pipeline"}]
for raise_errors in [True, False]
],
indirect=["mock_package_name_with_pipelines", "pipeline_names"],
)
def test_find_pipelines_skips_unimportable_pipeline_module(
mock_package_name_with_pipelines, pipeline_names
mock_package_name_with_pipelines, pipeline_names, raise_errors
):
(Path(sys.path[0]) / mock_package_name_with_pipelines / "pipeline.py").write_text(
textwrap.dedent(
Expand All @@ -248,12 +258,14 @@ def create_pipeline(**kwargs) -> Pipeline:
)

configure_project(mock_package_name_with_pipelines)
with pytest.warns(
UserWarning, match=r"An error occurred while importing the '\S+' module."
with getattr(pytest, "raises" if raise_errors else "warns")(
ImportError if raise_errors else UserWarning,
match=r"An error occurred while importing the '\S+' module.",
):
pipelines = find_pipelines()
assert set(pipelines) == pipeline_names | {"__default__"}
assert sum(pipelines.values()).outputs() == pipeline_names
pipelines = find_pipelines(raise_errors=raise_errors)
if not raise_errors:
assert set(pipelines) == pipeline_names | {"__default__"}
assert sum(pipelines.values()).outputs() == pipeline_names


@pytest.mark.parametrize(
Expand Down

0 comments on commit 56f2095

Please sign in to comment.