Skip to content

Commit

Permalink
Decouple buffer deallocation from ffi and allow creating buffers from…
Browse files Browse the repository at this point in the history
… rust vec (#1494)

* Decouple buffer deallocation from ffi and allow zero-copy buffer creation from rust vectors or strings

* Move allocation owner to alloc module

* Rename and comment Deallocation variants

* Fix doc link

* Explicitly assert that Buffer is UnwindSafe

* fix: doc comment

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
jhorstmann and alamb authored Apr 7, 2022
1 parent 497521f commit 688dd4c
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 45 deletions.
32 changes: 32 additions & 0 deletions arrow/src/alloc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
//! regions, cache and allocation alignments.
use std::alloc::{handle_alloc_error, Layout};
use std::fmt::{Debug, Formatter};
use std::mem::size_of;
use std::panic::RefUnwindSafe;
use std::ptr::NonNull;
use std::sync::Arc;

mod alignment;
mod types;
Expand Down Expand Up @@ -121,3 +124,32 @@ pub unsafe fn reallocate<T: NativeType>(
handle_alloc_error(Layout::from_size_align_unchecked(new_size, ALIGNMENT))
})
}

/// The owner of an allocation.
/// The trait implementation is responsible for dropping the allocations once no more references exist.
pub trait Allocation: RefUnwindSafe {}

impl<T: RefUnwindSafe> Allocation for T {}

/// Mode of deallocating memory regions
pub(crate) enum Deallocation {
/// An allocation of the given capacity that needs to be deallocated using arrows's cache aligned allocator.
/// See [allocate_aligned] and [free_aligned].
Arrow(usize),
/// An allocation from an external source like the FFI interface or a Rust Vec.
/// Deallocation will happen
Custom(Arc<dyn Allocation>),
}

impl Debug for Deallocation {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
Deallocation::Arrow(capacity) => {
write!(f, "Deallocation::Arrow {{ capacity: {} }}", capacity)
}
Deallocation::Custom(_) => {
write!(f, "Deallocation::Custom {{ capacity: unknown }}")
}
}
}
}
53 changes: 51 additions & 2 deletions arrow/src/array/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1459,10 +1459,11 @@ impl ArrayDataBuilder {
#[cfg(test)]
mod tests {
use super::*;
use std::ptr::NonNull;

use crate::array::{
Array, BooleanBuilder, Int32Array, Int32Builder, Int64Array, StringArray,
StructBuilder, UInt64Array,
make_array, Array, BooleanBuilder, Int32Array, Int32Builder, Int64Array,
StringArray, StructBuilder, UInt64Array,
};
use crate::buffer::Buffer;
use crate::datatypes::Field;
Expand Down Expand Up @@ -2594,4 +2595,52 @@ mod tests {

assert_eq!(&struct_array_slice, &cloned);
}

#[test]
fn test_string_data_from_foreign() {
let mut strings = "foobarfoobar".to_owned();
let mut offsets = vec![0_i32, 0, 3, 6, 12];
let mut bitmap = vec![0b1110_u8];

let strings_buffer = unsafe {
Buffer::from_custom_allocation(
NonNull::new_unchecked(strings.as_mut_ptr()),
strings.len(),
Arc::new(strings),
)
};
let offsets_buffer = unsafe {
Buffer::from_custom_allocation(
NonNull::new_unchecked(offsets.as_mut_ptr() as *mut u8),
offsets.len() * std::mem::size_of::<i32>(),
Arc::new(offsets),
)
};
let null_buffer = unsafe {
Buffer::from_custom_allocation(
NonNull::new_unchecked(bitmap.as_mut_ptr()),
bitmap.len(),
Arc::new(bitmap),
)
};

let data = ArrayData::try_new(
DataType::Utf8,
4,
None,
Some(null_buffer),
0,
vec![offsets_buffer, strings_buffer],
vec![],
)
.unwrap();

let array = make_array(data);
let array = array.as_any().downcast_ref::<StringArray>().unwrap();

let expected =
StringArray::from(vec![None, Some("foo"), Some("bar"), Some("foobar")]);

assert_eq!(array, &expected);
}
}
66 changes: 57 additions & 9 deletions arrow/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ use std::ptr::NonNull;
use std::sync::Arc;
use std::{convert::AsRef, usize};

use crate::alloc::{Allocation, Deallocation};
use crate::ffi::FFI_ArrowArray;
use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk};
use crate::{
bytes::{Bytes, Deallocation},
datatypes::ArrowNativeType,
ffi,
};
use crate::{bytes::Bytes, datatypes::ArrowNativeType};

