Skip to content

Commit

Permalink
Merge branch 'main' into ilongin/636-datachain-diff
Browse files Browse the repository at this point in the history
  • Loading branch information
ilongin committed Dec 15, 2024
2 parents 87df896 + eaf0412 commit dc50a72
Show file tree
Hide file tree
Showing 16 changed files with 112 additions and 83 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ repos:
- id: trailing-whitespace
exclude: '^LICENSES/'
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: 'v0.8.1'
rev: 'v0.8.2'
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
Expand Down
11 changes: 5 additions & 6 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ Use Cases
3. **Versioning.** DataChain doesn't store, require moving or copying data (unlike DVC).
Perfect use case is a bucket with thousands or millions of images, videos, audio, PDFs.

Getting Started
===============

Visit `Quick Start <https://docs.datachain.ai/quick-start>`_ and `Docs <https://docs.datachain.ai/>`_
to get started with `DataChain` and learn more.

Key Features
============
Expand All @@ -59,12 +64,6 @@ Key Features
- Pass datasets to Pytorch and Tensorflow, or export them back into storage.


Getting Started
===============

Visit `Quick Start <https://docs.datachain.ai/quick-start>`_ to get started with `DataChain` and learn more.


Contributing
============

Expand Down
45 changes: 25 additions & 20 deletions examples/get_started/torch-loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,26 @@
"""

import multiprocessing
import os
from posixpath import basename

import torch
from torch import nn, optim
from torch.utils.data import DataLoader
from torchvision.transforms import v2
from tqdm import tqdm

from datachain import C, DataChain
from datachain.torch import label_to_int

STORAGE = "gs://datachain-demo/dogs-and-cats/"
NUM_EPOCHS = os.getenv("NUM_EPOCHS", "3")
NUM_EPOCHS = int(os.getenv("NUM_EPOCHS", "3"))

# Define transformation for data preprocessing
transform = v2.Compose(
[
v2.ToTensor(),
v2.Compose([v2.ToImage(), v2.ToDtype(torch.float32, scale=True)]),
v2.Resize((64, 64)),
v2.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
]
Expand Down Expand Up @@ -54,6 +56,7 @@ def forward(self, x):
if __name__ == "__main__":
ds = (
DataChain.from_storage(STORAGE, type="image")
.settings(cache=True, prefetch=25)
.filter(C("file.path").glob("*.jpg"))
.map(
label=lambda path: label_to_int(basename(path)[:3], CLASSES),
Expand All @@ -64,28 +67,30 @@ def forward(self, x):

train_loader = DataLoader(
ds.to_pytorch(transform=transform),
batch_size=16,
num_workers=2,
batch_size=25,
num_workers=max(4, os.cpu_count() or 2),
persistent_workers=True,
multiprocessing_context=multiprocessing.get_context("spawn"),
)

model = CNN()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train the model
for epoch in range(int(NUM_EPOCHS)):
for i, data in enumerate(train_loader):
inputs, labels = data
optimizer.zero_grad()

# Forward pass
outputs = model(inputs)
loss = criterion(outputs, labels)

# Backward pass and optimize
loss.backward()
optimizer.step()

print(f"[{epoch + 1}, {i + 1:5d}] loss: {loss.item():.3f}")

print("Finished Training")
for epoch in range(NUM_EPOCHS):
with tqdm(
train_loader, desc=f"epoch {epoch + 1}/{NUM_EPOCHS}", unit="batch"
) as loader:
for data in loader:
inputs, labels = data
optimizer.zero_grad()

# Forward pass
outputs = model(inputs)
loss = criterion(outputs, labels)

# Backward pass and optimize
loss.backward()
optimizer.step()
loader.set_postfix(loss=loss.item())
12 changes: 7 additions & 5 deletions examples/llm_and_nlp/unstructured-embeddings-gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
group_broken_paragraphs,
replace_unicode_quotes,
)
from unstructured.embed.huggingface import (
from unstructured.partition.pdf import partition_pdf
from unstructured_ingest.embed.huggingface import (
HuggingFaceEmbeddingConfig,
HuggingFaceEmbeddingEncoder,
)
from unstructured.partition.pdf import partition_pdf

from datachain import C, DataChain, DataModel, File

Expand All @@ -43,6 +43,7 @@ def process_pdf(file: File) -> Iterator[Chunk]:
chunks = partition_pdf(file=f, chunking_strategy="by_title", strategy="fast")

# Clean the chunks and add new columns
text_chunks = []
for chunk in chunks:
chunk.apply(
lambda text: clean(
Expand All @@ -51,16 +52,17 @@ def process_pdf(file: File) -> Iterator[Chunk]:
)
chunk.apply(replace_unicode_quotes)
chunk.apply(group_broken_paragraphs)
text_chunks.append({"text": str(chunk)})

# create embeddings
chunks_embedded = embedding_encoder.embed_documents(chunks)
chunks_embedded = embedding_encoder.embed_documents(text_chunks)

# Add new rows to DataChain
for chunk in chunks_embedded:
yield Chunk(
key=file.path,
text=chunk.text,
embeddings=chunk.embeddings,
text=chunk.get("text"),
embeddings=chunk.get("embeddings"),
)


Expand Down
2 changes: 2 additions & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ def examples(session: nox.Session) -> None:
session.install(".[examples]")
session.run(
"pytest",
"--durations=0",
"tests/examples",
"-m",
"examples",
*session.posargs,
Expand Down
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,14 @@ dev = [
]
examples = [
"datachain[tests]",
"numpy>=1,<2",
"defusedxml",
"accelerate",
"unstructured[pdf,embed-huggingface]<0.16.0",
"unstructured_ingest[embed-huggingface]",
"unstructured[pdf]",
"pdfplumber==0.11.4",
"huggingface_hub[hf_transfer]",
"onnx==1.16.1",
"ultralytics==8.3.37"
"ultralytics==8.3.48"
]

[project.urls]
Expand Down
3 changes: 1 addition & 2 deletions src/datachain/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from .fsspec import Client
from .s3 import ClientS3

__all__ = ["Client", "ClientS3"]
__all__ = ["Client"]
6 changes: 5 additions & 1 deletion src/datachain/lib/dc.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
)

import orjson
import pandas as pd
import sqlalchemy
from pydantic import BaseModel
from sqlalchemy.sql.functions import GenericFunction
Expand Down Expand Up @@ -57,6 +56,7 @@
from datachain.utils import batched_it, inside_notebook, row_to_nested_dict

if TYPE_CHECKING:
import pandas as pd
from pyarrow import DataType as ArrowDataType
from typing_extensions import Concatenate, ParamSpec, Self

Expand Down Expand Up @@ -1777,6 +1777,8 @@ def to_pandas(self, flatten=False) -> "pd.DataFrame":
Parameters:
flatten : Whether to use a multiindex or flatten column names.
"""
import pandas as pd

