Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add CreateIndex commit type to python API #2883

Merged
merged 8 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
Literal,
NamedTuple,
Optional,
Set,
TypedDict,
Union,
)
Expand Down Expand Up @@ -2233,6 +2234,27 @@ def _to_inner(self):
rewritten_indices = [index._to_inner() for index in self.rewritten_indices]
return _Operation.rewrite(groups, rewritten_indices)

@dataclass
class CreateIndex(BaseOperation):
"""
Operation that creates an index on the dataset.
"""

uuid: str
name: str
fields: List[int]
dataset_version: int
fragment_ids: Set[int]

def _to_inner(self):
return _Operation.create_index(
self.uuid,
self.name,
self.fields,
self.dataset_version,
self.fragment_ids,
)


class ScannerBuilder:
def __init__(self, ds: LanceDataset):
Expand Down
86 changes: 86 additions & 0 deletions python/python/tests/test_commit_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

import random
import shutil
import string
from pathlib import Path

import lance
import numpy as np
import pyarrow as pa
import pytest


@pytest.fixture()
def test_table():
num_rows = 1000
price = np.random.rand(num_rows) * 100

def gen_str(n, split="", char_set=string.ascii_letters + string.digits):
return "".join(random.choices(char_set, k=n))

meta = np.array([gen_str(100) for _ in range(num_rows)])
doc = [gen_str(10, " ", string.ascii_letters) for _ in range(num_rows)]
tbl = pa.Table.from_arrays(
[
pa.array(price),
pa.array(meta),
pa.array(doc, pa.large_string()),
pa.array(range(num_rows)),
],
names=["price", "meta", "doc", "id"],
)
return tbl


@pytest.fixture()
def dataset_with_index(test_table, tmp_path):
dataset = lance.write_dataset(test_table, tmp_path)
dataset.create_scalar_index("meta", index_type="BTREE")
return dataset


def test_commit_index(dataset_with_index, test_table, tmp_path):
index_id = dataset_with_index.list_indices()[0]["uuid"]

# Create a new dataset without index
dataset_without_index = lance.write_dataset(
test_table, tmp_path / "dataset_without_index"
)

# Copy the index from dataset_with_index to dataset_without_index
src_index_dir = Path(dataset_with_index.uri) / "_indices" / index_id
dest_index_dir = Path(dataset_without_index.uri) / "_indices" / index_id
shutil.copytree(src_index_dir, dest_index_dir)

# Commit the index to dataset_without_index
field_idx = dataset_without_index.schema.get_field_index("meta")
create_index_op = lance.LanceOperation.CreateIndex(
index_id,
"meta_idx",
[field_idx],
dataset_without_index.version,
set([f.fragment_id for f in dataset_without_index.get_fragments()]),
)
dataset_without_index = lance.LanceDataset.commit(
dataset_without_index.uri,
create_index_op,
read_version=dataset_without_index.version,
)

# Verify that both datasets have the index
assert len(dataset_with_index.list_indices()) == 1
assert len(dataset_without_index.list_indices()) == 1

assert (
dataset_without_index.list_indices()[0] == dataset_with_index.list_indices()[0]
)

# Check if the index is used in scans
for dataset in [dataset_with_index, dataset_without_index]:
scanner = dataset.scanner(
fast_search=True, prefilter=True, filter="meta = 'hello'"
)
plan = scanner.explain_plan()
assert "MaterializeIndex" in plan
Comment on lines +80 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm kind of shocked it is if you don't pass down the fragment bitmap. 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only works because there is a fallback that scans the full index to recalculate this. It's much preferable to be able to pass down fragment_bitmap, as this scan could be slow for large indices.

27 changes: 27 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use lance_index::{
use lance_io::object_store::ObjectStoreParams;
use lance_linalg::distance::MetricType;
use lance_table::format::Fragment;
use lance_table::format::Index;
use lance_table::io::commit::CommitHandler;
use object_store::path::Path;
use pyo3::exceptions::{PyStopIteration, PyTypeError};
Expand Down Expand Up @@ -320,6 +321,32 @@ impl Operation {
};
Ok(Self(op))
}

#[staticmethod]
fn create_index(
uuid: String,
name: String,
fields: Vec<i32>,
dataset_version: u64,
fragment_ids: &PySet,
) -> PyResult<Self> {
let fragment_ids: Vec<u32> = fragment_ids
.iter()
.map(|item| item.extract::<u32>())
.collect::<PyResult<Vec<u32>>>()?;
let new_indices = vec![Index {
uuid: Uuid::parse_str(&uuid).map_err(|e| PyValueError::new_err(e.to_string()))?,
name,
fields,
dataset_version,
fragment_bitmap: Some(fragment_ids.into_iter().collect()),
}];
let op = LanceOperation::CreateIndex {
new_indices,
removed_indices: vec![],
};
Ok(Self(op))
}
}

/// Lance Dataset that will be wrapped by another class in Python
Expand Down