use super::ops::bitwise_unary_op_helper;
use super::MutableBuffer;
Expand Down Expand Up @@ -76,7 +74,7 @@ impl Buffer {
/// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed.
pub unsafe fn from_raw_parts(ptr: NonNull<u8>, len: usize, capacity: usize) -> Self {
assert!(len <= capacity);
Buffer::build_with_arguments(ptr, len, Deallocation::Native(capacity))
Buffer::build_with_arguments(ptr, len, Deallocation::Arrow(capacity))
}

/// Creates a buffer from an existing memory region (must already be byte-aligned), this
Expand All @@ -86,18 +84,41 @@ impl Buffer {
///
/// * `ptr` - Pointer to raw parts
/// * `len` - Length of raw parts in **bytes**
/// * `data` - An [ffi::FFI_ArrowArray] with the data
/// * `data` - An [crate::ffi::FFI_ArrowArray] with the data
///
/// # Safety
///
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
/// bytes and that the foreign deallocator frees the region.
#[deprecated(
note = "use from_custom_allocation instead which makes it clearer that the allocation is in fact owned"
)]
pub unsafe fn from_unowned(
ptr: NonNull<u8>,
len: usize,
data: Arc<ffi::FFI_ArrowArray>,
data: Arc<FFI_ArrowArray>,
) -> Self {
Self::from_custom_allocation(ptr, len, data)
}

