Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DataCatalog2.0]: Move pattern resolution logic to the separate component #4123

Merged
merged 89 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 84 commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
a8f4fb3
Added a skeleton for AbstractDataCatalog and KedroDataCatalog
ElenaKhaustova Jul 31, 2024
7d56818
Removed from_config method
ElenaKhaustova Jul 31, 2024
787e121
Merge branch 'main' into refactor-pattern-logic
ElenaKhaustova Aug 2, 2024
0b80f23
Implemented _init_datasets method
ElenaKhaustova Aug 2, 2024
5c727df
Implemented get dataset
ElenaKhaustova Aug 2, 2024
05c9171
Started resolve_patterns implementation
ElenaKhaustova Aug 2, 2024
5c804d6
Implemented resolve_patterns
ElenaKhaustova Aug 5, 2024
e9ba5c4
Merge branch 'main' into refactor-pattern-logic
ElenaKhaustova Aug 5, 2024
530f7d6
Fixed credentials resolving
ElenaKhaustova Aug 5, 2024
64be83c
Updated match pattern
ElenaKhaustova Aug 6, 2024
c29828a
Implemented add from dict method
ElenaKhaustova Aug 6, 2024
957403a
Updated io __init__
ElenaKhaustova Aug 6, 2024
14908ff
Added list method
ElenaKhaustova Aug 6, 2024
c5e925b
Implemented _validate_missing_keys
ElenaKhaustova Aug 6, 2024
b9a92b0
Added datasets access logic
ElenaKhaustova Aug 7, 2024
2cb794f
Merge branch 'main' into refactor-pattern-logic
ElenaKhaustova Aug 7, 2024
2f32593
Added __contains__ and comments on lazy loading
ElenaKhaustova Aug 7, 2024
d1ea64e
Renamed dataset_name to ds_name
ElenaKhaustova Aug 8, 2024
fb89fca
Updated some docstrings
ElenaKhaustova Aug 8, 2024
4486939
Merge branch 'main' into refactor-pattern-logic
ElenaKhaustova Aug 12, 2024
c667645
Fixed _update_ds_configs
ElenaKhaustova Aug 12, 2024
be8e929
Fixed _init_datasets
ElenaKhaustova Aug 12, 2024
ec7ac39
Implemented add_runtime_patterns
ElenaKhaustova Aug 12, 2024
8e23450
Fixed runtime patterns usage
ElenaKhaustova Aug 13, 2024
529e61a
Merge branch 'main' into refactor-pattern-logic
ElenaKhaustova Aug 19, 2024
e4cb21c
Merge branch 'main' into refactor-pattern-logic
ElenaKhaustova Aug 21, 2024
50bc816
Moved pattern logic out of data catalog, implemented KedroDataCatalog
ElenaKhaustova Aug 21, 2024
6dfbcb0
Merge branch 'main' into 4110-move-pattern-resolution-logic
ElenaKhaustova Aug 22, 2024
9346f08
KedroDataCatalog updates
ElenaKhaustova Aug 22, 2024
9568e29
Added property to return config
ElenaKhaustova Aug 28, 2024
86efdfe
Merge branch 'main' into 4110-move-pattern-resolution-logic
ElenaKhaustova Aug 28, 2024
5e27660
Added list patterns method
ElenaKhaustova Aug 28, 2024
72b11d0
Renamed and moved ConfigResolver
ElenaKhaustova Aug 29, 2024
f0a4090
Renamed ConfigResolver
ElenaKhaustova Aug 29, 2024
a4da52a
Merge branch 'main' into 4110-move-pattern-resolution-logic
ElenaKhaustova Aug 29, 2024
7d6227f
Cleaned KedroDataCatalog
ElenaKhaustova Aug 29, 2024
4092291
Cleaned up DataCatalogConfigResolver
ElenaKhaustova Aug 29, 2024
63e47f9
Docs build fix attempt
ElenaKhaustova Aug 30, 2024
68f6527
Removed KedroDataCatalog
ElenaKhaustova Sep 5, 2024
2ac4a2f
Updated from_config method
ElenaKhaustova Sep 5, 2024
cb5879d
Updated constructor and add methods
ElenaKhaustova Sep 5, 2024
9038e96
Updated _get_dataset method
ElenaKhaustova Sep 5, 2024
cc89565
Updated __contains__
ElenaKhaustova Sep 5, 2024
59b6764
Updated __eq__ and shallow_copy
ElenaKhaustova Sep 5, 2024
4f5a3fb
Added __iter__ and __getitem__
ElenaKhaustova Sep 5, 2024
12ed6f2
Removed unused imports
ElenaKhaustova Sep 5, 2024
a106cec
Added TODO
ElenaKhaustova Sep 5, 2024
6df04f7
Updated runner.run()
ElenaKhaustova Sep 5, 2024
8566e27
Updated session
ElenaKhaustova Sep 5, 2024
2dcea33
Added confil_resolver property
ElenaKhaustova Sep 5, 2024
a46597f
Updated catalog list command
ElenaKhaustova Sep 5, 2024
3787545
Updated catalog create command
ElenaKhaustova Sep 5, 2024
68d612d
Updated catalog rank command
ElenaKhaustova Sep 5, 2024
af5bee9
Updated catalog resolve command
ElenaKhaustova Sep 5, 2024
acc4d6e
Merge branch 'main' into 4110-move-pattern-resolution-logic
ElenaKhaustova Sep 5, 2024
e67ff0f
Remove some methods
ElenaKhaustova Sep 5, 2024
7b3afa2
Removed ds configs from catalog
ElenaKhaustova Sep 6, 2024
658a759
Fixed lint
ElenaKhaustova Sep 6, 2024
7be2a8e
Fixed typo
ElenaKhaustova Sep 6, 2024
09f3f26
Merge branch 'main' into 4110-move-pattern-resolution-logic
ElenaKhaustova Sep 6, 2024
9e43a9a
Added module docstring
ElenaKhaustova Sep 6, 2024
25b6501
Removed None from Pattern type
ElenaKhaustova Sep 6, 2024
3a646de
Fixed docs failing to find class reference
ElenaKhaustova Sep 6, 2024
5e5df4a
Fixed docs failing to find class reference
ElenaKhaustova Sep 6, 2024
aa59a35
Updated Patterns type
ElenaKhaustova Sep 6, 2024
c7efa3e
Fix tests (#4149)
ankatiyar Sep 6, 2024
023ffc6
Returned constants to avoid breaking changes
ElenaKhaustova Sep 6, 2024
585b44f
Minor fix
ElenaKhaustova Sep 6, 2024
e447078
Updated test_sorting_order_with_other_dataset_through_extra_pattern
ElenaKhaustova Sep 9, 2024
beb0165
Merge branch 'main' into 4110-move-pattern-resolution-logic
ElenaKhaustova Sep 9, 2024
975e968
Removed odd properties
ElenaKhaustova Sep 9, 2024
11d782c
Updated tests
ElenaKhaustova Sep 9, 2024
e4abd23
Removed None from _fetch_credentials input
ElenaKhaustova Sep 9, 2024
6433dd8
Renamed DataCatalogConfigResolver to CatalogConfigResolver
ElenaKhaustova Sep 10, 2024
355576f
Renamed _init_configs to _resolve_config_credentials
ElenaKhaustova Sep 10, 2024
39d9ff6
Moved functions to the class
ElenaKhaustova Sep 10, 2024
659c9da
Refactored resolve_dataset_pattern
ElenaKhaustova Sep 10, 2024
840b32a
Fixed refactored part
ElenaKhaustova Sep 10, 2024
77f551c
Changed the order of arguments for DataCatalog constructor
ElenaKhaustova Sep 10, 2024
6e079a1
Replaced __getitem__ with .get()
ElenaKhaustova Sep 10, 2024
1f7e5f8
Updated catalog commands
ElenaKhaustova Sep 10, 2024
80f0e3d
Moved warm up block outside of the try block
ElenaKhaustova Sep 10, 2024
017cda3
Fixed linter
ElenaKhaustova Sep 10, 2024
cab6f06
Removed odd copying
ElenaKhaustova Sep 10, 2024
8f604d1
Updated release notes
ElenaKhaustova Sep 11, 2024
9a4db18
Returned DatasetError
ElenaKhaustova Sep 11, 2024
0a6946a
Added _dataset_patterns and _default_pattern to _config_resolver to a…
ElenaKhaustova Sep 11, 2024
fee7bd6
Made resolve_dataset_pattern return just dict
ElenaKhaustova Sep 11, 2024
f5a7992
Fixed linter
ElenaKhaustova Sep 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
"typing.Type",
"typing.Set",
"kedro.config.config.ConfigLoader",
"kedro.io.catalog_config_resolver.CatalogConfigResolver",
"kedro.io.core.AbstractDataset",
"kedro.io.core.AbstractVersionedDataset",
"kedro.io.core.DatasetError",
Expand Down Expand Up @@ -168,6 +169,7 @@
"D[k] if k in D, else d. d defaults to None.",
"None. Update D from mapping/iterable E and F.",
"Patterns",
"CatalogConfigResolver",
),
"py:data": (
"typing.Any",
Expand Down
85 changes: 30 additions & 55 deletions kedro/framework/cli/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

from __future__ import annotations

import copy
from collections import defaultdict
from itertools import chain
from itertools import chain, filterfalse
from typing import TYPE_CHECKING, Any

import click
Expand All @@ -28,6 +27,11 @@ def _create_session(package_name: str, **kwargs: Any) -> KedroSession:
return KedroSession.create(**kwargs)


def is_parameter(dataset_name: str) -> bool:
"""Check if dataset is a parameter."""
return dataset_name.startswith("params:") or dataset_name == "parameters"


@click.group(name="Kedro")
def catalog_cli() -> None: # pragma: no cover
pass
Expand Down Expand Up @@ -88,21 +92,15 @@ def list_datasets(metadata: ProjectMetadata, pipeline: str, env: str) -> None:

# resolve any factory datasets in the pipeline
factory_ds_by_type = defaultdict(list)
for ds_name in default_ds:
matched_pattern = data_catalog._match_pattern(
data_catalog._dataset_patterns, ds_name
) or data_catalog._match_pattern(data_catalog._default_pattern, ds_name)
if matched_pattern:
ds_config_copy = copy.deepcopy(
data_catalog._dataset_patterns.get(matched_pattern)
or data_catalog._default_pattern.get(matched_pattern)
or {}
)

ds_config = data_catalog._resolve_config(
ds_name, matched_pattern, ds_config_copy
resolved_configs = data_catalog.config_resolver.resolve_dataset_pattern(
default_ds
)
for ds_name, ds_config in zip(default_ds, resolved_configs):
if data_catalog.config_resolver.match_pattern(ds_name):
factory_ds_by_type[ds_config.get("type", "DefaultDataset")].append( # type: ignore[attr-defined]
ds_name
)
factory_ds_by_type[ds_config["type"]].append(ds_name)

default_ds = default_ds - set(chain.from_iterable(factory_ds_by_type.values()))

Expand All @@ -128,12 +126,10 @@ def _map_type_to_datasets(
datasets of the specific type as a value.
"""
mapping = defaultdict(list) # type: ignore[var-annotated]
for dataset in datasets:
is_param = dataset.startswith("params:") or dataset == "parameters"
if not is_param:
ds_type = datasets_meta[dataset].__class__.__name__
if dataset not in mapping[ds_type]:
mapping[ds_type].append(dataset)
for dataset_name in filterfalse(is_parameter, datasets):
ds_type = datasets_meta[dataset_name].__class__.__name__
if dataset_name not in mapping[ds_type]:
mapping[ds_type].append(dataset_name)
return mapping


Expand Down Expand Up @@ -170,20 +166,12 @@ def create_catalog(metadata: ProjectMetadata, pipeline_name: str, env: str) -> N
f"'{pipeline_name}' pipeline not found! Existing pipelines: {existing_pipelines}"
)

pipe_datasets = {
ds_name
for ds_name in pipeline.datasets()
if not ds_name.startswith("params:") and ds_name != "parameters"
}
pipeline_datasets = set(filterfalse(is_parameter, pipeline.datasets()))

catalog_datasets = {
ds_name
for ds_name in context.catalog._datasets.keys()
if not ds_name.startswith("params:") and ds_name != "parameters"
}
catalog_datasets = set(filterfalse(is_parameter, context.catalog.list()))

# Datasets that are missing in Data Catalog
missing_ds = sorted(pipe_datasets - catalog_datasets)
missing_ds = sorted(pipeline_datasets - catalog_datasets)
if missing_ds:
catalog_path = (
context.project_path
Expand Down Expand Up @@ -221,12 +209,9 @@ def rank_catalog_factories(metadata: ProjectMetadata, env: str) -> None:
session = _create_session(metadata.package_name, env=env)
context = session.load_context()

catalog_factories = {
**context.catalog._dataset_patterns,
**context.catalog._default_pattern,
}
catalog_factories = context.catalog.config_resolver.list_patterns()
if catalog_factories:
click.echo(yaml.dump(list(catalog_factories.keys())))
click.echo(yaml.dump(catalog_factories))
else:
click.echo("There are no dataset factories in the catalog.")

Expand All @@ -250,35 +235,25 @@ def resolve_patterns(metadata: ProjectMetadata, env: str) -> None:
explicit_datasets = {
ds_name: ds_config
for ds_name, ds_config in catalog_config.items()
if not data_catalog._is_pattern(ds_name)
if not data_catalog.config_resolver.is_pattern(ds_name)
}

target_pipelines = pipelines.keys()
datasets = set()
pipeline_datasets = set()

for pipe in target_pipelines:
pl_obj = pipelines.get(pipe)
if pl_obj:
datasets.update(pl_obj.datasets())
pipeline_datasets.update(pl_obj.datasets())

for ds_name in datasets:
is_param = ds_name.startswith("params:") or ds_name == "parameters"
if ds_name in explicit_datasets or is_param:
for ds_name in pipeline_datasets:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big fan of continue since it harms the flow of thinking while reading the code. Could we do this instead:

for ds_name in (pipeline_datasets - explicit_datasets - filter(is_parameter, pipeline_datasets)):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will have to do this since explicit_datasets is a dictionary:

for ds_name in (pipeline_datasets - set(explicit_datasets.keys()) - filter(is_parameter, pipeline_datasets)):

and for me, it looks way more complex to understand than

for ds_name in pipeline_datasets:
    if ds_name in explicit_datasets or is_parameter(ds_name):
        continue

It's also slightly worse from the complexity point of view though I agree both cost nothing.

Curious what other think.

if ds_name in explicit_datasets or is_parameter(ds_name):
continue

matched_pattern = data_catalog._match_pattern(
data_catalog._dataset_patterns, ds_name
) or data_catalog._match_pattern(data_catalog._default_pattern, ds_name)
if matched_pattern:
ds_config_copy = copy.deepcopy(
data_catalog._dataset_patterns.get(matched_pattern)
or data_catalog._default_pattern.get(matched_pattern)
or {}
)
ds_config = data_catalog.config_resolver.resolve_dataset_pattern(ds_name)

ds_config = data_catalog._resolve_config(
ds_name, matched_pattern, ds_config_copy
)
# Exclude MemoryDatasets not set in the catalog explicitly
if ds_config is not None:
explicit_datasets[ds_name] = ds_config

secho(yaml.dump(explicit_datasets))
10 changes: 4 additions & 6 deletions kedro/framework/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,11 @@ def run( # noqa: PLR0913
run_params=record_data, pipeline=filtered_pipeline, catalog=catalog
)

if isinstance(runner, ThreadRunner):
for ds in filtered_pipeline.datasets():
if catalog.config_resolver.match_pattern(ds):
_ = catalog._get_dataset(ds)
try:
if isinstance(runner, ThreadRunner):
for ds in filtered_pipeline.datasets():
if catalog._match_pattern(
catalog._dataset_patterns, ds
) or catalog._match_pattern(catalog._default_pattern, ds):
_ = catalog._get_dataset(ds)
run_result = runner.run(
filtered_pipeline, catalog, hook_manager, session_id
)
Expand Down
2 changes: 2 additions & 0 deletions kedro/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from __future__ import annotations

from .cached_dataset import CachedDataset
from .catalog_config_resolver import CatalogConfigResolver
from .core import (
AbstractDataset,
AbstractVersionedDataset,
Expand All @@ -23,6 +24,7 @@
"AbstractVersionedDataset",
"CachedDataset",
"DataCatalog",
"CatalogConfigResolver",
"DatasetAlreadyExistsError",
"DatasetError",
"DatasetNotFoundError",
Expand Down
Loading