Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simplified Bytes (#1099)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jun 25, 2022
1 parent 10b57c3 commit 88f05bb
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 171 deletions.
159 changes: 58 additions & 101 deletions src/buffer/bytes.rs
Original file line number Diff line number Diff line change
@@ -1,161 +1,118 @@
//! This module contains an implementation of a contiguous immutable memory region that knows
//! how to de-allocate itself, [`Bytes`].

use std::{fmt::Debug, fmt::Formatter};
use std::mem::ManuallyDrop;
use std::ops::{Deref, DerefMut};
use std::panic::RefUnwindSafe;
use std::{ptr::NonNull, sync::Arc};

use super::foreign::MaybeForeign;
use crate::ffi;
use crate::types::NativeType;

/// Mode of deallocating memory regions
pub enum Deallocation {
/// Native deallocation, using Rust deallocator with Arrow-specific memory aligment
enum Allocation {
/// Native allocation
Native,
// Foreign interface, via a callback
Foreign(Arc<ffi::InternalArrowArray>),
// A foreign allocator and its ref count
Foreign(Arc<dyn RefUnwindSafe + Send + Sync>),
}

impl Debug for Deallocation {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
Deallocation::Native => {
write!(f, "Deallocation::Native")
}
Deallocation::Foreign(_) => {
write!(f, "Deallocation::Foreign {{ capacity: unknown }}")
}
}
}
}