/// Creates a buffer from an existing memory region. Ownership of the memory is tracked via reference counting
/// and the memory will be freed using the `drop` method of [crate::alloc::Allocation] when the reference count reaches zero.
///
/// # Arguments
///
/// * `ptr` - Pointer to raw parts
/// * `len` - Length of raw parts in **bytes**
/// * `owner` - A [crate::alloc::Allocation] which is responsible for freeing that data
///
/// # Safety
///
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len` bytes
pub unsafe fn from_custom_allocation(
ptr: NonNull<u8>,
len: usize,
owner: Arc<dyn Allocation>,
) -> Self {
Buffer::build_with_arguments(ptr, len, Deallocation::Foreign(data))
Buffer::build_with_arguments(ptr, len, Deallocation::Custom(owner))
}

/// Auxiliary method to create a new Buffer
Expand Down Expand Up @@ -321,6 +342,7 @@ impl<T: ArrowNativeType> FromIterator<T> for Buffer {

#[cfg(test)]
mod tests {
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::thread;

use super::*;
Expand Down Expand Up @@ -533,4 +555,30 @@ mod tests {
Buffer::from(&[0b01101101, 0b10101010]).count_set_bits_offset(7, 9)
);
}

#[test]
fn test_unwind_safe() {
fn assert_unwind_safe<T: RefUnwindSafe + UnwindSafe>() {}
assert_unwind_safe::<Buffer>()
}

#[test]
fn test_from_foreign_vec() {
let mut vector = vec![1_i32, 2, 3, 4, 5];
let buffer = unsafe {
Buffer::from_custom_allocation(
NonNull::new_unchecked(vector.as_mut_ptr() as *mut u8),
vector.len() * std::mem::size_of::<i32>(),
Arc::new(vector),
)
};

let slice = unsafe { buffer.typed_data::<i32>() };
assert_eq!(slice, &[1, 2, 3, 4, 5]);

let buffer = buffer.slice(std::mem::size_of::<i32>());

let slice = unsafe { buffer.typed_data::<i32>() };
assert_eq!(slice, &[2, 3, 4, 5]);
}
}
5 changes: 3 additions & 2 deletions arrow/src/buffer/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
// under the License.

use super::Buffer;
use crate::alloc::Deallocation;
use crate::{
alloc,
bytes::{Bytes, Deallocation},
bytes::Bytes,
datatypes::{ArrowNativeType, ToByteSlice},
util::bit_util,
};
Expand Down Expand Up @@ -266,7 +267,7 @@ impl MutableBuffer {
#[inline]
pub(super) fn into_buffer(self) -> Buffer {
let bytes = unsafe {
Bytes::new(self.data, self.len, Deallocation::Native(self.capacity))
Bytes::new(self.data, self.len, Deallocation::Arrow(self.capacity))
};
std::mem::forget(self);
Buffer::from_bytes(bytes)
Expand Down
42 changes: 11 additions & 31 deletions arrow/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,40 +21,20 @@
use core::slice;
use std::ptr::NonNull;
use std::sync::Arc;
use std::{fmt::Debug, fmt::Formatter};

use crate::{alloc, ffi};

/// Mode of deallocating memory regions
pub enum Deallocation {
/// Native deallocation, using Rust deallocator with Arrow-specific memory alignment
Native(usize),
/// Foreign interface, via a callback
Foreign(Arc<ffi::FFI_ArrowArray>),
}

impl Debug for Deallocation {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
Deallocation::Native(capacity) => {
write!(f, "Deallocation::Native {{ capacity: {} }}", capacity)
}
Deallocation::Foreign(_) => {
write!(f, "Deallocation::Foreign {{ capacity: unknown }}")
}
}
}
}
use crate::alloc;
use crate::alloc::Deallocation;

/// A continuous, fixed-size, immutable memory region that knows how to de-allocate itself.
/// This structs' API is inspired by the `bytes::Bytes`, but it is not limited to using rust's
/// global allocator nor u8 alignment.
///
/// In the most common case, this buffer is allocated using [`allocate_aligned`](crate::alloc::allocate_aligned)
/// and deallocated accordingly [`free_aligned`](crate::alloc::free_aligned).
/// When the region is allocated by an foreign allocator, [Deallocation::Foreign], this calls the
/// foreign deallocator to deallocate the region when it is no longer needed.
///
/// When the region is allocated by a different allocator, [Deallocation::Custom], this calls the
/// custom deallocator to deallocate the region when it is no longer needed.
pub struct Bytes {
/// The raw pointer to be beginning of the region
ptr: NonNull<u8>,
Expand All @@ -80,7 +60,7 @@ impl Bytes {
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
/// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed.
#[inline]
pub unsafe fn new(
pub(crate) unsafe fn new(
ptr: std::ptr::NonNull<u8>,
len: usize,
deallocation: Deallocation,
Expand Down Expand Up @@ -113,10 +93,10 @@ impl Bytes {

pub fn capacity(&self) -> usize {
match self.deallocation {
Deallocation::Native(capacity) => capacity,
Deallocation::Arrow(capacity) => capacity,
// we cannot determine this in general,
// and thus we state that this is externally-owned memory
Deallocation::Foreign(_) => 0,
Deallocation::Custom(_) => 0,
}
}
}
Expand All @@ -125,11 +105,11 @@ impl Drop for Bytes {
#[inline]
fn drop(&mut self) {
match &self.deallocation {
Deallocation::Native(capacity) => {
Deallocation::Arrow(capacity) => {
unsafe { alloc::free_aligned::<u8>(self.ptr, *capacity) };
}
// foreign interface knows how to deallocate itself.
Deallocation::Foreign(_) => (),
// The automatic drop implementation will free the memory once the reference count reaches zero
Deallocation::Custom(_allocation) => (),
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion arrow/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,8 @@ unsafe fn create_buffer(
assert!(index < array.n_buffers as usize);
let ptr = *buffers.add(index);

NonNull::new(ptr as *mut u8).map(|ptr| Buffer::from_unowned(ptr, len, owner))
NonNull::new(ptr as *mut u8)
.map(|ptr| Buffer::from_custom_allocation(ptr, len, owner))
}

fn create_child(
Expand Down

0 comments on commit 688dd4c

Please sign in to comment.