Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: change the default data storage version to "stable" (e.g. v2.0) #2829

Merged
merged 9 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,17 @@ jobs:
prefix-key: "manylinux2014" # use this to flush the cache
- uses: ./.github/workflows/build_linux_wheel
- uses: ./.github/workflows/run_tests
- name: Generate forward compatibility files
run: python python/tests/forward_compat/datagen.py
- name: Install old wheel
run: |
python -m venv venv
source venv/bin/activate
pip install pytest pylance==0.16.0
- name: Run forward compatibility tests
run: |
source venv/bin/activate
pytest python/tests/forward_compat --run-forward
Comment on lines +115 to +118
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fancy! ✨

# Make sure wheels are not included in the Rust cache
- name: Delete wheels
run: sudo rm -rf target/wheels
Expand Down
5 changes: 3 additions & 2 deletions docs/format.rst
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,15 @@ The following values are supported:
- Any
- This is the initial Lance format.
* - 2.0
- 0.15.0
- 0.16.0
- Any
- Rework of the Lance file format that removed row groups and introduced null
support for lists, fixed size lists, and primtives
* - 2.1 (unstable)
- None
- Any
- Adds FSST string compression and bit packing
- Enhances integer and string compression, adds support for nulls in struct fields,
and improves random access performance with nested fields.
* - legacy
- N/A
- N/A
Expand Down
1 change: 1 addition & 0 deletions java/core/lance-jni/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ crate-type = ["cdylib"]

[dependencies]
lance = { workspace = true, features = ["substrait"] }
lance-encoding = { path = "../../../rust/lance-encoding" }
lance-linalg = { path = "../../../rust/lance-linalg" }
lance-index = { path = "../../../rust/lance-index" }
lance-io.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions java/core/lance-jni/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use jni::objects::{JObject, JString};
use jni::JNIEnv;
use lance::dataset::{WriteMode, WriteParams};
use lance::index::vector::{StageParams, VectorIndexParams};
use lance_encoding::version::LanceFileVersion;
use lance_index::vector::hnsw::builder::HnswBuildParams;
use lance_index::vector::ivf::IvfBuildParams;
use lance_index::vector::pq::PQBuildParams;
Expand Down Expand Up @@ -52,6 +53,8 @@ pub fn extract_write_params(
if let Some(mode_val) = env.get_string_opt(mode)? {
write_params.mode = WriteMode::try_from(mode_val.as_str())?;
}
// Java code always sets the data storage version to Legacy for now
write_params.data_storage_version = Some(LanceFileVersion::Legacy);
Ok(write_params)
}

