Skip to content

Commit

Permalink
[KED-2026] Use Session to run the pipeline (#836)
Browse files Browse the repository at this point in the history
  • Loading branch information
921kiyo authored Oct 28, 2020
1 parent 60539ad commit b42845e
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 73 deletions.
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Upcoming 0.17.0 release

## Major features and improvements
* Introduced `KedroSession` which is responsible for managing the lifecycle of a Kedro run.
* Added `kedro catalog create` command. It creates `<conf_root>/<env>/catalog/<pipeline_name>.yml` configuration file with `MemoryDataSet` datasets for each dataset in a registered pipeline if it is missing from Data Catalog.

## Bug fixes and other changes
Expand Down
13 changes: 9 additions & 4 deletions docs/source/11_tools_integration/02_ipython.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,12 @@ def reload_kedro(project_path, line=None):
global parameters
try:
# ...
context = load_context(path)
session = KedroSession.create(path)
_push_session(session)
context = session.context
parameters = context.params
# ...
logging.info("Defined global variable `context`, `catalog` and `parameters`")
logging.info("Defined global variable `context`, `session`, `catalog` and `parameters`")
except:
pass
```
Expand Down Expand Up @@ -306,11 +308,14 @@ In certain cases, you may not be able to run `kedro jupyter notebook`, which mea

```python
from pathlib import Path
from kedro.framework.context import load_context
from kedro.framework.session import KedroSession
from kedro.framework.session.session import _push_session

current_dir = Path.cwd() # this points to 'notebooks/' folder
proj_path = current_dir.parent # point back to the root of the project
context = load_context(proj_path)
session = KedroSession.create(proj_path)
_push_session(session)
context = session.context
```

#### How can I reload the `context`, `catalog` and `startup_error` variables?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ def reload_kedro(path, line=None):

try:
import kedro.config.default_logger
from kedro.framework.context import load_context
from kedro.framework.session import KedroSession
from kedro.framework.session.session import _push_session
from kedro.framework.cli.jupyter import collect_line_magic
except ImportError:
logging.error(
Expand All @@ -31,7 +32,9 @@ def reload_kedro(path, line=None):
path = path or project_path

# remove cached user modules
context = load_context(path)
session = KedroSession.create(path)
_push_session(session)
context = session.context
to_remove = [mod for mod in sys.modules if mod.startswith(context.package_name)]
# `del` is used instead of `reload()` because: If the new version of a module does not
# define a name that was defined by the old version, the old definition remains.
Expand All @@ -42,7 +45,7 @@ def reload_kedro(path, line=None):
# Reload context to fix `pickle` related error (it is unable to serialize reloaded objects)
# Some details can be found here:
# https://modwsgi.readthedocs.io/en/develop/user-guides/issues-with-pickle-module.html#packing-and-script-reloading
context = load_context(path)
context = session.context
catalog = context.catalog

logging.info("** Kedro project %s", str(context.project_name))
Expand Down
43 changes: 30 additions & 13 deletions kedro/framework/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def get_current_session(silent: bool = False) -> Optional["KedroSession"]:
Returns:
KedroSession instance.
"""
session = None

Expand Down Expand Up @@ -107,11 +108,20 @@ def _jsonify_cli_context(ctx: click.core.Context):

class KedroSession:
"""``KedroSession`` is the object that is responsible for managing the lifecycle
of a Kedro project.
IMPORTANT: ``KedroSession`` is currently under development and is not
integrated into existing Kedro workflow. Its public (and private) interface
may change between minor releases without notice.
of a Kedro run.
- Use `KedroSession.create()` as a context manager to construct a new KedroSession with
session data provided (see the example below).
- Use `KedroSession(session_id=<id>)` to instantiate an existing session with a given
ID.
Example:
::
>>> from kedro.framework.session import KedroSession
>>>
>>> with KedroSession.create() as session:
>>> session.run()
>>>
"""

def __init__(
Expand All @@ -131,13 +141,17 @@ def create(
project_path: Union[Path, str] = None,
save_on_close: bool = True,
env: str = None,
extra_params: Dict[str, Any] = None,
) -> "KedroSession":
"""Create a new instance of ``KedroSession``.
"""Create a new instance of ``KedroSession`` with the session data.
Args:
project_path: Path to the project root directory.
save_on_close: Whether or not to save the session when it's closed.
env: Environment for the KedroContext.
extra_params: Optional dictionary containing extra project parameters
for underlying KedroContext. If specified, will update (and therefore take
precedence over) the parameters retrieved from the project configuration.
Returns:
A new ``KedroSession`` instance.
Expand All @@ -161,12 +175,14 @@ def create(
if env:
session_data["env"] = env

if extra_params:
session_data["extra_params"] = extra_params

session._store.update(session_data)
return session

def _init_store(self) -> BaseSessionStore:
static_data = get_static_project_data(self._project_path)

config = deepcopy(static_data.get("session_store", {}))
config.setdefault("path", (self._project_path / "sessions").as_posix())
config["session_id"] = self.session_id
Expand All @@ -193,7 +209,8 @@ def store(self) -> Dict[str, Any]:
def context(self) -> KedroContext:
"""An instance of the project context."""
env = self.store.get("env")
context = load_context(project_path=self._project_path, env=env)
extra_params = self.store.get("extra_params") or {}
context = load_context(project_path=self._project_path, env=env, **extra_params)
return context

def close(self):
Expand Down Expand Up @@ -278,7 +295,7 @@ def run( # pylint: disable=too-many-arguments,too-many-locals
"run_id": run_id,
"project_path": self._project_path.as_posix(),
"env": context.env,
"kedro_version": self.store["kedro_version"],
"kedro_version": self.store["project_version"],
"tags": tags,
"from_nodes": from_nodes,
"to_nodes": to_nodes,
Expand All @@ -295,23 +312,23 @@ def run( # pylint: disable=too-many-arguments,too-many-locals

# Run the runner
runner = runner or SequentialRunner()
hook = get_hook_manager().hook
hook.before_pipeline_run(
hook_manager = get_hook_manager()
hook_manager.hook.before_pipeline_run( # pylint: disable=no-member
run_params=record_data, pipeline=filtered_pipeline, catalog=catalog
)

try:
run_result = runner.run(filtered_pipeline, catalog, run_id)
except Exception as error:
hook.on_pipeline_error(
hook_manager.hook.on_pipeline_error(
error=error,
run_params=record_data,
pipeline=filtered_pipeline,
catalog=catalog,
)
raise

hook.after_pipeline_run(
hook_manager.hook.after_pipeline_run(
run_params=record_data,
run_result=run_result,
pipeline=filtered_pipeline,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def reload_kedro(path, line=None):

try:
import kedro.config.default_logger
from kedro.framework.context import load_context
from kedro.framework.session import KedroSession
from kedro.framework.session.session import _push_session
from kedro.framework.cli.jupyter import collect_line_magic
except ImportError:
logging.error(
Expand All @@ -32,15 +33,17 @@ def reload_kedro(path, line=None):
path = path or project_path

# remove cached user modules
context = load_context(path)
session = KedroSession.create(path)
_push_session(session)
context = session.context
to_remove = [mod for mod in sys.modules if mod.startswith(context.package_name)]
# `del` is used instead of `reload()` because: If the new version of a module does not
# define a name that was defined by the old version, the old definition remains.
for module in to_remove:
del sys.modules[module]

# clear hook manager; hook implementations will be re-registered when the
# context is instantiated again in `load_context()` below
# context is instantiated again in `session.context` below
hook_manager = get_hook_manager()
name_plugin_pairs = hook_manager.list_name_plugin()
for name, plugin in name_plugin_pairs:
Expand All @@ -50,11 +53,11 @@ def reload_kedro(path, line=None):
# Reload context to fix `pickle` related error (it is unable to serialize reloaded objects)
# Some details can be found here:
# https://modwsgi.readthedocs.io/en/develop/user-guides/issues-with-pickle-module.html#packing-and-script-reloading
context = load_context(path)
context = session.context
catalog = context.catalog

logging.info("** Kedro project %s", str(context.project_name))
logging.info("Defined global variable `context` and `catalog`")
logging.info("Defined global variable `context`, `session` and `catalog`")

for line_magic in collect_line_magic():
register_line_magic(needs_local_scope(line_magic))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from kedro.framework.cli.pipeline import pipeline as pipeline_group
from kedro.framework.cli.project import project_group
from kedro.framework.cli.utils import KedroCliError, env_option, split_string
from kedro.framework.context import load_context
from kedro.framework.session import KedroSession
from kedro.utils import load_obj

CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
Expand Down Expand Up @@ -223,17 +223,19 @@ def run(
tag = _get_values_as_tuple(tag) if tag else tag
node_names = _get_values_as_tuple(node_names) if node_names else node_names

context = load_context(Path.cwd(), env=env, extra_params=params)
context.run(
tags=tag,
runner=runner_class(is_async=is_async),
node_names=node_names,
from_nodes=from_nodes,
to_nodes=to_nodes,
from_inputs=from_inputs,
load_versions=load_version,
pipeline_name=pipeline,
)
with KedroSession.create(
project_path=Path.cwd(), env=env, extra_params=params
) as session:
session.run(
tags=tag,
runner=runner_class(is_async=is_async),
node_names=node_names,
from_nodes=from_nodes,
to_nodes=to_nodes,
from_inputs=from_inputs,
load_versions=load_version,
pipeline_name=pipeline,
)


cli.add_command(pipeline_group)
Expand Down
53 changes: 48 additions & 5 deletions tests/framework/session/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def fake_project(tmp_path, mock_load_context): # pylint: disable=unused-argumen
(fake_project_dir / "src").mkdir(parents=True)

kedro_yml_path = fake_project_dir / ".kedro.yml"
payload = {"kedro_version": _FAKE_KEDRO_VERSION}
payload = {"project_version": _FAKE_KEDRO_VERSION}
with kedro_yml_path.open("w") as _f:
yaml.safe_dump(payload, _f)

Expand All @@ -79,11 +79,18 @@ class FakeException(Exception):
@pytest.mark.usefixtures("mock_load_context")
class TestKedroSession:
@pytest.mark.parametrize("env", [None, "env1"])
@pytest.mark.parametrize("extra_params", [None, {"key": "val"}])
def test_create(
self, fake_project, mock_load_context, fake_session_id, mocker, env
self,
fake_project,
mock_load_context,
fake_session_id,
mocker,
env,
extra_params,
):
mock_click_ctx = mocker.patch("click.get_current_context").return_value
session = KedroSession.create(fake_project, env=env)
session = KedroSession.create(fake_project, env=env, extra_params=extra_params)

expected_cli_data = {
"args": mock_click_ctx.args,
Expand All @@ -96,17 +103,53 @@ def test_create(
"project_path": fake_project,
"source_dir": fake_project / "src",
"session_id": fake_session_id,
"kedro_version": _FAKE_KEDRO_VERSION,
"project_version": _FAKE_KEDRO_VERSION,
"cli": expected_cli_data,
}
if env:
expected_store["env"] = env
if extra_params:
expected_store["extra_params"] = extra_params

assert session.store == expected_store

mock_load_context.assert_not_called()
assert session.context is mock_load_context.return_value
if extra_params:
mock_load_context.assert_called_once_with(
project_path=fake_project, env=env, **extra_params
)
else:
mock_load_context.assert_called_once_with(
project_path=fake_project, env=env
)

def test_create_no_env_extra_params(
self, fake_project, mock_load_context, fake_session_id, mocker
):
mock_click_ctx = mocker.patch("click.get_current_context").return_value
session = KedroSession.create(fake_project)

expected_cli_data = {
"args": mock_click_ctx.args,
"params": mock_click_ctx.params,
"command_name": mock_click_ctx.command.name,
"command_path": mock_click_ctx.command_path,
}
expected_store = {
"config_file": fake_project / ".kedro.yml",
"project_path": fake_project,
"source_dir": fake_project / "src",
"session_id": fake_session_id,
"project_version": _FAKE_KEDRO_VERSION,
"cli": expected_cli_data,
}

assert session.store == expected_store

mock_load_context.assert_not_called()
assert session.context is mock_load_context.return_value
mock_load_context.assert_called_once_with(project_path=fake_project, env=env)
mock_load_context.assert_called_once_with(project_path=fake_project, env=None)

def test_default_store(self, fake_project, fake_session_id, caplog):
caplog.set_level(logging.WARN, logger="kedro.framework.session.store")
Expand Down
Loading

0 comments on commit b42845e

Please sign in to comment.