Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 7 additions & 69 deletions encodings/fastlanes/src/bitpacking/array/bitpack_decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,12 @@ use vortex_array::ToCanonical;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::builders::{ArrayBuilder, PrimitiveBuilder, UninitRange};
use vortex_array::patches::Patches;
use vortex_array::validity::Validity;
use vortex_array::vtable::ValidityHelper;
use vortex_buffer::BufferMut;
use vortex_dtype::{
IntegerPType, NativePType, UnsignedPType, match_each_integer_ptype,
match_each_unsigned_integer_ptype,
IntegerPType, NativePType, match_each_integer_ptype, match_each_unsigned_integer_ptype,
};
use vortex_error::{VortexExpect, vortex_panic};
use vortex_mask::{Mask, MaskMut};
use vortex_mask::Mask;
use vortex_scalar::Scalar;
use vortex_vector::primitive::{PVectorMut, PrimitiveVectorMut};

Expand Down Expand Up @@ -48,76 +45,17 @@ pub fn unpack_to_pvector<P: BitPacked>(array: &BitPackedArray) -> PVectorMut<P>

// TODO(connor): Implement a fused version of patching instead.
if let Some(patches) = array.patches() {
let patch_indices = patches.indices().to_primitive();
let patch_values = patches.values().to_primitive();
let patches_validity = patch_values.validity();
let patch_offset = patches.offset();

let patch_values_slice = patch_values.as_slice::<P>();
match_each_unsigned_integer_ptype!(patch_indices.ptype(), |I| {
let patch_indices_slice = patch_indices.as_slice::<I>();

// SAFETY:
// - `Patches` invariant guarantees indices are sorted and within array bounds.
// - `patch_indices` and `patch_values` have equal length (from `Patches` invariant).
// - `elements` and `validity` have equal length (both are `len` from the array).
// - All patch indices are valid after offset adjustment (guaranteed by `Patches`).
unsafe {
apply_patches_inner(
&mut elements,
&mut validity,
patch_indices_slice,
patch_offset,
patch_values_slice,
patches_validity,
)
};
});
// SAFETY:
// - `Patches` invariant guarantees indices are sorted and within array bounds.
// - `elements` and `validity` have equal length (both are `len` from the array).
// - All patch indices are valid after offset adjustment (guaranteed by `Patches`).
unsafe { patches.apply_to_buffer(&mut elements, &mut validity) };
}

// SAFETY: `elements` and `validity` have the same length.
unsafe { PVectorMut::new_unchecked(elements, validity) }
}

/// # Safety
///
/// - All indices in `patch_indices` after subtracting `patch_offset` must be valid indices
/// into both `buffer` and `validity`.
/// - `patch_indices` must be sorted in ascending order.
/// - `patch_indices` and `patch_values` must have the same length.
/// - `buffer` and `validity` must have the same length.
unsafe fn apply_patches_inner<P, I>(
buffer: &mut [P],
validity: &mut MaskMut,
patch_indices: &[I],
patch_offset: usize,
patch_values: &[P],
patches_validity: &Validity,
) where
P: NativePType,
I: UnsignedPType,
{
debug_assert!(!patch_indices.is_empty());
debug_assert_eq!(patch_indices.len(), patch_values.len());
debug_assert_eq!(buffer.len(), validity.len());
debug_assert!(patch_indices.is_sorted());
debug_assert!(patch_indices.last().vortex_expect("can't be empty").as_() <= validity.len());

match patches_validity {
Validity::NonNullable | Validity::AllValid => {
for (&i, &value) in patch_indices.iter().zip_eq(patch_values) {
let index = i.as_() - patch_offset;

// SAFETY: `index` is valid because caller guarantees all patch indices are within
// bounds after offset adjustment.
unsafe { validity.set_unchecked(index) };
buffer[index] = value;
}
}
_ => vortex_panic!("BitPackedArray somehow had nullable patch values"),
}
}

pub fn unpack_array(array: &BitPackedArray) -> PrimitiveArray {
match_each_integer_ptype!(array.ptype(), |P| { unpack_primitive_array::<P>(array) })
}
Expand Down
112 changes: 111 additions & 1 deletion vortex-array/src/patches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ use vortex_dtype::{
use vortex_error::{
VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic,
};
use vortex_mask::{AllOr, Mask};
use vortex_mask::{AllOr, Mask, MaskMut};
use vortex_scalar::{PValue, Scalar};
use vortex_utils::aliases::hash_map::HashMap;

use crate::arrays::PrimitiveArray;
use crate::compute::{cast, filter, is_sorted, take};
use crate::search_sorted::{SearchResult, SearchSorted, SearchSortedSide};
use crate::validity::Validity;
use crate::vtable::ValidityHelper;
use crate::{Array, ArrayRef, IntoArray, ToCanonical};

Expand Down Expand Up @@ -797,6 +798,43 @@ impl Patches {
}))
}

