Skip to content

Commit

Permalink
Merge branch 'main' into amrit/stream-logs
Browse files Browse the repository at this point in the history
  • Loading branch information
0x2b3bfa0 authored Dec 17, 2024
2 parents 8e3a5d8 + 10e90c5 commit 0797abf
Show file tree
Hide file tree
Showing 25 changed files with 895 additions and 125 deletions.
9 changes: 6 additions & 3 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
os: [ubuntu-latest, windows-latest]
pyv: ['3.9', '3.12']
group: ['get_started', 'llm_and_nlp or computer_vision', 'multimodal']
exclude:
Expand Down Expand Up @@ -166,9 +166,12 @@ jobs:
- name: Install nox
run: uv pip install nox --system

# HF runs against actual API - thus run it only once
- name: Set hf token
if: matrix.os == 'ubuntu-latest' && matrix.pyv == '3.12'
run: echo 'HF_TOKEN=${{ secrets.HF_TOKEN }}' >> "$GITHUB_ENV"

- name: Run examples
env:
HF_TOKEN: ${{ secrets.HF_TOKEN }}
run: nox -s examples -p ${{ matrix.pyv }} -- -m "${{ matrix.group }}"

check:
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ repos:
- id: trailing-whitespace
exclude: '^LICENSES/'
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: 'v0.8.1'
rev: 'v0.8.3'
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
Expand Down
11 changes: 5 additions & 6 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ Use Cases
3. **Versioning.** DataChain doesn't store, require moving or copying data (unlike DVC).
Perfect use case is a bucket with thousands or millions of images, videos, audio, PDFs.

Getting Started
===============

Visit `Quick Start <https://docs.datachain.ai/quick-start>`_ and `Docs <https://docs.datachain.ai/>`_
to get started with `DataChain` and learn more.

Key Features
============
Expand All @@ -59,12 +64,6 @@ Key Features
- Pass datasets to Pytorch and Tensorflow, or export them back into storage.


Getting Started
===============

Visit `Quick Start <https://docs.datachain.ai/quick-start>`_ to get started with `DataChain` and learn more.


Contributing
============

