From db7291742cf76d33290edf5cd03935004a3b964e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Mon, 28 Mar 2022 09:00:47 +0200 Subject: [PATCH 1/6] Decouple buffer deallocation from ffi and allow zero-copy buffer creation from rust vectors or strings --- arrow/src/array/data.rs | 53 +++++++++++++++++++++++++++++++++-- arrow/src/buffer/immutable.rs | 30 ++++++++++++++++---- arrow/src/bytes.rs | 11 ++++++-- arrow/src/ffi.rs | 2 +- 4 files changed, 86 insertions(+), 10 deletions(-) diff --git a/arrow/src/array/data.rs b/arrow/src/array/data.rs index 551bf536cb33..f947a457c607 100644 --- a/arrow/src/array/data.rs +++ b/arrow/src/array/data.rs @@ -1459,10 +1459,11 @@ impl ArrayDataBuilder { #[cfg(test)] mod tests { use super::*; + use std::ptr::NonNull; use crate::array::{ - Array, BooleanBuilder, Int32Array, Int32Builder, Int64Array, StringArray, - StructBuilder, UInt64Array, + make_array, Array, BooleanBuilder, Int32Array, Int32Builder, Int64Array, + StringArray, StructBuilder, UInt64Array, }; use crate::buffer::Buffer; use crate::datatypes::Field; @@ -2594,4 +2595,52 @@ mod tests { assert_eq!(&struct_array_slice, &cloned); } + + #[test] + fn test_string_data_from_foreign() { + let mut strings = "foobarfoobar".to_owned(); + let mut offsets = vec![0_i32, 0, 3, 6, 12]; + let mut bitmap = vec![0b1110_u8]; + + let strings_buffer = unsafe { + Buffer::from_foreign( + NonNull::new_unchecked(strings.as_mut_ptr()), + strings.len(), + Arc::new(strings), + ) + }; + let offsets_buffer = unsafe { + Buffer::from_foreign( + NonNull::new_unchecked(offsets.as_mut_ptr() as *mut u8), + offsets.len() * std::mem::size_of::(), + Arc::new(offsets), + ) + }; + let null_buffer = unsafe { + Buffer::from_foreign( + NonNull::new_unchecked(bitmap.as_mut_ptr()), + bitmap.len(), + Arc::new(bitmap), + ) + }; + + let data = ArrayData::try_new( + DataType::Utf8, + 4, + None, + Some(null_buffer), + 0, + vec![offsets_buffer, strings_buffer], + vec![], + ) + .unwrap(); + + let array = make_array(data); + let array = array.as_any().downcast_ref::().unwrap(); + + let expected = + StringArray::from(vec![None, Some("foo"), Some("bar"), Some("foobar")]); + + assert_eq!(array, &expected); + } } diff --git a/arrow/src/buffer/immutable.rs b/arrow/src/buffer/immutable.rs index b918c0d4429f..a363e4002723 100644 --- a/arrow/src/buffer/immutable.rs +++ b/arrow/src/buffer/immutable.rs @@ -21,11 +21,11 @@ use std::ptr::NonNull; use std::sync::Arc; use std::{convert::AsRef, usize}; +use crate::bytes::Allocation; use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk}; use crate::{ bytes::{Bytes, Deallocation}, datatypes::ArrowNativeType, - ffi, }; use super::ops::bitwise_unary_op_helper; @@ -86,18 +86,18 @@ impl Buffer { /// /// * `ptr` - Pointer to raw parts /// * `len` - Length of raw parts in **bytes** - /// * `data` - An [ffi::FFI_ArrowArray] with the data + /// * `owner` - A [bytes::Owner] which is responsible for freeing that data /// /// # Safety /// /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` /// bytes and that the foreign deallocator frees the region. - pub unsafe fn from_unowned( + pub unsafe fn from_foreign( ptr: NonNull, len: usize, - data: Arc, + owner: Arc, ) -> Self { - Buffer::build_with_arguments(ptr, len, Deallocation::Foreign(data)) + Buffer::build_with_arguments(ptr, len, Deallocation::Foreign(owner)) } /// Auxiliary method to create a new Buffer @@ -533,4 +533,24 @@ mod tests { Buffer::from(&[0b01101101, 0b10101010]).count_set_bits_offset(7, 9) ); } + + #[test] + fn test_from_foreign_vec() { + let mut vector = vec![1_i32, 2, 3, 4, 5]; + let buffer = unsafe { + Buffer::from_foreign( + NonNull::new_unchecked(vector.as_mut_ptr() as *mut u8), + vector.len() * std::mem::size_of::(), + Arc::new(vector), + ) + }; + + let slice = unsafe { buffer.typed_data::() }; + assert_eq!(slice, &[1, 2, 3, 4, 5]); + + let buffer = buffer.slice(std::mem::size_of::()); + + let slice = unsafe { buffer.typed_data::() }; + assert_eq!(slice, &[2, 3, 4, 5]); + } } diff --git a/arrow/src/bytes.rs b/arrow/src/bytes.rs index bc92b90576f6..1b72f663f60c 100644 --- a/arrow/src/bytes.rs +++ b/arrow/src/bytes.rs @@ -20,18 +20,25 @@ //! Note that this is a low-level functionality of this crate. use core::slice; +use std::panic::RefUnwindSafe; use std::ptr::NonNull; use std::sync::Arc; use std::{fmt::Debug, fmt::Formatter}; -use crate::{alloc, ffi}; +use crate::alloc; + +/// The owner of an allocation, that is not natively allocated. +/// The trait implementation is responsible for dropping the allocations once no more references exist. +pub trait Allocation: RefUnwindSafe {} + +impl Allocation for T {} /// Mode of deallocating memory regions pub enum Deallocation { /// Native deallocation, using Rust deallocator with Arrow-specific memory alignment Native(usize), /// Foreign interface, via a callback - Foreign(Arc), + Foreign(Arc), } impl Debug for Deallocation { diff --git a/arrow/src/ffi.rs b/arrow/src/ffi.rs index 5fb1cce4eb57..180bbed58b5f 100644 --- a/arrow/src/ffi.rs +++ b/arrow/src/ffi.rs @@ -538,7 +538,7 @@ unsafe fn create_buffer( assert!(index < array.n_buffers as usize); let ptr = *buffers.add(index); - NonNull::new(ptr as *mut u8).map(|ptr| Buffer::from_unowned(ptr, len, owner)) + NonNull::new(ptr as *mut u8).map(|ptr| Buffer::from_foreign(ptr, len, owner)) } fn create_child( From 2cc581a217a4e16b36e0c403f1f39ad03aafe83b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Tue, 29 Mar 2022 10:06:44 +0200 Subject: [PATCH 2/6] Move allocation owner to alloc module --- arrow/src/alloc/mod.rs | 30 ++++++++++++++++++++++++++++++ arrow/src/buffer/immutable.rs | 9 +++------ arrow/src/buffer/mutable.rs | 3 ++- arrow/src/bytes.rs | 30 +----------------------------- 4 files changed, 36 insertions(+), 36 deletions(-) diff --git a/arrow/src/alloc/mod.rs b/arrow/src/alloc/mod.rs index 88ab8187727e..fc59f658862f 100644 --- a/arrow/src/alloc/mod.rs +++ b/arrow/src/alloc/mod.rs @@ -19,8 +19,11 @@ //! regions, cache and allocation alignments. use std::alloc::{handle_alloc_error, Layout}; +use std::fmt::{Debug, Formatter}; use std::mem::size_of; +use std::panic::RefUnwindSafe; use std::ptr::NonNull; +use std::sync::Arc; mod alignment; mod types; @@ -121,3 +124,30 @@ pub unsafe fn reallocate( handle_alloc_error(Layout::from_size_align_unchecked(new_size, ALIGNMENT)) }) } + +/// The owner of an allocation, that is not natively allocated. +/// The trait implementation is responsible for dropping the allocations once no more references exist. +pub trait Allocation: RefUnwindSafe {} + +impl Allocation for T {} + +/// Mode of deallocating memory regions +pub enum Deallocation { + /// Native deallocation, using Rust deallocator with Arrow-specific memory alignment + Native(usize), + /// Foreign interface, via a callback + Foreign(Arc), +} + +impl Debug for Deallocation { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + match self { + Deallocation::Native(capacity) => { + write!(f, "Deallocation::Native {{ capacity: {} }}", capacity) + } + Deallocation::Foreign(_) => { + write!(f, "Deallocation::Foreign {{ capacity: unknown }}") + } + } + } +} diff --git a/arrow/src/buffer/immutable.rs b/arrow/src/buffer/immutable.rs index a363e4002723..76fa90b8155b 100644 --- a/arrow/src/buffer/immutable.rs +++ b/arrow/src/buffer/immutable.rs @@ -21,12 +21,9 @@ use std::ptr::NonNull; use std::sync::Arc; use std::{convert::AsRef, usize}; -use crate::bytes::Allocation; +use crate::alloc::{Allocation, Deallocation}; use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk}; -use crate::{ - bytes::{Bytes, Deallocation}, - datatypes::ArrowNativeType, -}; +use crate::{bytes::Bytes, datatypes::ArrowNativeType}; use super::ops::bitwise_unary_op_helper; use super::MutableBuffer; @@ -86,7 +83,7 @@ impl Buffer { /// /// * `ptr` - Pointer to raw parts /// * `len` - Length of raw parts in **bytes** - /// * `owner` - A [bytes::Owner] which is responsible for freeing that data + /// * `owner` - A [crate::alloc::Allocation] which is responsible for freeing that data /// /// # Safety /// diff --git a/arrow/src/buffer/mutable.rs b/arrow/src/buffer/mutable.rs index b38d16aaacbc..b1ea32231e11 100644 --- a/arrow/src/buffer/mutable.rs +++ b/arrow/src/buffer/mutable.rs @@ -16,9 +16,10 @@ // under the License. use super::Buffer; +use crate::alloc::Deallocation; use crate::{ alloc, - bytes::{Bytes, Deallocation}, + bytes::Bytes, datatypes::{ArrowNativeType, ToByteSlice}, util::bit_util, }; diff --git a/arrow/src/bytes.rs b/arrow/src/bytes.rs index 1b72f663f60c..8f04b017692e 100644 --- a/arrow/src/bytes.rs +++ b/arrow/src/bytes.rs @@ -20,39 +20,11 @@ //! Note that this is a low-level functionality of this crate. use core::slice; -use std::panic::RefUnwindSafe; use std::ptr::NonNull; -use std::sync::Arc; use std::{fmt::Debug, fmt::Formatter}; use crate::alloc; - -/// The owner of an allocation, that is not natively allocated. -/// The trait implementation is responsible for dropping the allocations once no more references exist. -pub trait Allocation: RefUnwindSafe {} - -impl Allocation for T {} - -/// Mode of deallocating memory regions -pub enum Deallocation { - /// Native deallocation, using Rust deallocator with Arrow-specific memory alignment - Native(usize), - /// Foreign interface, via a callback - Foreign(Arc), -} - -impl Debug for Deallocation { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - match self { - Deallocation::Native(capacity) => { - write!(f, "Deallocation::Native {{ capacity: {} }}", capacity) - } - Deallocation::Foreign(_) => { - write!(f, "Deallocation::Foreign {{ capacity: unknown }}") - } - } - } -} +use crate::alloc::Deallocation; /// A continuous, fixed-size, immutable memory region that knows how to de-allocate itself. /// This structs' API is inspired by the `bytes::Bytes`, but it is not limited to using rust's From 55ddb828449fc7017789ecec2daeab2c1ce35fde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Wed, 30 Mar 2022 12:10:06 +0200 Subject: [PATCH 3/6] Rename and comment Deallocation variants --- arrow/src/alloc/mod.rs | 22 ++++++++++++---------- arrow/src/array/data.rs | 6 +++--- arrow/src/buffer/immutable.rs | 34 +++++++++++++++++++++++++++++----- arrow/src/buffer/mutable.rs | 2 +- arrow/src/bytes.rs | 12 ++++++------ arrow/src/ffi.rs | 3 ++- 6 files changed, 53 insertions(+), 26 deletions(-) diff --git a/arrow/src/alloc/mod.rs b/arrow/src/alloc/mod.rs index fc59f658862f..418bc95fd2e8 100644 --- a/arrow/src/alloc/mod.rs +++ b/arrow/src/alloc/mod.rs @@ -125,28 +125,30 @@ pub unsafe fn reallocate( }) } -/// The owner of an allocation, that is not natively allocated. +/// The owner of an allocation. /// The trait implementation is responsible for dropping the allocations once no more references exist. pub trait Allocation: RefUnwindSafe {} impl Allocation for T {} /// Mode of deallocating memory regions -pub enum Deallocation { - /// Native deallocation, using Rust deallocator with Arrow-specific memory alignment - Native(usize), - /// Foreign interface, via a callback - Foreign(Arc), +pub(crate) enum Deallocation { + /// An allocation of the given capacity that needs to be deallocated using arrows's cache aligned allocator. + /// See [allocate_aligned] and [free_aligned]. + Arrow(usize), + /// An allocation from an external source like the FFI interface or a Rust Vec. + /// Deallocation will happen + Custom(Arc), } impl Debug for Deallocation { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { match self { - Deallocation::Native(capacity) => { - write!(f, "Deallocation::Native {{ capacity: {} }}", capacity) + Deallocation::Arrow(capacity) => { + write!(f, "Deallocation::Arrow {{ capacity: {} }}", capacity) } - Deallocation::Foreign(_) => { - write!(f, "Deallocation::Foreign {{ capacity: unknown }}") + Deallocation::Custom(_) => { + write!(f, "Deallocation::Custom {{ capacity: unknown }}") } } } diff --git a/arrow/src/array/data.rs b/arrow/src/array/data.rs index f947a457c607..8622c0a2ce60 100644 --- a/arrow/src/array/data.rs +++ b/arrow/src/array/data.rs @@ -2603,21 +2603,21 @@ mod tests { let mut bitmap = vec![0b1110_u8]; let strings_buffer = unsafe { - Buffer::from_foreign( + Buffer::from_custom_allocation( NonNull::new_unchecked(strings.as_mut_ptr()), strings.len(), Arc::new(strings), ) }; let offsets_buffer = unsafe { - Buffer::from_foreign( + Buffer::from_custom_allocation( NonNull::new_unchecked(offsets.as_mut_ptr() as *mut u8), offsets.len() * std::mem::size_of::(), Arc::new(offsets), ) }; let null_buffer = unsafe { - Buffer::from_foreign( + Buffer::from_custom_allocation( NonNull::new_unchecked(bitmap.as_mut_ptr()), bitmap.len(), Arc::new(bitmap), diff --git a/arrow/src/buffer/immutable.rs b/arrow/src/buffer/immutable.rs index 76fa90b8155b..1597f70e99f6 100644 --- a/arrow/src/buffer/immutable.rs +++ b/arrow/src/buffer/immutable.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use std::{convert::AsRef, usize}; use crate::alloc::{Allocation, Deallocation}; +use crate::ffi::FFI_ArrowArray; use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk}; use crate::{bytes::Bytes, datatypes::ArrowNativeType}; @@ -73,7 +74,7 @@ impl Buffer { /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. pub unsafe fn from_raw_parts(ptr: NonNull, len: usize, capacity: usize) -> Self { assert!(len <= capacity); - Buffer::build_with_arguments(ptr, len, Deallocation::Native(capacity)) + Buffer::build_with_arguments(ptr, len, Deallocation::Arrow(capacity)) } /// Creates a buffer from an existing memory region (must already be byte-aligned), this @@ -83,18 +84,41 @@ impl Buffer { /// /// * `ptr` - Pointer to raw parts /// * `len` - Length of raw parts in **bytes** - /// * `owner` - A [crate::alloc::Allocation] which is responsible for freeing that data + /// * `data` - An [ffi::FFI_ArrowArray] with the data /// /// # Safety /// /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` /// bytes and that the foreign deallocator frees the region. - pub unsafe fn from_foreign( + #[deprecated( + note = "use from_custom_allocation instead which makes it clearer that the allocation is in fact owned" + )] + pub unsafe fn from_unowned( + ptr: NonNull, + len: usize, + data: Arc, + ) -> Self { + Self::from_custom_allocation(ptr, len, data) + } + + /// Creates a buffer from an existing memory region. Ownership of the memory is tracked via reference counting + /// and the memory will be freed using the `drop` method of [crate::alloc::Allocation] when the reference count reaches zero. + /// + /// # Arguments + /// + /// * `ptr` - Pointer to raw parts + /// * `len` - Length of raw parts in **bytes** + /// * `owner` - A [crate::alloc::Allocation] which is responsible for freeing that data + /// + /// # Safety + /// + /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` bytes + pub unsafe fn from_custom_allocation( ptr: NonNull, len: usize, owner: Arc, ) -> Self { - Buffer::build_with_arguments(ptr, len, Deallocation::Foreign(owner)) + Buffer::build_with_arguments(ptr, len, Deallocation::Custom(owner)) } /// Auxiliary method to create a new Buffer @@ -535,7 +559,7 @@ mod tests { fn test_from_foreign_vec() { let mut vector = vec![1_i32, 2, 3, 4, 5]; let buffer = unsafe { - Buffer::from_foreign( + Buffer::from_custom_allocation( NonNull::new_unchecked(vector.as_mut_ptr() as *mut u8), vector.len() * std::mem::size_of::(), Arc::new(vector), diff --git a/arrow/src/buffer/mutable.rs b/arrow/src/buffer/mutable.rs index b1ea32231e11..4a5166d0c2e8 100644 --- a/arrow/src/buffer/mutable.rs +++ b/arrow/src/buffer/mutable.rs @@ -267,7 +267,7 @@ impl MutableBuffer { #[inline] pub(super) fn into_buffer(self) -> Buffer { let bytes = unsafe { - Bytes::new(self.data, self.len, Deallocation::Native(self.capacity)) + Bytes::new(self.data, self.len, Deallocation::Arrow(self.capacity)) }; std::mem::forget(self); Buffer::from_bytes(bytes) diff --git a/arrow/src/bytes.rs b/arrow/src/bytes.rs index 8f04b017692e..df8c79b1f624 100644 --- a/arrow/src/bytes.rs +++ b/arrow/src/bytes.rs @@ -59,7 +59,7 @@ impl Bytes { /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. #[inline] - pub unsafe fn new( + pub(crate) unsafe fn new( ptr: std::ptr::NonNull, len: usize, deallocation: Deallocation, @@ -92,10 +92,10 @@ impl Bytes { pub fn capacity(&self) -> usize { match self.deallocation { - Deallocation::Native(capacity) => capacity, + Deallocation::Arrow(capacity) => capacity, // we cannot determine this in general, // and thus we state that this is externally-owned memory - Deallocation::Foreign(_) => 0, + Deallocation::Custom(_) => 0, } } } @@ -104,11 +104,11 @@ impl Drop for Bytes { #[inline] fn drop(&mut self) { match &self.deallocation { - Deallocation::Native(capacity) => { + Deallocation::Arrow(capacity) => { unsafe { alloc::free_aligned::(self.ptr, *capacity) }; } - // foreign interface knows how to deallocate itself. - Deallocation::Foreign(_) => (), + // The automatic drop implementation will free the memory once the reference count reaches zero + Deallocation::Custom(_allocation) => (), } } } diff --git a/arrow/src/ffi.rs b/arrow/src/ffi.rs index 180bbed58b5f..f808bae1e63b 100644 --- a/arrow/src/ffi.rs +++ b/arrow/src/ffi.rs @@ -538,7 +538,8 @@ unsafe fn create_buffer( assert!(index < array.n_buffers as usize); let ptr = *buffers.add(index); - NonNull::new(ptr as *mut u8).map(|ptr| Buffer::from_foreign(ptr, len, owner)) + NonNull::new(ptr as *mut u8) + .map(|ptr| Buffer::from_custom_allocation(ptr, len, owner)) } fn create_child( From a9195665fa926ec5dd1cb50a9a251400056e7f4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Wed, 30 Mar 2022 16:38:31 +0200 Subject: [PATCH 4/6] Fix doc link --- arrow/src/buffer/immutable.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow/src/buffer/immutable.rs b/arrow/src/buffer/immutable.rs index 1597f70e99f6..c79cf8a1f5ed 100644 --- a/arrow/src/buffer/immutable.rs +++ b/arrow/src/buffer/immutable.rs @@ -84,7 +84,7 @@ impl Buffer { /// /// * `ptr` - Pointer to raw parts /// * `len` - Length of raw parts in **bytes** - /// * `data` - An [ffi::FFI_ArrowArray] with the data + /// * `data` - An [crate::ffi::FFI_ArrowArray] with the data /// /// # Safety /// From 246c3e54229d78d976664d78c188bfa466ee7aa1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Horstmann?= Date: Wed, 30 Mar 2022 16:48:20 +0200 Subject: [PATCH 5/6] Explicitly assert that Buffer is UnwindSafe --- arrow/src/buffer/immutable.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/arrow/src/buffer/immutable.rs b/arrow/src/buffer/immutable.rs index c79cf8a1f5ed..c34ea101bb3b 100644 --- a/arrow/src/buffer/immutable.rs +++ b/arrow/src/buffer/immutable.rs @@ -342,6 +342,7 @@ impl FromIterator for Buffer { #[cfg(test)] mod tests { + use std::panic::{RefUnwindSafe, UnwindSafe}; use std::thread; use super::*; @@ -555,6 +556,12 @@ mod tests { ); } + #[test] + fn test_unwind_safe() { + fn assert_unwind_safe() {} + assert_unwind_safe::() + } + #[test] fn test_from_foreign_vec() { let mut vector = vec![1_i32, 2, 3, 4, 5]; From e6c6c07dee89d4d567b73966aec85bb6bbaeb115 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 7 Apr 2022 15:53:09 -0400 Subject: [PATCH 6/6] fix: doc comment --- arrow/src/bytes.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/arrow/src/bytes.rs b/arrow/src/bytes.rs index df8c79b1f624..7b57552e60f6 100644 --- a/arrow/src/bytes.rs +++ b/arrow/src/bytes.rs @@ -32,8 +32,9 @@ use crate::alloc::Deallocation; /// /// In the most common case, this buffer is allocated using [`allocate_aligned`](crate::alloc::allocate_aligned) /// and deallocated accordingly [`free_aligned`](crate::alloc::free_aligned). -/// When the region is allocated by an foreign allocator, [Deallocation::Foreign], this calls the -/// foreign deallocator to deallocate the region when it is no longer needed. +/// +/// When the region is allocated by a different allocator, [Deallocation::Custom], this calls the +/// custom deallocator to deallocate the region when it is no longer needed. pub struct Bytes { /// The raw pointer to be beginning of the region ptr: NonNull,