Skip to content

Commit

Permalink
Zero-copy Vec conversion (apache#3516) (apache#1176)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Feb 23, 2023
1 parent 0373a9d commit 4cbad4d
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 25 deletions.
2 changes: 2 additions & 0 deletions arrow-array/src/array/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,7 @@ mod tests {

#[test]
#[should_panic(expected = "memory is not aligned")]
#[allow(deprecated)]
fn test_primitive_array_alignment() {
let ptr = arrow_buffer::alloc::allocate_aligned(8);
let buf = unsafe { Buffer::from_raw_parts(ptr, 8, 8) };
Expand All @@ -845,6 +846,7 @@ mod tests {
// Different error messages, so skip for now
// https://github.com/apache/arrow-rs/issues/1545
#[cfg(not(feature = "force_validate"))]
#[allow(deprecated)]
fn test_list_array_alignment() {
let ptr = arrow_buffer::alloc::allocate_aligned(8);
let buf = unsafe { Buffer::from_raw_parts(ptr, 8, 8) };
Expand Down
14 changes: 9 additions & 5 deletions arrow-buffer/src/alloc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ fn dangling_ptr() -> NonNull<u8> {
/// Allocates a cache-aligned memory region of `size` bytes with uninitialized values.
/// This is more performant than using [allocate_aligned_zeroed] when all bytes will have
/// an unknown or non-zero value and is semantically similar to `malloc`.
#[deprecated(note = "Use Vec")]
pub fn allocate_aligned(size: usize) -> NonNull<u8> {
unsafe {
if size == 0 {
Expand All @@ -60,6 +61,7 @@ pub fn allocate_aligned(size: usize) -> NonNull<u8> {
/// Allocates a cache-aligned memory region of `size` bytes with `0` on all of them.
/// This is more performant than using [allocate_aligned] and setting all bytes to zero
/// and is semantically similar to `calloc`.
#[deprecated(note = "Use Vec")]
pub fn allocate_aligned_zeroed(size: usize) -> NonNull<u8> {
unsafe {
if size == 0 {
Expand All @@ -80,6 +82,7 @@ pub fn allocate_aligned_zeroed(size: usize) -> NonNull<u8> {
/// * ptr must denote a block of memory currently allocated via this allocator,
///
/// * size must be the same size that was used to allocate that block of memory,
#[deprecated(note = "Use Vec")]
pub unsafe fn free_aligned(ptr: NonNull<u8>, size: usize) {
if size != 0 {
std::alloc::dealloc(
Expand All @@ -100,6 +103,8 @@ pub unsafe fn free_aligned(ptr: NonNull<u8>, size: usize) {
///
/// * new_size, when rounded up to the nearest multiple of [ALIGNMENT], must not overflow (i.e.,
/// the rounded value must be less than usize::MAX).
#[deprecated(note = "Use Vec")]
#[allow(deprecated)]
pub unsafe fn reallocate(
ptr: NonNull<u8>,
old_size: usize,
Expand Down Expand Up @@ -132,9 +137,8 @@ impl<T: RefUnwindSafe + Send + Sync> Allocation for T {}

/// Mode of deallocating memory regions
pub(crate) enum Deallocation {
/// An allocation of the given capacity that needs to be deallocated using arrows's cache aligned allocator.
/// See [allocate_aligned] and [free_aligned].
Arrow(usize),
/// An allocation using [`std::alloc`]
Standard(Layout),
/// An allocation from an external source like the FFI interface or a Rust Vec.
/// Deallocation will happen
Custom(Arc<dyn Allocation>),
Expand All @@ -143,8 +147,8 @@ pub(crate) enum Deallocation {
impl Debug for Deallocation {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
Deallocation::Arrow(capacity) => {
write!(f, "Deallocation::Arrow {{ capacity: {capacity} }}")
Deallocation::Standard(layout) => {
write!(f, "Deallocation::Standard {layout:?}")
}
Deallocation::Custom(_) => {
write!(f, "Deallocation::Custom {{ capacity: unknown }}")
Expand Down
147 changes: 142 additions & 5 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use std::convert::AsRef;
use std::alloc::Layout;
use std::fmt::Debug;
use std::iter::FromIterator;
use std::ptr::NonNull;
use std::sync::Arc;

use crate::alloc::{Allocation, Deallocation};
use crate::alloc::{Allocation, Deallocation, ALIGNMENT};
use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk};
use crate::{bytes::Bytes, native::ArrowNativeType};

Expand All @@ -42,6 +42,8 @@ pub struct Buffer {
ptr: *const u8,

/// Byte length of the buffer.
///
/// Must be less than or equal to `data.len()`
length: usize,
}

Expand Down Expand Up @@ -69,6 +71,21 @@ impl Buffer {
}
}

/// Create a [`Buffer`] from the provided `Vec` without copying
#[inline]
pub fn from_vec<T: ArrowNativeType>(vec: Vec<T>) -> Self {
// Safety
// Vec::as_ptr guaranteed to not be null and ArrowNativeType are trivially transmutable
let ptr = unsafe { NonNull::new_unchecked(vec.as_ptr() as _) };
let len = vec.len() * std::mem::size_of::<T>();
// Safety
// Layout guaranteed to be valid
let layout = unsafe { Layout::array::<T>(vec.capacity()).unwrap_unchecked() };
std::mem::forget(vec);
let b = unsafe { Bytes::new(ptr, len, Deallocation::Standard(layout)) };
Self::from_bytes(b)
}

/// Initializes a [Buffer] from a slice of items.
pub fn from_slice_ref<U: ArrowNativeType, T: AsRef<[U]>>(items: T) -> Self {
let slice = items.as_ref();
Expand All @@ -78,7 +95,7 @@ impl Buffer {
buffer.into()
}

/// Creates a buffer from an existing memory region (must already be byte-aligned), this
/// Creates a buffer from an existing aligned memory region (must already be byte-aligned), this
/// `Buffer` will free this piece of memory when dropped.
///
/// # Arguments
Expand All @@ -91,9 +108,11 @@ impl Buffer {
///
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
/// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed.
#[deprecated(note = "Use From<Vec<T>>")]
pub unsafe fn from_raw_parts(ptr: NonNull<u8>, len: usize, capacity: usize) -> Self {
assert!(len <= capacity);
Buffer::build_with_arguments(ptr, len, Deallocation::Arrow(capacity))
let layout = Layout::from_size_align(capacity, ALIGNMENT).unwrap();
Buffer::build_with_arguments(ptr, len, Deallocation::Standard(layout))
}

/// Creates a buffer from an existing memory region. Ownership of the memory is tracked via reference counting
Expand Down Expand Up @@ -253,7 +272,8 @@ impl Buffer {
}

/// Returns `MutableBuffer` for mutating the buffer if this buffer is not shared.
/// Returns `Err` if this is shared or its allocation is from an external source.
/// Returns `Err` if this is shared or its allocation is from an external source or
/// it is not allocated with alignment [`ALIGNMENT`]
pub fn into_mutable(self) -> Result<MutableBuffer, Self> {
let ptr = self.ptr;
let length = self.length;
Expand All @@ -269,6 +289,43 @@ impl Buffer {
length,
})
}

/// Returns `Vec` for mutating the buffer if this buffer is not offset and was
/// allocated with the correct layout for `Vec<T>`
pub fn into_vec<T: ArrowNativeType>(self) -> Result<Vec<T>, Self> {
let layout = match self.data.deallocation() {
Deallocation::Standard(l) => l,
_ => return Err(self), // Custom allocation
};

if self.ptr != self.data.as_ptr() {
return Err(self); // Data is offset
}

let v_capacity = layout.size() / std::mem::size_of::<T>();
match Layout::array::<T>(v_capacity) {
Ok(expected) if layout == &expected => {}
_ => return Err(self), // Incorrect layout
}

let length = self.length;
let ptr = self.ptr;
let v_len = self.length / std::mem::size_of::<T>();

Arc::try_unwrap(self.data)
.map(|bytes| unsafe {
let ptr = bytes.ptr().as_ptr() as _;
std::mem::forget(bytes);
// Safety
// Verified that bytes layout matches that of Vec
Vec::from_raw_parts(ptr, v_len, v_capacity)
})
.map_err(|bytes| Buffer {
data: bytes,
ptr,
length,
})
}
}

/// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>` into a newly
Expand Down Expand Up @@ -378,6 +435,7 @@ impl<T: ArrowNativeType> FromIterator<T> for Buffer {

#[cfg(test)]
mod tests {
use crate::i256;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::thread;

Expand Down Expand Up @@ -632,4 +690,83 @@ mod tests {
let buffer = Buffer::from(MutableBuffer::from_len_zeroed(12));
buffer.slice_with_length(2, usize::MAX);
}

#[test]
fn test_vec_interop() {
// Test empty vec
let a: Vec<i128> = Vec::new();
let b = Buffer::from_vec(a);
b.into_vec::<i128>().unwrap();

// Test vec with capacity
let a: Vec<i128> = Vec::with_capacity(20);
let b = Buffer::from_vec(a);
let back = b.into_vec::<i128>().unwrap();
assert_eq!(back.len(), 0);
assert_eq!(back.capacity(), 20);

// Test vec with values
let mut a: Vec<i128> = Vec::with_capacity(3);
a.extend_from_slice(&[1, 2, 3]);
let b = Buffer::from_vec(a);
let back = b.into_vec::<i128>().unwrap();
assert_eq!(back.len(), 3);
assert_eq!(back.capacity(), 3);

// Test vec with values and spare capacity
let mut a: Vec<i128> = Vec::with_capacity(20);
a.extend_from_slice(&[1, 4, 7, 8, 9, 3, 6]);
let b = Buffer::from_vec(a);
let back = b.into_vec::<i128>().unwrap();
assert_eq!(back.len(), 7);
assert_eq!(back.capacity(), 20);

// Test incorrect alignment
let a: Vec<i128> = Vec::new();
let b = Buffer::from_vec(a);
let b = b.into_vec::<i32>().unwrap_err();
b.into_vec::<i8>().unwrap_err();

// Test convert between types with same alignment
// This is an implementation quirk, but isn't harmful
// as ArrowNativeType are trivially transmutable
let a: Vec<i64> = vec![1, 2, 3, 4];
let b = Buffer::from_vec(a);
let back = b.into_vec::<u64>().unwrap();
assert_eq!(back.len(), 4);
assert_eq!(back.capacity(), 4);

// i256 has the same layout as i128 so this is valid
let mut b: Vec<i128> = Vec::with_capacity(4);
b.extend_from_slice(&[1, 2, 3, 4]);
let b = Buffer::from_vec(b);
let back = b.into_vec::<i256>().unwrap();
assert_eq!(back.len(), 2);
assert_eq!(back.capacity(), 2);

// Invalid layout
let b: Vec<i128> = vec![1, 2, 3];
let b = Buffer::from_vec(b);
b.into_vec::<i256>().unwrap_err();

// Invalid layout
let mut b: Vec<i128> = Vec::with_capacity(5);
b.extend_from_slice(&[1, 2, 3, 4]);
let b = Buffer::from_vec(b);
b.into_vec::<i256>().unwrap_err();

// Truncates length
// This is an implementation quirk, but isn't harmful
let mut b: Vec<i128> = Vec::with_capacity(4);
b.extend_from_slice(&[1, 2, 3]);
let b = Buffer::from_vec(b);
let back = b.into_vec::<i256>().unwrap();
assert_eq!(back.len(), 1);
assert_eq!(back.capacity(), 2);

// Cannot use aligned allocation
let b = Buffer::from(MutableBuffer::new(10));
let b = b.into_vec::<u8>().unwrap_err();
b.into_vec::<u64>().unwrap_err();
}
}
30 changes: 21 additions & 9 deletions arrow-buffer/src/buffer/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,28 @@
// under the License.

use super::Buffer;
use crate::alloc::Deallocation;
use crate::alloc::{Deallocation, ALIGNMENT};
use crate::{
alloc,
bytes::Bytes,
native::{ArrowNativeType, ToByteSlice},
util::bit_util,
};
use std::alloc::Layout;
use std::mem;
use std::ptr::NonNull;

/// A [`MutableBuffer`] is Arrow's interface to build a [`Buffer`] out of items or slices of items.
///
/// [`Buffer`]s created from [`MutableBuffer`] (via `into`) are guaranteed to have its pointer aligned
/// along cache lines and in multiple of 64 bytes.
///
/// Use [MutableBuffer::push] to insert an item, [MutableBuffer::extend_from_slice]
/// to insert many items, and `into` to convert it to [`Buffer`].
///
/// For a safe, strongly typed API consider using `arrow::array::BufferBuilder`
/// For a safe, strongly typed API consider using `Vec`
///
/// Note: this may be deprecated in a future release (#1176)[https://github.com/apache/arrow-rs/issues/1176]
///
/// # Example
///
Expand Down Expand Up @@ -62,6 +67,7 @@ impl MutableBuffer {

/// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`.
#[inline]
#[allow(deprecated)]
pub fn with_capacity(capacity: usize) -> Self {
let capacity = bit_util::round_upto_multiple_of_64(capacity);
let ptr = alloc::allocate_aligned(capacity);
Expand All @@ -83,6 +89,7 @@ impl MutableBuffer {
/// let data = buffer.as_slice_mut();
/// assert_eq!(data[126], 0u8);
/// ```
#[allow(deprecated)]
pub fn from_len_zeroed(len: usize) -> Self {
let new_capacity = bit_util::round_upto_multiple_of_64(len);
let ptr = alloc::allocate_aligned_zeroed(new_capacity);
Expand All @@ -95,12 +102,14 @@ impl MutableBuffer {

/// Allocates a new [MutableBuffer] from given `Bytes`.
pub(crate) fn from_bytes(bytes: Bytes) -> Result<Self, Bytes> {
if !matches!(bytes.deallocation(), Deallocation::Arrow(_)) {
return Err(bytes);
}
let capacity = match bytes.deallocation() {
Deallocation::Standard(layout) if layout.align() == ALIGNMENT => {
layout.size()
}
_ => return Err(bytes),
};

let len = bytes.len();
let capacity = bytes.capacity();
let ptr = bytes.ptr();
mem::forget(bytes);

Expand Down Expand Up @@ -224,6 +233,7 @@ impl MutableBuffer {
/// buffer.shrink_to_fit();
/// assert!(buffer.capacity() >= 64 && buffer.capacity() < 128);
/// ```
#[allow(deprecated)]
pub fn shrink_to_fit(&mut self) {
let new_capacity = bit_util::round_upto_multiple_of_64(self.len);
if new_capacity < self.capacity {
Expand Down Expand Up @@ -300,9 +310,9 @@ impl MutableBuffer {

#[inline]
pub(super) fn into_buffer(self) -> Buffer {
let bytes = unsafe {
Bytes::new(self.data, self.len, Deallocation::Arrow(self.capacity))
};
let layout = Layout::from_size_align(self.capacity, ALIGNMENT).unwrap();
let bytes =
unsafe { Bytes::new(self.data, self.len, Deallocation::Standard(layout)) };
std::mem::forget(self);
Buffer::from_bytes(bytes)
}
Expand Down Expand Up @@ -448,6 +458,7 @@ impl MutableBuffer {
/// # Safety
/// `ptr` must be allocated for `old_capacity`.
#[cold]
#[allow(deprecated)]
unsafe fn reallocate(
ptr: NonNull<u8>,
old_capacity: usize,
Expand Down Expand Up @@ -630,6 +641,7 @@ impl std::ops::DerefMut for MutableBuffer {
}

impl Drop for MutableBuffer {
#[allow(deprecated)]
fn drop(&mut self) {
unsafe { alloc::free_aligned(self.data, self.capacity) };
}
Expand Down
Loading

0 comments on commit 4cbad4d

Please sign in to comment.