headers, max_length = self._effective_signals_schema.get_headers_with_length()
if flatten or max_length < 2:
columns = [".".join(filter(None, header)) for header in headers]
Expand All @@ -1800,6 +1802,8 @@ def show(
transpose : Whether to transpose rows and columns.
truncate : Whether or not to truncate the contents of columns.
"""
import pandas as pd

dc = self.limit(limit) if limit > 0 else self # type: ignore[misc]
df = dc.to_pandas(flatten)

Expand Down
3 changes: 2 additions & 1 deletion src/datachain/lib/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

from fsspec.callbacks import DEFAULT_CALLBACK, Callback
from PIL import Image
from pyarrow.dataset import dataset
from pydantic import Field, field_validator

from datachain.client.fileslice import FileSlice
Expand Down Expand Up @@ -452,6 +451,8 @@ class ArrowRow(DataModel):
@contextmanager
def open(self):
"""Stream row contents from indexed file."""
from pyarrow.dataset import dataset

if self.file._caching_enabled:
self.file.ensure_cached()
path = self.file.get_local_path()
Expand Down
3 changes: 2 additions & 1 deletion src/datachain/lib/meta_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from pathlib import Path
from typing import Callable

import datamodel_code_generator
import jmespath as jsp
from pydantic import BaseModel, ConfigDict, Field, ValidationError # noqa: F401

Expand Down Expand Up @@ -67,6 +66,8 @@ def read_schema(source_file, data_type="csv", expr=None, model_name=None):
data_type = "json" # treat json line as plain JSON in auto-schema
data_string = json.dumps(json_object)

import datamodel_code_generator

input_file_types = {i.value: i for i in datamodel_code_generator.InputFileType}
input_file_type = input_file_types[data_type]
with tempfile.TemporaryDirectory() as tmpdir:
Expand Down
6 changes: 1 addition & 5 deletions src/datachain/lib/pytorch.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from torch.distributed import get_rank, get_world_size
from torch.utils.data import IterableDataset, get_worker_info
from torchvision.transforms import v2
from tqdm import tqdm

from datachain import Session
from datachain.asyn import AsyncMapper
Expand Down Expand Up @@ -112,10 +111,7 @@ def __iter__(self) -> Iterator[Any]:
from datachain.lib.udf import _prefetch_input

rows = AsyncMapper(_prefetch_input, rows, workers=self.prefetch).iterate()

desc = f"Parsed PyTorch dataset for rank={total_rank} worker"
with tqdm(rows, desc=desc, unit=" rows", position=total_rank) as rows_it:
yield from map(self._process_row, rows_it)
yield from map(self._process_row, rows)

def _process_row(self, row_features):
row = []
Expand Down
5 changes: 4 additions & 1 deletion src/datachain/query/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from sqlalchemy.sql.selectable import Select

from datachain.asyn import ASYNC_WORKERS, AsyncMapper, OrderedMapper
from datachain.catalog import QUERY_SCRIPT_CANCELED_EXIT_CODE, get_catalog
from datachain.data_storage.schema import (
PARTITION_COLUMN_ID,
partition_col_names,
Expand Down Expand Up @@ -394,6 +393,8 @@ def create_result_query(
"""

def populate_udf_table(self, udf_table: "Table", query: Select) -> None:
from datachain.catalog import QUERY_SCRIPT_CANCELED_EXIT_CODE

use_partitioning = self.partition_by is not None
batching = self.udf.get_batching(use_partitioning)
workers = self.workers
Expand Down Expand Up @@ -1088,6 +1089,8 @@ def get_table() -> "TableClause":
def delete(
name: str, version: Optional[int] = None, catalog: Optional["Catalog"] = None
) -> None:
from datachain.catalog import get_catalog

catalog = catalog or get_catalog()
version = version or catalog.get_dataset(name).latest_version
catalog.remove_dataset(name, version)
Expand Down
25 changes: 19 additions & 6 deletions src/datachain/toolkit/split.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
import random
from typing import Optional

from datachain import C, DataChain

RESOLUTION = 2**31 - 1 # Maximum positive value for a 32-bit signed integer.


def train_test_split(dc: DataChain, weights: list[float]) -> list[DataChain]:
def train_test_split(
dc: DataChain,
weights: list[float],
seed: Optional[int] = None,
) -> list[DataChain]:
"""
Splits a DataChain into multiple subsets based on the provided weights.
Expand All @@ -18,6 +27,8 @@ def train_test_split(dc: DataChain, weights: list[float]) -> list[DataChain]:
For example:
- `[0.7, 0.3]` corresponds to a 70/30 split;
- `[2, 1, 1]` corresponds to a 50/25/25 split.
seed (int, optional):
The seed for the random number generator. Defaults to None.
Returns:
list[DataChain]:
Expand Down Expand Up @@ -58,14 +69,16 @@ def train_test_split(dc: DataChain, weights: list[float]) -> list[DataChain]:

weights_normalized = [weight / sum(weights) for weight in weights]

resolution = 2**31 - 1 # Maximum positive value for a 32-bit signed integer.
rand_col = C("sys.rand")
if seed is not None:
uniform_seed = random.Random(seed).randrange(1, RESOLUTION) # noqa: S311
rand_col = (rand_col % RESOLUTION) * uniform_seed # type: ignore[assignment]
rand_col = rand_col % RESOLUTION # type: ignore[assignment]

return [
dc.filter(
C("sys__rand") % resolution
>= round(sum(weights_normalized[:index]) * resolution),
C("sys__rand") % resolution
< round(sum(weights_normalized[: index + 1]) * resolution),
rand_col >= round(sum(weights_normalized[:index]) * (RESOLUTION - 1)),
rand_col < round(sum(weights_normalized[: index + 1]) * (RESOLUTION - 1)),
)
for index, _ in enumerate(weights_normalized)
]
22 changes: 12 additions & 10 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,18 +654,20 @@ def studio_datasets(requests_mock):

@pytest.fixture
def not_random_ds(test_session):
# `sys__rand` column is carefully crafted to ensure that `train_test_split` func
# will always return columns in the `sys__id` order if no seed is provided.
return DataChain.from_records(
[
{"sys__id": 1, "sys__rand": 200000000, "fib": 0},
{"sys__id": 2, "sys__rand": 400000000, "fib": 1},
{"sys__id": 3, "sys__rand": 600000000, "fib": 1},
{"sys__id": 4, "sys__rand": 800000000, "fib": 2},
{"sys__id": 5, "sys__rand": 1000000000, "fib": 3},
{"sys__id": 6, "sys__rand": 1200000000, "fib": 5},
{"sys__id": 7, "sys__rand": 1400000000, "fib": 8},
{"sys__id": 8, "sys__rand": 1600000000, "fib": 13},
{"sys__id": 9, "sys__rand": 1800000000, "fib": 21},
{"sys__id": 10, "sys__rand": 2000000000, "fib": 34},
{"sys__id": 1, "sys__rand": 8025184816406567794, "fib": 0},
{"sys__id": 2, "sys__rand": 8264763963075908010, "fib": 1},
{"sys__id": 3, "sys__rand": 338514328625642097, "fib": 1},
{"sys__id": 4, "sys__rand": 508807229144041274, "fib": 2},
{"sys__id": 5, "sys__rand": 8730460072520445744, "fib": 3},
{"sys__id": 6, "sys__rand": 154987448000528066, "fib": 5},
{"sys__id": 7, "sys__rand": 6310705427500864020, "fib": 8},
{"sys__id": 8, "sys__rand": 2154127460471345108, "fib": 13},
{"sys__id": 9, "sys__rand": 2584481985215516118, "fib": 21},
{"sys__id": 10, "sys__rand": 5771949255753972681, "fib": 34},
],
session=test_session,
schema={"sys": Sys, "fib": int},
Expand Down
Loading

0 comments on commit dc50a72

Please sign in to comment.