diff --git a/encodings/fastlanes/src/bitpacking/array/bitpack_decompress.rs b/encodings/fastlanes/src/bitpacking/array/bitpack_decompress.rs index c56519e80b0..f8316f54fbd 100644 --- a/encodings/fastlanes/src/bitpacking/array/bitpack_decompress.rs +++ b/encodings/fastlanes/src/bitpacking/array/bitpack_decompress.rs @@ -7,15 +7,12 @@ use vortex_array::ToCanonical; use vortex_array::arrays::PrimitiveArray; use vortex_array::builders::{ArrayBuilder, PrimitiveBuilder, UninitRange}; use vortex_array::patches::Patches; -use vortex_array::validity::Validity; -use vortex_array::vtable::ValidityHelper; use vortex_buffer::BufferMut; use vortex_dtype::{ - IntegerPType, NativePType, UnsignedPType, match_each_integer_ptype, - match_each_unsigned_integer_ptype, + IntegerPType, NativePType, match_each_integer_ptype, match_each_unsigned_integer_ptype, }; use vortex_error::{VortexExpect, vortex_panic}; -use vortex_mask::{Mask, MaskMut}; +use vortex_mask::Mask; use vortex_scalar::Scalar; use vortex_vector::primitive::{PVectorMut, PrimitiveVectorMut}; @@ -48,76 +45,17 @@ pub fn unpack_to_pvector(array: &BitPackedArray) -> PVectorMut

// TODO(connor): Implement a fused version of patching instead. if let Some(patches) = array.patches() { - let patch_indices = patches.indices().to_primitive(); - let patch_values = patches.values().to_primitive(); - let patches_validity = patch_values.validity(); - let patch_offset = patches.offset(); - - let patch_values_slice = patch_values.as_slice::

(); - match_each_unsigned_integer_ptype!(patch_indices.ptype(), |I| { - let patch_indices_slice = patch_indices.as_slice::(); - - // SAFETY: - // - `Patches` invariant guarantees indices are sorted and within array bounds. - // - `patch_indices` and `patch_values` have equal length (from `Patches` invariant). - // - `elements` and `validity` have equal length (both are `len` from the array). - // - All patch indices are valid after offset adjustment (guaranteed by `Patches`). - unsafe { - apply_patches_inner( - &mut elements, - &mut validity, - patch_indices_slice, - patch_offset, - patch_values_slice, - patches_validity, - ) - }; - }); + // SAFETY: + // - `Patches` invariant guarantees indices are sorted and within array bounds. + // - `elements` and `validity` have equal length (both are `len` from the array). + // - All patch indices are valid after offset adjustment (guaranteed by `Patches`). + unsafe { patches.apply_to_buffer(&mut elements, &mut validity) }; } // SAFETY: `elements` and `validity` have the same length. unsafe { PVectorMut::new_unchecked(elements, validity) } } -/// # Safety -/// -/// - All indices in `patch_indices` after subtracting `patch_offset` must be valid indices -/// into both `buffer` and `validity`. -/// - `patch_indices` must be sorted in ascending order. -/// - `patch_indices` and `patch_values` must have the same length. -/// - `buffer` and `validity` must have the same length. -unsafe fn apply_patches_inner( - buffer: &mut [P], - validity: &mut MaskMut, - patch_indices: &[I], - patch_offset: usize, - patch_values: &[P], - patches_validity: &Validity, -) where - P: NativePType, - I: UnsignedPType, -{ - debug_assert!(!patch_indices.is_empty()); - debug_assert_eq!(patch_indices.len(), patch_values.len()); - debug_assert_eq!(buffer.len(), validity.len()); - debug_assert!(patch_indices.is_sorted()); - debug_assert!(patch_indices.last().vortex_expect("can't be empty").as_() <= validity.len()); - - match patches_validity { - Validity::NonNullable | Validity::AllValid => { - for (&i, &value) in patch_indices.iter().zip_eq(patch_values) { - let index = i.as_() - patch_offset; - - // SAFETY: `index` is valid because caller guarantees all patch indices are within - // bounds after offset adjustment. - unsafe { validity.set_unchecked(index) }; - buffer[index] = value; - } - } - _ => vortex_panic!("BitPackedArray somehow had nullable patch values"), - } -} - pub fn unpack_array(array: &BitPackedArray) -> PrimitiveArray { match_each_integer_ptype!(array.ptype(), |P| { unpack_primitive_array::

(array) }) } diff --git a/vortex-array/src/patches.rs b/vortex-array/src/patches.rs index a72251cbc8b..1de409f0a8c 100644 --- a/vortex-array/src/patches.rs +++ b/vortex-array/src/patches.rs @@ -17,13 +17,14 @@ use vortex_dtype::{ use vortex_error::{ VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic, }; -use vortex_mask::{AllOr, Mask}; +use vortex_mask::{AllOr, Mask, MaskMut}; use vortex_scalar::{PValue, Scalar}; use vortex_utils::aliases::hash_map::HashMap; use crate::arrays::PrimitiveArray; use crate::compute::{cast, filter, is_sorted, take}; use crate::search_sorted::{SearchResult, SearchSorted, SearchSortedSide}; +use crate::validity::Validity; use crate::vtable::ValidityHelper; use crate::{Array, ArrayRef, IntoArray, ToCanonical}; @@ -797,6 +798,43 @@ impl Patches { })) } + /// Apply patches to a mutable buffer and validity mask. + /// + /// This method applies the patch values to the given buffer at the positions specified by the + /// patch indices. For non-null patch values, it updates the buffer and marks the position as + /// valid. For null patch values, it marks the position as invalid. + /// + /// # Safety + /// + /// - All patch indices after offset adjustment must be valid indices into the buffer. + /// - The buffer and validity mask must have the same length. + pub unsafe fn apply_to_buffer(&self, buffer: &mut [P], validity: &mut MaskMut) { + let patch_indices = self.indices.to_primitive(); + let patch_values = self.values.to_primitive(); + let patches_validity = patch_values.validity(); + + let patch_values_slice = patch_values.as_slice::

