Skip to content

Commit

Permalink
Merge pull request #1 from westonpace/jleibs/update_arrow_datafusion
Browse files Browse the repository at this point in the history
Clean up and centralize bytes::Bytes -> arrow_buffer::Buffer routines
  • Loading branch information
jleibs authored Nov 26, 2024
2 parents 08aa39b + e587921 commit 63fec42
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 15 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/lance-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ arrow-data = { workspace = true }
arrow-cast = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
bytes = { workspace = true }
half = { workspace = true }
num-traits = { workspace = true }
rand.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-arrow/src/deepcopy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use arrow_buffer::{Buffer, NullBuffer};
use arrow_data::ArrayData;

pub fn deep_copy_buffer(buffer: &Buffer) -> Buffer {
Buffer::from(Vec::from(buffer.as_slice()))
Buffer::from(buffer.as_slice())
}

fn deep_copy_nulls(nulls: &NullBuffer) -> Buffer {
Expand Down
65 changes: 64 additions & 1 deletion rust/lance-arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
//!
//! To improve Arrow-RS ergonomic
use std::collections::HashMap;
use std::sync::Arc;
use std::{collections::HashMap, ptr::NonNull};

use arrow_array::{
cast::AsArray, Array, ArrayRef, ArrowNumericType, FixedSizeBinaryArray, FixedSizeListArray,
GenericListArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, StructArray, UInt32Array,
UInt8Array,
};
use arrow_buffer::MutableBuffer;
use arrow_data::ArrayDataBuilder;
use arrow_schema::{ArrowError, DataType, Field, FieldRef, Fields, IntervalUnit, Schema};
use arrow_select::{interleave::interleave, take::take};
Expand Down Expand Up @@ -654,6 +655,68 @@ pub fn interleave_batches(
RecordBatch::try_new(schema, columns)
}

pub trait BufferExt {
/// Create an `arrow_buffer::Buffer`` from a `bytes::Bytes` object
///
/// The alignment must be specified (as `bytes_per_value`) since we want to make
/// sure we can safely reinterpret the buffer.
///
/// If the buffer is properly aligned this will be zero-copy. If not, a copy
/// will be made and an owned buffer returned.
///
/// If `bytes_per_value` is not a power of two, then we assume the buffer is
/// never going to be reinterpreted into another type and we can safely
/// ignore the alignment.
///
/// Yes, the method name is odd. It's because there is already a `from_bytes`
/// which converts from `arrow_buffer::bytes::Bytes` (not `bytes::Bytes`)
fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self;

/// Allocates a new properly aligned arrow buffer and copies `bytes` into it
///
/// `size_bytes` can be larger than `bytes` and, if so, the trailing bytes will
/// be zeroed out.
///
/// # Panics
///
/// Panics if `size_bytes` is less than `bytes.len()`
fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self;
}

fn is_pwr_two(n: u64) -> bool {
n & (n - 1) == 0
}

impl BufferExt for arrow_buffer::Buffer {
fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
{
// The original buffer is not aligned, cannot zero-copy
let size_bytes = bytes.len();
Self::copy_bytes_bytes(bytes, size_bytes)
} else {
// The original buffer is aligned, can zero-copy
// SAFETY: the alignment is correct we can make this conversion
unsafe {
Self::from_custom_allocation(
NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
bytes.len(),
Arc::new(bytes),
)
}
}
}

fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self {
assert!(size_bytes >= bytes.len());
let mut buf = MutableBuffer::with_capacity(size_bytes);
let to_fill = size_bytes - bytes.len();
buf.extend(bytes);
buf.extend(std::iter::repeat(0).take(to_fill));
Self::from(buf)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-io/src/encodings/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use arrow_schema::DataType;
use async_trait::async_trait;
use bytes::Bytes;
use futures::{StreamExt, TryStreamExt};
use lance_arrow::BufferExt;
use snafu::{location, Location};
use tokio::io::AsyncWriteExt;

Expand Down Expand Up @@ -224,7 +225,7 @@ impl<'a, T: ByteArrayType> BinaryDecoder<'a, T> {
.null_bit_buffer(null_buf);
}

let buf = Buffer::from_vec(bytes.into());
let buf = Buffer::from_bytes_bytes(bytes, /*bytes_per_value=*/ 1);
let array_data = data_builder
.add_buffer(offset_data.buffers()[0].clone())
.add_buffer(buf)
Expand Down
16 changes: 4 additions & 12 deletions rust/lance-io/src/encodings/plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
//! it stores the array directly in the file. It offers O(1) read access.
use std::ops::{Range, RangeFrom, RangeFull, RangeTo};
use std::ptr::NonNull;
use std::slice::from_raw_parts;
use std::sync::Arc;

Expand Down Expand Up @@ -204,21 +203,14 @@ pub fn bytes_to_array(
let min_buffer_size = len_plus_offset.saturating_mul(*byte_width);

// alignment or size isn't right -- just make a copy
if (bytes.len() < min_buffer_size) || (bytes.as_ptr().align_offset(*alignment) != 0) {
Buffer::from_vec(bytes.into())
if bytes.len() < min_buffer_size {
Buffer::copy_bytes_bytes(bytes, min_buffer_size)
} else {
// SAFETY: the alignment is correct we can make this conversion
unsafe {
Buffer::from_custom_allocation(
NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
bytes.len(),
Arc::new(bytes),
)
}
Buffer::from_bytes_bytes(bytes, *alignment as u64)
}
} else {
// cases we don't handle, just copy
Buffer::from_vec(bytes.into())
Buffer::from_slice_ref(bytes)
};

let array_data = ArrayDataBuilder::new(data_type.clone())
Expand Down

0 comments on commit 63fec42

Please sign in to comment.