Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding DataChain.export_files(...) #30

Merged
merged 33 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
facabdb
added logic to export files from chain
ilongin Jul 13, 2024
21e95d5
added logic to export files from chain
ilongin Jul 13, 2024
a47f219
fixing mypy
ilongin Jul 13, 2024
f382739
fixing progress bar
ilongin Jul 13, 2024
c8625cc
Merge branch 'main' into ilongin/1721-datachain-export-files
ilongin Jul 15, 2024
8ba52a7
added simplified method to export files
ilongin Jul 15, 2024
4f0b2df
Merge branch 'main' into ilongin/1721-datachain-export-files
ilongin Jul 16, 2024
9b0fca1
refactoring output directory generation
ilongin Jul 16, 2024
3217021
returning old fetcher classes and files
ilongin Jul 16, 2024
90c3ac1
returning old fetcher classes and files
ilongin Jul 16, 2024
86355dd
added file export strategy and distinct method in dataset query
ilongin Jul 16, 2024
f66cac3
adding tests
ilongin Jul 16, 2024
55b2253
fixing strategy type
ilongin Jul 16, 2024
bbba52d
merging with main
ilongin Jul 16, 2024
2285554
using posixpath
ilongin Jul 16, 2024
bb6fb50
added tests for export_files
ilongin Jul 16, 2024
505b1e7
addef file fixtures
ilongin Jul 16, 2024
0a3fa78
merging with main
ilongin Jul 17, 2024
e82a48a
using fixtures in tests
ilongin Jul 17, 2024
e04d0dd
adding tests
ilongin Jul 17, 2024
6a9c43b
remved tests
ilongin Jul 17, 2024
406feb6
remved tests
ilongin Jul 17, 2024
4c91fdb
added print
ilongin Jul 17, 2024
e630077
better prints
ilongin Jul 17, 2024
175d40d
fixing tests
ilongin Jul 17, 2024
4775cb1
fixing tests
ilongin Jul 17, 2024
caaaa5f
removed only one test
ilongin Jul 17, 2024
a8ceb22
removed not needed function
ilongin Jul 17, 2024
35b47b5
merging with main
ilongin Jul 18, 2024
1b1f15a
added using cache
ilongin Jul 18, 2024
9459eb6
added using cache
ilongin Jul 18, 2024
cdd6a87
refactored tests and added print
ilongin Jul 18, 2024
8fffba0
renamed strategy to placement
ilongin Jul 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/datachain/lib/dc.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from datachain.lib.convert.values_to_tuples import values_to_tuples
from datachain.lib.data_model import DataType
from datachain.lib.dataset_info import DatasetInfo
from datachain.lib.file import ExportPlacement as FileExportPlacement
from datachain.lib.file import File, IndexedFile, get_file
from datachain.lib.meta_formats import read_meta, read_schema
from datachain.lib.model_store import ModelStore
Expand Down Expand Up @@ -1009,3 +1010,19 @@ def setup(self, **kwargs) -> "Self":

self._setup = self._setup | kwargs
return self

def export_files(
self,
output: str,
signal="file",
placement: FileExportPlacement = "fullpath",
use_cache: bool = True,
) -> None:
"""Method that export all files from chain to some folder"""
if placement == "filename":
print("Checking if file names are unique")
if self.select(f"{signal}.name").distinct().count() != self.count():
raise ValueError("Files with the same name found")

for file in self.collect_one(signal):
file.export(output, placement, use_cache) # type: ignore[union-attr]
43 changes: 43 additions & 0 deletions src/datachain/lib/file.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import io
import json
import os
import posixpath
from abc import ABC, abstractmethod
from contextlib import contextmanager
from datetime import datetime
Expand All @@ -24,6 +26,9 @@
if TYPE_CHECKING:
from datachain.catalog import Catalog

# how to create file path when exporting
ExportPlacement = Literal["filename", "etag", "fullpath", "checksum"]


class VFileError(DataChainError):
def __init__(self, file: "File", message: str, vtype: str = ""):
Expand Down Expand Up @@ -186,6 +191,21 @@ def open(self):
) as f:
yield f

