From 4cbad4d84afe0a391a2fead4aacc0696a31d7c9d Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 23 Feb 2023 12:53:52 +0000 Subject: [PATCH] Zero-copy Vec conversion (#3516) (#1176) --- arrow-array/src/array/list_array.rs | 2 + arrow-buffer/src/alloc/mod.rs | 14 ++- arrow-buffer/src/buffer/immutable.rs | 147 ++++++++++++++++++++++++++- arrow-buffer/src/buffer/mutable.rs | 30 ++++-- arrow-buffer/src/buffer/scalar.rs | 9 ++ arrow-buffer/src/bytes.rs | 13 +-- 6 files changed, 190 insertions(+), 25 deletions(-) diff --git a/arrow-array/src/array/list_array.rs b/arrow-array/src/array/list_array.rs index 6b63269d1615..178139f810e7 100644 --- a/arrow-array/src/array/list_array.rs +++ b/arrow-array/src/array/list_array.rs @@ -829,6 +829,7 @@ mod tests { #[test] #[should_panic(expected = "memory is not aligned")] + #[allow(deprecated)] fn test_primitive_array_alignment() { let ptr = arrow_buffer::alloc::allocate_aligned(8); let buf = unsafe { Buffer::from_raw_parts(ptr, 8, 8) }; @@ -845,6 +846,7 @@ mod tests { // Different error messages, so skip for now // https://github.com/apache/arrow-rs/issues/1545 #[cfg(not(feature = "force_validate"))] + #[allow(deprecated)] fn test_list_array_alignment() { let ptr = arrow_buffer::alloc::allocate_aligned(8); let buf = unsafe { Buffer::from_raw_parts(ptr, 8, 8) }; diff --git a/arrow-buffer/src/alloc/mod.rs b/arrow-buffer/src/alloc/mod.rs index 1493d839f5ab..3dfe7848d3f9 100644 --- a/arrow-buffer/src/alloc/mod.rs +++ b/arrow-buffer/src/alloc/mod.rs @@ -45,6 +45,7 @@ fn dangling_ptr() -> NonNull { /// Allocates a cache-aligned memory region of `size` bytes with uninitialized values. /// This is more performant than using [allocate_aligned_zeroed] when all bytes will have /// an unknown or non-zero value and is semantically similar to `malloc`. +#[deprecated(note = "Use Vec")] pub fn allocate_aligned(size: usize) -> NonNull { unsafe { if size == 0 { @@ -60,6 +61,7 @@ pub fn allocate_aligned(size: usize) -> NonNull { /// Allocates a cache-aligned memory region of `size` bytes with `0` on all of them. /// This is more performant than using [allocate_aligned] and setting all bytes to zero /// and is semantically similar to `calloc`. +#[deprecated(note = "Use Vec")] pub fn allocate_aligned_zeroed(size: usize) -> NonNull { unsafe { if size == 0 { @@ -80,6 +82,7 @@ pub fn allocate_aligned_zeroed(size: usize) -> NonNull { /// * ptr must denote a block of memory currently allocated via this allocator, /// /// * size must be the same size that was used to allocate that block of memory, +#[deprecated(note = "Use Vec")] pub unsafe fn free_aligned(ptr: NonNull, size: usize) { if size != 0 { std::alloc::dealloc( @@ -100,6 +103,8 @@ pub unsafe fn free_aligned(ptr: NonNull, size: usize) { /// /// * new_size, when rounded up to the nearest multiple of [ALIGNMENT], must not overflow (i.e., /// the rounded value must be less than usize::MAX). +#[deprecated(note = "Use Vec")] +#[allow(deprecated)] pub unsafe fn reallocate( ptr: NonNull, old_size: usize, @@ -132,9 +137,8 @@ impl Allocation for T {} /// Mode of deallocating memory regions pub(crate) enum Deallocation { - /// An allocation of the given capacity that needs to be deallocated using arrows's cache aligned allocator. - /// See [allocate_aligned] and [free_aligned]. - Arrow(usize), + /// An allocation using [`std::alloc`] + Standard(Layout), /// An allocation from an external source like the FFI interface or a Rust Vec. /// Deallocation will happen Custom(Arc), @@ -143,8 +147,8 @@ pub(crate) enum Deallocation { impl Debug for Deallocation { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { match self { - Deallocation::Arrow(capacity) => { - write!(f, "Deallocation::Arrow {{ capacity: {capacity} }}") + Deallocation::Standard(layout) => { + write!(f, "Deallocation::Standard {layout:?}") } Deallocation::Custom(_) => { write!(f, "Deallocation::Custom {{ capacity: unknown }}") diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs index cbfba1e0540c..042992efd7c3 100644 --- a/arrow-buffer/src/buffer/immutable.rs +++ b/arrow-buffer/src/buffer/immutable.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::convert::AsRef; +use std::alloc::Layout; use std::fmt::Debug; use std::iter::FromIterator; use std::ptr::NonNull; use std::sync::Arc; -use crate::alloc::{Allocation, Deallocation}; +use crate::alloc::{Allocation, Deallocation, ALIGNMENT}; use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk}; use crate::{bytes::Bytes, native::ArrowNativeType}; @@ -42,6 +42,8 @@ pub struct Buffer { ptr: *const u8, /// Byte length of the buffer. + /// + /// Must be less than or equal to `data.len()` length: usize, } @@ -69,6 +71,21 @@ impl Buffer { } } + /// Create a [`Buffer`] from the provided `Vec` without copying + #[inline] + pub fn from_vec(vec: Vec) -> Self { + // Safety + // Vec::as_ptr guaranteed to not be null and ArrowNativeType are trivially transmutable + let ptr = unsafe { NonNull::new_unchecked(vec.as_ptr() as _) }; + let len = vec.len() * std::mem::size_of::(); + // Safety + // Layout guaranteed to be valid + let layout = unsafe { Layout::array::(vec.capacity()).unwrap_unchecked() }; + std::mem::forget(vec); + let b = unsafe { Bytes::new(ptr, len, Deallocation::Standard(layout)) }; + Self::from_bytes(b) + } + /// Initializes a [Buffer] from a slice of items. pub fn from_slice_ref>(items: T) -> Self { let slice = items.as_ref(); @@ -78,7 +95,7 @@ impl Buffer { buffer.into() } - /// Creates a buffer from an existing memory region (must already be byte-aligned), this + /// Creates a buffer from an existing aligned memory region (must already be byte-aligned), this /// `Buffer` will free this piece of memory when dropped. /// /// # Arguments @@ -91,9 +108,11 @@ impl Buffer { /// /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. + #[deprecated(note = "Use From>")] pub unsafe fn from_raw_parts(ptr: NonNull, len: usize, capacity: usize) -> Self { assert!(len <= capacity); - Buffer::build_with_arguments(ptr, len, Deallocation::Arrow(capacity)) + let layout = Layout::from_size_align(capacity, ALIGNMENT).unwrap(); + Buffer::build_with_arguments(ptr, len, Deallocation::Standard(layout)) } /// Creates a buffer from an existing memory region. Ownership of the memory is tracked via reference counting @@ -253,7 +272,8 @@ impl Buffer { } /// Returns `MutableBuffer` for mutating the buffer if this buffer is not shared. - /// Returns `Err` if this is shared or its allocation is from an external source. + /// Returns `Err` if this is shared or its allocation is from an external source or + /// it is not allocated with alignment [`ALIGNMENT`] pub fn into_mutable(self) -> Result { let ptr = self.ptr; let length = self.length; @@ -269,6 +289,43 @@ impl Buffer { length, }) } + + /// Returns `Vec` for mutating the buffer if this buffer is not offset and was + /// allocated with the correct layout for `Vec` + pub fn into_vec(self) -> Result, Self> { + let layout = match self.data.deallocation() { + Deallocation::Standard(l) => l, + _ => return Err(self), // Custom allocation + }; + + if self.ptr != self.data.as_ptr() { + return Err(self); // Data is offset + } + + let v_capacity = layout.size() / std::mem::size_of::(); + match Layout::array::(v_capacity) { + Ok(expected) if layout == &expected => {} + _ => return Err(self), // Incorrect layout + } + + let length = self.length; + let ptr = self.ptr; + let v_len = self.length / std::mem::size_of::(); + + Arc::try_unwrap(self.data) + .map(|bytes| unsafe { + let ptr = bytes.ptr().as_ptr() as _; + std::mem::forget(bytes); + // Safety + // Verified that bytes layout matches that of Vec + Vec::from_raw_parts(ptr, v_len, v_capacity) + }) + .map_err(|bytes| Buffer { + data: bytes, + ptr, + length, + }) + } } /// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>` into a newly @@ -378,6 +435,7 @@ impl FromIterator for Buffer { #[cfg(test)] mod tests { + use crate::i256; use std::panic::{RefUnwindSafe, UnwindSafe}; use std::thread; @@ -632,4 +690,83 @@ mod tests { let buffer = Buffer::from(MutableBuffer::from_len_zeroed(12)); buffer.slice_with_length(2, usize::MAX); } + + #[test] + fn test_vec_interop() { + // Test empty vec + let a: Vec = Vec::new(); + let b = Buffer::from_vec(a); + b.into_vec::().unwrap(); + + // Test vec with capacity + let a: Vec = Vec::with_capacity(20); + let b = Buffer::from_vec(a); + let back = b.into_vec::().unwrap(); + assert_eq!(back.len(), 0); + assert_eq!(back.capacity(), 20); + + // Test vec with values + let mut a: Vec = Vec::with_capacity(3); + a.extend_from_slice(&[1, 2, 3]); + let b = Buffer::from_vec(a); + let back = b.into_vec::().unwrap(); + assert_eq!(back.len(), 3); + assert_eq!(back.capacity(), 3); + + // Test vec with values and spare capacity + let mut a: Vec = Vec::with_capacity(20); + a.extend_from_slice(&[1, 4, 7, 8, 9, 3, 6]); + let b = Buffer::from_vec(a); + let back = b.into_vec::().unwrap(); + assert_eq!(back.len(), 7); + assert_eq!(back.capacity(), 20); + + // Test incorrect alignment + let a: Vec = Vec::new(); + let b = Buffer::from_vec(a); + let b = b.into_vec::().unwrap_err(); + b.into_vec::().unwrap_err(); + + // Test convert between types with same alignment + // This is an implementation quirk, but isn't harmful + // as ArrowNativeType are trivially transmutable + let a: Vec = vec![1, 2, 3, 4]; + let b = Buffer::from_vec(a); + let back = b.into_vec::().unwrap(); + assert_eq!(back.len(), 4); + assert_eq!(back.capacity(), 4); + + // i256 has the same layout as i128 so this is valid + let mut b: Vec = Vec::with_capacity(4); + b.extend_from_slice(&[1, 2, 3, 4]); + let b = Buffer::from_vec(b); + let back = b.into_vec::().unwrap(); + assert_eq!(back.len(), 2); + assert_eq!(back.capacity(), 2); + + // Invalid layout + let b: Vec = vec![1, 2, 3]; + let b = Buffer::from_vec(b); + b.into_vec::().unwrap_err(); + + // Invalid layout + let mut b: Vec = Vec::with_capacity(5); + b.extend_from_slice(&[1, 2, 3, 4]); + let b = Buffer::from_vec(b); + b.into_vec::().unwrap_err(); + + // Truncates length + // This is an implementation quirk, but isn't harmful + let mut b: Vec = Vec::with_capacity(4); + b.extend_from_slice(&[1, 2, 3]); + let b = Buffer::from_vec(b); + let back = b.into_vec::().unwrap(); + assert_eq!(back.len(), 1); + assert_eq!(back.capacity(), 2); + + // Cannot use aligned allocation + let b = Buffer::from(MutableBuffer::new(10)); + let b = b.into_vec::().unwrap_err(); + b.into_vec::().unwrap_err(); + } } diff --git a/arrow-buffer/src/buffer/mutable.rs b/arrow-buffer/src/buffer/mutable.rs index 2e6e2f1d7b08..35218eb80376 100644 --- a/arrow-buffer/src/buffer/mutable.rs +++ b/arrow-buffer/src/buffer/mutable.rs @@ -16,23 +16,28 @@ // under the License. use super::Buffer; -use crate::alloc::Deallocation; +use crate::alloc::{Deallocation, ALIGNMENT}; use crate::{ alloc, bytes::Bytes, native::{ArrowNativeType, ToByteSlice}, util::bit_util, }; +use std::alloc::Layout; use std::mem; use std::ptr::NonNull; /// A [`MutableBuffer`] is Arrow's interface to build a [`Buffer`] out of items or slices of items. +/// /// [`Buffer`]s created from [`MutableBuffer`] (via `into`) are guaranteed to have its pointer aligned /// along cache lines and in multiple of 64 bytes. +/// /// Use [MutableBuffer::push] to insert an item, [MutableBuffer::extend_from_slice] /// to insert many items, and `into` to convert it to [`Buffer`]. /// -/// For a safe, strongly typed API consider using `arrow::array::BufferBuilder` +/// For a safe, strongly typed API consider using `Vec` +/// +/// Note: this may be deprecated in a future release (#1176)[https://github.com/apache/arrow-rs/issues/1176] /// /// # Example /// @@ -62,6 +67,7 @@ impl MutableBuffer { /// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`. #[inline] + #[allow(deprecated)] pub fn with_capacity(capacity: usize) -> Self { let capacity = bit_util::round_upto_multiple_of_64(capacity); let ptr = alloc::allocate_aligned(capacity); @@ -83,6 +89,7 @@ impl MutableBuffer { /// let data = buffer.as_slice_mut(); /// assert_eq!(data[126], 0u8); /// ``` + #[allow(deprecated)] pub fn from_len_zeroed(len: usize) -> Self { let new_capacity = bit_util::round_upto_multiple_of_64(len); let ptr = alloc::allocate_aligned_zeroed(new_capacity); @@ -95,12 +102,14 @@ impl MutableBuffer { /// Allocates a new [MutableBuffer] from given `Bytes`. pub(crate) fn from_bytes(bytes: Bytes) -> Result { - if !matches!(bytes.deallocation(), Deallocation::Arrow(_)) { - return Err(bytes); - } + let capacity = match bytes.deallocation() { + Deallocation::Standard(layout) if layout.align() == ALIGNMENT => { + layout.size() + } + _ => return Err(bytes), + }; let len = bytes.len(); - let capacity = bytes.capacity(); let ptr = bytes.ptr(); mem::forget(bytes); @@ -224,6 +233,7 @@ impl MutableBuffer { /// buffer.shrink_to_fit(); /// assert!(buffer.capacity() >= 64 && buffer.capacity() < 128); /// ``` + #[allow(deprecated)] pub fn shrink_to_fit(&mut self) { let new_capacity = bit_util::round_upto_multiple_of_64(self.len); if new_capacity < self.capacity { @@ -300,9 +310,9 @@ impl MutableBuffer { #[inline] pub(super) fn into_buffer(self) -> Buffer { - let bytes = unsafe { - Bytes::new(self.data, self.len, Deallocation::Arrow(self.capacity)) - }; + let layout = Layout::from_size_align(self.capacity, ALIGNMENT).unwrap(); + let bytes = + unsafe { Bytes::new(self.data, self.len, Deallocation::Standard(layout)) }; std::mem::forget(self); Buffer::from_bytes(bytes) } @@ -448,6 +458,7 @@ impl MutableBuffer { /// # Safety /// `ptr` must be allocated for `old_capacity`. #[cold] +#[allow(deprecated)] unsafe fn reallocate( ptr: NonNull, old_capacity: usize, @@ -630,6 +641,7 @@ impl std::ops::DerefMut for MutableBuffer { } impl Drop for MutableBuffer { + #[allow(deprecated)] fn drop(&mut self) { unsafe { alloc::free_aligned(self.data, self.capacity) }; } diff --git a/arrow-buffer/src/buffer/scalar.rs b/arrow-buffer/src/buffer/scalar.rs index e688e52fea5c..01a64633f532 100644 --- a/arrow-buffer/src/buffer/scalar.rs +++ b/arrow-buffer/src/buffer/scalar.rs @@ -90,6 +90,15 @@ impl From for ScalarBuffer { } } +impl From> for ScalarBuffer { + fn from(value: Vec) -> Self { + Self { + buffer: Buffer::from_vec(value), + phantom: Default::default(), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/arrow-buffer/src/bytes.rs b/arrow-buffer/src/bytes.rs index 3320dfc261c7..2820fda781e6 100644 --- a/arrow-buffer/src/bytes.rs +++ b/arrow-buffer/src/bytes.rs @@ -23,10 +23,10 @@ use core::slice; use std::ptr::NonNull; use std::{fmt::Debug, fmt::Formatter}; -use crate::alloc; use crate::alloc::Deallocation; /// A continuous, fixed-size, immutable memory region that knows how to de-allocate itself. +/// /// This structs' API is inspired by the `bytes::Bytes`, but it is not limited to using rust's /// global allocator nor u8 alignment. /// @@ -53,7 +53,7 @@ impl Bytes { /// /// * `ptr` - Pointer to raw parts /// * `len` - Length of raw parts in **bytes** - /// * `capacity` - Total allocated memory for the pointer `ptr`, in **bytes** + /// * `deallocation` - Type of allocation /// /// # Safety /// @@ -93,7 +93,7 @@ impl Bytes { pub fn capacity(&self) -> usize { match self.deallocation { - Deallocation::Arrow(capacity) => capacity, + Deallocation::Standard(layout) => layout.size(), // we cannot determine this in general, // and thus we state that this is externally-owned memory Deallocation::Custom(_) => 0, @@ -115,9 +115,10 @@ impl Drop for Bytes { #[inline] fn drop(&mut self) { match &self.deallocation { - Deallocation::Arrow(capacity) => { - unsafe { alloc::free_aligned(self.ptr, *capacity) }; - } + Deallocation::Standard(layout) => match layout.size() { + 0 => {} // Nothing to do + _ => unsafe { std::alloc::dealloc(self.ptr.as_ptr(), *layout) }, + }, // The automatic drop implementation will free the memory once the reference count reaches zero Deallocation::Custom(_allocation) => (), }