(); + match_each_unsigned_integer_ptype!(patch_indices.ptype(), |I| { + let patch_indices_slice = patch_indices.as_slice::(); + + // SAFETY: + // - `Patches` invariant guarantees indices are sorted and within array bounds. + // - `patch_indices` and `patch_values` have equal length (from `Patches` invariant). + // - `buffer` and `validity` have equal length (precondition). + // - All patch indices are valid after offset adjustment (precondition). + unsafe { + apply_patches_to_buffer_inner( + buffer, + validity, + patch_indices_slice, + self.offset, + patch_values_slice, + patches_validity, + ); + } + }); + } + pub fn map_values(self, f: F) -> VortexResult where F: FnOnce(ArrayRef) -> VortexResult, @@ -821,6 +859,78 @@ impl Patches { } } +/// Helper function to apply patches to a buffer. +/// +/// # Safety +/// +/// - All indices in `patch_indices` after subtracting `patch_offset` must be valid indices +/// into both `buffer` and `validity`. +/// - `patch_indices` must be sorted in ascending order. +/// - `patch_indices` and `patch_values` must have the same length. +/// - `buffer` and `validity` must have the same length. +unsafe fn apply_patches_to_buffer_inner( + buffer: &mut [P], + validity: &mut MaskMut, + patch_indices: &[I], + patch_offset: usize, + patch_values: &[P], + patches_validity: &Validity, +) where + P: NativePType, + I: UnsignedPType, +{ + debug_assert!(!patch_indices.is_empty()); + debug_assert_eq!(patch_indices.len(), patch_values.len()); + debug_assert_eq!(buffer.len(), validity.len()); + + match patches_validity { + Validity::NonNullable | Validity::AllValid => { + // All patch values are valid, apply them all. + for (&i, &value) in patch_indices.iter().zip_eq(patch_values) { + let index = i.as_() - patch_offset; + + // SAFETY: `index` is valid because caller guarantees all patch indices are within + // bounds after offset adjustment. + unsafe { + validity.set_unchecked(index); + } + buffer[index] = value; + } + } + Validity::AllInvalid => { + // All patch values are null, just mark positions as invalid. + for &i in patch_indices { + let index = i.as_() - patch_offset; + + // SAFETY: `index` is valid because caller guarantees all patch indices are within + // bounds after offset adjustment. + unsafe { + validity.unset_unchecked(index); + } + } + } + Validity::Array(array) => { + // Some patch values may be null, check each one. + let bool_array = array.to_bool(); + let mask = bool_array.bit_buffer(); + for (patch_idx, (&i, &value)) in patch_indices.iter().zip_eq(patch_values).enumerate() { + let index = i.as_() - patch_offset; + + // SAFETY: `index` and `patch_idx` are valid because caller guarantees all patch + // indices are within bounds after offset adjustment. + unsafe { + if mask.value_unchecked(patch_idx) { + buffer[index] = value; + validity.set_unchecked(index); + } else { + validity.unset_unchecked(index); + } + } + } + } + } +} + fn take_map, T: NativePType>( indices: &[I], take_indices: PrimitiveArray,