diff --git a/rust/lance-encoding/Cargo.toml b/rust/lance-encoding/Cargo.toml index 668b95fc4ee..5a95137b2bf 100644 --- a/rust/lance-encoding/Cargo.toml +++ b/rust/lance-encoding/Cargo.toml @@ -24,6 +24,7 @@ arrow-select.workspace = true bytes.workspace = true futures.workspace = true fsst.workspace = true +hex = "0.4.3" log.workspace = true num-traits.workspace = true prost.workspace = true @@ -52,3 +53,7 @@ pprof = { workspace = true } [[bench]] name = "decoder" harness = false + +[[bench]] +name = "buffer" +harness = false diff --git a/rust/lance-encoding/benches/buffer.rs b/rust/lance-encoding/benches/buffer.rs new file mode 100644 index 00000000000..0cecdebf537 --- /dev/null +++ b/rust/lance-encoding/benches/buffer.rs @@ -0,0 +1,68 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput}; +use lance_encoding::buffer::LanceBuffer; + +const NUM_VALUES: &[usize] = &[1024 * 1024, 32 * 1024, 8 * 1024]; + +fn bench_zip(c: &mut Criterion) { + for num_values in NUM_VALUES { + let num_values = *num_values; + let mut group = c.benchmark_group(&format!("zip_{}Ki", num_values / 1024)); + + group.throughput(Throughput::Bytes((num_values * 6) as u64)); + + group.bench_function("2_4_zip_into_6", move |b| { + // Zip together a 2-byte-per-buffer and an 8-byte-per-buffer array, each with 1Mi items + let random_shorts: Vec = + (0..num_values * 2).map(|_| rand::random::()).collect(); + let random_ints: Vec = (0..num_values * 4).map(|_| rand::random::()).collect(); + let mut buffers = vec![ + (LanceBuffer::Owned(random_shorts), 16), + (LanceBuffer::Owned(random_ints), 32), + ]; + let buffers = &mut buffers; + + b.iter(move || { + let buffers = buffers + .iter_mut() + .map(|(buf, bits_per_value)| (buf.borrow_and_clone(), *bits_per_value)) + .collect::>(); + black_box(LanceBuffer::zip_into_one(buffers, num_values as u64).unwrap()); + }) + }); + + group.bench_function("2_2_2_zip_into_6", move |b| { + // Zip together a 2-byte-per-buffer and an 8-byte-per-buffer array, each with 1Mi items + let random_shorts1: Vec = + (0..num_values * 2).map(|_| rand::random::()).collect(); + let random_shorts2: Vec = + (0..num_values * 2).map(|_| rand::random::()).collect(); + let random_shorts3: Vec = + (0..num_values * 2).map(|_| rand::random::()).collect(); + let mut buffers = vec![ + (LanceBuffer::Owned(random_shorts1), 16), + (LanceBuffer::Owned(random_shorts2), 16), + (LanceBuffer::Owned(random_shorts3), 16), + ]; + let buffers = &mut buffers; + + b.iter(move || { + let buffers = buffers + .iter_mut() + .map(|(buf, bits_per_value)| (buf.borrow_and_clone(), *bits_per_value)) + .collect::>(); + black_box(LanceBuffer::zip_into_one(buffers, num_values as u64).unwrap()); + }) + }); + } +} + +#[cfg(target_os = "linux")] +criterion_group!( + name=benches; + config = Criterion::default().significance_level(0.1).sample_size(10) + .with_profiler(pprof::criterion::PProfProfiler::new(100, pprof::criterion::Output::Flamegraph(None))); + targets = bench_zip); +criterion_main!(benches); diff --git a/rust/lance-encoding/src/buffer.rs b/rust/lance-encoding/src/buffer.rs index 65a7405025b..23af12cc29c 100644 --- a/rust/lance-encoding/src/buffer.rs +++ b/rust/lance-encoding/src/buffer.rs @@ -5,7 +5,7 @@ use std::{ops::Deref, ptr::NonNull, sync::Arc}; -use arrow_buffer::Buffer; +use arrow_buffer::{ArrowNativeType, Buffer, ScalarBuffer}; use snafu::{location, Location}; use lance_core::{Error, Result}; @@ -22,12 +22,51 @@ use lance_core::{Error, Result}; /// /// If you need to clone a LanceBuffer you can use borrow_and_clone() which will make sure that the buffer /// is in borrowed mode before cloning. This is a zero copy operation (but requires &mut self). -#[derive(Debug)] pub enum LanceBuffer { Borrowed(Buffer), Owned(Vec), } +// Compares equality of the buffers, ignoring owned / unowned status +impl PartialEq for LanceBuffer { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::Borrowed(l0), Self::Borrowed(r0)) => l0 == r0, + (Self::Owned(l0), Self::Owned(r0)) => l0 == r0, + (Self::Borrowed(l0), Self::Owned(r0)) => l0.as_slice() == r0.as_slice(), + (Self::Owned(l0), Self::Borrowed(r0)) => l0.as_slice() == r0.as_slice(), + } + } +} + +impl Eq for LanceBuffer {} + +impl std::fmt::Debug for LanceBuffer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let preview = if self.len() > 10 { + format!("0x{}...", hex::encode_upper(&self[..10])) + } else { + format!("0x{}", hex::encode_upper(self.as_ref())) + }; + match self { + Self::Borrowed(buffer) => write!( + f, + "LanceBuffer::Borrowed(bytes={} #bytes={})", + preview, + buffer.len() + ), + Self::Owned(buffer) => { + write!( + f, + "LanceBuffer::Owned(bytes={} #bytes={})", + preview, + buffer.len() + ) + } + } + } +} + impl LanceBuffer { /// Convert into a mutable buffer. If this is a borrowed buffer, the data will be copied. pub fn into_owned(self) -> Vec { @@ -45,6 +84,26 @@ impl LanceBuffer { } } + /// Returns an owned buffer of the given size with all bits set to 0 + pub fn all_unset(len: usize) -> Self { + Self::Owned(vec![0; len]) + } + + /// Returns an owned buffer of the given size with all bits set to 1 + pub fn all_set(len: usize) -> Self { + Self::Owned(vec![0xff; len]) + } + + /// Creates an empty buffer + pub fn empty() -> Self { + Self::Owned(Vec::new()) + } + + /// Converts the buffer into a hex string + pub fn as_hex(&self) -> String { + hex::encode_upper(self) + } + /// Create a LanceBuffer from a bytes::Bytes object /// /// The alignment must be specified (as `bytes_per_value`) since we want to make @@ -114,6 +173,111 @@ impl LanceBuffer { }), } } + + /// Make an owned copy of the buffer (always does a copy of the data) + pub fn deep_copy(&self) -> Self { + match self { + Self::Borrowed(buffer) => Self::Owned(buffer.to_vec()), + Self::Owned(buffer) => Self::Owned(buffer.clone()), + } + } + + /// Reinterprets a Vec as a LanceBuffer + /// + /// Note that this creates a borrowed buffer. It is not possible to safely + /// reinterpret a Vec into a Vec in rust due to this constraint from + /// [`Vec::from_raw_parts`]: + /// + /// > `T` needs to have the same alignment as what `ptr` was allocated with. + /// > (`T` having a less strict alignment is not sufficient, the alignment really + /// > needs to be equal to satisfy the [`dealloc`] requirement that memory must be + /// > allocated and deallocated with the same layout.) + /// + /// However, we can safely reinterpret Vec into &[u8] which is what happens here. + pub fn reinterpret_vec(vec: Vec) -> Self { + Self::Borrowed(Buffer::from_vec(vec)) + } + + /// Reinterprets a LanceBuffer into a Vec + /// + /// Unfortunately, there is no way to do this safely in Rust without a copy, even if + /// the source is Vec. + pub fn borrow_to_typed_slice(&mut self) -> impl AsRef<[T]> { + ScalarBuffer::::from(self.borrow_and_clone().into_buffer()) + } + + /// Concatenates multiple buffers into a single buffer, consuming the input buffers + /// + /// If there is only one buffer, it will be returned as is + pub fn concat_into_one(buffers: Vec) -> Self { + if buffers.len() == 1 { + return buffers.into_iter().next().unwrap(); + } + + let mut total_len = 0; + for buffer in &buffers { + total_len += buffer.len(); + } + + let mut data = Vec::with_capacity(total_len); + for buffer in buffers { + data.extend_from_slice(buffer.as_ref()); + } + + Self::Owned(data) + } + + /// Zips multiple buffers into a single buffer, consuming the input buffers + /// + /// Unlike concat_into_one this "zips" the buffers, interleaving the values + pub fn zip_into_one(buffers: Vec<(Self, u64)>, num_values: u64) -> Result { + let bytes_per_value = buffers.iter().map(|(_, bits_per_value)| { + if bits_per_value % 8 == 0 { + Ok(bits_per_value / 8) + } else { + Err(Error::InvalidInput { source: format!("LanceBuffer::zip_into_one only supports full-byte buffers currently and received a buffer with {} bits per value", bits_per_value).into(), location: location!() }) + } + }).collect::>>()?; + let total_bytes_per_value = bytes_per_value.iter().sum::(); + let total_bytes = (total_bytes_per_value * num_values) as usize; + + let mut zipped = vec![0_u8; total_bytes]; + let mut buffer_ptrs = buffers + .iter() + .zip(bytes_per_value) + .map(|((buffer, _), bytes_per_value)| (buffer.as_ptr(), bytes_per_value as usize)) + .collect::>(); + + let mut zipped_ptr = zipped.as_mut_ptr(); + unsafe { + let end = zipped_ptr.add(total_bytes); + while zipped_ptr < end { + for (buf, bytes_per_value) in buffer_ptrs.iter_mut() { + std::ptr::copy_nonoverlapping(*buf, zipped_ptr, *bytes_per_value); + zipped_ptr = zipped_ptr.add(*bytes_per_value); + *buf = buf.add(*bytes_per_value); + } + } + } + + Ok(Self::Owned(zipped)) + } + + /// Create a LanceBuffer from a slice + /// + /// This is NOT a zero-copy operation. We can't even create a borrowed buffer because + /// we have no way of extending the lifetime of the slice. + pub fn copy_slice(slice: &[u8]) -> Self { + Self::Owned(slice.to_vec()) + } + + /// Create a LanceBuffer from an array (fixed-size slice) + /// + /// This is NOT a zero-copy operation. The slice memory could be on the stack and + /// thus we can't forget it. + pub fn copy_array(array: [u8; N]) -> Self { + Self::Owned(Vec::from(array)) + } } impl AsRef<[u8]> for LanceBuffer { @@ -133,6 +297,8 @@ impl Deref for LanceBuffer { } } +// All `From` implementations are zero-copy + impl From> for LanceBuffer { fn from(buffer: Vec) -> Self { Self::Owned(buffer) @@ -144,3 +310,84 @@ impl From for LanceBuffer { Self::Borrowed(buffer) } } + +#[cfg(test)] +mod tests { + use arrow_buffer::Buffer; + + use super::LanceBuffer; + + #[test] + fn test_eq() { + let buf = LanceBuffer::Borrowed(Buffer::from_vec(vec![1_u8, 2, 3])); + let buf2 = LanceBuffer::Owned(vec![1, 2, 3]); + assert_eq!(buf, buf2); + } + + #[test] + fn test_reinterpret_vec() { + let vec = vec![1_u32, 2, 3]; + let mut buf = LanceBuffer::reinterpret_vec(vec); + + let mut expected = Vec::with_capacity(12); + expected.extend_from_slice(&1_u32.to_ne_bytes()); + expected.extend_from_slice(&2_u32.to_ne_bytes()); + expected.extend_from_slice(&3_u32.to_ne_bytes()); + let expected = LanceBuffer::Owned(expected); + + assert_eq!(expected, buf); + assert_eq!(buf.borrow_to_typed_slice::().as_ref(), vec![1, 2, 3]); + } + + #[test] + fn test_concat() { + let buf1 = LanceBuffer::Owned(vec![1_u8, 2, 3]); + let buf2 = LanceBuffer::Owned(vec![4_u8, 5, 6]); + let buf3 = LanceBuffer::Owned(vec![7_u8, 8, 9]); + + let expected = LanceBuffer::Owned(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]); + assert_eq!( + expected, + LanceBuffer::concat_into_one(vec![buf1, buf2, buf3]) + ); + + let empty = LanceBuffer::empty(); + assert_eq!( + LanceBuffer::empty(), + LanceBuffer::concat_into_one(vec![empty]) + ); + + let expected = LanceBuffer::Owned(vec![1, 2, 3]); + assert_eq!( + expected, + LanceBuffer::concat_into_one(vec![expected.deep_copy(), LanceBuffer::empty()]) + ); + } + + #[test] + fn test_zip() { + let buf1 = LanceBuffer::Owned(vec![1_u8, 2, 3]); + let buf2 = LanceBuffer::reinterpret_vec(vec![1_u16, 2, 3]); + let buf3 = LanceBuffer::reinterpret_vec(vec![1_u32, 2, 3]); + + let zipped = LanceBuffer::zip_into_one(vec![(buf1, 8), (buf2, 16), (buf3, 32)], 3).unwrap(); + + assert_eq!(zipped.len(), 21); + + let mut expected = Vec::with_capacity(21); + for i in 1..4 { + expected.push(i as u8); + expected.extend_from_slice(&(i as u16).to_ne_bytes()); + expected.extend_from_slice(&(i as u32).to_ne_bytes()); + } + let expected = LanceBuffer::Owned(expected); + + assert_eq!(expected, zipped); + } + + #[test] + fn test_hex() { + let buf = LanceBuffer::Owned(vec![1, 2, 15, 20]); + assert_eq!("01020F14", buf.as_hex()); + } +}