diff --git a/arrow/src/alloc/mod.rs b/arrow/src/alloc/mod.rs index 88ab8187727e..418bc95fd2e8 100644 --- a/arrow/src/alloc/mod.rs +++ b/arrow/src/alloc/mod.rs @@ -19,8 +19,11 @@ //! regions, cache and allocation alignments. use std::alloc::{handle_alloc_error, Layout}; +use std::fmt::{Debug, Formatter}; use std::mem::size_of; +use std::panic::RefUnwindSafe; use std::ptr::NonNull; +use std::sync::Arc; mod alignment; mod types; @@ -121,3 +124,32 @@ pub unsafe fn reallocate( handle_alloc_error(Layout::from_size_align_unchecked(new_size, ALIGNMENT)) }) } + +/// The owner of an allocation. +/// The trait implementation is responsible for dropping the allocations once no more references exist. +pub trait Allocation: RefUnwindSafe {} + +impl Allocation for T {} + +/// Mode of deallocating memory regions +pub(crate) enum Deallocation { + /// An allocation of the given capacity that needs to be deallocated using arrows's cache aligned allocator. + /// See [allocate_aligned] and [free_aligned]. + Arrow(usize), + /// An allocation from an external source like the FFI interface or a Rust Vec. + /// Deallocation will happen + Custom(Arc), +} + +impl Debug for Deallocation { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + match self { + Deallocation::Arrow(capacity) => { + write!(f, "Deallocation::Arrow {{ capacity: {} }}", capacity) + } + Deallocation::Custom(_) => { + write!(f, "Deallocation::Custom {{ capacity: unknown }}") + } + } + } +} diff --git a/arrow/src/array/data.rs b/arrow/src/array/data.rs index 551bf536cb33..8622c0a2ce60 100644 --- a/arrow/src/array/data.rs +++ b/arrow/src/array/data.rs @@ -1459,10 +1459,11 @@ impl ArrayDataBuilder { #[cfg(test)] mod tests { use super::*; + use std::ptr::NonNull; use crate::array::{ - Array, BooleanBuilder, Int32Array, Int32Builder, Int64Array, StringArray, - StructBuilder, UInt64Array, + make_array, Array, BooleanBuilder, Int32Array, Int32Builder, Int64Array, + StringArray, StructBuilder, UInt64Array, }; use crate::buffer::Buffer; use crate::datatypes::Field; @@ -2594,4 +2595,52 @@ mod tests { assert_eq!(&struct_array_slice, &cloned); } + + #[test] + fn test_string_data_from_foreign() { + let mut strings = "foobarfoobar".to_owned(); + let mut offsets = vec![0_i32, 0, 3, 6, 12]; + let mut bitmap = vec![0b1110_u8]; + + let strings_buffer = unsafe { + Buffer::from_custom_allocation( + NonNull::new_unchecked(strings.as_mut_ptr()), + strings.len(), + Arc::new(strings), + ) + }; + let offsets_buffer = unsafe { + Buffer::from_custom_allocation( + NonNull::new_unchecked(offsets.as_mut_ptr() as *mut u8), + offsets.len() * std::mem::size_of::(), + Arc::new(offsets), + ) + }; + let null_buffer = unsafe { + Buffer::from_custom_allocation( + NonNull::new_unchecked(bitmap.as_mut_ptr()), + bitmap.len(), + Arc::new(bitmap), + ) + }; + + let data = ArrayData::try_new( + DataType::Utf8, + 4, + None, + Some(null_buffer), + 0, + vec![offsets_buffer, strings_buffer], + vec![], + ) + .unwrap(); + + let array = make_array(data); + let array = array.as_any().downcast_ref::().unwrap(); + + let expected = + StringArray::from(vec![None, Some("foo"), Some("bar"), Some("foobar")]); + + assert_eq!(array, &expected); + } } diff --git a/arrow/src/buffer/immutable.rs b/arrow/src/buffer/immutable.rs index b918c0d4429f..c34ea101bb3b 100644 --- a/arrow/src/buffer/immutable.rs +++ b/arrow/src/buffer/immutable.rs @@ -21,12 +21,10 @@ use std::ptr::NonNull; use std::sync::Arc; use std::{convert::AsRef, usize}; +use crate::alloc::{Allocation, Deallocation}; +use crate::ffi::FFI_ArrowArray; use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk}; -use crate::{ - bytes::{Bytes, Deallocation}, - datatypes::ArrowNativeType, - ffi, -}; +use crate::{bytes::Bytes, datatypes::ArrowNativeType}; use super::ops::bitwise_unary_op_helper; use super::MutableBuffer; @@ -76,7 +74,7 @@ impl Buffer { /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. pub unsafe fn from_raw_parts(ptr: NonNull, len: usize, capacity: usize) -> Self { assert!(len <= capacity); - Buffer::build_with_arguments(ptr, len, Deallocation::Native(capacity)) + Buffer::build_with_arguments(ptr, len, Deallocation::Arrow(capacity)) } /// Creates a buffer from an existing memory region (must already be byte-aligned), this @@ -86,18 +84,41 @@ impl Buffer { /// /// * `ptr` - Pointer to raw parts /// * `len` - Length of raw parts in **bytes** - /// * `data` - An [ffi::FFI_ArrowArray] with the data + /// * `data` - An [crate::ffi::FFI_ArrowArray] with the data /// /// # Safety /// /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` /// bytes and that the foreign deallocator frees the region. + #[deprecated( + note = "use from_custom_allocation instead which makes it clearer that the allocation is in fact owned" + )] pub unsafe fn from_unowned( ptr: NonNull, len: usize, - data: Arc, + data: Arc, + ) -> Self { + Self::from_custom_allocation(ptr, len, data) + } + + /// Creates a buffer from an existing memory region. Ownership of the memory is tracked via reference counting + /// and the memory will be freed using the `drop` method of [crate::alloc::Allocation] when the reference count reaches zero. + /// + /// # Arguments + /// + /// * `ptr` - Pointer to raw parts + /// * `len` - Length of raw parts in **bytes** + /// * `owner` - A [crate::alloc::Allocation] which is responsible for freeing that data + /// + /// # Safety + /// + /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` bytes + pub unsafe fn from_custom_allocation( + ptr: NonNull, + len: usize, + owner: Arc, ) -> Self { - Buffer::build_with_arguments(ptr, len, Deallocation::Foreign(data)) + Buffer::build_with_arguments(ptr, len, Deallocation::Custom(owner)) } /// Auxiliary method to create a new Buffer @@ -321,6 +342,7 @@ impl FromIterator for Buffer { #[cfg(test)] mod tests { + use std::panic::{RefUnwindSafe, UnwindSafe}; use std::thread; use super::*; @@ -533,4 +555,30 @@ mod tests { Buffer::from(&[0b01101101, 0b10101010]).count_set_bits_offset(7, 9) ); } + + #[test] + fn test_unwind_safe() { + fn assert_unwind_safe() {} + assert_unwind_safe::() + } + + #[test] + fn test_from_foreign_vec() { + let mut vector = vec![1_i32, 2, 3, 4, 5]; + let buffer = unsafe { + Buffer::from_custom_allocation( + NonNull::new_unchecked(vector.as_mut_ptr() as *mut u8), + vector.len() * std::mem::size_of::(), + Arc::new(vector), + ) + }; + + let slice = unsafe { buffer.typed_data::() }; + assert_eq!(slice, &[1, 2, 3, 4, 5]); + + let buffer = buffer.slice(std::mem::size_of::()); + + let slice = unsafe { buffer.typed_data::() }; + assert_eq!(slice, &[2, 3, 4, 5]); + } } diff --git a/arrow/src/buffer/mutable.rs b/arrow/src/buffer/mutable.rs index bbe7bb40567b..709973b4401b 100644 --- a/arrow/src/buffer/mutable.rs +++ b/arrow/src/buffer/mutable.rs @@ -16,9 +16,10 @@ // under the License. use super::Buffer; +use crate::alloc::Deallocation; use crate::{ alloc, - bytes::{Bytes, Deallocation}, + bytes::Bytes, datatypes::{ArrowNativeType, ToByteSlice}, util::bit_util, }; @@ -266,7 +267,7 @@ impl MutableBuffer { #[inline] pub(super) fn into_buffer(self) -> Buffer { let bytes = unsafe { - Bytes::new(self.data, self.len, Deallocation::Native(self.capacity)) + Bytes::new(self.data, self.len, Deallocation::Arrow(self.capacity)) }; std::mem::forget(self); Buffer::from_bytes(bytes) diff --git a/arrow/src/bytes.rs b/arrow/src/bytes.rs index bc92b90576f6..7b57552e60f6 100644 --- a/arrow/src/bytes.rs +++ b/arrow/src/bytes.rs @@ -21,31 +21,10 @@ use core::slice; use std::ptr::NonNull; -use std::sync::Arc; use std::{fmt::Debug, fmt::Formatter}; -use crate::{alloc, ffi}; - -/// Mode of deallocating memory regions -pub enum Deallocation { - /// Native deallocation, using Rust deallocator with Arrow-specific memory alignment - Native(usize), - /// Foreign interface, via a callback - Foreign(Arc), -} - -impl Debug for Deallocation { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - match self { - Deallocation::Native(capacity) => { - write!(f, "Deallocation::Native {{ capacity: {} }}", capacity) - } - Deallocation::Foreign(_) => { - write!(f, "Deallocation::Foreign {{ capacity: unknown }}") - } - } - } -} +use crate::alloc; +use crate::alloc::Deallocation; /// A continuous, fixed-size, immutable memory region that knows how to de-allocate itself. /// This structs' API is inspired by the `bytes::Bytes`, but it is not limited to using rust's @@ -53,8 +32,9 @@ impl Debug for Deallocation { /// /// In the most common case, this buffer is allocated using [`allocate_aligned`](crate::alloc::allocate_aligned) /// and deallocated accordingly [`free_aligned`](crate::alloc::free_aligned). -/// When the region is allocated by an foreign allocator, [Deallocation::Foreign], this calls the -/// foreign deallocator to deallocate the region when it is no longer needed. +/// +/// When the region is allocated by a different allocator, [Deallocation::Custom], this calls the +/// custom deallocator to deallocate the region when it is no longer needed. pub struct Bytes { /// The raw pointer to be beginning of the region ptr: NonNull, @@ -80,7 +60,7 @@ impl Bytes { /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. #[inline] - pub unsafe fn new( + pub(crate) unsafe fn new( ptr: std::ptr::NonNull, len: usize, deallocation: Deallocation, @@ -113,10 +93,10 @@ impl Bytes { pub fn capacity(&self) -> usize { match self.deallocation { - Deallocation::Native(capacity) => capacity, + Deallocation::Arrow(capacity) => capacity, // we cannot determine this in general, // and thus we state that this is externally-owned memory - Deallocation::Foreign(_) => 0, + Deallocation::Custom(_) => 0, } } } @@ -125,11 +105,11 @@ impl Drop for Bytes { #[inline] fn drop(&mut self) { match &self.deallocation { - Deallocation::Native(capacity) => { + Deallocation::Arrow(capacity) => { unsafe { alloc::free_aligned::(self.ptr, *capacity) }; } - // foreign interface knows how to deallocate itself. - Deallocation::Foreign(_) => (), + // The automatic drop implementation will free the memory once the reference count reaches zero + Deallocation::Custom(_allocation) => (), } } } diff --git a/arrow/src/ffi.rs b/arrow/src/ffi.rs index 72f69d846dfe..a6ccfa02a5dd 100644 --- a/arrow/src/ffi.rs +++ b/arrow/src/ffi.rs @@ -538,7 +538,8 @@ unsafe fn create_buffer( assert!(index < array.n_buffers as usize); let ptr = *buffers.add(index); - NonNull::new(ptr as *mut u8).map(|ptr| Buffer::from_unowned(ptr, len, owner)) + NonNull::new(ptr as *mut u8) + .map(|ptr| Buffer::from_custom_allocation(ptr, len, owner)) } fn create_child(