From 15ceb2f2de1db9d6d2a9ca87ac476fae110c9ed4 Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Thu, 18 Apr 2024 07:30:29 -0600 Subject: [PATCH 1/6] Add option to raise errors during `find_pipelines` Signed-off-by: Deepyaman Datta --- kedro/framework/project/__init__.py | 16 +++++++++++-- .../project/test_pipeline_discovery.py | 24 ++++++++++++------- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/kedro/framework/project/__init__.py b/kedro/framework/project/__init__.py index ea56f5d668..8d03136591 100644 --- a/kedro/framework/project/__init__.py +++ b/kedro/framework/project/__init__.py @@ -319,7 +319,7 @@ def _create_pipeline(pipeline_module: types.ModuleType) -> Pipeline | None: return obj -def find_pipelines() -> dict[str, Pipeline]: # noqa: PLR0912 +def find_pipelines(raise_errors=False) -> dict[str, Pipeline]: # noqa: PLR0912 """Automatically find modular pipelines having a ``create_pipeline`` function. By default, projects created using Kedro 0.18.3 and higher call this function to autoregister pipelines upon creation/addition. @@ -346,6 +346,12 @@ def find_pipelines() -> dict[str, Pipeline]: # noqa: PLR0912 try: pipeline_module = importlib.import_module(pipeline_module_name) except Exception as exc: + if raise_errors: + raise ImportError( + f"An error occurred while importing the " + f"'{pipeline_module_name}' module." + ) from exc + if str(exc) != f"No module named '{pipeline_module_name}'": warnings.warn( IMPORT_ERROR_MESSAGE.format( @@ -378,7 +384,13 @@ def find_pipelines() -> dict[str, Pipeline]: # noqa: PLR0912 pipeline_module_name = f"{PACKAGE_NAME}.pipelines.{pipeline_name}" try: pipeline_module = importlib.import_module(pipeline_module_name) - except: # noqa: E722 + except Exception as exc: + if raise_errors: + raise ImportError( + f"An error occurred while importing the " + f"'{pipeline_module_name}' module." + ) from exc + warnings.warn( IMPORT_ERROR_MESSAGE.format( module=pipeline_module_name, tb_exc=traceback.format_exc() diff --git a/tests/framework/project/test_pipeline_discovery.py b/tests/framework/project/test_pipeline_discovery.py index 4525144fa5..bcb4320c52 100644 --- a/tests/framework/project/test_pipeline_discovery.py +++ b/tests/framework/project/test_pipeline_discovery.py @@ -226,12 +226,16 @@ def create_pipeline(**kwargs) -> Pipeline: @pytest.mark.parametrize( - "mock_package_name_with_pipelines,pipeline_names", - [(x, x) for x in [set(), {"my_pipeline"}]], - indirect=True, + "mock_package_name_with_pipelines,pipeline_names,raise_errors", + [ + (x, x, raise_errors) + for x in [set(), {"my_pipeline"}] + for raise_errors in [True, False] + ], + indirect=["mock_package_name_with_pipelines", "pipeline_names"], ) def test_find_pipelines_skips_unimportable_pipeline_module( - mock_package_name_with_pipelines, pipeline_names + mock_package_name_with_pipelines, pipeline_names, raise_errors ): (Path(sys.path[0]) / mock_package_name_with_pipelines / "pipeline.py").write_text( textwrap.dedent( @@ -248,12 +252,14 @@ def create_pipeline(**kwargs) -> Pipeline: ) configure_project(mock_package_name_with_pipelines) - with pytest.warns( - UserWarning, match=r"An error occurred while importing the '\S+' module." + with getattr(pytest, "raises" if raise_errors else "warns")( + ImportError if raise_errors else UserWarning, + match=r"An error occurred while importing the '\S+' module.", ): - pipelines = find_pipelines() - assert set(pipelines) == pipeline_names | {"__default__"} - assert sum(pipelines.values()).outputs() == pipeline_names + pipelines = find_pipelines(raise_errors=raise_errors) + if not raise_errors: + assert set(pipelines) == pipeline_names | {"__default__"} + assert sum(pipelines.values()).outputs() == pipeline_names @pytest.mark.parametrize( From 55efdb9712bb335b5847ac784e23f7a9a079449b Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Thu, 18 Apr 2024 09:10:29 -0600 Subject: [PATCH 2/6] Add coverage for non-simplified project structures Signed-off-by: Deepyaman Datta --- .../project/test_pipeline_discovery.py | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/tests/framework/project/test_pipeline_discovery.py b/tests/framework/project/test_pipeline_discovery.py index bcb4320c52..afffbcfdeb 100644 --- a/tests/framework/project/test_pipeline_discovery.py +++ b/tests/framework/project/test_pipeline_discovery.py @@ -177,12 +177,16 @@ def test_find_pipelines_skips_regular_files_within_the_pipelines_folder( @pytest.mark.parametrize( - "mock_package_name_with_pipelines,pipeline_names", - [(x, x) for x in [set(), {"my_pipeline"}]], - indirect=True, + "mock_package_name_with_pipelines,pipeline_names,raise_errors", + [ + (x, x, raise_errors) + for x in [set(), {"my_pipeline"}] + for raise_errors in [True, False] + ], + indirect=["mock_package_name_with_pipelines", "pipeline_names"], ) def test_find_pipelines_skips_modules_that_cause_exceptions_upon_import( - mock_package_name_with_pipelines, pipeline_names + mock_package_name_with_pipelines, pipeline_names, raise_errors ): # Create a module that will result in errors when we try to load it. pipelines_dir = Path(sys.path[0]) / mock_package_name_with_pipelines / "pipelines" @@ -191,12 +195,14 @@ def test_find_pipelines_skips_modules_that_cause_exceptions_upon_import( (pipeline_dir / "__init__.py").write_text("I walk a lonely road...") configure_project(mock_package_name_with_pipelines) - with pytest.warns( - UserWarning, match=r"An error occurred while importing the '\S+' module." + with getattr(pytest, "raises" if raise_errors else "warns")( + ImportError if raise_errors else UserWarning, + match=r"An error occurred while importing the '\S+' module.", ): - pipelines = find_pipelines() - assert set(pipelines) == pipeline_names | {"__default__"} - assert sum(pipelines.values()).outputs() == pipeline_names + pipelines = find_pipelines(raise_errors=raise_errors) + if not raise_errors: + assert set(pipelines) == pipeline_names | {"__default__"} + assert sum(pipelines.values()).outputs() == pipeline_names @pytest.mark.parametrize( From e087efa7dea22d33c659b78330eaa9d25f99d10f Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Thu, 18 Apr 2024 09:40:29 -0600 Subject: [PATCH 3/6] Fix skipping for non-simplified project structures Signed-off-by: Deepyaman Datta --- kedro/framework/project/__init__.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/kedro/framework/project/__init__.py b/kedro/framework/project/__init__.py index 8d03136591..9858b6d149 100644 --- a/kedro/framework/project/__init__.py +++ b/kedro/framework/project/__init__.py @@ -346,13 +346,13 @@ def find_pipelines(raise_errors=False) -> dict[str, Pipeline]: # noqa: PLR0912 try: pipeline_module = importlib.import_module(pipeline_module_name) except Exception as exc: - if raise_errors: - raise ImportError( - f"An error occurred while importing the " - f"'{pipeline_module_name}' module." - ) from exc - if str(exc) != f"No module named '{pipeline_module_name}'": + if raise_errors: + raise ImportError( + f"An error occurred while importing the " + f"'{pipeline_module_name}' module." + ) from exc + warnings.warn( IMPORT_ERROR_MESSAGE.format( module=pipeline_module_name, tb_exc=traceback.format_exc() From 6da44f59d245167f5e25fc398e43acae58db057b Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Thu, 18 Apr 2024 13:31:35 -0600 Subject: [PATCH 4/6] Add required type hint for `raise_errors` argument Signed-off-by: Deepyaman Datta --- kedro/framework/project/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro/framework/project/__init__.py b/kedro/framework/project/__init__.py index 9858b6d149..6130b2c3f1 100644 --- a/kedro/framework/project/__init__.py +++ b/kedro/framework/project/__init__.py @@ -319,7 +319,7 @@ def _create_pipeline(pipeline_module: types.ModuleType) -> Pipeline | None: return obj -def find_pipelines(raise_errors=False) -> dict[str, Pipeline]: # noqa: PLR0912 +def find_pipelines(raise_errors: bool = False) -> dict[str, Pipeline]: # noqa: PLR0912 """Automatically find modular pipelines having a ``create_pipeline`` function. By default, projects created using Kedro 0.18.3 and higher call this function to autoregister pipelines upon creation/addition. From 47b90175397fe303c3217b88f99b9baaa2510b47 Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Thu, 18 Apr 2024 14:11:01 -0600 Subject: [PATCH 5/6] Add docstring and release notes for `raise_errors` Signed-off-by: Deepyaman Datta --- RELEASE.md | 3 ++- kedro/framework/project/__init__.py | 10 ++++++++++ kedro/pipeline/modular_pipeline.py | 4 ++++ 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/RELEASE.md b/RELEASE.md index 5049f8d920..9133e2f975 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,6 +1,7 @@ # Upcoming Release 0.19.5 ## Major features and improvements +* Added `raise_errors` argument to `find_pipelines`. If `True`, the first pipeline for which autodiscovery fails will cause an error to be raised. The default behavior is still to raise a warning for each failing pipeline. ## Bug fixes and other changes @@ -25,7 +26,7 @@ * Updated `kedro pipeline create` and `kedro pipeline delete` to read the base environment from the project settings. * Updated CLI command `kedro catalog resolve` to read credentials properly. * Changed the path of where pipeline tests generated with `kedro pipeline create` from `/src/tests/pipelines/` to `/tests/pipelines/`. -* Updated ``.gitignore`` to prevent pushing Mlflow local runs folder to a remote forge when using mlflow and git. +* Updated ``.gitignore`` to prevent pushing MLflow local runs folder to a remote forge when using MLflow and Git. * Fixed error handling message for malformed yaml/json files in OmegaConfigLoader. * Fixed a bug in `node`-creation allowing self-dependencies when using transcoding, that is datasets named like `name@format`. * Improved error message when passing wrong value to node. diff --git a/kedro/framework/project/__init__.py b/kedro/framework/project/__init__.py index 6130b2c3f1..821b78e253 100644 --- a/kedro/framework/project/__init__.py +++ b/kedro/framework/project/__init__.py @@ -331,13 +331,23 @@ def find_pipelines(raise_errors: bool = False) -> dict[str, Pipeline]: # noqa: For more information on the pipeline registry and autodiscovery, see https://kedro.readthedocs.io/en/stable/nodes_and_pipelines/pipeline_registry.html + Args: + raise_errors: If ``True``, raise an error upon failed discovery. + Returns: A generated mapping from pipeline names to ``Pipeline`` objects. + Raises: + ImportError: When a module does not expose a ``create_pipeline`` + function, the ``create_pipeline`` function does not return a + ``Pipeline`` object, or if the module import fails up front. + If ``raise_errors`` is ``False``, see Warns section instead. + Warns: UserWarning: When a module does not expose a ``create_pipeline`` function, the ``create_pipeline`` function does not return a ``Pipeline`` object, or if the module import fails up front. + If ``raise_errors`` is ``True``, see Raises section instead. """ pipeline_obj = None diff --git a/kedro/pipeline/modular_pipeline.py b/kedro/pipeline/modular_pipeline.py index c600ca877c..f2d9376fac 100644 --- a/kedro/pipeline/modular_pipeline.py +++ b/kedro/pipeline/modular_pipeline.py @@ -99,8 +99,10 @@ def _get_dataset_names_mapping( the same as they are named in the provided pipeline. When dict[str, str] is provided, current names will be mapped to new names in the resultant pipeline. + Returns: A dictionary that maps the old dataset names to the provided ones. + Examples: >>> _get_dataset_names_mapping("dataset_name") {"dataset_name": "dataset_name"} # a str name will stay the same @@ -138,8 +140,10 @@ def _get_param_names_mapping( the same as they are named in the provided pipeline. When dict[str, str] is provided, current names will be mapped to new names in the resultant pipeline. + Returns: A dictionary that maps the old parameter names to the provided ones. + Examples: >>> _get_param_names_mapping("param_name") {"params:param_name": "params:param_name"} # a str name will stay the same From 71e0f64f038929f4d127cbc9f092217894187592 Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Fri, 19 Apr 2024 10:46:48 -0600 Subject: [PATCH 6/6] Add documentation on the `raise_error` flag to web Signed-off-by: Deepyaman Datta --- docs/source/nodes_and_pipelines/pipeline_registry.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/source/nodes_and_pipelines/pipeline_registry.md b/docs/source/nodes_and_pipelines/pipeline_registry.md index 7686c38eb1..030e8f9b7e 100644 --- a/docs/source/nodes_and_pipelines/pipeline_registry.md +++ b/docs/source/nodes_and_pipelines/pipeline_registry.md @@ -51,7 +51,9 @@ Under the hood, the `find_pipelines()` function traverses the `src/.pipelines.` module 3. Validating that the constructed object is a [`Pipeline`](/api/kedro.pipeline.Pipeline) -If any of these steps fail, `find_pipelines()` raises an appropriate warning and skips the current pipeline but continues traversal. +By default, if any of these steps fail, `find_pipelines()` (or `find_pipelines(raise_errors=False)`) raises an appropriate warning and skips the current pipeline but continues traversal. During development, this enables you to run your project with some pipelines, even if other pipelines are broken or works in progress. + +If you specify `find_pipelines(raise_errors=True)`, the autodiscovery process will fail upon the first error. In production, this ensures errors are caught up front, and pipelines do not get excluded accidentally. The mapping returned by `find_pipelines()` can be modified, meaning you are not limited to the pipelines returned by each of the `create_pipeline()` functions found above. For example, to add a data engineering pipeline that isn't part of the default pipeline, add it to the dictionary *after* constructing the default pipeline: