Skip to content

Commit

Permalink
feat: standalone vector transform stage (#2566)
Browse files Browse the repository at this point in the history
This makes the vector transform stage a standalone operation.
  • Loading branch information
westonpace authored Jul 26, 2024
1 parent 9535186 commit a23ba62
Show file tree
Hide file tree
Showing 7 changed files with 270 additions and 58 deletions.
32 changes: 31 additions & 1 deletion python/python/benchmarks/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
from pathlib import Path

import lance
import numpy as np
import pyarrow as pa
import pyarrow.compute as pc
import pytest
from lance.indices import IndicesBuilder
from lance.indices import IndicesBuilder, IvfModel, PqModel

N_DIMS = 512

Expand Down Expand Up @@ -127,3 +128,32 @@ def test_partition_assignment(test_large_dataset, benchmark, num_partitions):
benchmark.pedantic(
builder.assign_ivf_partitions, args=[ivf, None, "cuda"], iterations=1, rounds=1
)


def rand_ivf(rand_dataset):
dtype = rand_dataset.schema.field("vector").type.value_type.to_pandas_dtype()
centroids = np.random.rand(N_DIMS * 35000).astype(dtype)
centroids = pa.FixedSizeListArray.from_arrays(centroids, N_DIMS)
return IvfModel(centroids, "l2")


def rand_pq(rand_dataset, rand_ivf):
dtype = rand_dataset.schema.field("vector").type.value_type.to_pandas_dtype()
codebook = np.random.rand(N_DIMS * 256).astype(dtype)
codebook = pa.FixedSizeListArray.from_arrays(codebook, N_DIMS)
pq = PqModel(512 // 16, codebook)
return pq


@pytest.mark.benchmark(group="transform_vectors")
def test_transform_vectors(test_dataset, tmpdir_factory, benchmark):
ivf = rand_ivf(test_dataset)
pq = rand_pq(test_dataset, ivf)
builder = IndicesBuilder(test_dataset)
dst_uri = str(tmpdir_factory.mktemp("transformed") / "output.lance")
benchmark.pedantic(
builder.transform_vectors,
args=["vector", ivf, pq, dst_uri],
iterations=1,
rounds=1,
)
50 changes: 47 additions & 3 deletions python/python/lance/indices.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ def load(cls, uri: str):
return cls(centroids, distance_type)


# Some transforms hardcode their output column names
PARTITION_COLUMN = "__ivf_part_id"
RESIDUAL_COLUMN = "__residual_vector"
PQ_COLUMN = "__pq_code"


class IndicesBuilder:
"""
A class with helper functions for building indices on a dataset.
Expand Down Expand Up @@ -197,9 +203,6 @@ def train_ivf(
Parameters
----------
column: str
The vector column to partition, must be a fixed size list of floats
or 1-dimensional fixed-shape tensor column.
num_partitions: int
The number of partitions to train. Large values are more expensive to
train and can lead to longer search times. Smaller values could lead to
Expand Down Expand Up @@ -371,6 +374,47 @@ def assign_ivf_partitions(
self.dataset, self.column[0], kmeans, dst_dataset_uri=output_uri
)

def transform_vectors(
self,
ivf: IvfModel,
pq: PqModel,
dest_uri: str,
partition_ds_uri: Optional[str] = None,
):
"""
Apply transformations to the vectors in the dataset and create an unsorted
storage file. The unsorted storage file is a lance file that will at least
have a row id column. Normally it will have other columns containing the
transform outputs (such as partition id and PQ code)
Parameters
----------
ivf: IvfModel
The IVF model to use for the transformations (e.g. partition assignment)
pq: PqModel
The PQ model to use for the transformations (e.g. quantization)
dest_uri: str
The URI to save the transformed vectors to. The URI can be a local file
path or a cloud storage path.
partition_ds_uri: str
The URI of a precomputed partitions dataset. This allows the partition
transform to be skipped, using the precomputed value instead. This is
optional.
"""
dimension = self.dataset.schema.field(self.column[0]).type.list_size
num_subvectors = pq.num_subvectors
distance_type = ivf.distance_type
indices.transform_vectors(
self.dataset._ds,
self.column[0],
dimension,
num_subvectors,
distance_type,
ivf.centroids,
pq.codebook,
dest_uri,
)

def _determine_num_partitions(self, num_partitions: Optional[int], num_rows: int):
if num_partitions is None:
return round(math.sqrt(num_rows))
Expand Down
10 changes: 10 additions & 0 deletions python/python/lance/lance/indices/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,13 @@ def train_pq_model(
max_iters: int,
ivf_model: pa.Array,
) -> pa.Array: ...
def transform_vectors(
dataset,
column: str,
dimension: int,
num_subvectors: int,
distance_type: str,
ivf_centroids: pa.Array,
pq_codebook: pa.Array,
dst_uri: str,
): ...
117 changes: 66 additions & 51 deletions python/python/tests/test_indices.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,31 @@
import numpy as np
import pyarrow as pa
import pytest
from lance.file import LanceFileReader
from lance.indices import IndicesBuilder, IvfModel, PqModel

NUM_ROWS = 10000
DIMENSION = 128
NUM_SUBVECTORS = 8
NUM_PARTITIONS = 100

def gen_dataset(tmpdir, datatype=np.float32):
vectors = np.random.randn(10000, 128).astype(datatype)

@pytest.fixture(params=[np.float16, np.float32, np.float64], ids=["f16", "f32", "f64"])
def rand_dataset(tmpdir, request):
vectors = np.random.randn(NUM_ROWS, DIMENSION).astype(request.param)
vectors.shape = -1
vectors = pa.FixedSizeListArray.from_arrays(vectors, 128)
vectors = pa.FixedSizeListArray.from_arrays(vectors, DIMENSION)
table = pa.Table.from_arrays([vectors], names=["vectors"])
ds = lance.write_dataset(table, str(tmpdir / "dataset"))

return ds


def test_ivf_centroids(tmpdir):
ds = gen_dataset(tmpdir)

ivf = IndicesBuilder(ds, "vectors").train_ivf(sample_rate=16)
def test_ivf_centroids(tmpdir, rand_dataset):
ivf = IndicesBuilder(rand_dataset, "vectors").train_ivf(sample_rate=16)

assert ivf.distance_type == "l2"
assert len(ivf.centroids) == 100
assert len(ivf.centroids) == NUM_PARTITIONS

ivf.save(str(tmpdir / "ivf"))
reloaded = IvfModel.load(str(tmpdir / "ivf"))
Expand All @@ -32,33 +37,18 @@ def test_ivf_centroids(tmpdir):


@pytest.mark.cuda
def test_ivf_centroids_cuda(tmpdir):
ds = gen_dataset(tmpdir)
ivf = IndicesBuilder(ds, "vectors").train_ivf(sample_rate=16, accelerator="cuda")
def test_ivf_centroids_cuda(rand_dataset):
ivf = IndicesBuilder(rand_dataset, "vectors").train_ivf(
sample_rate=16, accelerator="cuda"
)

assert ivf.distance_type == "l2"
assert len(ivf.centroids) == 100


def test_ivf_centroids_column_type(tmpdir):
def check(column_type, typename):
ds = gen_dataset(tmpdir / typename, column_type)
ivf = IndicesBuilder(ds, "vectors").train_ivf(sample_rate=16)
assert len(ivf.centroids) == 100
ivf.save(str(tmpdir / f"ivf_{typename}"))
reloaded = IvfModel.load(str(tmpdir / f"ivf_{typename}"))
assert ivf.centroids == reloaded.centroids

check(np.float16, "f16")
check(np.float32, "f32")
check(np.float64, "f64")
assert len(ivf.centroids) == NUM_PARTITIONS


def test_ivf_centroids_distance_type(tmpdir):
ds = gen_dataset(tmpdir)

def test_ivf_centroids_distance_type(tmpdir, rand_dataset):
def check(distance_type):
ivf = IndicesBuilder(ds, "vectors").train_ivf(
ivf = IndicesBuilder(rand_dataset, "vectors").train_ivf(
sample_rate=16, distance_type=distance_type
)
assert ivf.distance_type == distance_type
Expand All @@ -71,26 +61,25 @@ def check(distance_type):
check("dot")


def test_num_partitions(tmpdir):
ds = gen_dataset(tmpdir)

ivf = IndicesBuilder(ds, "vectors").train_ivf(sample_rate=16, num_partitions=10)
def test_num_partitions(rand_dataset):
ivf = IndicesBuilder(rand_dataset, "vectors").train_ivf(
sample_rate=16, num_partitions=10
)
assert ivf.num_partitions == 10


@pytest.fixture
def ds_with_ivf(tmpdir):
ds = gen_dataset(tmpdir)
ivf = IndicesBuilder(ds, "vectors").train_ivf(sample_rate=16)
return ds, ivf
def rand_ivf(rand_dataset):
dtype = rand_dataset.schema.field("vectors").type.value_type.to_pandas_dtype()
centroids = np.random.rand(DIMENSION * 100).astype(dtype)
centroids = pa.FixedSizeListArray.from_arrays(centroids, DIMENSION)
return IvfModel(centroids, "l2")


def test_gen_pq(tmpdir, ds_with_ivf):
ds, ivf = ds_with_ivf

pq = IndicesBuilder(ds, "vectors").train_pq(ivf, sample_rate=16)
assert pq.dimension == 128
assert pq.num_subvectors == 8
def test_gen_pq(tmpdir, rand_dataset, rand_ivf):
pq = IndicesBuilder(rand_dataset, "vectors").train_pq(rand_ivf, sample_rate=2)
assert pq.dimension == DIMENSION
assert pq.num_subvectors == NUM_SUBVECTORS

pq.save(str(tmpdir / "pq"))
reloaded = PqModel.load(str(tmpdir / "pq"))
Expand All @@ -99,12 +88,10 @@ def test_gen_pq(tmpdir, ds_with_ivf):


@pytest.mark.cuda
def test_assign_partitions(tmpdir):
ds = gen_dataset(tmpdir)
builder = IndicesBuilder(ds, "vectors")
def test_assign_partitions(rand_dataset, rand_ivf):
builder = IndicesBuilder(rand_dataset, "vectors")

ivf = builder.train_ivf(sample_rate=16, num_partitions=20)
partitions_uri = builder.assign_ivf_partitions(ivf, accelerator="cuda")
partitions_uri = builder.assign_ivf_partitions(rand_ivf, accelerator="cuda")

partitions = lance.dataset(partitions_uri)
found_row_ids = set()
Expand All @@ -114,5 +101,33 @@ def test_assign_partitions(tmpdir):
found_row_ids.add(row_id)
part_ids = batch["partition"]
for part_id in part_ids:
assert part_id.as_py() < 20
assert len(found_row_ids) == ds.count_rows()
assert part_id.as_py() < 100
assert len(found_row_ids) == rand_dataset.count_rows()


@pytest.fixture
def rand_pq(rand_dataset, rand_ivf):
dtype = rand_dataset.schema.field("vectors").type.value_type.to_pandas_dtype()
codebook = np.random.rand(DIMENSION * 256).astype(dtype)
codebook = pa.FixedSizeListArray.from_arrays(codebook, DIMENSION)
pq = PqModel(NUM_SUBVECTORS, codebook)
return pq


def test_vector_transform(tmpdir, rand_dataset, rand_ivf, rand_pq):
builder = IndicesBuilder(rand_dataset, "vectors")
builder.transform_vectors(rand_ivf, rand_pq, str(tmpdir / "transformed"))

reader = LanceFileReader(str(tmpdir / "transformed"))

assert reader.metadata().num_rows == 10000
data = next(reader.read_all(batch_size=10000).to_batches())

row_id = data.column("_rowid")
assert row_id.type == pa.uint64()

pq_code = data.column("__pq_code")
assert pq_code.type == pa.list_(pa.uint8(), 8)

part_id = data.column("__ivf_part_id")
assert part_id.type == pa.uint32()
71 changes: 70 additions & 1 deletion python/src/indices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
use arrow::pyarrow::{PyArrowType, ToPyArrow};
use arrow_array::{Array, FixedSizeListArray};
use arrow_data::ArrayData;
use lance::{index::vector::ivf::builder::write_vector_storage, io::ObjectStore};
use lance_index::vector::{
ivf::{storage::IvfModel, IvfBuildParams},
pq::PQBuildParams,
pq::{PQBuildParams, ProductQuantizer},
};
use lance_linalg::distance::DistanceType;
use pyo3::{pyfunction, types::PyModule, wrap_pyfunction, PyObject, PyResult, Python};
Expand Down Expand Up @@ -139,10 +140,78 @@ fn train_pq_model(
codebook.to_pyarrow(py)
}

async fn do_transform_vectors(
dataset: &Dataset,
column: &str,
distance_type: DistanceType,
ivf_centroids: FixedSizeListArray,
pq_model: ProductQuantizer,
dst_uri: &str,
) -> PyResult<()> {
let num_rows = dataset.ds.count_rows(None).await.infer_error()?;
let transform_input = dataset
.ds
.scan()
.project(&[column])
.infer_error()?
.with_row_id()
.batch_size(8192)
.try_into_stream()
.await
.infer_error()?;

let (obj_store, path) = ObjectStore::from_uri(dst_uri).await.infer_error()?;
let writer = obj_store.create(&path).await.infer_error()?;
write_vector_storage(
transform_input,
num_rows as u64,
ivf_centroids,
pq_model,
distance_type,
column,
writer,
)
.await
.infer_error()?;
Ok(())
}

#[pyfunction]
#[allow(clippy::too_many_arguments)]
pub fn transform_vectors(
py: Python<'_>,
dataset: &Dataset,
column: &str,
dimension: usize,
num_subvectors: u32,
distance_type: &str,
ivf_centroids: PyArrowType<ArrayData>,
pq_codebook: PyArrowType<ArrayData>,
dst_uri: &str,
) -> PyResult<()> {
let ivf_centroids = ivf_centroids.0;
let ivf_centroids = FixedSizeListArray::from(ivf_centroids);
let codebook = pq_codebook.0;
let codebook = FixedSizeListArray::from(codebook);
let distance_type = DistanceType::try_from(distance_type).unwrap();
let pq = ProductQuantizer::new(
num_subvectors as usize,
/*num_bits=*/ 8,
dimension,
codebook,
distance_type,
);
RT.block_on(
Some(py),
do_transform_vectors(dataset, column, distance_type, ivf_centroids, pq, dst_uri),
)?
}

pub fn register_indices(py: Python, m: &PyModule) -> PyResult<()> {
let indices = PyModule::new(py, "indices")?;
indices.add_wrapped(wrap_pyfunction!(train_ivf_model))?;
indices.add_wrapped(wrap_pyfunction!(train_pq_model))?;
indices.add_wrapped(wrap_pyfunction!(transform_vectors))?;
m.add_submodule(indices)?;
Ok(())
}
Loading

0 comments on commit a23ba62

Please sign in to comment.