def export(
self,
output: str,
placement: ExportPlacement = "fullpath",
use_cache: bool = True,
) -> None:
if use_cache:
self._caching_enabled = use_cache
dst = self.get_destination_path(output, placement)
dst_dir = os.path.dirname(dst)
os.makedirs(dst_dir, exist_ok=True)

with open(dst, mode="wb") as f:
f.write(self.read())

def _set_stream(
self,
catalog: "Catalog",
Expand Down Expand Up @@ -233,6 +253,29 @@ def get_path(self) -> str:
path = url2pathname(path)
return path

def get_destination_path(self, output: str, placement: ExportPlacement) -> str:
"""
Returns full destination path of a file for exporting to some output
based on export placement
"""
if placement == "filename":
path = unquote(self.name)
elif placement == "etag":
path = f"{self.etag}{self.get_file_suffix()}"
elif placement == "fullpath":
fs = self.get_fs()
if isinstance(fs, LocalFileSystem):
path = unquote(self.get_full_name())
else:
path = (
Path(urlparse(self.source).netloc) / unquote(self.get_full_name())
).as_posix()
elif placement == "checksum":
raise NotImplementedError("Checksum placement not implemented yet")
else:
raise ValueError(f"Unsupported file export placement: {placement}")
return posixpath.join(output, path) # type: ignore[union-attr]

def get_fs(self):
return self._catalog.get_client(self.source).fs

Expand Down
12 changes: 12 additions & 0 deletions src/datachain/query/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,12 @@ def apply_sql_clause(self, query):
return sqlalchemy.select(f.count(1)).select_from(query.subquery())


@frozen
class SQLDistinct(SQLClause):
def apply_sql_clause(self, query):
return query.distinct()


@frozen
class SQLUnion(Step):
query1: "DatasetQuery"
Expand Down Expand Up @@ -1407,6 +1413,12 @@ def offset(self, offset: int) -> "Self":
query.steps.append(SQLOffset(offset))
return query

@detach
def distinct(self) -> "Self":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This signature is not comprehensive

The main use case for distinct() in the datasets is removal of duplicate entries - for that, the function should take signal (or signal list) as an argument

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right! @ilongin could you please implement this as a follow up issue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #89

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will create a followup issue. It seems like we need sometning like PostgreSQL specific DISTINCT ON which is not available in SQLite though (it has just "normal" distinct which returns unique column(s)) where we will prob need to implement it with group by or something else under the hood

Copy link
Member

@dmpetrov dmpetrov Jul 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if self.select(f"{signal}.name").distinct().count() != self.count():
                raise ValueError("Files with the same name found")

This statement might not ideal for two resons:

  1. There might be an issue if the original dataset contains duplicates (we cannot guarantee it's not).
  2. It does count two times().

This seems like group by with a count is the right way to solve this, not distinct.

query = self.clone()
query.steps.append(SQLDistinct())
return query

def as_scalar(self) -> Any:
with self.as_iterable() as rows:
row = next(iter(rows))
Expand Down
57 changes: 57 additions & 0 deletions tests/func/test_datachain.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os

import pytest

from datachain.lib.dc import DataChain
Expand Down Expand Up @@ -56,3 +58,58 @@ def test_read_file(cloud_test_catalog, use_cache):
assert file.get_local_path() is None
file.read()
assert bool(file.get_local_path()) is use_cache


@pytest.mark.parametrize("placement", ["fullpath", "filename"])
@pytest.mark.parametrize("use_map", [True, False])
@pytest.mark.parametrize("use_cache", [True, False])
@pytest.mark.parametrize("cloud_type", ["file"], indirect=True)
def test_export_files(tmp_dir, cloud_test_catalog, placement, use_map, use_cache):
ctc = cloud_test_catalog
df = DataChain.from_storage(ctc.src_uri)
if use_map:
df.export_files(tmp_dir / "output", placement=placement, use_cache=use_cache)
df.map(
res=lambda file: file.export(
tmp_dir / "output", placement=placement, use_cache=use_cache
)
).exec()
else:
df.export_files(tmp_dir / "output", placement=placement)

expected = {
"description": "Cats and Dogs",
"cat1": "meow",
"cat2": "mrow",
"dog1": "woof",
"dog2": "arf",
"dog3": "bark",
"dog4": "ruff",
}

for file in df.collect_one("file"):
if placement == "filename":
file_path = file.name
else:
file_path = file.get_full_name()
with open(tmp_dir / "output" / file_path) as f:
assert f.read() == expected[file.name]


def test_export_files_filename_placement_not_unique_files(tmp_dir, catalog):
data = b"some\x00data\x00is\x48\x65\x6c\x57\x6f\x72\x6c\x64\xff\xffheRe"
bucket_name = "mybucket"
files = ["dir1/a.json", "dir1/dir2/a.json"]

# create bucket dir with duplicate file names
bucket_dir = tmp_dir / bucket_name
bucket_dir.mkdir(parents=True)
for file_path in files:
file_path = bucket_dir / file_path
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "wb") as fd:
fd.write(data)

df = DataChain.from_storage((tmp_dir / bucket_name).as_uri())
with pytest.raises(ValueError):
df.export_files(tmp_dir / "output", placement="filename")
34 changes: 34 additions & 0 deletions tests/func/test_dataset_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,40 @@ def test_select_except(cloud_test_catalog):
]


