Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions rust/lance-encoding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ arrow-select.workspace = true
bytes.workspace = true
futures.workspace = true
fsst.workspace = true
hex = "0.4.3"
log.workspace = true
num-traits.workspace = true
prost.workspace = true
Expand Down Expand Up @@ -52,3 +53,7 @@ pprof = { workspace = true }
[[bench]]
name = "decoder"
harness = false

[[bench]]
name = "buffer"
harness = false
68 changes: 68 additions & 0 deletions rust/lance-encoding/benches/buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
use lance_encoding::buffer::LanceBuffer;

const NUM_VALUES: &[usize] = &[1024 * 1024, 32 * 1024, 8 * 1024];

fn bench_zip(c: &mut Criterion) {
for num_values in NUM_VALUES {
let num_values = *num_values;
let mut group = c.benchmark_group(&format!("zip_{}Ki", num_values / 1024));

group.throughput(Throughput::Bytes((num_values * 6) as u64));

group.bench_function("2_4_zip_into_6", move |b| {
// Zip together a 2-byte-per-buffer and an 8-byte-per-buffer array, each with 1Mi items
let random_shorts: Vec<u8> =
(0..num_values * 2).map(|_| rand::random::<u8>()).collect();
let random_ints: Vec<u8> = (0..num_values * 4).map(|_| rand::random::<u8>()).collect();
let mut buffers = vec![
(LanceBuffer::Owned(random_shorts), 16),
(LanceBuffer::Owned(random_ints), 32),
];
let buffers = &mut buffers;

b.iter(move || {
let buffers = buffers
.iter_mut()
.map(|(buf, bits_per_value)| (buf.borrow_and_clone(), *bits_per_value))
.collect::<Vec<_>>();
black_box(LanceBuffer::zip_into_one(buffers, num_values as u64).unwrap());
})
});

group.bench_function("2_2_2_zip_into_6", move |b| {
// Zip together a 2-byte-per-buffer and an 8-byte-per-buffer array, each with 1Mi items
let random_shorts1: Vec<u8> =
(0..num_values * 2).map(|_| rand::random::<u8>()).collect();
let random_shorts2: Vec<u8> =
(0..num_values * 2).map(|_| rand::random::<u8>()).collect();
let random_shorts3: Vec<u8> =
(0..num_values * 2).map(|_| rand::random::<u8>()).collect();
let mut buffers = vec![
(LanceBuffer::Owned(random_shorts1), 16),
(LanceBuffer::Owned(random_shorts2), 16),
(LanceBuffer::Owned(random_shorts3), 16),
];
let buffers = &mut buffers;

b.iter(move || {
let buffers = buffers
.iter_mut()
.map(|(buf, bits_per_value)| (buf.borrow_and_clone(), *bits_per_value))
.collect::<Vec<_>>();
black_box(LanceBuffer::zip_into_one(buffers, num_values as u64).unwrap());
})
});
}
}

#[cfg(target_os = "linux")]
criterion_group!(
name=benches;
config = Criterion::default().significance_level(0.1).sample_size(10)
.with_profiler(pprof::criterion::PProfProfiler::new(100, pprof::criterion::Output::Flamegraph(None)));
targets = bench_zip);
criterion_main!(benches);
251 changes: 249 additions & 2 deletions rust/lance-encoding/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use std::{ops::Deref, ptr::NonNull, sync::Arc};

use arrow_buffer::Buffer;
use arrow_buffer::{ArrowNativeType, Buffer, ScalarBuffer};
use snafu::{location, Location};

use lance_core::{Error, Result};
Expand All @@ -22,12 +22,51 @@ use lance_core::{Error, Result};
///
/// If you need to clone a LanceBuffer you can use borrow_and_clone() which will make sure that the buffer
/// is in borrowed mode before cloning. This is a zero copy operation (but requires &mut self).
#[derive(Debug)]
pub enum LanceBuffer {
Borrowed(Buffer),
Owned(Vec<u8>),
}

// Compares equality of the buffers, ignoring owned / unowned status
impl PartialEq for LanceBuffer {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::Borrowed(l0), Self::Borrowed(r0)) => l0 == r0,
(Self::Owned(l0), Self::Owned(r0)) => l0 == r0,
(Self::Borrowed(l0), Self::Owned(r0)) => l0.as_slice() == r0.as_slice(),
(Self::Owned(l0), Self::Borrowed(r0)) => l0.as_slice() == r0.as_slice(),
}
}
}

impl Eq for LanceBuffer {}