/// A continuous, fixed-size, immutable memory region that knows how to de-allocate itself.
/// A continuous memory region that may be allocated externally.
///
/// In the most common case, this buffer is allocated using Rust's native allocator.
/// However, it may also be allocated by a foreign allocator, [Deallocation::Foreign].
pub struct Bytes<T: NativeType> {
/// inner data
data: MaybeForeign<T>,
/// how to deallocate this region
deallocation: Deallocation,
/// In the most common case, this is created from [`Vec`].
/// However, this region can also be allocated by a foreign allocator.
pub struct Bytes<T> {
/// An implementation using an `enum` of a `Vec` or a foreign pointer is not used
/// because `deref` is at least 50% more expensive than the deref of a `Vec`.
data: ManuallyDrop<Vec<T>>,
/// the region was allocated
allocation: Allocation,
}

impl<T: NativeType> Bytes<T> {
/// Takes ownership of an allocated memory region,
///
/// # Arguments
///
/// * `ptr` - Pointer to raw parts
/// * `len` - Length of raw parts in **bytes**
/// * `capacity` - Total allocated memory for the pointer `ptr`, in **bytes**
///
impl<T> Bytes<T> {
/// Takes ownership of an allocated memory region `[ptr, ptr+len[`,
/// # Safety
///
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
/// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed.
///
/// # Panics
///
/// This function panics if the give deallocation not is `Deallocation::Foreign`
/// This function is safe iff:
/// * the region is properly allocated in that a slice can be safely built from it.
/// * the region is immutable.
/// # Implementation
/// This function leaks iff `owner` does not deallocate the region when dropped.
#[inline]
pub unsafe fn from_ffi(
pub unsafe fn from_owned(
ptr: std::ptr::NonNull<T>,
len: usize,
deallocation: Deallocation,
owner: Arc<dyn RefUnwindSafe + Send + Sync>,
) -> Self {
assert!(matches!(deallocation, Deallocation::Foreign(_)));
// This line is technically outside the assumptions of `Vec::from_raw_parts`, since
// `ptr` was not allocated by `Vec`. However, one of the invariants of this struct
// is that we do not expose this region as a `Vec`; we only use `Vec` on it to provide
// immutable access to the region (via `Vec::deref` to `&[T]`).
// MIRI does not complain, which seems to agree with the line of thought.
let data = Vec::from_raw_parts(ptr.as_ptr(), len, len);
let data = MaybeForeign::new(data);

Self { data, deallocation }
}
let data = ManuallyDrop::new(data);

#[inline]
fn as_slice(&self) -> &[T] {
self
Self {
data,
allocation: Allocation::Foreign(owner),
}
}

/// The length of the region
#[inline]
pub fn len(&self) -> usize {
self.data.len()
}

/// The pointer to the region
#[inline]
pub fn ptr(&self) -> NonNull<T> {
debug_assert!(!self.data.as_ptr().is_null());
unsafe { NonNull::new_unchecked(self.data.as_ptr() as *mut T) }
}

/// Returns a mutable reference to the internal [`Vec<T>`] if it is natively allocated.
/// Returns `None` if allocated by a foreign interface.
/// Returns a `Some` mutable reference of [`Vec<T>`] iff this was initialized
/// from a [`Vec<T>`] and `None` otherwise.
#[inline]
pub fn get_vec(&mut self) -> Option<&mut Vec<T>> {
match &self.deallocation {
Deallocation::Foreign(_) => None,
// Safety:
// The allocation is native so we can share the vec
Deallocation::Native => Some(unsafe { self.data.mut_vec() }),
match &self.allocation {
Allocation::Foreign(_) => None,
Allocation::Native => Some(self.data.deref_mut()),
}
}
}

impl<T: NativeType> Drop for Bytes<T> {
impl<T> Drop for Bytes<T> {
#[inline]
fn drop(&mut self) {
match self.deallocation {
// a foreign interface knows how to deallocate itself
Deallocation::Foreign(_) => {}
Deallocation::Native => {
// Safety:
// the allocation is native, so we can safely drop
unsafe { self.data.drop_local() }
match self.allocation {
Allocation::Foreign(_) => {
// The ref count of the foreign is reduced by one
// we can't deallocate `Vec` here since the region was allocated by
// a foreign allocator
}
Allocation::Native => {
let data = std::mem::take(&mut self.data);
let _ = ManuallyDrop::into_inner(data);
}
}
}
}

impl<T: NativeType> std::ops::Deref for Bytes<T> {
impl<T> std::ops::Deref for Bytes<T> {
type Target = [T];

#[inline]
fn deref(&self) -> &[T] {
&self.data
}
}

impl<T: NativeType> PartialEq for Bytes<T> {
impl<T: PartialEq> PartialEq for Bytes<T> {
#[inline]
fn eq(&self, other: &Bytes<T>) -> bool {
self.as_slice() == other.as_slice()
self.deref() == other.deref()
}
}

impl<T: NativeType> Debug for Bytes<T> {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"Bytes {{ ptr: {:?}, len: {}, data: ",
self.data.as_ptr(),
self.len(),
)?;

f.debug_list().entries(self.iter()).finish()?;

write!(f, " }}")
}
}

impl<T: NativeType> From<Vec<T>> for Bytes<T> {
impl<T> From<Vec<T>> for Bytes<T> {
#[inline]
fn from(data: Vec<T>) -> Self {
let data = MaybeForeign::new(data);
Self {
data,
deallocation: Deallocation::Native,
data: ManuallyDrop::new(data),
allocation: Allocation::Native,
}
}
}

// This is sound because `Bytes` is an immutable container
unsafe impl<T: NativeType> Send for Bytes<T> {}
unsafe impl<T: NativeType> Sync for Bytes<T> {}
55 changes: 0 additions & 55 deletions src/buffer/foreign.rs

This file was deleted.

1 change: 0 additions & 1 deletion src/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,5 @@
mod immutable;

pub(crate) mod bytes;
mod foreign;

pub use immutable::Buffer;
24 changes: 10 additions & 14 deletions src/ffi/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ use std::{ptr::NonNull, sync::Arc};
use crate::{
array::*,
bitmap::{utils::bytes_for, Bitmap},
buffer::{
bytes::{Bytes, Deallocation},
Buffer,
},
buffer::{bytes::Bytes, Buffer},
datatypes::{DataType, PhysicalType},
error::{Error, Result},
ffi::schema::get_child,
Expand Down Expand Up @@ -181,7 +178,7 @@ impl ArrowArray {
unsafe fn create_buffer<T: NativeType>(
array: &ArrowArray,
data_type: &DataType,
deallocation: Deallocation,
owner: Arc<InternalArrowArray>,
index: usize,
) -> Result<Buffer<T>> {
if array.buffers.is_null() {
Expand All @@ -197,7 +194,7 @@ unsafe fn create_buffer<T: NativeType>(
let len = buffer_len(array, data_type, index)?;
let offset = buffer_offset(array, data_type, index);
let bytes = ptr
.map(|ptr| Bytes::from_ffi(ptr, len, deallocation))
.map(|ptr| Bytes::from_owned(ptr, len, owner))
.ok_or_else(|| Error::OutOfSpec(format!("The buffer at position {} is null", index)))?;

Ok(Buffer::from_bytes(bytes).slice(offset, len - offset))
Expand All @@ -212,7 +209,7 @@ unsafe fn create_buffer<T: NativeType>(
/// This function assumes that `ceil(self.length * bits, 8)` is the size of the buffer
unsafe fn create_bitmap(
array: &ArrowArray,
deallocation: Deallocation,
owner: Arc<InternalArrowArray>,
index: usize,
) -> Result<Bitmap> {
if array.buffers.is_null() {
Expand All @@ -228,7 +225,7 @@ unsafe fn create_bitmap(
let bytes_len = bytes_for(offset + len);
let ptr = NonNull::new(ptr as *mut u8);
let bytes = ptr
.map(|ptr| Bytes::from_ffi(ptr, bytes_len, deallocation))
.map(|ptr| Bytes::from_owned(ptr, bytes_len, owner))
.ok_or_else(|| {
Error::OutOfSpec(format!(
"The buffer {} is a null pointer and cannot be interpreted as a bitmap",
Expand Down Expand Up @@ -344,8 +341,8 @@ fn create_dictionary(
}

pub trait ArrowArrayRef: std::fmt::Debug {
fn deallocation(&self) -> Deallocation {
Deallocation::Foreign(self.parent().clone())
fn owner(&self) -> Arc<InternalArrowArray> {
self.parent().clone()
}

/// returns the null bit buffer.
Expand All @@ -358,23 +355,22 @@ pub trait ArrowArrayRef: std::fmt::Debug {
if self.array().null_count() == 0 {
Ok(None)
} else {
create_bitmap(self.array(), self.deallocation(), 0).map(Some)
create_bitmap(self.array(), self.owner(), 0).map(Some)
}
}

/// # Safety
/// The caller must guarantee that the buffer `index` corresponds to a bitmap.
/// This function assumes that the bitmap created from FFI is valid; this is impossible to prove.
unsafe fn buffer<T: NativeType>(&self, index: usize) -> Result<Buffer<T>> {
create_buffer::<T>(self.array(), self.data_type(), self.deallocation(), index)
create_buffer::<T>(self.array(), self.data_type(), self.owner(), index)
}

/// # Safety
/// The caller must guarantee that the buffer `index` corresponds to a bitmap.
/// This function assumes that the bitmap created from FFI is valid; this is impossible to prove.
unsafe fn bitmap(&self, index: usize) -> Result<Bitmap> {
// +1 to ignore null bitmap
create_bitmap(self.array(), self.deallocation(), index)
create_bitmap(self.array(), self.owner(), index)
}

/// # Safety
Expand Down

0 comments on commit 88f05bb

Please sign in to comment.