Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(listing): pick exact or create new one on update #726

Merged
merged 1 commit into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions src/datachain/lib/dc.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,9 @@ def parse_uri(
"""Returns correct listing dataset name that must be used for saving listing
operation. It takes into account existing listings and reusability of those.
It also returns boolean saying if returned dataset name is reused / already
exists or not, and it returns correct listing path that should be used to find
rows based on uri.
exists or not (on update it always returns False - just because there was no
reason to complicate it so far). And it returns correct listing path that should
be used to find rows based on uri.
"""
catalog = session.catalog
cache = catalog.cache
Expand All @@ -427,17 +428,18 @@ def parse_uri(
if not ls.is_expired and ls.contains(ds_name)
]

if listings:
if update:
# choosing the smallest possible one to minimize update time
listing = sorted(listings, key=lambda ls: len(ls.name))[0]
else:
# no need to update, choosing the most recent one
listing = sorted(listings, key=lambda ls: ls.created_at)[-1]

# if no need to update - choosing the most recent one;
# otherwise, we'll using the exact original `ds_name`` in this case:
# - if a "bigger" listing exists, we don't want to update it, it's better
# to create a new "smaller" one on "update=True"
# - if an exact listing exists it will have the same name as `ds_name`
# anyway below
if listings and not update:
listing = sorted(listings, key=lambda ls: ls.created_at)[-1]

# for local file system we need to fix listing path / prefix
# if we are reusing existing listing
if isinstance(client, FileClient) and listing and listing.name != ds_name:
# For local file system we need to fix listing path / prefix
# if we are reusing existing listing
list_path = f'{ds_name.strip("/").removeprefix(listing.name)}/{list_path}'

ds_name = listing.name if listing else ds_name
Expand Down
8 changes: 6 additions & 2 deletions tests/func/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,13 +840,17 @@ def test_listing_stats(cloud_test_catalog):

catalog.enlist_source(f"{src_uri}/dogs/", update=True)
stats = listing_stats(src_uri, catalog)
assert stats.num_objects == 7
shcheklein marked this conversation as resolved.
Show resolved Hide resolved
assert stats.size == 36

stats = listing_stats(f"{src_uri}/dogs/", catalog)
assert stats.num_objects == 4
assert stats.size == 15

catalog.enlist_source(f"{src_uri}/dogs/")
stats = listing_stats(src_uri, catalog)
assert stats.num_objects == 4
assert stats.size == 15
assert stats.num_objects == 7
assert stats.size == 36


@pytest.mark.parametrize("cloud_type", ["s3", "azure", "gs"], indirect=True)
Expand Down
32 changes: 30 additions & 2 deletions tests/unit/test_listing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datachain.lib.dc import DataChain
from datachain.lib.file import File
from datachain.lib.listing import (
LISTING_PREFIX,
is_listing_dataset,
listing_uri_from_name,
parse_listing_uri,
Expand Down Expand Up @@ -38,20 +39,47 @@ def _tree_to_entries(tree: dict, path=""):
@pytest.fixture
def listing(test_session):
catalog = test_session.catalog
dataset_name, _, _, _ = DataChain.parse_uri("s3://whatever", test_session)
dataset_name, _, _, _ = DataChain.parse_uri("file:///whatever", test_session)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[C]: minor semi-related change - there is not reason to use s3, tests were failing since it was hitting actual token update w/o proper fixtures. cc @ilongin

DataChain.from_values(file=list(_tree_to_entries(TREE))).save(
dataset_name, listing=True
)

return Listing(
catalog.metastore.clone(),
catalog.warehouse.clone(),
Client.get_client("s3://whatever", catalog.cache, **catalog.client_config),
Client.get_client("file:///whatever", catalog.cache, **catalog.client_config),
dataset_name=dataset_name,
object_name="file",
)


def test_parse_uri_returns_exact_math_on_update(test_session):
# Context: https://github.com/iterative/datachain/pull/726
# On update it should be returning the exact match (not a "bigger" one)
dataset_name_dir1, _, _, exists = DataChain.parse_uri(
"file:///whatever/dir1", test_session
)
DataChain.from_values(file=list(_tree_to_entries(TREE["dir1"]))).save(
dataset_name_dir1, listing=True
)
assert dataset_name_dir1 == f"{LISTING_PREFIX}file:///whatever/dir1/"
assert not exists

dataset_name, _, _, exists = DataChain.parse_uri("file:///whatever", test_session)
DataChain.from_values(file=list(_tree_to_entries(TREE))).save(
dataset_name, listing=True
)
assert dataset_name == f"{LISTING_PREFIX}file:///whatever/"
assert not exists

dataset_name_dir1, _, _, exists = DataChain.parse_uri(
"file:///whatever/dir1", test_session, update=True
)
assert dataset_name_dir1 == f"{LISTING_PREFIX}file:///whatever/dir1/"
# On update it is always false (there was no reason to complicate it for now)
assert not exists


def test_resolve_path_in_root(listing):
node = listing.resolve_path("dir1")
assert node.dir_type == DirType.DIR
Expand Down
Loading