Skip to content

Commit

Permalink
Implement autodiscovery of project pipelines 🔍
Browse files Browse the repository at this point in the history
  • Loading branch information
deepyaman committed Jul 13, 2022
1 parent db27a0a commit 2197b46
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 5 deletions.
26 changes: 21 additions & 5 deletions kedro/framework/project/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from dynaconf import LazySettings
from dynaconf.validator import ValidationError, Validator

from kedro.pipeline import Pipeline
from kedro.pipeline import Pipeline, pipeline


def _get_default_class(class_import_path):
Expand Down Expand Up @@ -150,7 +150,7 @@ def _get_pipelines_registry_callable(pipelines_module: str):
return register_pipelines

def _load_data(self):
"""Lazily read pipelines defined in the pipelines registry module"""
"""Lazily read pipelines defined in the pipelines registry module."""

# If the pipelines dictionary has not been configured with a pipelines module
# or if data has been loaded
Expand All @@ -167,7 +167,7 @@ def _load_data(self):

def configure(self, pipelines_module: Optional[str] = None) -> None:
"""Configure the pipelines_module to load the pipelines dictionary.
Reset the data loading state so that after every `configure` call,
Reset the data loading state so that after every ``configure`` call,
data are reloaded.
"""
self._pipelines_module = pipelines_module
Expand Down Expand Up @@ -206,7 +206,7 @@ def __init__(self):
rich.pretty.install()

def configure(self, logging_config: Dict[str, Any]) -> None:
"""Configure project logging using `logging_config` (e.g. from project
"""Configure project logging using ``logging_config`` (e.g. from project
logging.yml). We store this in the UserDict data so that it can be reconfigured
in _bootstrap_subprocess.
"""
Expand Down Expand Up @@ -241,7 +241,7 @@ def configure_project(package_name: str):


def configure_logging(logging_config: Dict[str, Any]) -> None:
"""Configure logging according to `logging_config` dictionary."""
"""Configure logging according to ``logging_config`` dictionary."""
LOGGING.configure(logging_config)


Expand All @@ -254,3 +254,19 @@ def validate_settings():
More info on the dynaconf issue: https://github.com/rochacbruno/dynaconf/issues/460
"""
importlib.import_module(f"{PACKAGE_NAME}.settings")


def find_pipelines() -> Dict[str, Pipeline]:
"""Automatically find modular pipelines having a ``create_pipeline``
function. By default, projects created using Kedro 0.18.3 and higher
call this function to autoregister pipelines upon creation/addition.
Returns:
A generated mapping from pipeline names to ``Pipeline`` objects.
Warns:
UserWarning: When a module does not expose a ``create_pipeline``
function, the ``create_pipeline`` function does not return a
``Pipeline`` object, or if the module import fails up front.
"""
return {"__default__": pipeline([])}
48 changes: 48 additions & 0 deletions tests/framework/project/test_pipeline_discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import sys
import textwrap

import pytest

from kedro.framework.project import find_pipelines


@pytest.fixture
def mock_package_with_pipelines(tmp_path, request):
pipelines_dir = tmp_path / "test_package" / "pipelines"
pipelines_dir.mkdir(parents=True)
for pipeline_name in request.param:
pipeline_dir = pipelines_dir / pipeline_name
pipeline_dir.mkdir()
(pipeline_dir / "__init__.py").write_text(
textwrap.dedent(
f"""
from kedro.pipeline import Pipeline, node, pipeline
def create_pipeline(**kwargs) -> Pipeline:
return pipeline([node(lambda: 1, None, "{pipeline_name}")])
"""
)
)
sys.path.insert(0, str(tmp_path))
yield
sys.path.pop(0)


@pytest.fixture
def pipeline_names(request):
return request.param


@pytest.mark.parametrize(
"mock_package_with_pipelines,pipeline_names",
[(x, x) for x in [set()]],
indirect=True,
)
def test_pipelines_without_configure_project_is_empty(
mock_package_with_pipelines, # pylint: disable=unused-argument
pipeline_names,
):
pipelines = find_pipelines()
assert set(pipelines) == pipeline_names | {"__default__"}
assert sum(pipelines.values()).outputs() == pipeline_names

0 comments on commit 2197b46

Please sign in to comment.