From dac80bd437986dc9ee95ec52a03b019668c6d301 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Thu, 14 Sep 2023 12:13:03 -0400 Subject: [PATCH 1/5] buildable --- rust/lance-arrow/Cargo.toml | 1 + rust/lance-arrow/src/bfloat16.rs | 2 + rust/lance-arrow/src/floats.rs | 61 +++++++++++ rust/lance-arrow/src/lib.rs | 2 + rust/lance-linalg/src/kmeans.rs | 18 ++-- rust/lance-linalg/src/matrix.rs | 101 +++++++++--------- .../lance/src/index/vector/diskann/builder.rs | 26 ++--- rust/lance/src/index/vector/graph/builder.rs | 7 +- rust/lance/src/index/vector/ivf.rs | 42 +++++--- rust/lance/src/index/vector/pq.rs | 16 +-- rust/lance/src/index/vector/traits.rs | 6 +- rust/lance/src/index/vector/utils.rs | 13 ++- 12 files changed, 195 insertions(+), 100 deletions(-) create mode 100644 rust/lance-arrow/src/floats.rs diff --git a/rust/lance-arrow/Cargo.toml b/rust/lance-arrow/Cargo.toml index f796e91e70..66999294df 100644 --- a/rust/lance-arrow/Cargo.toml +++ b/rust/lance-arrow/Cargo.toml @@ -18,3 +18,4 @@ arrow-schema = { workspace = true } serde_json = { workspace = true } serde = { workspace = true } half = { workspace = true } +num-traits = { workspace = true } diff --git a/rust/lance-arrow/src/bfloat16.rs b/rust/lance-arrow/src/bfloat16.rs index 6eca8b99f0..4acdf0449b 100644 --- a/rust/lance-arrow/src/bfloat16.rs +++ b/rust/lance-arrow/src/bfloat16.rs @@ -25,6 +25,8 @@ use arrow_data::ArrayData; use arrow_schema::{ArrowError, DataType}; use half::bf16; +pub struct BFloat16Type {} + pub struct BFloat16Array { inner: FixedSizeBinaryArray, } diff --git a/rust/lance-arrow/src/floats.rs b/rust/lance-arrow/src/floats.rs new file mode 100644 index 0000000000..30a9c5b2bb --- /dev/null +++ b/rust/lance-arrow/src/floats.rs @@ -0,0 +1,61 @@ +// Copyright 2023 Lance Developers. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Floats Array + +use arrow_array::{Array, Float16Array, Float32Array, Float64Array}; +use half::{bf16, f16}; +use num_traits::{Float, FromPrimitive}; + +use super::bfloat16::BFloat16Array; + +/// [FloatArray] is a trait that is implemented by all float type arrays. +pub trait FloatArray: Array + From> { + type Native: Float + FromPrimitive; + + /// Returns a reference to the underlying data as a slice. + fn as_slice(&self) -> &[Self::Native]; +} + +impl FloatArray for BFloat16Array { + type Native = bf16; + + fn as_slice(&self) -> &[Self::Native] { + todo!() + } +} + +impl FloatArray for Float16Array { + type Native = f16; + + fn as_slice(&self) -> &[Self::Native] { + self.values() + } +} + +impl FloatArray for Float32Array { + type Native = f32; + + fn as_slice(&self) -> &[Self::Native] { + self.values() + } +} + +impl FloatArray for Float64Array { + type Native = f64; + + fn as_slice(&self) -> &[Self::Native] { + self.values() + } +} diff --git a/rust/lance-arrow/src/lib.rs b/rust/lance-arrow/src/lib.rs index 1c7954ee9a..cd32218b35 100644 --- a/rust/lance-arrow/src/lib.rs +++ b/rust/lance-arrow/src/lib.rs @@ -28,6 +28,8 @@ use arrow_schema::{ArrowError, DataType, Field, FieldRef, Fields, Schema}; pub mod schema; pub use schema::*; pub mod bfloat16; +pub mod floats; +pub use floats::*; type Result = std::result::Result; diff --git a/rust/lance-linalg/src/kmeans.rs b/rust/lance-linalg/src/kmeans.rs index aa40de01d8..376b95a4c4 100644 --- a/rust/lance-linalg/src/kmeans.rs +++ b/rust/lance-linalg/src/kmeans.rs @@ -16,10 +16,10 @@ use std::cmp::min; use std::collections::HashSet; use std::sync::Arc; -use arrow_array::cast::AsArray; -use arrow_array::FixedSizeListArray; +use arrow_array::types::Float32Type; use arrow_array::{ - builder::Float32Builder, cast::as_primitive_array, new_empty_array, Array, Float32Array, + cast::{as_primitive_array, AsArray}, + new_empty_array, Array, FixedSizeListArray, Float32Array, }; use arrow_schema::{ArrowError, DataType}; use arrow_select::concat::concat; @@ -154,12 +154,12 @@ async fn kmeans_random_init( let chosen = (0..data.len() / dimension) .choose_multiple(&mut rng, k) .to_vec(); - let mut builder = Float32Builder::with_capacity(k * dimension); + let mut builder: Vec = Vec::with_capacity(k * dimension); for i in chosen { - builder.append_slice(&data.values()[i * dimension..(i + 1) * dimension]); + builder.extend(data.values()[i * dimension..(i + 1) * dimension].iter()); } let mut kmeans = KMeans::empty(k, dimension, metric_type); - kmeans.centroids = Arc::new(builder.finish()); + kmeans.centroids = Arc::new(builder.into()); Ok(kmeans) } @@ -246,7 +246,7 @@ impl KMeanMembership { } let centroids = concat(&mean_refs).unwrap(); Ok(KMeans { - centroids: Arc::new(as_primitive_array(centroids.as_ref()).clone()), + centroids: Arc::new(centroids.as_ref().as_primitive::().clone()), dimension, k: self.k, metric_type: self.metric_type, @@ -319,7 +319,7 @@ impl KMeans { /// - *metric_type*: the metric type to calculate distance. /// - *rng*: random generator. pub async fn init_random( - data: &MatrixView, + data: &MatrixView, k: usize, metric_type: MetricType, rng: impl Rng, @@ -431,7 +431,7 @@ impl KMeans { /// let kmeans = membership.to_kmeans(); /// } /// ``` - pub async fn train_once(&self, data: &MatrixView) -> KMeanMembership { + pub async fn train_once(&self, data: &MatrixView) -> KMeanMembership { self.compute_membership(data.data().clone()).await } diff --git a/rust/lance-linalg/src/matrix.rs b/rust/lance-linalg/src/matrix.rs index 09d2aadabf..9f6e9c75bc 100644 --- a/rust/lance-linalg/src/matrix.rs +++ b/rust/lance-linalg/src/matrix.rs @@ -17,18 +17,14 @@ use std::sync::Arc; -use arrow_array::{ - builder::Float32Builder, cast::AsArray, Array, FixedSizeListArray, Float32Array, -}; -use arrow_schema::DataType; +use lance_arrow::FloatArray; +use num_traits::{AsPrimitive, Float, FromPrimitive, ToPrimitive}; use rand::{distributions::Standard, rngs::SmallRng, seq::IteratorRandom, Rng, SeedableRng}; -use crate::{Error, Result}; - /// Transpose a matrix. -fn transpose(input: &[f32], dimension: usize) -> Vec { +fn transpose(input: &[T], dimension: usize) -> Vec { let n = input.len() / dimension; - let mut mat = vec![0_f32; input.len()]; + let mut mat = vec![T::zero(); input.len()]; for row in 0..dimension { for col in 0..n { mat[row * n + col] = input[col * dimension + row]; @@ -38,26 +34,32 @@ fn transpose(input: &[f32], dimension: usize) -> Vec { mat } -/// A 2-D matrix view on top of Arrow Arrays. +/// A 2-D dense matrix on top of Arrow Arrays. /// #[derive(Debug, Clone)] -pub struct MatrixView { +pub struct MatrixView +where + Standard: rand::distributions::Distribution<::Native>, +{ /// Underneath data array. - data: Arc, + data: Arc, - /// The number of + /// The number of columns in the matrix. num_columns: usize, /// Is this matrix transposed or not. pub transpose: bool, } -impl MatrixView { +impl MatrixView +where + Standard: rand::distributions::Distribution<::Native>, +{ /// Create a MatrixView from a f32 data. - pub fn new(data: Arc, num_columns: usize) -> Self { + pub fn new(data: Arc, num_columns: impl AsPrimitive) -> Self { Self { data, - num_columns, + num_columns: num_columns.as_(), transpose: false, } } @@ -65,10 +67,11 @@ impl MatrixView { /// Randomly initialize a matrix of shape `(num_rows, num_columns)`. pub fn random(num_rows: usize, num_columns: usize) -> Self { let mut rng = SmallRng::from_entropy(); - let data = Arc::new(Float32Array::from_iter_values( + let data = Arc::new(T::from( (&mut rng) .sample_iter(Standard) - .take(num_columns * num_rows), + .take(num_columns * num_rows) + .collect::>(), )); Self { data, @@ -79,14 +82,18 @@ impl MatrixView { /// Create a identity matrix, with number of rows `n`. pub fn identity(n: usize) -> Self { - let mut builder = Float32Builder::with_capacity(n * n); + let mut builder = Vec::::with_capacity(n * n); for i in 0..n { for j in 0..n { - builder.append_value(if i == j { 1.0 } else { 0.0 }); + builder.push(if i == j { + T::Native::from_f32(1.0).unwrap() + } else { + T::Native::from_f32(0.0).unwrap() + }); } } Self { - data: Arc::new(builder.finish()), + data: Arc::new(builder.into()), num_columns: n, transpose: false, } @@ -101,6 +108,7 @@ impl MatrixView { } } + /// Number of the columns (dimension) in the matrix. pub fn num_columns(&self) -> usize { if self.transpose { self.data.len() / self.num_columns @@ -109,9 +117,9 @@ impl MatrixView { } } - pub fn data(&self) -> Arc { + pub fn data(&self) -> Arc { if self.transpose { - Arc::new(transpose(self.data.values(), self.num_rows()).into()) + Arc::new(transpose(self.data.as_slice(), self.num_rows()).into()) } else { self.data.clone() } @@ -120,7 +128,7 @@ impl MatrixView { /// Returns a row at index `i`. Returns `None` if the index is out of bound. /// /// # Panics if the matrix is transposed. - pub fn row(&self, i: usize) -> Option<&[f32]> { + pub fn row(&self, i: usize) -> Option<&[T::Native]> { assert!( !self.transpose, "Centroid is not defined for transposed matrix." @@ -129,14 +137,14 @@ impl MatrixView { None } else { let dim = self.num_columns(); - Some(&self.data.values()[i * dim..(i + 1) * dim]) + Some(&self.data.as_slice()[i * dim..(i + 1) * dim]) } } /// Compute the centroid from all the rows. Returns `None` if this matrix is empty. /// /// # Panics if the matrix is transposed. - pub fn centroid(&self) -> Option { + pub fn centroid(&self) -> Option { assert!( !self.transpose, "Centroid is not defined for transposed matrix." @@ -149,13 +157,18 @@ impl MatrixView { // Add all rows with only one memory allocation. let mut sum = vec![0_f64; dim]; // TODO: can SIMD work better here? Is it memory-bandwidth bound?. - self.data.values().chunks(dim).for_each(|row| { + self.data.as_slice().chunks(dim).for_each(|row| { row.iter().enumerate().for_each(|(i, v)| { - sum[i] += *v as f64; + sum[i] += v.to_f64().unwrap(); }) }); let total = self.num_rows() as f64; - let arr = Float32Array::from_iter(sum.iter().map(|v| (v / total) as f32)); + let arr: T = sum + .iter() + .map(|v| T::Native::from_f64(v / total).unwrap()) + .collect::>() + .into(); + Some(arr) } @@ -239,16 +252,19 @@ impl MatrixView { "Does not support sampling on transposed matrix" ); if n > self.num_rows() { - return self.clone(); + return Self { + data: self.data.clone(), + num_columns: self.num_columns, + transpose: self.transpose, + }; } let chosen = (0..self.num_rows()).choose_multiple(&mut rng, n); let dim = self.num_columns(); - let mut builder = Float32Builder::with_capacity(n * dim); + let mut builder = Vec::with_capacity(n * dim); for idx in chosen.iter() { - let s = self.data.slice(idx * dim, dim); - builder.append_slice(s.values()); + builder.extend(self.data.as_slice()[idx * dim..(idx + 1) * dim].iter()); } - let data = Arc::new(builder.finish()); + let data = Arc::new(builder.into()); Self { data, num_columns: self.num_columns, @@ -257,25 +273,6 @@ impl MatrixView { } } -impl TryFrom<&FixedSizeListArray> for MatrixView { - type Error = Error; - - fn try_from(fsl: &FixedSizeListArray) -> Result { - if !matches!(fsl.value_type(), DataType::Float32) { - return Err(Error::ComputeError(format!( - "Only support convert f32 FixedSizeListArray to MatrixView, got {}", - fsl.data_type() - ))); - } - let values = fsl.values(); - Ok(Self { - data: Arc::new(values.as_primitive().clone()), - num_columns: fsl.value_length() as usize, - transpose: false, - }) - } -} - #[cfg(test)] mod tests { use std::collections::HashSet; diff --git a/rust/lance/src/index/vector/diskann/builder.rs b/rust/lance/src/index/vector/diskann/builder.rs index e91164d9f5..8b8e8bacf6 100644 --- a/rust/lance/src/index/vector/diskann/builder.rs +++ b/rust/lance/src/index/vector/diskann/builder.rs @@ -15,8 +15,7 @@ use std::collections::{BinaryHeap, HashSet}; use std::sync::Arc; -use arrow_array::UInt32Array; -use arrow_array::{cast::as_primitive_array, types::UInt64Type}; +use arrow_array::{cast::AsArray, types::UInt64Type, Float32Array, UInt32Array}; use arrow_select::concat::concat_batches; use futures::stream::{self, StreamExt, TryStreamExt}; use lance_arrow::*; @@ -136,16 +135,19 @@ async fn init_graph( let batches = stream.try_collect::>().await?; let batch = concat_batches(&batches[0].schema(), &batches)?; - let row_ids = as_primitive_array::(batch.column_by_qualified_name(ROW_ID).ok_or( - Error::Index { + let row_ids = batch + .column_by_qualified_name(ROW_ID) + .ok_or(Error::Index { message: "row_id not found".to_string(), - }, - )?); - let vectors = - as_fixed_size_list_array(batch.column_by_qualified_name(column).ok_or(Error::Index { + })? + .as_primitive::(); + let vectors = batch + .column_by_qualified_name(column) + .ok_or(Error::Index { message: format!("column {} not found", column), - })?); - let matrix: MatrixView = vectors.try_into()?; + })? + .as_fixed_size_list(); + let matrix: MatrixView = vectors.try_into()?; let nodes = row_ids .values() .iter() @@ -176,7 +178,7 @@ async fn init_graph( } /// Distance between two vectors in the matrix. -fn distance(matrix: &MatrixView, i: usize, j: usize) -> Result { +fn distance(matrix: &MatrixView, i: usize, j: usize) -> Result { let vector_i = matrix.row(i).ok_or(Error::Index { message: "Invalid row index".to_string(), })?; @@ -245,7 +247,7 @@ async fn robust_prune( } /// Find the index of the medoid vector in all vectors. -async fn find_medoid(vectors: &MatrixView, metric_type: MetricType) -> Result { +async fn find_medoid(vectors: &MatrixView, metric_type: MetricType) -> Result { let centroid = vectors.centroid().ok_or_else(|| Error::Index { message: "Cannot find the medoid of an empty matrix".to_string(), })?; diff --git a/rust/lance/src/index/vector/graph/builder.rs b/rust/lance/src/index/vector/graph/builder.rs index 2525c01347..a468b258a4 100644 --- a/rust/lance/src/index/vector/graph/builder.rs +++ b/rust/lance/src/index/vector/graph/builder.rs @@ -16,8 +16,9 @@ use std::sync::Arc; -use arrow_array::UInt32Array; +use arrow_array::{Float32Array, UInt32Array}; use async_trait::async_trait; +use lance_arrow::FloatArray; use lance_linalg::distance::{DistanceFunc, MetricType}; use super::{Graph, Vertex}; @@ -42,7 +43,7 @@ pub struct GraphBuilder { pub(crate) nodes: Vec>, /// Hold all vectors in memory for fast access at the moment. - pub(crate) data: MatrixView, + pub(crate) data: MatrixView, /// Metric type. metric_type: MetricType, @@ -52,7 +53,7 @@ pub struct GraphBuilder { } impl GraphBuilder { - pub fn new(vertices: &[V], data: MatrixView, metric_type: MetricType) -> Self { + pub fn new(vertices: &[V], data: MatrixView, metric_type: MetricType) -> Self { Self { nodes: vertices .iter() diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index ee32139835..6ef7648e69 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -393,7 +393,7 @@ impl Ivf { /// Returns a `RecordBatch` with schema `{__part_id: u32, __residual: FixedSizeList}` pub fn compute_partition_and_residual( &self, - data: &MatrixView, + data: &MatrixView, metric_type: MetricType, ) -> Result { let mut part_id_builder = UInt32Builder::with_capacity(data.num_rows()); @@ -402,7 +402,7 @@ impl Ivf { let dim = data.num_columns(); let dist_func = metric_type.batch_func(); - let centroids: MatrixView = self.centroids.as_ref().try_into()?; + let centroids: MatrixView = self.centroids.as_ref().try_into()?; for i in 0..data.num_rows() { let vector = data.row(i).unwrap(); let part_id = @@ -571,8 +571,8 @@ impl IvfBuildParams { /// - *centroids*: the centroids to compute residual vectors. /// - *metric_type*: the metric type to compute distance. fn compute_residual_matrix( - data: &MatrixView, - centroids: &MatrixView, + data: &MatrixView, + centroids: &MatrixView, metric_type: MetricType, ) -> Result> { assert_eq!(centroids.num_columns(), data.num_columns()); @@ -691,7 +691,11 @@ pub async fn build_ivf_pq_index( ) } else { // Compute the residual vector for training PQ - let ivf_centroids = ivf_model.centroids.as_ref().try_into()?; + let centroids = &ivf_model.centroids; + let ivf_centroids = MatrixView::new( + Arc::new(centroids.values().as_primitive::().clone()), + centroids.value_length() as usize, + ); let residual_data = compute_residual_matrix(&training_data, &ivf_centroids, metric_type)?; let pq_training_data = MatrixView::new(residual_data, training_data.num_columns()); @@ -718,7 +722,17 @@ pub async fn build_ivf_pq_index( let arr = batch.column_by_name(column).ok_or_else(|| Error::IO { message: format!("Dataset does not have column {column}"), })?; - let mut vectors: MatrixView = as_fixed_size_list_array(arr).try_into()?; + let fixed_size_list_arr = arr.as_fixed_size_list(); + + let mut vectors = MatrixView::new( + Arc::new( + fixed_size_list_arr + .values() + .as_primitive::() + .clone(), + ), + fixed_size_list_arr.value_length() as usize, + ); // Transform the vectors if pre-transforms are used. for transform in transform_ref.iter() { @@ -731,13 +745,15 @@ pub async fn build_ivf_pq_index( }) .await??; - let residual_col = part_id_and_residual + let residual_arr = part_id_and_residual .column_by_name(RESIDUAL_COLUMN) - .unwrap(); - let residual_data = as_fixed_size_list_array(&residual_col); - let pq_code = pq_ref - .transform(&residual_data.try_into()?, metric_type) - .await?; + .unwrap() + .as_fixed_size_list(); + let residual_data = MatrixView::new( + Arc::new(residual_arr.values().as_primitive::().clone()), + residual_arr.value_length() as usize, + ); + let pq_code = pq_ref.transform(&residual_data, metric_type).await?; let row_ids = batch .column_by_name(ROW_ID) @@ -852,7 +868,7 @@ async fn write_index_file( /// Train IVF partitions using kmeans. async fn train_ivf_model( - data: &MatrixView, + data: &MatrixView, metric_type: MetricType, params: &IvfBuildParams, ) -> Result { diff --git a/rust/lance/src/index/vector/pq.rs b/rust/lance/src/index/vector/pq.rs index 474f4a9223..34c34dfd1c 100644 --- a/rust/lance/src/index/vector/pq.rs +++ b/rust/lance/src/index/vector/pq.rs @@ -429,7 +429,11 @@ impl ProductQuantizer { /// /// Quantization distortion is the difference between the centroids /// from the PQ code to the actual vector. - pub async fn distortion(&self, data: &MatrixView, metric_type: MetricType) -> Result { + pub async fn distortion( + &self, + data: &MatrixView, + metric_type: MetricType, + ) -> Result { let sub_vectors = divide_to_subvectors(data, self.num_sub_vectors); debug_assert_eq!(sub_vectors.len(), self.num_sub_vectors); @@ -465,7 +469,7 @@ impl ProductQuantizer { /// Transform the vector array to PQ code array. pub async fn transform( &self, - data: &MatrixView, + data: &MatrixView, metric_type: MetricType, ) -> Result { let all_centroids = (0..self.num_sub_vectors) @@ -508,7 +512,7 @@ impl ProductQuantizer { /// Train [`ProductQuantizer`] using vectors. pub async fn train( &mut self, - data: &MatrixView, + data: &MatrixView, metric_type: MetricType, max_iters: usize, ) -> Result<()> { @@ -555,7 +559,7 @@ impl ProductQuantizer { /// Reset the centroids from the OPQ training. pub fn reset_centroids( &mut self, - data: &MatrixView, + data: &MatrixView, pq_code: &FixedSizeListArray, ) -> Result<()> { assert_eq!(data.num_rows(), pq_code.len()); @@ -634,7 +638,7 @@ impl From<&ProductQuantizer> for pb::Pq { /// /// For example, for a `[1024x1M]` matrix, when `n = 8`, this function divides /// the matrix into `[128x1M; 8]` vector of matrix. -fn divide_to_subvectors(data: &MatrixView, m: usize) -> Vec> { +fn divide_to_subvectors(data: &MatrixView, m: usize) -> Vec> { assert!(!data.num_rows() > 0); let sub_vector_length = data.num_columns() / m; @@ -723,7 +727,7 @@ impl PQBuildParams { /// Train product quantization over (OPQ-rotated) residual vectors. pub(crate) async fn train_pq( - data: &MatrixView, + data: &MatrixView, params: &PQBuildParams, ) -> Result { let mut pq = ProductQuantizer::new( diff --git a/rust/lance/src/index/vector/traits.rs b/rust/lance/src/index/vector/traits.rs index 47cdf04c40..3d06e270a2 100644 --- a/rust/lance/src/index/vector/traits.rs +++ b/rust/lance/src/index/vector/traits.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use arrow_array::RecordBatch; +use arrow_array::{Float32Array, RecordBatch}; use async_trait::async_trait; use lance_linalg::MatrixView; @@ -73,12 +73,12 @@ pub trait Transformer: std::fmt::Debug + Sync + Send { /// /// Parameters: /// - *data*: training vectors. - async fn train(&mut self, data: &MatrixView) -> Result<()>; + async fn train(&mut self, data: &MatrixView) -> Result<()>; /// Apply transform on the matrix `data`. /// /// Returns a new Matrix instead. - async fn transform(&self, data: &MatrixView) -> Result; + async fn transform(&self, data: &MatrixView) -> Result>; async fn save(&self, writer: &mut ObjectWriter) -> Result; } diff --git a/rust/lance/src/index/vector/utils.rs b/rust/lance/src/index/vector/utils.rs index 66fb0cbc46..426ddc42da 100644 --- a/rust/lance/src/index/vector/utils.rs +++ b/rust/lance/src/index/vector/utils.rs @@ -12,8 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use arrow_array::cast::AsArray; use std::sync::Arc; +use arrow_array::types::Float32Type; +use arrow_array::Float32Array; use arrow_schema::Schema as ArrowSchema; use arrow_select::concat::concat_batches; use futures::stream::TryStreamExt; @@ -29,7 +32,7 @@ pub async fn maybe_sample_training_data( dataset: &Dataset, column: &str, sample_size_hint: usize, -) -> Result { +) -> Result> { let num_rows = dataset.count_rows().await?; let projection = dataset.schema().project(&[column])?; let batch = if num_rows > sample_size_hint { @@ -52,5 +55,11 @@ pub async fn maybe_sample_training_data( ), })?; let fixed_size_array = as_fixed_size_list_array(array); - Ok(fixed_size_array.try_into()?) + let values = Arc::new( + fixed_size_array + .values() + .as_primitive::() + .clone(), + ); + Ok(MatrixView::new(values, fixed_size_array.value_length())) } From cefff34391aa42240a6fa2a3b5aea11deac11446 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Thu, 14 Sep 2023 14:25:20 -0400 Subject: [PATCH 2/5] at least it compiles --- rust/lance-linalg/src/distance.rs | 2 +- rust/lance-linalg/src/matrix.rs | 3 ++- rust/lance/src/index/vector/diskann/builder.rs | 6 +++++- rust/lance/src/index/vector/graph/builder.rs | 3 +-- rust/lance/src/index/vector/ivf.rs | 4 +++- rust/lance/src/index/vector/utils.rs | 4 +--- 6 files changed, 13 insertions(+), 9 deletions(-) diff --git a/rust/lance-linalg/src/distance.rs b/rust/lance-linalg/src/distance.rs index 672001da5d..79df808670 100644 --- a/rust/lance-linalg/src/distance.rs +++ b/rust/lance-linalg/src/distance.rs @@ -96,7 +96,7 @@ impl TryFrom<&str> for DistanceType { "dot" => Ok(Self::Dot), _ => Err(ArrowError::InvalidArgumentError( format!("Metric type '{s}' is not supported,") - + "only 'l2'/'euclidean', 'cosine', and 'dot' are supported.", + + "only 'l2'/'euclidean, 'cosine', and 'dot' are supported.", )), } } diff --git a/rust/lance-linalg/src/matrix.rs b/rust/lance-linalg/src/matrix.rs index 9f6e9c75bc..9d4eb82446 100644 --- a/rust/lance-linalg/src/matrix.rs +++ b/rust/lance-linalg/src/matrix.rs @@ -17,10 +17,11 @@ use std::sync::Arc; -use lance_arrow::FloatArray; use num_traits::{AsPrimitive, Float, FromPrimitive, ToPrimitive}; use rand::{distributions::Standard, rngs::SmallRng, seq::IteratorRandom, Rng, SeedableRng}; +use lance_arrow::FloatArray; + /// Transpose a matrix. fn transpose(input: &[T], dimension: usize) -> Vec { let n = input.len() / dimension; diff --git a/rust/lance/src/index/vector/diskann/builder.rs b/rust/lance/src/index/vector/diskann/builder.rs index 8b8e8bacf6..4ef3c1f9ef 100644 --- a/rust/lance/src/index/vector/diskann/builder.rs +++ b/rust/lance/src/index/vector/diskann/builder.rs @@ -15,6 +15,7 @@ use std::collections::{BinaryHeap, HashSet}; use std::sync::Arc; +use arrow_array::types::Float32Type; use arrow_array::{cast::AsArray, types::UInt64Type, Float32Array, UInt32Array}; use arrow_select::concat::concat_batches; use futures::stream::{self, StreamExt, TryStreamExt}; @@ -147,7 +148,10 @@ async fn init_graph( message: format!("column {} not found", column), })? .as_fixed_size_list(); - let matrix: MatrixView = vectors.try_into()?; + let matrix = MatrixView::::new( + Arc::new(vectors.values().as_primitive::().clone()), + vectors.value_length(), + ); let nodes = row_ids .values() .iter() diff --git a/rust/lance/src/index/vector/graph/builder.rs b/rust/lance/src/index/vector/graph/builder.rs index a468b258a4..0e992da45d 100644 --- a/rust/lance/src/index/vector/graph/builder.rs +++ b/rust/lance/src/index/vector/graph/builder.rs @@ -18,12 +18,11 @@ use std::sync::Arc; use arrow_array::{Float32Array, UInt32Array}; use async_trait::async_trait; -use lance_arrow::FloatArray; use lance_linalg::distance::{DistanceFunc, MetricType}; +use lance_linalg::matrix::MatrixView; use super::{Graph, Vertex}; use crate::{Error, Result}; -use lance_linalg::matrix::MatrixView; /// A graph node to hold the vertex data and its neighbors. #[derive(Debug)] diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 6ef7648e69..8884c6de45 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -402,7 +402,9 @@ impl Ivf { let dim = data.num_columns(); let dist_func = metric_type.batch_func(); - let centroids: MatrixView = self.centroids.as_ref().try_into()?; + let centroids_arr = self.centroids.values().as_primitive::(); + let centroids: MatrixView = + MatrixView::new(Arc::new(centroids_arr.clone()), dim); for i in 0..data.num_rows() { let vector = data.row(i).unwrap(); let part_id = diff --git a/rust/lance/src/index/vector/utils.rs b/rust/lance/src/index/vector/utils.rs index 426ddc42da..df434b95a7 100644 --- a/rust/lance/src/index/vector/utils.rs +++ b/rust/lance/src/index/vector/utils.rs @@ -12,11 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use arrow_array::cast::AsArray; use std::sync::Arc; -use arrow_array::types::Float32Type; -use arrow_array::Float32Array; +use arrow_array::{cast::AsArray, types::Float32Type, Float32Array}; use arrow_schema::Schema as ArrowSchema; use arrow_select::concat::concat_batches; use futures::stream::TryStreamExt; From cde408111e2a9ccf70490fcd5419a08161b0c420 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Thu, 14 Sep 2023 14:36:07 -0400 Subject: [PATCH 3/5] iter --- rust/lance-linalg/src/matrix.rs | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/rust/lance-linalg/src/matrix.rs b/rust/lance-linalg/src/matrix.rs index 9d4eb82446..69bacd0022 100644 --- a/rust/lance-linalg/src/matrix.rs +++ b/rust/lance-linalg/src/matrix.rs @@ -272,6 +272,35 @@ where transpose: false, } } + + pub fn iter(&self) -> MatrixRowIter { + MatrixRowIter { + data: self, + cur_idx: 0, + } + } +} + +/// Iterator over the matrix one row at a time. +pub struct MatrixRowIter<'a, T: FloatArray> +where + Standard: rand::distributions::Distribution<::Native>, +{ + data: &'a MatrixView, + cur_idx: usize, +} + +impl<'a, T: FloatArray> Iterator for MatrixRowIter<'a, T> +where + Standard: rand::distributions::Distribution<::Native>, +{ + type Item = &'a [T::Native]; + + fn next(&mut self) -> Option { + let cur_idx = self.cur_idx; + self.cur_idx += 1; + self.data.row(cur_idx) + } } #[cfg(test)] From 10141336dbd43cd23d4d11a28ad9901e71de35b0 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Thu, 14 Sep 2023 15:08:03 -0400 Subject: [PATCH 4/5] fix lint --- rust/lance-linalg/src/distance.rs | 7 +++---- rust/lance-linalg/src/matrix.rs | 6 ++++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/rust/lance-linalg/src/distance.rs b/rust/lance-linalg/src/distance.rs index 79df808670..2ebc4e8edc 100644 --- a/rust/lance-linalg/src/distance.rs +++ b/rust/lance-linalg/src/distance.rs @@ -94,10 +94,9 @@ impl TryFrom<&str> for DistanceType { "l2" | "euclidean" => Ok(Self::L2), "cosine" => Ok(Self::Cosine), "dot" => Ok(Self::Dot), - _ => Err(ArrowError::InvalidArgumentError( - format!("Metric type '{s}' is not supported,") - + "only 'l2'/'euclidean, 'cosine', and 'dot' are supported.", - )), + _ => Err(ArrowError::InvalidArgumentError(format!( + "Metric type '{s}' is not supported" + ))), } } } diff --git a/rust/lance-linalg/src/matrix.rs b/rust/lance-linalg/src/matrix.rs index 69bacd0022..6b778f5c90 100644 --- a/rust/lance-linalg/src/matrix.rs +++ b/rust/lance-linalg/src/matrix.rs @@ -307,6 +307,8 @@ where mod tests { use std::collections::HashSet; + use arrow_array::Float32Array; + #[cfg(feature = "opq")] use approx::assert_relative_eq; @@ -337,13 +339,13 @@ mod tests { fn test_dot_on_transposed_mat() { // A[2,3] let a_data = Arc::new(Float32Array::from_iter((1..=6).map(|v| v as f32))); - let a = MatrixView::new(a_data, 3); + let a = MatrixView::::new(a_data, 3); // B[3,2] let b_data = Arc::new(Float32Array::from_iter_values([ 2.0, 3.0, 6.0, 7.0, 10.0, 11.0, ])); - let b = MatrixView::new(b_data, 2); + let b = MatrixView::::new(b_data, 2); let c_t = b.transpose().dot(&a.transpose()).unwrap(); let expected = vec![44.0, 98.0, 50.0, 113.0]; From ce7430d92d60e9e8fa6e2dac3164ea9daf3d0a7e Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Fri, 15 Sep 2023 09:53:44 -0400 Subject: [PATCH 5/5] add todo --- rust/lance-arrow/src/floats.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/lance-arrow/src/floats.rs b/rust/lance-arrow/src/floats.rs index 30a9c5b2bb..0721ecf847 100644 --- a/rust/lance-arrow/src/floats.rs +++ b/rust/lance-arrow/src/floats.rs @@ -32,6 +32,7 @@ impl FloatArray for BFloat16Array { type Native = bf16; fn as_slice(&self) -> &[Self::Native] { + // TODO: apache/arrow-rs#4820 todo!() } }