Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

143 changes: 105 additions & 38 deletions python/python/ci_benchmarks/benchmarks/test_ivf_pq_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
"""Benchmarks for IVF_PQ vector search performance."""

import math
import multiprocessing as mp
import tempfile
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path

import lance
Expand Down Expand Up @@ -35,8 +37,8 @@
K_LABELS = ["k10", "k100"]


# Global cache for datasets, keyed by (num_rows, dim)
_DATASET_CACHE = {}
# Datasets are stored in fixed temporary directories and reused between runs
# to avoid retraining indexes


def _generate_vector_dataset(num_rows: int, dim: int = 1024):
Expand Down Expand Up @@ -73,46 +75,57 @@ def _generate_vector_dataset(num_rows: int, dim: int = 1024):
def _get_or_create_dataset(num_rows: int, dim: int = 1024) -> str:
"""Get or create a dataset with the specified parameters.

Datasets are cached globally per process to avoid expensive recreation.
Uses a fixed temporary directory so datasets persist between benchmark runs.
If the dataset exists and has the correct number of rows, it will be reused.
Returns the URI to the dataset.
"""
cache_key = (num_rows, dim)

if cache_key not in _DATASET_CACHE:
# Create a persistent temporary directory for this dataset
tmpdir = tempfile.mkdtemp(prefix=f"lance_bench_{num_rows}_{dim}_")
dataset_uri = str(Path(tmpdir) / "vector_dataset.lance")

# Create schema
schema = pa.schema(
[
pa.field("vector", pa.list_(pa.float32(), dim)),
pa.field("id", pa.int64()),
]
)

# Generate and write dataset
data = _generate_vector_dataset(num_rows, dim)
ds = lance.write_dataset(
data,
dataset_uri,
schema=schema,
mode="create",
)
# Use a fixed directory path based on parameters
tmpdir = Path(tempfile.gettempdir()) / f"lance_bench_{num_rows}_{dim}"
tmpdir.mkdir(exist_ok=True)
dataset_uri = "file://" + str(tmpdir / "vector_dataset.lance")

# Check if dataset already exists and has correct row count
try:
ds = lance.dataset(dataset_uri)
if ds.count_rows() == num_rows:
print(f"Reusing existing dataset at {dataset_uri}")
return dataset_uri
else:
print(
"Dataset exists but has wrong row count "
f"({ds.count_rows()} vs {num_rows}), recreating..."
)
except Exception:
print(f"Creating new dataset at {dataset_uri}")

# Create schema
schema = pa.schema(
[
pa.field("vector", pa.list_(pa.float32(), dim)),
pa.field("id", pa.int64()),
]
)

num_partitions = min(num_rows // 4000, int(math.sqrt(num_rows)))
# Generate and write dataset
data = _generate_vector_dataset(num_rows, dim)
ds = lance.write_dataset(
data,
dataset_uri,
schema=schema,
mode="overwrite", # Use overwrite to handle recreation
)

# Create IVF_PQ index
ds.create_index(
"vector",
index_type="IVF_PQ",
num_partitions=num_partitions,
num_sub_vectors=dim // 16,
)
num_partitions = min(num_rows // 4000, int(math.sqrt(num_rows)))

_DATASET_CACHE[cache_key] = dataset_uri
# Create IVF_PQ index
ds.create_index(
"vector",
index_type="IVF_PQ",
num_partitions=num_partitions,
num_sub_vectors=dim // 16,
)

return _DATASET_CACHE[cache_key]
return dataset_uri


@pytest.mark.parametrize("num_rows", DATASET_SIZES, ids=DATASET_SIZE_LABELS)
Expand All @@ -139,7 +152,7 @@ def test_ivf_pq_search(

Uses 1024-dimensional float32 vectors with IVF_PQ index.
"""
# Get or create the dataset (cached globally per process)
# Get or create the dataset (reused from fixed temp directory between runs)
dataset_uri = _get_or_create_dataset(num_rows, dim=VECTOR_DIM)
ds = lance.dataset(dataset_uri)

Expand Down Expand Up @@ -204,7 +217,7 @@ def test_ivf_pq_search_with_payload(
Similar to test_ivf_pq_search but includes retrieving vector data
along with results, which tests data loading performance.
"""
# Get or create the dataset (cached globally per process)
# Get or create the dataset (reused from fixed temp directory between runs)
dataset_uri = _get_or_create_dataset(num_rows, dim=VECTOR_DIM)
ds = lance.dataset(dataset_uri)

Expand Down Expand Up @@ -248,3 +261,57 @@ def bench():
iterations=1,
setup=setup,
)


@pytest.mark.parametrize("use_cache", [True, False], ids=["cache", "no_cache"])
def test_ivf_pq_throughput(
benchmark,
use_cache: bool,
):
"""Benchmark IVF_PQ vector search throughput (with payload)"""
# Get or create the dataset (reused from fixed temp directory between runs)
dataset_uri = _get_or_create_dataset(1_000_000, dim=768)
ds = lance.dataset(dataset_uri)

NUM_QUERIES = 1000

# Generate query vectors
query_vectors = [
np.random.randn(768).astype(np.float32) for _ in range(NUM_QUERIES)
]

def clear_cache():
if not use_cache:
wipe_os_cache(dataset_uri)

def bench():
with ThreadPoolExecutor(max_workers=2 * (mp.cpu_count() - 2)) as executor:
futures = [
executor.submit(
ds.to_table,
nearest={
"column": "vector",
"q": query_vector,
"k": 50,
"nprobes": 20,
"refine_factor": 10,
},
columns=["vector", "_distance"],
)
for query_vector in query_vectors
]
for future in futures:
future.result()

if use_cache:
setup = None
else:
setup = clear_cache

benchmark.pedantic(
bench,
warmup_rounds=1,
rounds=1,
iterations=1,
setup=setup,
)
2 changes: 0 additions & 2 deletions python/python/ci_benchmarks/datagen/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def _create(dataset_uri: str):
dataset_uri,
schema=SCHEMA,
mode="append",
use_legacy_format=False,
)
else:
raise Exception(
Expand All @@ -72,7 +71,6 @@ def _create(dataset_uri: str):
dataset_uri,
schema=SCHEMA,
mode="create",
use_legacy_format=False,
)
if ds.list_indices() == []:
ds.create_scalar_index("row_number", "BTREE")
Expand Down
5 changes: 5 additions & 0 deletions rust/lance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ lzma-sys = { version = "0.1" }
lance-test-macros = { workspace = true }
lance-datagen = { workspace = true }
pretty_assertions = { workspace = true }
libc = { workspace = true }
clap = { workspace = true, features = ["derive"] }
criterion = { workspace = true }
approx.workspace = true
Expand Down Expand Up @@ -165,5 +166,9 @@ harness = false
name = "random_access"
harness = false

[[bench]]
name = "vector_throughput"
harness = false

[lints]
workspace = true
Loading
Loading