Skip to content

Commit

Permalink
[DataCatalog2.0]: Move pattern resolution logic to the separate compo…
Browse files Browse the repository at this point in the history
…nent (#4123)

* Added a skeleton for AbstractDataCatalog and KedroDataCatalog

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Removed from_config method

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Implemented _init_datasets method

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Implemented get dataset

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Started resolve_patterns implementation

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Implemented resolve_patterns

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Fixed credentials resolving

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Updated match pattern

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Implemented add from dict method

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Updated io __init__

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Added list method

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Implemented _validate_missing_keys

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Added datasets access logic

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Added __contains__ and comments on lazy loading

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Renamed dataset_name to ds_name

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Updated some docstrings

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Fixed _update_ds_configs

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Fixed _init_datasets

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Implemented add_runtime_patterns

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Fixed runtime patterns usage

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Moved pattern logic out of data catalog, implemented KedroDataCatalog

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* KedroDataCatalog updates

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Added property to return config

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Added list patterns method

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Renamed and moved ConfigResolver

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Renamed ConfigResolver

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Cleaned KedroDataCatalog

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Cleaned up DataCatalogConfigResolver

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Docs build fix attempt

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Removed KedroDataCatalog

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Updated from_config method

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Updated constructor and add methods

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Updated _get_dataset method

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Updated __contains__

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Updated __eq__ and shallow_copy

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Added __iter__ and __getitem__

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Removed unused imports

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Added TODO

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Updated runner.run()

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Updated session

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Added confil_resolver property

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Updated catalog list command

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Updated catalog create command

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Updated catalog rank command

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Updated catalog resolve command

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Remove some methods

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Removed ds configs from catalog

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Fixed lint

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Fixed typo

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Added module docstring

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Removed None from Pattern type

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Fixed docs failing to find class reference

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Fixed docs failing to find class reference

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Updated Patterns type

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Fix tests (#4149)

* Fix most tests

Signed-off-by: Ankita Katiyar <ankitakatiyar2401@gmail.com>

* Fix most tests

Signed-off-by: Ankita Katiyar <ankitakatiyar2401@gmail.com>

---------

Signed-off-by: Ankita Katiyar <ankitakatiyar2401@gmail.com>

* Returned constants to avoid breaking changes

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Minor fix

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Updated test_sorting_order_with_other_dataset_through_extra_pattern

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Removed odd properties

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Updated tests

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Removed None from _fetch_credentials input

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Renamed DataCatalogConfigResolver to CatalogConfigResolver

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Renamed _init_configs to _resolve_config_credentials

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Moved functions to the class

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Refactored resolve_dataset_pattern

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Fixed refactored part

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Changed the order of arguments for DataCatalog constructor

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Replaced __getitem__ with .get()

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Updated catalog commands

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Moved warm up block outside of the try block

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Fixed linter

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

* Removed odd copying

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>

---------

Signed-off-by: Elena Khaustova <ymax70rus@gmail.com>
Signed-off-by: Ankita Katiyar <ankitakatiyar2401@gmail.com>
Co-authored-by: Ankita Katiyar <110245118+ankatiyar@users.noreply.github.com>
  • Loading branch information
ElenaKhaustova and ankatiyar authored Sep 12, 2024
1 parent 66e5e07 commit 7e02653
Show file tree
Hide file tree
Showing 11 changed files with 406 additions and 326 deletions.
2 changes: 2 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Upcoming Release

## Major features and improvements
* Refactored `kedro run` and `kedro catalog` commands.
* Moved pattern resolution logic from `DataCatalog` to a separate component - `CatalogConfigResolver`. Updated `DataCatalog` to use `CatalogConfigResolver` internally.
* Made packaged Kedro projects return `session.run()` output to be used when running it in the interactive environment.
* Enhanced `OmegaConfigLoader` configuration validation to detect duplicate keys at all parameter levels, ensuring comprehensive nested key checking.
## Bug fixes and other changes
Expand Down
2 changes: 2 additions & 0 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
"typing.Type",
"typing.Set",
"kedro.config.config.ConfigLoader",
"kedro.io.catalog_config_resolver.CatalogConfigResolver",
"kedro.io.core.AbstractDataset",
"kedro.io.core.AbstractVersionedDataset",
"kedro.io.core.DatasetError",
Expand Down Expand Up @@ -168,6 +169,7 @@
"D[k] if k in D, else d. d defaults to None.",
"None. Update D from mapping/iterable E and F.",
"Patterns",
"CatalogConfigResolver",
),
"py:data": (
"typing.Any",
Expand Down
83 changes: 29 additions & 54 deletions kedro/framework/cli/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

from __future__ import annotations

import copy
from collections import defaultdict
from itertools import chain
from itertools import chain, filterfalse
from typing import TYPE_CHECKING, Any

import click
Expand All @@ -28,6 +27,11 @@ def _create_session(package_name: str, **kwargs: Any) -> KedroSession:
return KedroSession.create(**kwargs)


def is_parameter(dataset_name: str) -> bool:
"""Check if dataset is a parameter."""
return dataset_name.startswith("params:") or dataset_name == "parameters"


@click.group(name="Kedro")
def catalog_cli() -> None: # pragma: no cover
pass
Expand Down Expand Up @@ -88,21 +92,15 @@ def list_datasets(metadata: ProjectMetadata, pipeline: str, env: str) -> None:

# resolve any factory datasets in the pipeline
factory_ds_by_type = defaultdict(list)

for ds_name in default_ds:
matched_pattern = data_catalog._match_pattern(
data_catalog._dataset_patterns, ds_name
) or data_catalog._match_pattern(data_catalog._default_pattern, ds_name)
if matched_pattern:
ds_config_copy = copy.deepcopy(
data_catalog._dataset_patterns.get(matched_pattern)
or data_catalog._default_pattern.get(matched_pattern)
or {}
if data_catalog.config_resolver.match_pattern(ds_name):
ds_config = data_catalog.config_resolver.resolve_dataset_pattern(
ds_name
)

ds_config = data_catalog._resolve_config(
ds_name, matched_pattern, ds_config_copy
factory_ds_by_type[ds_config.get("type", "DefaultDataset")].append(
ds_name
)
factory_ds_by_type[ds_config["type"]].append(ds_name)

default_ds = default_ds - set(chain.from_iterable(factory_ds_by_type.values()))

Expand All @@ -128,12 +126,10 @@ def _map_type_to_datasets(
datasets of the specific type as a value.
"""
mapping = defaultdict(list) # type: ignore[var-annotated]
for dataset in datasets:
is_param = dataset.startswith("params:") or dataset == "parameters"
if not is_param:
ds_type = datasets_meta[dataset].__class__.__name__
if dataset not in mapping[ds_type]:
mapping[ds_type].append(dataset)
for dataset_name in filterfalse(is_parameter, datasets):
ds_type = datasets_meta[dataset_name].__class__.__name__
if dataset_name not in mapping[ds_type]:
mapping[ds_type].append(dataset_name)
return mapping


Expand Down Expand Up @@ -170,20 +166,12 @@ def create_catalog(metadata: ProjectMetadata, pipeline_name: str, env: str) -> N
f"'{pipeline_name}' pipeline not found! Existing pipelines: {existing_pipelines}"
)

pipe_datasets = {
ds_name
for ds_name in pipeline.datasets()
if not ds_name.startswith("params:") and ds_name != "parameters"
}
pipeline_datasets = set(filterfalse(is_parameter, pipeline.datasets()))

catalog_datasets = {
ds_name
for ds_name in context.catalog._datasets.keys()
if not ds_name.startswith("params:") and ds_name != "parameters"
}
catalog_datasets = set(filterfalse(is_parameter, context.catalog.list()))

# Datasets that are missing in Data Catalog
missing_ds = sorted(pipe_datasets - catalog_datasets)
missing_ds = sorted(pipeline_datasets - catalog_datasets)
if missing_ds:
catalog_path = (
context.project_path
Expand Down Expand Up @@ -221,12 +209,9 @@ def rank_catalog_factories(metadata: ProjectMetadata, env: str) -> None:
session = _create_session(metadata.package_name, env=env)
context = session.load_context()

catalog_factories = {
**context.catalog._dataset_patterns,
**context.catalog._default_pattern,
}
catalog_factories = context.catalog.config_resolver.list_patterns()
if catalog_factories:
click.echo(yaml.dump(list(catalog_factories.keys())))
click.echo(yaml.dump(catalog_factories))
else:
click.echo("There are no dataset factories in the catalog.")

Expand All @@ -250,35 +235,25 @@ def resolve_patterns(metadata: ProjectMetadata, env: str) -> None:
explicit_datasets = {
ds_name: ds_config
for ds_name, ds_config in catalog_config.items()
if not data_catalog._is_pattern(ds_name)
if not data_catalog.config_resolver.is_pattern(ds_name)
}

target_pipelines = pipelines.keys()
datasets = set()
pipeline_datasets = set()

for pipe in target_pipelines:
pl_obj = pipelines.get(pipe)
if pl_obj:
datasets.update(pl_obj.datasets())
pipeline_datasets.update(pl_obj.datasets())

for ds_name in datasets:
is_param = ds_name.startswith("params:") or ds_name == "parameters"
if ds_name in explicit_datasets or is_param:
for ds_name in pipeline_datasets:
if ds_name in explicit_datasets or is_parameter(ds_name):
continue

matched_pattern = data_catalog._match_pattern(
data_catalog._dataset_patterns, ds_name
) or data_catalog._match_pattern(data_catalog._default_pattern, ds_name)
if matched_pattern:
ds_config_copy = copy.deepcopy(
data_catalog._dataset_patterns.get(matched_pattern)
or data_catalog._default_pattern.get(matched_pattern)
or {}
)
ds_config = data_catalog.config_resolver.resolve_dataset_pattern(ds_name)

ds_config = data_catalog._resolve_config(
ds_name, matched_pattern, ds_config_copy
)
# Exclude MemoryDatasets not set in the catalog explicitly
if ds_config:
explicit_datasets[ds_name] = ds_config

secho(yaml.dump(explicit_datasets))
10 changes: 4 additions & 6 deletions kedro/framework/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,11 @@ def run( # noqa: PLR0913
run_params=record_data, pipeline=filtered_pipeline, catalog=catalog
)

if isinstance(runner, ThreadRunner):
for ds in filtered_pipeline.datasets():
if catalog.config_resolver.match_pattern(ds):
_ = catalog._get_dataset(ds)
try:
if isinstance(runner, ThreadRunner):
for ds in filtered_pipeline.datasets():
if catalog._match_pattern(
catalog._dataset_patterns, ds
) or catalog._match_pattern(catalog._default_pattern, ds):
_ = catalog._get_dataset(ds)
run_result = runner.run(
filtered_pipeline, catalog, hook_manager, session_id
)
Expand Down
2 changes: 2 additions & 0 deletions kedro/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from __future__ import annotations

from .cached_dataset import CachedDataset
from .catalog_config_resolver import CatalogConfigResolver
from .core import (
AbstractDataset,
AbstractVersionedDataset,
Expand All @@ -23,6 +24,7 @@
"AbstractVersionedDataset",
"CachedDataset",
"DataCatalog",
"CatalogConfigResolver",
"DatasetAlreadyExistsError",
"DatasetError",
"DatasetNotFoundError",
Expand Down
Loading

0 comments on commit 7e02653

Please sign in to comment.