Expand Down
45 changes: 25 additions & 20 deletions examples/get_started/torch-loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,26 @@
"""

import multiprocessing
import os
from posixpath import basename

import torch
from torch import nn, optim
from torch.utils.data import DataLoader
from torchvision.transforms import v2
from tqdm import tqdm

from datachain import C, DataChain
from datachain.torch import label_to_int

STORAGE = "gs://datachain-demo/dogs-and-cats/"
NUM_EPOCHS = os.getenv("NUM_EPOCHS", "3")
NUM_EPOCHS = int(os.getenv("NUM_EPOCHS", "3"))

# Define transformation for data preprocessing
transform = v2.Compose(
[
v2.ToTensor(),
v2.Compose([v2.ToImage(), v2.ToDtype(torch.float32, scale=True)]),
v2.Resize((64, 64)),
v2.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
]
Expand Down Expand Up @@ -54,6 +56,7 @@ def forward(self, x):
if __name__ == "__main__":
ds = (
DataChain.from_storage(STORAGE, type="image")
.settings(cache=True, prefetch=25)
.filter(C("file.path").glob("*.jpg"))
.map(
label=lambda path: label_to_int(basename(path)[:3], CLASSES),
Expand All @@ -64,28 +67,30 @@ def forward(self, x):

train_loader = DataLoader(
ds.to_pytorch(transform=transform),
batch_size=16,
num_workers=2,
batch_size=25,
num_workers=max(4, os.cpu_count() or 2),
persistent_workers=True,
multiprocessing_context=multiprocessing.get_context("spawn"),
)

model = CNN()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train the model
for epoch in range(int(NUM_EPOCHS)):
for i, data in enumerate(train_loader):
inputs, labels = data
optimizer.zero_grad()

# Forward pass
outputs = model(inputs)
loss = criterion(outputs, labels)

# Backward pass and optimize
loss.backward()
optimizer.step()

print(f"[{epoch + 1}, {i + 1:5d}] loss: {loss.item():.3f}")

print("Finished Training")
for epoch in range(NUM_EPOCHS):
with tqdm(
train_loader, desc=f"epoch {epoch + 1}/{NUM_EPOCHS}", unit="batch"
) as loader:
for data in loader:
inputs, labels = data
optimizer.zero_grad()

# Forward pass
outputs = model(inputs)
loss = criterion(outputs, labels)

# Backward pass and optimize
loss.backward()
optimizer.step()
loader.set_postfix(loss=loss.item())
12 changes: 7 additions & 5 deletions examples/llm_and_nlp/unstructured-embeddings-gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
group_broken_paragraphs,
replace_unicode_quotes,
)
from unstructured.embed.huggingface import (
from unstructured.partition.pdf import partition_pdf
from unstructured_ingest.embed.huggingface import (
HuggingFaceEmbeddingConfig,
HuggingFaceEmbeddingEncoder,
)
from unstructured.partition.pdf import partition_pdf

from datachain import C, DataChain, DataModel, File

Expand All @@ -43,6 +43,7 @@ def process_pdf(file: File) -> Iterator[Chunk]:
chunks = partition_pdf(file=f, chunking_strategy="by_title", strategy="fast")

# Clean the chunks and add new columns
text_chunks = []
for chunk in chunks:
chunk.apply(
lambda text: clean(
Expand All @@ -51,16 +52,17 @@ def process_pdf(file: File) -> Iterator[Chunk]:
)
chunk.apply(replace_unicode_quotes)
chunk.apply(group_broken_paragraphs)
text_chunks.append({"text": str(chunk)})

# create embeddings
chunks_embedded = embedding_encoder.embed_documents(chunks)
chunks_embedded = embedding_encoder.embed_documents(text_chunks)

# Add new rows to DataChain
for chunk in chunks_embedded:
yield Chunk(
key=file.path,
text=chunk.text,
embeddings=chunk.embeddings,
text=chunk.get("text"),
embeddings=chunk.get("embeddings"),
)


Expand Down
2 changes: 2 additions & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ def examples(session: nox.Session) -> None:
session.install(".[examples]")
session.run(
"pytest",
"--durations=0",
"tests/examples",
"-m",
"examples",
*session.posargs,
Expand Down
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,14 @@ dev = [
]
examples = [
"datachain[tests]",
"numpy>=1,<2",
"defusedxml",
"accelerate",
"unstructured[pdf,embed-huggingface]<0.16.0",
"unstructured_ingest[embed-huggingface]",
"unstructured[pdf]",
"pdfplumber==0.11.4",
"huggingface_hub[hf_transfer]",
"onnx==1.16.1",
"ultralytics==8.3.37"
"ultralytics==8.3.50"
]

[project.urls]
Expand Down
6 changes: 3 additions & 3 deletions src/datachain/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -1297,15 +1297,15 @@ def pull_dataset( # noqa: PLR0915
output: Optional[str] = None,
local_ds_name: Optional[str] = None,
local_ds_version: Optional[int] = None,
no_cp: bool = False,
cp: bool = False,
force: bool = False,
edatachain: bool = False,
edatachain_file: Optional[str] = None,
*,
client_config=None,
) -> None:
def _instantiate(ds_uri: str) -> None:
if no_cp:
if not cp:
return
assert output
self.cp(
Expand All @@ -1318,7 +1318,7 @@ def _instantiate(ds_uri: str) -> None:
)
print(f"Dataset {ds_uri} instantiated locally to {output}")

if not output and not no_cp:
if cp and not output:
raise ValueError("Please provide output directory for instantiation")

studio_client = StudioClient()
Expand Down
28 changes: 25 additions & 3 deletions src/datachain/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,28 @@ def add_studio_parser(subparsers, parent_parser) -> None:
help="Python package requirement. Can be specified multiple times.",
)

studio_cancel_help = "Cancel a job in Studio"
studio_cancel_description = "This command cancels a job in Studio."

studio_cancel_parser = studio_subparser.add_parser(
"cancel",
parents=[parent_parser],
description=studio_cancel_description,
help=studio_cancel_help,
)

studio_cancel_parser.add_argument(
"job_id",
action="store",
help="The job ID to cancel.",
)
studio_cancel_parser.add_argument(
"--team",
action="store",
default=None,
help="The team to cancel a job for. By default, it will use team from config.",
)


def get_parser() -> ArgumentParser: # noqa: PLR0915
try:
Expand Down Expand Up @@ -457,10 +479,10 @@ def get_parser() -> ArgumentParser: # noqa: PLR0915
help="Copy directories recursively",
)
parse_pull.add_argument(
"--no-cp",
"--cp",
default=False,
action="store_true",
help="Do not copy files, just pull a remote dataset into local DB",
help="Copy actual files after pulling remote dataset into local DB",
)
parse_pull.add_argument(
"--edatachain",
Expand Down Expand Up @@ -1300,7 +1322,7 @@ def main(argv: Optional[list[str]] = None) -> int: # noqa: C901, PLR0912, PLR09
args.output,
local_ds_name=args.local_name,
local_ds_version=args.local_version,
no_cp=args.no_cp,
cp=args.cp,
force=bool(args.force),
edatachain=args.edatachain,
edatachain_file=args.edatachain_file,
Expand Down
3 changes: 1 addition & 2 deletions src/datachain/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from .fsspec import Client
from .s3 import ClientS3

__all__ = ["Client", "ClientS3"]
__all__ = ["Client"]
Loading

0 comments on commit 0797abf

Please sign in to comment.