/// Apply patches to a mutable buffer and validity mask.
///
/// This method applies the patch values to the given buffer at the positions specified by the
/// patch indices. For non-null patch values, it updates the buffer and marks the position as
/// valid. For null patch values, it marks the position as invalid.
///
/// # Safety
///
/// - All patch indices after offset adjustment must be valid indices into the buffer.
/// - The buffer and validity mask must have the same length.
pub unsafe fn apply_to_buffer<P: NativePType>(&self, buffer: &mut [P], validity: &mut MaskMut) {
let patch_indices = self.indices.to_primitive();
let patch_values = self.values.to_primitive();
let patches_validity = patch_values.validity();

let patch_values_slice = patch_values.as_slice::<P>();
match_each_unsigned_integer_ptype!(patch_indices.ptype(), |I| {
let patch_indices_slice = patch_indices.as_slice::<I>();

// SAFETY:
// - `Patches` invariant guarantees indices are sorted and within array bounds.
// - `patch_indices` and `patch_values` have equal length (from `Patches` invariant).
// - `buffer` and `validity` have equal length (precondition).
// - All patch indices are valid after offset adjustment (precondition).
unsafe {
apply_patches_to_buffer_inner(
buffer,
validity,
patch_indices_slice,
self.offset,
patch_values_slice,
patches_validity,
);
}
});
}

pub fn map_values<F>(self, f: F) -> VortexResult<Self>
where
F: FnOnce(ArrayRef) -> VortexResult<ArrayRef>,
Expand All @@ -821,6 +859,78 @@ impl Patches {
}
}

/// Helper function to apply patches to a buffer.
///
/// # Safety
///
/// - All indices in `patch_indices` after subtracting `patch_offset` must be valid indices
/// into both `buffer` and `validity`.
/// - `patch_indices` must be sorted in ascending order.
/// - `patch_indices` and `patch_values` must have the same length.
/// - `buffer` and `validity` must have the same length.
unsafe fn apply_patches_to_buffer_inner<P, I>(
buffer: &mut [P],
validity: &mut MaskMut,
patch_indices: &[I],
patch_offset: usize,
patch_values: &[P],
patches_validity: &Validity,
) where
P: NativePType,
I: UnsignedPType,
{
debug_assert!(!patch_indices.is_empty());
debug_assert_eq!(patch_indices.len(), patch_values.len());
debug_assert_eq!(buffer.len(), validity.len());

match patches_validity {
Validity::NonNullable | Validity::AllValid => {
// All patch values are valid, apply them all.
for (&i, &value) in patch_indices.iter().zip_eq(patch_values) {
let index = i.as_() - patch_offset;

// SAFETY: `index` is valid because caller guarantees all patch indices are within
// bounds after offset adjustment.
unsafe {
validity.set_unchecked(index);
}
buffer[index] = value;
}
}
Validity::AllInvalid => {
// All patch values are null, just mark positions as invalid.
for &i in patch_indices {
let index = i.as_() - patch_offset;

// SAFETY: `index` is valid because caller guarantees all patch indices are within
// bounds after offset adjustment.
unsafe {
validity.unset_unchecked(index);
}
}
}
Validity::Array(array) => {
// Some patch values may be null, check each one.
let bool_array = array.to_bool();
let mask = bool_array.bit_buffer();
for (patch_idx, (&i, &value)) in patch_indices.iter().zip_eq(patch_values).enumerate() {
let index = i.as_() - patch_offset;

// SAFETY: `index` and `patch_idx` are valid because caller guarantees all patch
// indices are within bounds after offset adjustment.
unsafe {
if mask.value_unchecked(patch_idx) {
buffer[index] = value;
validity.set_unchecked(index);
} else {
validity.unset_unchecked(index);
}
}
}
}
}
}

fn take_map<I: NativePType + Hash + Eq + TryFrom<usize>, T: NativePType>(
indices: &[I],
take_indices: PrimitiveArray,
Expand Down
Loading