Skip to content

Commit

Permalink
fix(listing): pick exact or create new one on update
Browse files Browse the repository at this point in the history
  • Loading branch information
shcheklein committed Dec 22, 2024
1 parent 20c73b2 commit 3bcaa78
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 16 deletions.
26 changes: 14 additions & 12 deletions src/datachain/lib/dc.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,9 @@ def parse_uri(
"""Returns correct listing dataset name that must be used for saving listing
operation. It takes into account existing listings and reusability of those.
It also returns boolean saying if returned dataset name is reused / already
exists or not, and it returns correct listing path that should be used to find
rows based on uri.
exists or not (on update it always returns False - just because there was no
reason to complicate it so far). And it returns correct listing path that should
be used to find rows based on uri.
"""
catalog = session.catalog
cache = catalog.cache
Expand All @@ -427,17 +428,18 @@ def parse_uri(
if not ls.is_expired and ls.contains(ds_name)
]

if listings:
if update:
# choosing the smallest possible one to minimize update time
listing = sorted(listings, key=lambda ls: len(ls.name))[0]
else:
# no need to update, choosing the most recent one
listing = sorted(listings, key=lambda ls: ls.created_at)[-1]

# if no need to update - choosing the most recent one;
# otherwise, we'll using the exact original `ds_name`` in this case:
# - if a "bigger" listing exists, we don't want to update it, it's better
# to create a new "smaller" one on "update=True"
# - if an exact listing exists it will have the same name as `ds_name`
# anyway below
if listings and not update:
listing = sorted(listings, key=lambda ls: ls.created_at)[-1]

# for local file system we need to fix listing path / prefix
# if we are reusing existing listing
if isinstance(client, FileClient) and listing and listing.name != ds_name:
# For local file system we need to fix listing path / prefix
# if we are reusing existing listing
list_path = f'{ds_name.strip("/").removeprefix(listing.name)}/{list_path}'

ds_name = listing.name if listing else ds_name
Expand Down
8 changes: 6 additions & 2 deletions tests/func/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,13 +840,17 @@ def test_listing_stats(cloud_test_catalog):

catalog.enlist_source(f"{src_uri}/dogs/", update=True)
stats = listing_stats(src_uri, catalog)
assert stats.num_objects == 7
assert stats.size == 36

stats = listing_stats(f"{src_uri}/dogs/", catalog)
assert stats.num_objects == 4
assert stats.size == 15

catalog.enlist_source(f"{src_uri}/dogs/")
stats = listing_stats(src_uri, catalog)
assert stats.num_objects == 4
assert stats.size == 15
assert stats.num_objects == 7
assert stats.size == 36


@pytest.mark.parametrize("cloud_type", ["s3", "azure", "gs"], indirect=True)
Expand Down
32 changes: 30 additions & 2 deletions tests/unit/test_listing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datachain.lib.dc import DataChain
from datachain.lib.file import File
from datachain.lib.listing import (
LISTING_PREFIX,
is_listing_dataset,
listing_uri_from_name,
parse_listing_uri,
Expand Down Expand Up @@ -38,20 +39,47 @@ def _tree_to_entries(tree: dict, path=""):
@pytest.fixture
def listing(test_session):
catalog = test_session.catalog
dataset_name, _, _, _ = DataChain.parse_uri("s3://whatever", test_session)
dataset_name, _, _, _ = DataChain.parse_uri("file:///whatever", test_session)
DataChain.from_values(file=list(_tree_to_entries(TREE))).save(
dataset_name, listing=True
)

return Listing(
catalog.metastore.clone(),
catalog.warehouse.clone(),
Client.get_client("s3://whatever", catalog.cache, **catalog.client_config),
Client.get_client("file:///whatever", catalog.cache, **catalog.client_config),
dataset_name=dataset_name,
object_name="file",
)


def test_parse_uri_returns_exact_math_on_update(test_session):
# Context: https://github.com/iterative/datachain/pull/726
# On update it should be returning the exact match (not a "bigger" one)
dataset_name_dir1, _, _, exists = DataChain.parse_uri(
"file:///whatever/dir1", test_session
)
DataChain.from_values(file=list(_tree_to_entries(TREE["dir1"]))).save(
dataset_name_dir1, listing=True
)
assert dataset_name_dir1 == f"{LISTING_PREFIX}file:///whatever/dir1/"
assert not exists

dataset_name, _, _, exists = DataChain.parse_uri("file:///whatever", test_session)
DataChain.from_values(file=list(_tree_to_entries(TREE))).save(
dataset_name, listing=True
)
assert dataset_name == f"{LISTING_PREFIX}file:///whatever/"
assert not exists

dataset_name_dir1, _, _, exists = DataChain.parse_uri(
"file:///whatever/dir1", test_session, update=True
)
assert dataset_name_dir1 == f"{LISTING_PREFIX}file:///whatever/dir1/"
# On update it is always false (there was no reason to complicate it for now)
assert not exists


def test_resolve_path_in_root(listing):
node = listing.resolve_path("dir1")
assert node.dir_type == DirType.DIR
Expand Down

0 comments on commit 3bcaa78

Please sign in to comment.