Skip to content

Commit

Permalink
Fix after_dataset_saved hook so that it saves multiple outputs when…
Browse files Browse the repository at this point in the history
… run in async mode (#2159)

* Add mapping between futures and dataset info to pass to after_dataset_saved

Signed-off-by: Jannic Holzer <jannic.holzer@quantumblack.com>

* Fix linting

Signed-off-by: Jannic Holzer <jannic.holzer@quantumblack.com>

* Combine future_dataset_mapping and save_futures to reduce number of local variables

Signed-off-by: Jannic Holzer <jannic.holzer@quantumblack.com>

* Add test for after_dataset_load hook in async mode with multiple outputs

Signed-off-by: Jannic Holzer <jannic.holzer@quantumblack.com>

* Remove spurious comment

Signed-off-by: Jannic Holzer <jannic.holzer@quantumblack.com>

* Add release note

Signed-off-by: Jannic Holzer <jannic.holzer@quantumblack.com>

* Remove unused argument from test_after_dataset_load_hook_async_multiple_outputs

Signed-off-by: Jannic Holzer <jannic.holzer@quantumblack.com>

* Change tense of verb under bug fixes subheading

Signed-off-by: Jannic Holzer <jannic.holzer@quantumblack.com>

* Add hook_manager fixture, remove mock_session from test_after_dataset_load_hook_async_multiple_outputs

Signed-off-by: Jannic Holzer <jannic.holzer@quantumblack.com>

Signed-off-by: Jannic Holzer <jannic.holzer@quantumblack.com>
  • Loading branch information
jmholzer committed Jan 9, 2023
1 parent d611c03 commit 8a1bd5f
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 12 deletions.
5 changes: 3 additions & 2 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
* Save node outputs after every `yield` before proceeding with next chunk.

## Bug fixes and other changes
* Fix bug where `micropkg` manifest section in `pyproject.toml` isn't recognised as allowed configuration.
* Fix bug causing `load_ipython_extension` not to register the `%reload_kedro` line magic when called in a directory that does not contain a Kedro project.
* Fixed bug where `micropkg` manifest section in `pyproject.toml` isn't recognised as allowed configuration.
* Fixed bug causing `load_ipython_extension` not to register the `%reload_kedro` line magic when called in a directory that does not contain a Kedro project.
* Added anyconfig's `ac_context` parameter to `kedro.config.commons` module functions for more flexible `ConfigLoader` customizations.
* Fixed bug causing the `after_dataset_saved` hook only to be called for one output dataset when multiple are saved in a single node and async saving is in use.

## Breaking changes to the API

Expand Down
13 changes: 6 additions & 7 deletions kedro/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,17 +469,16 @@ def _synchronous_dataset_load(dataset_name: str):
node, catalog, inputs, is_async, hook_manager, session_id=session_id
)

save_futures = set()

future_dataset_mapping = {}
for name, data in outputs.items():
hook_manager.hook.before_dataset_saved(dataset_name=name, data=data)
save_futures.add(pool.submit(catalog.save, name, data))
future = pool.submit(catalog.save, name, data)
future_dataset_mapping[future] = (name, data)

for future in as_completed(save_futures):
for future in as_completed(future_dataset_mapping):
exception = future.exception()
if exception:
raise exception
hook_manager.hook.after_dataset_saved(
dataset_name=name, data=data # pylint: disable=undefined-loop-variable
)
name, data = future_dataset_mapping[future]
hook_manager.hook.after_dataset_saved(dataset_name=name, data=data)
return node
62 changes: 59 additions & 3 deletions tests/framework/session/test_session_extension_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@
from dynaconf.validator import Validator

from kedro.framework.context.context import _convert_paths_to_absolute_posix
from kedro.framework.hooks import hook_impl
from kedro.framework.project import _ProjectPipelines, _ProjectSettings, pipelines
from kedro.framework.hooks import _create_hook_manager, hook_impl
from kedro.framework.hooks.manager import _register_hooks, _register_hooks_setuptools
from kedro.framework.project import (
_ProjectPipelines,
_ProjectSettings,
pipelines,
settings,
)
from kedro.framework.session import KedroSession
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline import node, pipeline
Expand Down Expand Up @@ -526,6 +532,20 @@ def wait_and_identity(x: Any):
return node(wait_and_identity, inputs="ds1", outputs="ds2", name="test-node")


@pytest.fixture
def sample_node_multiple_outputs():
def wait_and_identity(x: Any, y: Any):
time.sleep(0.1)
return (x, y)

return node(
wait_and_identity,
inputs=["ds1", "ds2"],
outputs=["ds3", "ds4"],
name="test-node",
)


class LogCatalog(DataCatalog):
def load(self, name: str, version: str = None) -> Any:
dataset = super().load(name=name, version=version)
Expand All @@ -537,7 +557,17 @@ def load(self, name: str, version: str = None) -> Any:
def memory_catalog():
ds1 = MemoryDataSet({"data": 42})
ds2 = MemoryDataSet({"data": 42})
return LogCatalog({"ds1": ds1, "ds2": ds2})
ds3 = MemoryDataSet({"data": 42})
ds4 = MemoryDataSet({"data": 42})
return LogCatalog({"ds1": ds1, "ds2": ds2, "ds3": ds3, "ds4": ds4})


@pytest.fixture
def hook_manager():
hook_manager = _create_hook_manager()
_register_hooks(hook_manager, settings.HOOKS)
_register_hooks_setuptools(hook_manager, settings.DISABLE_HOOKS_FOR_PLUGINS)
return hook_manager


class TestAsyncNodeDatasetHooks:
Expand All @@ -562,6 +592,32 @@ def test_after_dataset_load_hook_async(
["Before dataset loaded", "Catalog load", "After dataset loaded"]
).strip("[]") in str(hooks_log_messages).strip("[]")

def test_after_dataset_load_hook_async_multiple_outputs(
self,
mocker,
memory_catalog,
hook_manager,
sample_node_multiple_outputs,
):
after_dataset_saved_mock = mocker.patch.object(
hook_manager.hook, "after_dataset_saved"
)

_run_node_async(
node=sample_node_multiple_outputs,
catalog=memory_catalog,
hook_manager=hook_manager,
)

after_dataset_saved_mock.assert_has_calls(
[
mocker.call(dataset_name="ds3", data={"data": 42}),
mocker.call(dataset_name="ds4", data={"data": 42}),
],
any_order=True,
)
assert after_dataset_saved_mock.call_count == 2


class TestKedroContextSpecsHook:
"""Test the behavior of `after_context_created` when updating node inputs."""
Expand Down

0 comments on commit 8a1bd5f

Please sign in to comment.