impl std::fmt::Debug for LanceBuffer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let preview = if self.len() > 10 {
format!("0x{}...", hex::encode_upper(&self[..10]))
} else {
format!("0x{}", hex::encode_upper(self.as_ref()))
};
match self {
Self::Borrowed(buffer) => write!(
f,
"LanceBuffer::Borrowed(bytes={} #bytes={})",
preview,
buffer.len()
),
Self::Owned(buffer) => {
write!(
f,
"LanceBuffer::Owned(bytes={} #bytes={})",
preview,
buffer.len()
)
}
}
}
}

impl LanceBuffer {
/// Convert into a mutable buffer. If this is a borrowed buffer, the data will be copied.
pub fn into_owned(self) -> Vec<u8> {
Expand All @@ -45,6 +84,26 @@ impl LanceBuffer {
}
}

/// Returns an owned buffer of the given size with all bits set to 0
pub fn all_unset(len: usize) -> Self {
Self::Owned(vec![0; len])
}

/// Returns an owned buffer of the given size with all bits set to 1
pub fn all_set(len: usize) -> Self {
Self::Owned(vec![0xff; len])
}

/// Creates an empty buffer
pub fn empty() -> Self {
Self::Owned(Vec::new())
}

/// Converts the buffer into a hex string
pub fn as_hex(&self) -> String {
hex::encode_upper(self)
}

/// Create a LanceBuffer from a bytes::Bytes object
///
/// The alignment must be specified (as `bytes_per_value`) since we want to make
Expand Down Expand Up @@ -114,6 +173,111 @@ impl LanceBuffer {
}),
}
}

/// Make an owned copy of the buffer (always does a copy of the data)
pub fn deep_copy(&self) -> Self {
match self {
Self::Borrowed(buffer) => Self::Owned(buffer.to_vec()),
Self::Owned(buffer) => Self::Owned(buffer.clone()),
}
}

/// Reinterprets a Vec<T> as a LanceBuffer
///
/// Note that this creates a borrowed buffer. It is not possible to safely
/// reinterpret a Vec<T> into a Vec<u8> in rust due to this constraint from
/// [`Vec::from_raw_parts`]:
///
/// > `T` needs to have the same alignment as what `ptr` was allocated with.
/// > (`T` having a less strict alignment is not sufficient, the alignment really
/// > needs to be equal to satisfy the [`dealloc`] requirement that memory must be
/// > allocated and deallocated with the same layout.)
///
/// However, we can safely reinterpret Vec<T> into &[u8] which is what happens here.
pub fn reinterpret_vec<T: ArrowNativeType>(vec: Vec<T>) -> Self {
Self::Borrowed(Buffer::from_vec(vec))
}

/// Reinterprets a LanceBuffer into a Vec<T>
///
/// Unfortunately, there is no way to do this safely in Rust without a copy, even if
/// the source is Vec<u8>.
pub fn borrow_to_typed_slice<T: ArrowNativeType>(&mut self) -> impl AsRef<[T]> {
ScalarBuffer::<T>::from(self.borrow_and_clone().into_buffer())
}

/// Concatenates multiple buffers into a single buffer, consuming the input buffers
///
/// If there is only one buffer, it will be returned as is
pub fn concat_into_one(buffers: Vec<Self>) -> Self {
if buffers.len() == 1 {
return buffers.into_iter().next().unwrap();
}

let mut total_len = 0;
for buffer in &buffers {
total_len += buffer.len();
}

let mut data = Vec::with_capacity(total_len);
for buffer in buffers {
data.extend_from_slice(buffer.as_ref());
}

Self::Owned(data)
}

/// Zips multiple buffers into a single buffer, consuming the input buffers
///
/// Unlike concat_into_one this "zips" the buffers, interleaving the values
pub fn zip_into_one(buffers: Vec<(Self, u64)>, num_values: u64) -> Result<Self> {
let bytes_per_value = buffers.iter().map(|(_, bits_per_value)| {
if bits_per_value % 8 == 0 {
Ok(bits_per_value / 8)
} else {
Err(Error::InvalidInput { source: format!("LanceBuffer::zip_into_one only supports full-byte buffers currently and received a buffer with {} bits per value", bits_per_value).into(), location: location!() })
}
}).collect::<Result<Vec<_>>>()?;
let total_bytes_per_value = bytes_per_value.iter().sum::<u64>();
let total_bytes = (total_bytes_per_value * num_values) as usize;

let mut zipped = vec![0_u8; total_bytes];
let mut buffer_ptrs = buffers
.iter()
.zip(bytes_per_value)
.map(|((buffer, _), bytes_per_value)| (buffer.as_ptr(), bytes_per_value as usize))
.collect::<Vec<_>>();

let mut zipped_ptr = zipped.as_mut_ptr();
unsafe {
let end = zipped_ptr.add(total_bytes);
while zipped_ptr < end {
for (buf, bytes_per_value) in buffer_ptrs.iter_mut() {
std::ptr::copy_nonoverlapping(*buf, zipped_ptr, *bytes_per_value);
zipped_ptr = zipped_ptr.add(*bytes_per_value);
*buf = buf.add(*bytes_per_value);
}
}
}

Ok(Self::Owned(zipped))
}

