Skip to content

Commit

Permalink
Raise exception when more than 1 run executed within the same session
Browse files Browse the repository at this point in the history
Signed-off-by: Merel Theisen <merel.theisen@quantumblack.com>
  • Loading branch information
merelcht committed Mar 9, 2022
1 parent 1af6cce commit 55fb343
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 1 deletion.
22 changes: 21 additions & 1 deletion kedro/framework/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ def _jsonify_cli_context(ctx: click.core.Context) -> Dict[str, Any]:
}


class KedroSessionError(Exception):
"""``KedroSessionError`` raised by ``KedroSession``
in case of run failure as part of a session.
"""

pass


class KedroSession:
"""``KedroSession`` is the object that is responsible for managing the lifecycle
of a Kedro run.
Expand All @@ -94,18 +102,20 @@ class KedroSession:
>>>
"""

def __init__(
def __init__( # pylint: disable=too-many-arguments
self,
session_id: str,
package_name: str = None,
project_path: Union[Path, str] = None,
save_on_close: bool = False,
run_called: bool = False,
):
self._project_path = Path(project_path or Path.cwd()).resolve()
self.session_id = session_id
self.save_on_close = save_on_close
self._package_name = package_name
self._store = self._init_store()
self._run_called = run_called

hook_manager = _create_hook_manager()
_register_hooks(hook_manager, settings.HOOKS)
Expand Down Expand Up @@ -318,6 +328,8 @@ def run( # pylint: disable=too-many-arguments,too-many-locals
defined by `register_pipelines`.
Exception: Any uncaught exception during the run will be re-raised
after being passed to ``on_pipeline_error`` hook.
KedroSessionError: If more than one run is attempted to be executed during
a single session.
Returns:
Any node outputs that cannot be processed by the ``DataCatalog``.
These are returned in a dictionary, where the keys are defined
Expand All @@ -327,6 +339,13 @@ def run( # pylint: disable=too-many-arguments,too-many-locals
# Report project name
self._logger.info("** Kedro project %s", self._project_path.name)

if self._run_called:
raise KedroSessionError(
"A run has already been executed as part of the"
" active KedroSession. KedroSession has a 1-1 mapping with"
" runs, and thus only one run should be executed per session."
)

save_version = self.store["session_id"]
extra_params = self.store.get("extra_params") or {}
context = self.load_context()
Expand Down Expand Up @@ -380,6 +399,7 @@ def run( # pylint: disable=too-many-arguments,too-many-locals

try:
run_result = runner.run(filtered_pipeline, catalog, hook_manager)
self._run_called = True
except Exception as error:
hook_manager.hook.on_pipeline_error(
error=error,
Expand Down
67 changes: 67 additions & 0 deletions tests/framework/session/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,73 @@ def test_run(
catalog=mock_catalog,
)

@pytest.mark.usefixtures("mock_settings_context_class")
@pytest.mark.parametrize("fake_pipeline_name", [None, _FAKE_PIPELINE_NAME])
def test_run_multiple_times( # pylint: disable=too-many-locals
self,
fake_project,
fake_session_id,
fake_pipeline_name,
mock_context_class,
mock_package_name,
mocker,
):
"""Test running the project more than once via the session"""

mock_hook = mocker.patch(
"kedro.framework.session.session._create_hook_manager"
).return_value.hook
mock_pipelines = mocker.patch(
"kedro.framework.session.session.pipelines",
return_value={
_FAKE_PIPELINE_NAME: mocker.Mock(),
"__default__": mocker.Mock(),
},
)
mock_context = mock_context_class.return_value
mock_catalog = mock_context._get_catalog.return_value
mock_runner = mocker.Mock()
mock_pipeline = mock_pipelines.__getitem__.return_value.filter.return_value

message = (
"A run has already been executed as part of the active KedroSession. "
"KedroSession has a 1-1 mapping with runs, and thus only one run should be"
" executed per session."
)
with pytest.raises(Exception, match=message):
with KedroSession.create(mock_package_name, fake_project) as session:
session.run(runner=mock_runner, pipeline_name=fake_pipeline_name)
session.run(runner=mock_runner, pipeline_name=fake_pipeline_name)

record_data = {
"session_id": fake_session_id,
"project_path": fake_project.as_posix(),
"env": mock_context.env,
"kedro_version": kedro_version,
"tags": None,
"from_nodes": None,
"to_nodes": None,
"node_names": None,
"from_inputs": None,
"to_outputs": None,
"load_versions": None,
"extra_params": {},
"pipeline_name": fake_pipeline_name,
}

mock_hook.before_pipeline_run.assert_called_once_with(
run_params=record_data, pipeline=mock_pipeline, catalog=mock_catalog
)
mock_runner.run.assert_called_once_with(
mock_pipeline, mock_catalog, session._hook_manager
)
mock_hook.after_pipeline_run.assert_called_once_with(
run_params=record_data,
run_result=mock_runner.run.return_value,
pipeline=mock_pipeline,
catalog=mock_catalog,
)

@pytest.mark.usefixtures("mock_settings_context_class")
def test_run_non_existent_pipeline(self, fake_project, mock_package_name, mocker):
mock_runner = mocker.Mock()
Expand Down

0 comments on commit 55fb343

Please sign in to comment.