From 3bcaa78438822037f50498e2b1905e786422c5c4 Mon Sep 17 00:00:00 2001 From: Ivan Shcheklein Date: Fri, 20 Dec 2024 15:41:38 -0800 Subject: [PATCH] fix(listing): pick exact or create new one on update --- src/datachain/lib/dc.py | 26 ++++++++++++++------------ tests/func/test_catalog.py | 8 ++++++-- tests/unit/test_listing.py | 32 ++++++++++++++++++++++++++++++-- 3 files changed, 50 insertions(+), 16 deletions(-) diff --git a/src/datachain/lib/dc.py b/src/datachain/lib/dc.py index a3278b232..5e61ed965 100644 --- a/src/datachain/lib/dc.py +++ b/src/datachain/lib/dc.py @@ -410,8 +410,9 @@ def parse_uri( """Returns correct listing dataset name that must be used for saving listing operation. It takes into account existing listings and reusability of those. It also returns boolean saying if returned dataset name is reused / already - exists or not, and it returns correct listing path that should be used to find - rows based on uri. + exists or not (on update it always returns False - just because there was no + reason to complicate it so far). And it returns correct listing path that should + be used to find rows based on uri. """ catalog = session.catalog cache = catalog.cache @@ -427,17 +428,18 @@ def parse_uri( if not ls.is_expired and ls.contains(ds_name) ] - if listings: - if update: - # choosing the smallest possible one to minimize update time - listing = sorted(listings, key=lambda ls: len(ls.name))[0] - else: - # no need to update, choosing the most recent one - listing = sorted(listings, key=lambda ls: ls.created_at)[-1] - + # if no need to update - choosing the most recent one; + # otherwise, we'll using the exact original `ds_name`` in this case: + # - if a "bigger" listing exists, we don't want to update it, it's better + # to create a new "smaller" one on "update=True" + # - if an exact listing exists it will have the same name as `ds_name` + # anyway below + if listings and not update: + listing = sorted(listings, key=lambda ls: ls.created_at)[-1] + + # for local file system we need to fix listing path / prefix + # if we are reusing existing listing if isinstance(client, FileClient) and listing and listing.name != ds_name: - # For local file system we need to fix listing path / prefix - # if we are reusing existing listing list_path = f'{ds_name.strip("/").removeprefix(listing.name)}/{list_path}' ds_name = listing.name if listing else ds_name diff --git a/tests/func/test_catalog.py b/tests/func/test_catalog.py index 5f1b27ae6..fb353a911 100644 --- a/tests/func/test_catalog.py +++ b/tests/func/test_catalog.py @@ -840,13 +840,17 @@ def test_listing_stats(cloud_test_catalog): catalog.enlist_source(f"{src_uri}/dogs/", update=True) stats = listing_stats(src_uri, catalog) + assert stats.num_objects == 7 + assert stats.size == 36 + + stats = listing_stats(f"{src_uri}/dogs/", catalog) assert stats.num_objects == 4 assert stats.size == 15 catalog.enlist_source(f"{src_uri}/dogs/") stats = listing_stats(src_uri, catalog) - assert stats.num_objects == 4 - assert stats.size == 15 + assert stats.num_objects == 7 + assert stats.size == 36 @pytest.mark.parametrize("cloud_type", ["s3", "azure", "gs"], indirect=True) diff --git a/tests/unit/test_listing.py b/tests/unit/test_listing.py index 06884353e..338f97bfb 100644 --- a/tests/unit/test_listing.py +++ b/tests/unit/test_listing.py @@ -7,6 +7,7 @@ from datachain.lib.dc import DataChain from datachain.lib.file import File from datachain.lib.listing import ( + LISTING_PREFIX, is_listing_dataset, listing_uri_from_name, parse_listing_uri, @@ -38,7 +39,7 @@ def _tree_to_entries(tree: dict, path=""): @pytest.fixture def listing(test_session): catalog = test_session.catalog - dataset_name, _, _, _ = DataChain.parse_uri("s3://whatever", test_session) + dataset_name, _, _, _ = DataChain.parse_uri("file:///whatever", test_session) DataChain.from_values(file=list(_tree_to_entries(TREE))).save( dataset_name, listing=True ) @@ -46,12 +47,39 @@ def listing(test_session): return Listing( catalog.metastore.clone(), catalog.warehouse.clone(), - Client.get_client("s3://whatever", catalog.cache, **catalog.client_config), + Client.get_client("file:///whatever", catalog.cache, **catalog.client_config), dataset_name=dataset_name, object_name="file", ) +def test_parse_uri_returns_exact_math_on_update(test_session): + # Context: https://github.com/iterative/datachain/pull/726 + # On update it should be returning the exact match (not a "bigger" one) + dataset_name_dir1, _, _, exists = DataChain.parse_uri( + "file:///whatever/dir1", test_session + ) + DataChain.from_values(file=list(_tree_to_entries(TREE["dir1"]))).save( + dataset_name_dir1, listing=True + ) + assert dataset_name_dir1 == f"{LISTING_PREFIX}file:///whatever/dir1/" + assert not exists + + dataset_name, _, _, exists = DataChain.parse_uri("file:///whatever", test_session) + DataChain.from_values(file=list(_tree_to_entries(TREE))).save( + dataset_name, listing=True + ) + assert dataset_name == f"{LISTING_PREFIX}file:///whatever/" + assert not exists + + dataset_name_dir1, _, _, exists = DataChain.parse_uri( + "file:///whatever/dir1", test_session, update=True + ) + assert dataset_name_dir1 == f"{LISTING_PREFIX}file:///whatever/dir1/" + # On update it is always false (there was no reason to complicate it for now) + assert not exists + + def test_resolve_path_in_root(listing): node = listing.resolve_path("dir1") assert node.dir_type == DirType.DIR