/// Create a LanceBuffer from a slice
///
/// This is NOT a zero-copy operation. We can't even create a borrowed buffer because
/// we have no way of extending the lifetime of the slice.
pub fn copy_slice(slice: &[u8]) -> Self {
Self::Owned(slice.to_vec())
}

/// Create a LanceBuffer from an array (fixed-size slice)
///
/// This is NOT a zero-copy operation. The slice memory could be on the stack and
/// thus we can't forget it.
pub fn copy_array<const N: usize>(array: [u8; N]) -> Self {
Self::Owned(Vec::from(array))
}
}

impl AsRef<[u8]> for LanceBuffer {
Expand All @@ -133,6 +297,8 @@ impl Deref for LanceBuffer {
}
}

// All `From` implementations are zero-copy

impl From<Vec<u8>> for LanceBuffer {
fn from(buffer: Vec<u8>) -> Self {
Self::Owned(buffer)
Expand All @@ -144,3 +310,84 @@ impl From<Buffer> for LanceBuffer {
Self::Borrowed(buffer)
}
}

#[cfg(test)]
mod tests {
use arrow_buffer::Buffer;

use super::LanceBuffer;

#[test]
fn test_eq() {
let buf = LanceBuffer::Borrowed(Buffer::from_vec(vec![1_u8, 2, 3]));
let buf2 = LanceBuffer::Owned(vec![1, 2, 3]);
assert_eq!(buf, buf2);
}

#[test]
fn test_reinterpret_vec() {
let vec = vec![1_u32, 2, 3];
let mut buf = LanceBuffer::reinterpret_vec(vec);

let mut expected = Vec::with_capacity(12);
expected.extend_from_slice(&1_u32.to_ne_bytes());
expected.extend_from_slice(&2_u32.to_ne_bytes());
expected.extend_from_slice(&3_u32.to_ne_bytes());
let expected = LanceBuffer::Owned(expected);

assert_eq!(expected, buf);
assert_eq!(buf.borrow_to_typed_slice::<u32>().as_ref(), vec![1, 2, 3]);
}

#[test]
fn test_concat() {
let buf1 = LanceBuffer::Owned(vec![1_u8, 2, 3]);
let buf2 = LanceBuffer::Owned(vec![4_u8, 5, 6]);
let buf3 = LanceBuffer::Owned(vec![7_u8, 8, 9]);

let expected = LanceBuffer::Owned(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
assert_eq!(
expected,
LanceBuffer::concat_into_one(vec![buf1, buf2, buf3])
);

let empty = LanceBuffer::empty();
assert_eq!(
LanceBuffer::empty(),
LanceBuffer::concat_into_one(vec![empty])
);

let expected = LanceBuffer::Owned(vec![1, 2, 3]);
assert_eq!(
expected,
LanceBuffer::concat_into_one(vec![expected.deep_copy(), LanceBuffer::empty()])
);
}

#[test]
fn test_zip() {
let buf1 = LanceBuffer::Owned(vec![1_u8, 2, 3]);
let buf2 = LanceBuffer::reinterpret_vec(vec![1_u16, 2, 3]);
let buf3 = LanceBuffer::reinterpret_vec(vec![1_u32, 2, 3]);

let zipped = LanceBuffer::zip_into_one(vec![(buf1, 8), (buf2, 16), (buf3, 32)], 3).unwrap();

assert_eq!(zipped.len(), 21);

let mut expected = Vec::with_capacity(21);
for i in 1..4 {
expected.push(i as u8);
expected.extend_from_slice(&(i as u16).to_ne_bytes());
expected.extend_from_slice(&(i as u32).to_ne_bytes());
}
let expected = LanceBuffer::Owned(expected);

assert_eq!(expected, zipped);
}

#[test]
fn test_hex() {
let buf = LanceBuffer::Owned(vec![1, 2, 15, 20]);
assert_eq!("01020F14", buf.as_hex());
}
}