@pytest.mark.parametrize(
"cloud_type,version_aware",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check distinct without File? It should not touch other parts if there is no need

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd try to distinct on a list of integers...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed, I would leave this for separated issue as there are multiple tests that could be refactored in this way in this file

[("s3", True)],
indirect=True,
)
def test_distinct(cloud_test_catalog):
catalog = cloud_test_catalog.catalog
path = cloud_test_catalog.src_uri
ds = DatasetQuery(path=path, catalog=catalog)

q = ds.select(C.parent).order_by(C.parent).distinct()
result = q.results()

assert result == [
("",),
("cats",),
("dogs",),
("dogs/others",),
]


@pytest.mark.parametrize(
"cloud_type,version_aware",
[("s3", True)],
indirect=True,
)
def test_distinct_count(cloud_test_catalog):
catalog = cloud_test_catalog.catalog
path = cloud_test_catalog.src_uri
ds = DatasetQuery(path=path, catalog=catalog)

assert ds.select(C.parent).order_by(C.parent).distinct().count() == 4


@pytest.mark.parametrize("save", [True, False])
@pytest.mark.parametrize(
"cloud_type,version_aware",
Expand Down
50 changes: 50 additions & 0 deletions tests/unit/lib/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@
from datachain.lib.file import File, TextFile


def create_file(source: str):
return File(
name="test.txt",
parent="dir1/dir2",
source=source,
etag="ed779276108738fdb2179ccabf9680d9",
)


def test_uid_missing_location():
name = "my_name"
vtype = "vt1"
Expand Down Expand Up @@ -65,6 +74,47 @@ def test_cache_get_path(catalog: Catalog):
assert f.read() == data


def test_get_destination_path_wrong_strategy():
file = create_file("s3://mybkt")
with pytest.raises(ValueError):
file.get_destination_path("", "wrong")


def test_get_destination_path_filename_strategy():
file = create_file("s3://mybkt")
assert file.get_destination_path("output", "filename") == "output/test.txt"


def test_get_destination_path_empty_output():
file = create_file("s3://mybkt")
assert file.get_destination_path("", "filename") == "test.txt"


def test_get_destination_path_etag_strategy():
file = create_file("s3://mybkt")
assert (
file.get_destination_path("output", "etag")
== "output/ed779276108738fdb2179ccabf9680d9.txt"
)


def test_get_destination_path_fullpath_strategy(catalog):
file = create_file("s3://mybkt")
file._set_stream(catalog, False)
assert (
file.get_destination_path("output", "fullpath")
== "output/mybkt/dir1/dir2/test.txt"
)


def test_get_destination_path_fullpath_strategy_file_source(catalog, tmp_path):
file = create_file("file:///")
file._set_stream(catalog, False)
assert (
file.get_destination_path("output", "fullpath") == "output/dir1/dir2/test.txt"
)


def test_read_binary_data(tmp_path, catalog: Catalog):
file_name = "myfile"
data = b"some\x00data\x00is\x48\x65\x6c\x57\x6f\x72\x6c\x64\xff\xffheRe"
Expand Down
Loading