Expand Down
2 changes: 1 addition & 1 deletion python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2840,7 +2840,7 @@ def write_dataset(
commit_lock: Optional[CommitLock] = None,
progress: Optional[FragmentWriteProgress] = None,
storage_options: Optional[Dict[str, str]] = None,
data_storage_version: str = "legacy",
data_storage_version: str = "stable",
use_legacy_format: Optional[bool] = None,
enable_v2_manifest_paths: bool = False,
) -> LanceDataset:
Expand Down
4 changes: 2 additions & 2 deletions python/python/lance/fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def create(
progress: Optional[FragmentWriteProgress] = None,
mode: str = "append",
*,
data_storage_version: str = "legacy",
data_storage_version: str = "stable",
use_legacy_format: Optional[bool] = None,
storage_options: Optional[Dict[str, str]] = None,
) -> FragmentMetadata:
Expand Down Expand Up @@ -528,7 +528,7 @@ def write_fragments(
max_rows_per_group: int = 1024,
max_bytes_per_file: int = DEFAULT_MAX_BYTES_PER_FILE,
progress: Optional[FragmentWriteProgress] = None,
data_storage_version: str = "legacy",
data_storage_version: str = "stable",
use_legacy_format: Optional[bool] = None,
storage_options: Optional[Dict[str, str]] = None,
) -> List[FragmentMetadata]:
Expand Down
8 changes: 4 additions & 4 deletions python/python/lance/ray/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def _write_fragment(
max_rows_per_file: int = 1024 * 1024,
max_bytes_per_file: Optional[int] = None,
max_rows_per_group: int = 1024, # Only useful for v1 writer.
data_storage_version: str = "legacy",
data_storage_version: str = "stable",
storage_options: Optional[Dict[str, Any]] = None,
) -> Tuple[FragmentMetadata, pa.Schema]:
from ..dependencies import _PANDAS_AVAILABLE
Expand Down Expand Up @@ -188,7 +188,7 @@ def __init__(
schema: Optional[pa.Schema] = None,
mode: Literal["create", "append", "overwrite"] = "create",
max_rows_per_file: int = 1024 * 1024,
data_storage_version: str = "legacy",
data_storage_version: str = "stable",
use_legacy_format: Optional[bool] = None,
storage_options: Optional[Dict[str, Any]] = None,
*args,
Expand Down Expand Up @@ -295,7 +295,7 @@ def __init__(
max_rows_per_file: int = 1024 * 1024,
max_bytes_per_file: Optional[int] = None,
max_rows_per_group: Optional[int] = None, # Only useful for v1 writer.
data_storage_version: str = "legacy",
data_storage_version: str = "stable",
use_legacy_format: Optional[bool] = False,
storage_options: Optional[Dict[str, Any]] = None,
):
Expand Down Expand Up @@ -387,7 +387,7 @@ def write_lance(
max_rows_per_file: int = 1024 * 1024,
max_bytes_per_file: Optional[int] = None,
storage_options: Optional[Dict[str, Any]] = None,
data_storage_version: str = "legacy",
data_storage_version: str = "stable",
) -> None:
"""Write Ray dataset at scale.

Expand Down
21 changes: 21 additions & 0 deletions python/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,34 @@ def pytest_addoption(parser):
default=False,
help="Run slow tests",
)
parser.addoption(
"--run-forward",
action="store_true",
default=False,
help="Run forward compatibility tests (requires files to be generated already)",
)


def pytest_configure(config):
config.addinivalue_line(
"markers",
"forward: mark tests that require forward compatibility datagen files",
)
config.addinivalue_line(
"markers", "integration: mark test that requires object storage integration"
)
config.addinivalue_line(
"markers", "slow: mark tests that require large CPU or RAM resources"
)


def pytest_collection_modifyitems(config, items):
if not config.getoption("--run-integration"):
disable_items_with_mark(items, "integration", "--run-integration not specified")
if not config.getoption("--run-slow"):
disable_items_with_mark(items, "slow", "--run-slow not specified")
if not config.getoption("--run-forward"):
disable_items_with_mark(items, "forward", "--run-forward not specified")
try:
import torch

Expand Down
2 changes: 2 additions & 0 deletions python/python/tests/forward_compat/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors
97 changes: 97 additions & 0 deletions python/python/tests/forward_compat/datagen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

# This script generates Lance files that are read by test_forward_compat.py

from pathlib import Path

import pyarrow as pa
from lance.file import LanceFileWriter


def get_path(name: str):
dataset_dir = (
Path(__file__).parent.parent.parent.parent.parent
/ "test_data"
/ "forward_compat"
/ name
)
return dataset_dir


def build_basic_types():
schema = pa.schema(
[
pa.field("int", pa.int64()),
pa.field("float", pa.float32()),
pa.field("str", pa.string()),
pa.field("list_int", pa.list_(pa.int64())),
pa.field("list_str", pa.list_(pa.string())),
pa.field("struct", pa.struct([pa.field("a", pa.int64())])),
pa.field("dict", pa.dictionary(pa.int16(), pa.string())),
pa.field("str_as_dict", pa.string()),
]
)

return pa.table(
[
pa.array(range(1000)),
pa.array(range(1000), pa.float32()),
pa.array([str(i) for i in range(1000)]),
pa.array([list(range(i)) for i in range(1000)]),
pa.array([[str(i)] for i in range(1000)]),
pa.array([{"a": i} for i in range(1000)]),
pa.array(
[str(i % 10) for i in range(1000)],
pa.dictionary(pa.int16(), pa.string()),
),
pa.array(["a"] * 500 + ["b"] * 500),
],
schema=schema,
)


def write_basic_types():
path = get_path("basic_types.lance")
with LanceFileWriter(str(path)) as writer:
writer.write_batch(build_basic_types())


def build_large():
# ~40MB of vector embedding data (10K 1024-float32)
fsl_data = pa.array(range(1024 * 1000 * 10), pa.float32())
fsls = pa.FixedSizeListArray.from_arrays(fsl_data, 1024)
# ~40 MiB of binary data (10k 4KiB chunks)
bindata = pa.allocate_buffer(1024 * 1000 * 40)
offsets = pa.array(
range(0, (1024 * 1000 * 40) + 4 * 1024, 4 * 1024), pa.int32()
).buffers()[1]
bins = pa.BinaryArray.from_buffers(pa.binary(), 10000, [None, offsets, bindata])

schema = pa.schema(
[
pa.field("int", pa.int32()),
pa.field("fsl", pa.list_(pa.float32())),
pa.field("bin", pa.binary()),
]
)

return pa.table(
[
pa.array(range(10000), pa.int32()),
fsls,
bins,
],
schema=schema,
)


def write_large():
path = get_path("large.lance")
with LanceFileWriter(str(path)) as writer:
writer.write_batch(build_large())


if __name__ == "__main__":
write_basic_types()
write_large()
20 changes: 20 additions & 0 deletions python/python/tests/forward_compat/test_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

import pytest
from lance.file import LanceFileReader

from .datagen import build_basic_types, build_large, get_path


@pytest.mark.forward
def test_scans():
expected_basic_types = build_basic_types()
actual_basic_types = (
LanceFileReader(str(get_path("basic_types.lance"))).read_all().to_table()
)
assert actual_basic_types.equals(expected_basic_types)

expected_large = build_large()
actual_large = LanceFileReader(str(get_path("large.lance"))).read_all().to_table()
assert actual_large.equals(expected_large)
49 changes: 48 additions & 1 deletion python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ def test_roundtrip_types(tmp_path: Path):
}
)

dataset = lance.write_dataset(table, tmp_path)
# TODO: V2 does not currently handle large_dict
dataset = lance.write_dataset(table, tmp_path, data_storage_version="legacy")
assert dataset.schema == table.schema
assert dataset.to_table() == table

Expand Down Expand Up @@ -538,6 +539,28 @@ def test_pickle(tmp_path: Path):
assert dataset.to_table() == unpickled.to_table()


def test_nested_projection(tmp_path: Path):
table = pa.Table.from_pydict(
{
"a": range(100),
"b": range(100),
"struct": [{"x": counter, "y": counter % 2 == 0} for counter in range(100)],
}
)
base_dir = tmp_path / "test"
lance.write_dataset(table, base_dir)

dataset = lance.dataset(base_dir)

projected = dataset.to_table(columns=["struct.x"])
assert projected == pa.Table.from_pydict({"struct.x": range(100)})

projected = dataset.to_table(columns=["struct.y"])
assert projected == pa.Table.from_pydict(
{"struct.y": [i % 2 == 0 for i in range(100)]}
)


def test_polar_scan(tmp_path: Path):
some_structs = [{"x": counter, "y": counter} for counter in range(100)]
table = pa.Table.from_pydict(
Expand Down Expand Up @@ -2273,3 +2296,27 @@ def test_late_materialization_batch_size(tmp_path: Path):
columns=["values"], filter="filter % 2 == 0", batch_size=32
):
assert batch.num_rows == 32


EXPECTED_DEFAULT_STORAGE_VERSION = "2.0"
EXPECTED_MAJOR_VERSION = 2
EXPECTED_MINOR_VERSION = 0


def test_default_storage_version(tmp_path: Path):
table = pa.table({"x": [0]})
dataset = lance.write_dataset(table, tmp_path)
assert dataset.data_storage_version == EXPECTED_DEFAULT_STORAGE_VERSION

frag = lance.LanceFragment.create(dataset.uri, table)
sample_file = frag.to_json()["files"][0]
assert sample_file["file_major_version"] == EXPECTED_MAJOR_VERSION
assert sample_file["file_minor_version"] == EXPECTED_MINOR_VERSION

from lance.fragment import write_fragments

frags = write_fragments(table, dataset.uri)
frag = frags[0]
sample_file = frag.to_json()["files"][0]
assert sample_file["file_major_version"] == EXPECTED_MAJOR_VERSION
assert sample_file["file_minor_version"] == EXPECTED_MINOR_VERSION
Loading
Loading