diff --git a/encodings/alp/benches/alp_compress.rs b/encodings/alp/benches/alp_compress.rs index 2c5aa407efa..55643556d55 100644 --- a/encodings/alp/benches/alp_compress.rs +++ b/encodings/alp/benches/alp_compress.rs @@ -6,11 +6,11 @@ use divan::Bencher; use rand::rngs::StdRng; use rand::{Rng, SeedableRng as _}; -use vortex_alp::{ALPFloat, ALPRDFloat, RDEncoder, alp_encode}; +use vortex_alp::{ALPFloat, ALPRDFloat, RDEncoder, alp_encode, decompress}; use vortex_array::arrays::PrimitiveArray; use vortex_array::compute::warm_up_vtables; use vortex_array::validity::Validity; -use vortex_buffer::buffer; +use vortex_buffer::{Buffer, buffer}; use vortex_dtype::NativePType; fn main() { @@ -84,10 +84,15 @@ fn decompress_alp(bencher: Bencher, args: (usize, f64 Validity::NonNullable }; let values = values.freeze(); - let array = alp_encode(&PrimitiveArray::new(values, validity), None).unwrap(); bencher - .with_inputs(|| array.clone()) - .bench_values(|array| array.to_canonical()); + .with_inputs(|| { + alp_encode( + &PrimitiveArray::new(Buffer::copy_from(&values), validity.clone()), + None, + ) + .unwrap() + }) + .bench_values(decompress); } #[divan::bench(types = [f32, f64], args = [10_000, 100_000])] diff --git a/encodings/alp/src/alp/array.rs b/encodings/alp/src/alp/array.rs index 71f20824384..2da2fc05a16 100644 --- a/encodings/alp/src/alp/array.rs +++ b/encodings/alp/src/alp/array.rs @@ -282,6 +282,6 @@ impl ArrayVTable for ALPVTable { impl CanonicalVTable for ALPVTable { fn canonicalize(array: &ALPArray) -> Canonical { - Canonical::Primitive(decompress(array)) + Canonical::Primitive(decompress(array.clone())) } } diff --git a/encodings/alp/src/alp/compress.rs b/encodings/alp/src/alp/compress.rs index e6d502384c2..fa6083ab50f 100644 --- a/encodings/alp/src/alp/compress.rs +++ b/encodings/alp/src/alp/compress.rs @@ -1,14 +1,16 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::mem::transmute; + use itertools::Itertools; -use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::{PrimitiveArray, patch_chunk}; use vortex_array::patches::Patches; use vortex_array::validity::Validity; use vortex_array::vtable::ValidityHelper; use vortex_array::{ArrayRef, IntoArray, ToCanonical}; use vortex_buffer::{Buffer, BufferMut}; -use vortex_dtype::PType; +use vortex_dtype::{PType, match_each_unsigned_integer_ptype}; use vortex_error::{VortexResult, vortex_bail}; use vortex_mask::Mask; @@ -118,20 +120,113 @@ where Ok((exponents, encoded_array, patches)) } -pub fn decompress(array: &ALPArray) -> PrimitiveArray { +/// Decompresses an ALP-encoded array. +/// +/// # Arguments +/// +/// * `array` - The ALP-encoded array to decompress +/// +/// # Returns +/// +/// A `PrimitiveArray` containing the decompressed floating-point values with all patches applied. +pub fn decompress(array: ALPArray) -> PrimitiveArray { + let patches = array.patches().cloned(); + if let Some(patches) = patches + && let Some(chunk_offsets) = patches.chunk_offsets() + { + return decompress_chunked(array, &patches, &chunk_offsets.as_ref().to_primitive()); + } + + decompress_unchunked(array) +} + +/// Decompresses an ALP-encoded array in 1024-element chunks. +/// +/// Decoding and applying patches is done in chunks of 1024 elements for better L1 cache locality. +/// +/// # Arguments +/// +/// * `array` - The ALP-encoded array to decompress +/// * `patches` - The patches containing exceptional values and their positions +/// * `patches_chunk_offsets` - Offsets into the patches array for each chunk +/// +/// # Returns +/// +/// A `PrimitiveArray` containing the decompressed values with all patches applied. +#[allow(clippy::cognitive_complexity)] +pub fn decompress_chunked( + array: ALPArray, + patches: &Patches, + patches_chunk_offsets: &PrimitiveArray, +) -> PrimitiveArray { + let encoded = array.encoded().to_primitive(); + let validity = encoded.validity().clone(); + + let patches_indices = patches.indices().as_ref().to_primitive(); + let patches_values = patches.values().as_ref().to_primitive(); + let ptype = array.dtype().as_ptype(); + let array_len = array.len(); + let exponents = array.exponents(); + let patches_offset = patches.offset(); + + // We need to drop ALPArray here in case converting encoded buffer into primitive didn't create a copy. In that case + // both alp_encoded and array will hold a reference to the buffer we want to mutate. + drop(array); + + match_each_alp_float_ptype!(ptype, |T| { + let patches_values = patches_values.as_slice::(); + let mut alp_buffer = encoded.into_buffer_mut(); + match_each_unsigned_integer_ptype!(patches_chunk_offsets.ptype(), |C| { + let patches_chunk_offsets = patches_chunk_offsets.as_slice::(); + match_each_unsigned_integer_ptype!(patches_indices.ptype(), |I| { + let patches_indices = patches_indices.as_slice::(); + + for (chunk_idx, chunk_start) in (0..array_len).step_by(1024).enumerate() { + let chunk_end = (chunk_start + 1024).min(array_len); + let chunk_slice = &mut alp_buffer.as_mut_slice()[chunk_start..chunk_end]; + ::decode_slice_inplace(chunk_slice, exponents); + + let decoded_chunk: &mut [T] = unsafe { transmute(chunk_slice) }; + patch_chunk( + decoded_chunk, + patches_indices, + patches_values, + patches_offset, + patches_chunk_offsets, + chunk_idx, + ); + } + + let decoded_buffer: BufferMut = unsafe { transmute(alp_buffer) }; + PrimitiveArray::new::(decoded_buffer.freeze(), validity) + }) + }) + }) +} + +/// Decompresses an ALP-encoded array without chunk offsets. +/// +/// This function decodes the complete array at once and then applies any patches after. +fn decompress_unchunked(array: ALPArray) -> PrimitiveArray { + let patches = array.patches().cloned(); let encoded = array.encoded().to_primitive(); let validity = encoded.validity().clone(); + let exponents = array.exponents(); let ptype = array.dtype().as_ptype(); + // We need to drop ALPArray here in case converting encoded buffer into primitive didn't create a copy. In that case + // both alp_encoded and array will hold a reference to the buffer we want to mutate. + drop(array); + let decoded = match_each_alp_float_ptype!(ptype, |T| { PrimitiveArray::new::( - ::decode_buffer(encoded.into_buffer_mut(), array.exponents()), + ::decode_buffer(encoded.into_buffer_mut(), exponents), validity, ) }); - if let Some(patches) = array.patches() { - decoded.patch(patches) + if let Some(patches) = patches { + decoded.patch(&patches) } else { decoded } @@ -158,7 +253,7 @@ mod tests { assert_arrays_eq!(encoded.encoded(), expected_encoded); assert_eq!(encoded.exponents(), Exponents { e: 9, f: 6 }); - let decoded = decompress(&encoded); + let decoded = decompress(encoded); assert_arrays_eq!(decoded, array); } @@ -171,7 +266,7 @@ mod tests { assert_arrays_eq!(encoded.encoded(), expected_encoded); assert_eq!(encoded.exponents(), Exponents { e: 9, f: 6 }); - let decoded = decompress(&encoded); + let decoded = decompress(encoded); let expected = PrimitiveArray::from_option_iter(vec![None, Some(1.234f32), None]); assert_arrays_eq!(decoded, expected); } @@ -187,7 +282,7 @@ mod tests { assert_arrays_eq!(encoded.encoded(), expected_encoded); assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 }); - let decoded = decompress(&encoded); + let decoded = decompress(encoded); let expected_decoded = PrimitiveArray::new(values, Validity::NonNullable); assert_arrays_eq!(decoded, expected_decoded); } @@ -204,7 +299,7 @@ mod tests { assert_arrays_eq!(encoded.encoded(), expected_encoded); assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 }); - let decoded = decompress(&encoded); + let decoded = decompress(encoded); assert_arrays_eq!(decoded, array); } @@ -223,9 +318,9 @@ mod tests { assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 }); - assert_arrays_eq!(&encoded, array); + assert_arrays_eq!(encoded, array); - let _decoded = decompress(&encoded); + let _decoded = decompress(encoded); } #[test] @@ -362,4 +457,173 @@ mod tests { let expected_values = PrimitiveArray::from_iter(vec![PI, E]); assert_arrays_eq!(patch_values, expected_values); } + + #[test] + fn test_slice_half_chunk_f32_roundtrip() { + // Create 1024 elements, encode, slice to first 512, then decode + let values = vec![1.234f32; 1024]; + let original = PrimitiveArray::new(Buffer::from(values), Validity::NonNullable); + let encoded = alp_encode(&original, None).unwrap(); + + let sliced_alp = encoded.slice(512..1024); + let decoded = sliced_alp.to_primitive(); + + let expected_slice = original.slice(512..1024).to_primitive(); + assert_eq!(expected_slice.as_slice::(), decoded.as_slice::()); + } + + #[test] + fn test_slice_half_chunk_f64_roundtrip() { + let values = vec![5.678f64; 1024]; + let original = PrimitiveArray::new(Buffer::from(values), Validity::NonNullable); + let encoded = alp_encode(&original, None).unwrap(); + + let sliced_alp = encoded.slice(512..1024); + let decoded = sliced_alp.to_primitive(); + + let expected_slice = original.slice(512..1024).to_primitive(); + assert_eq!(expected_slice.as_slice::(), decoded.as_slice::()); + } + + #[test] + fn test_slice_half_chunk_with_patches_roundtrip() { + let mut values = vec![1.0f64; 1024]; + values[100] = PI; + values[200] = E; + values[600] = 42.42; + + let original = PrimitiveArray::new(Buffer::from(values), Validity::NonNullable); + let encoded = alp_encode(&original, None).unwrap(); + + let sliced_alp = encoded.slice(512..1024); + let decoded = sliced_alp.to_primitive(); + + let expected_slice = original.slice(512..1024).to_primitive(); + assert_eq!(expected_slice.as_slice::(), decoded.as_slice::()); + assert!(encoded.patches().is_some()); + } + + #[test] + fn test_slice_half_chunk_nullable_roundtrip() { + let values = (0..1024) + .map(|i| if i % 3 == 0 { None } else { Some(2.5f32) }) + .collect::>(); + + let original = PrimitiveArray::from_option_iter(values); + let encoded = alp_encode(&original, None).unwrap(); + + let sliced_alp = encoded.slice(512..1024); + let decoded = sliced_alp.to_primitive(); + + let expected_slice = original.slice(512..1024); + assert_arrays_eq!(decoded, expected_slice); + } + + #[test] + fn test_large_f32_array_uniform_values() { + let size = 10_000; + let array = PrimitiveArray::new(buffer![42.125f32; size], Validity::NonNullable); + let encoded = alp_encode(&array, None).unwrap(); + + assert!(encoded.patches().is_none()); + let decoded = decompress(encoded); + assert_eq!(array.as_slice::(), decoded.as_slice::()); + } + + #[test] + fn test_large_f64_array_uniform_values() { + let size = 50_000; + let array = PrimitiveArray::new(buffer![123.456789f64; size], Validity::NonNullable); + let encoded = alp_encode(&array, None).unwrap(); + + assert!(encoded.patches().is_none()); + let decoded = decompress(encoded); + assert_eq!(array.as_slice::(), decoded.as_slice::()); + } + + #[test] + fn test_large_f32_array_with_patches() { + let size = 5_000; + let mut values = vec![1.5f32; size]; + values[100] = std::f32::consts::PI; + values[1500] = std::f32::consts::E; + values[3000] = f32::NEG_INFINITY; + values[4500] = f32::INFINITY; + + let array = PrimitiveArray::new(Buffer::from(values.clone()), Validity::NonNullable); + let encoded = alp_encode(&array, None).unwrap(); + + assert!(encoded.patches().is_some()); + let decoded = decompress(encoded); + assert_eq!(values.as_slice(), decoded.as_slice::()); + } + + #[test] + fn test_large_f64_array_with_patches() { + let size = 8_000; + let mut values = vec![2.2184f64; size]; + values[0] = PI; + values[1000] = E; + values[2000] = f64::NAN; + values[3000] = f64::INFINITY; + values[4000] = f64::NEG_INFINITY; + values[5000] = 0.0; + values[6000] = -0.0; + values[7000] = 999.999999999; + + let array = PrimitiveArray::new(Buffer::from(values.clone()), Validity::NonNullable); + let encoded = alp_encode(&array, None).unwrap(); + + assert!(encoded.patches().is_some()); + let decoded = decompress(encoded); + + for idx in 0..size { + let decoded_val = decoded.as_slice::()[idx]; + let original_val = values[idx]; + assert!( + decoded_val.is_eq(original_val), + "At index {idx}: Expected {original_val} but got {decoded_val}" + ); + } + } + + #[test] + fn test_large_nullable_array() { + let size = 12_000; + let values: Vec> = (0..size) + .map(|i| { + if i % 7 == 0 { + None + } else { + Some((i as f32) * 0.1) + } + }) + .collect(); + + let array = PrimitiveArray::from_option_iter(values); + let encoded = alp_encode(&array, None).unwrap(); + let decoded = decompress(encoded); + + assert_arrays_eq!(decoded, array); + } + + #[test] + fn test_large_mixed_validity_with_patches() { + let size = 6_000; + let mut values = vec![10.125f64; size]; + + values[500] = PI; + values[1500] = E; + values[2500] = f64::INFINITY; + values[3500] = f64::NEG_INFINITY; + values[4500] = f64::NAN; + + let validity = Validity::from_iter((0..size).map(|i| !matches!(i, 500 | 2500))); + + let array = PrimitiveArray::new(Buffer::from(values), validity); + let encoded = alp_encode(&array, None).unwrap(); + let decoded = decompress(encoded); + + assert_arrays_eq!(decoded, array); + } } diff --git a/encodings/alp/src/alp/mod.rs b/encodings/alp/src/alp/mod.rs index 6aecf49bf20..b1d91771848 100644 --- a/encodings/alp/src/alp/mod.rs +++ b/encodings/alp/src/alp/mod.rs @@ -2,7 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::fmt::{Display, Formatter}; -use std::mem::size_of; +use std::mem::{size_of, transmute, transmute_copy}; use itertools::Itertools; use num_traits::{CheckedSub, Float, PrimInt, ToPrimitive}; @@ -196,6 +196,16 @@ pub trait ALPFloat: private::Sealed + Float + Display + NativePType { encoded.map_each_in_place(move |encoded| Self::decode_single(encoded, exponents)) } + fn decode_slice_inplace(encoded: &mut [Self::ALPInt], exponents: Exponents) { + let decoded: &mut [Self] = unsafe { transmute(encoded) }; + decoded.iter_mut().for_each(|v| { + *v = Self::decode_single( + unsafe { transmute_copy::(v) }, + exponents, + ) + }) + } + #[inline(always)] fn decode_single(encoded: Self::ALPInt, exponents: Exponents) -> Self { Self::from_int(encoded) * Self::F10[exponents.f as usize] * Self::IF10[exponents.e as usize] diff --git a/vortex-array/src/arrays/primitive/array/mod.rs b/vortex-array/src/arrays/primitive/array/mod.rs index ea7d43c1905..15546532b19 100644 --- a/vortex-array/src/arrays/primitive/array/mod.rs +++ b/vortex-array/src/arrays/primitive/array/mod.rs @@ -18,6 +18,8 @@ mod conversion; mod patch; mod top_value; +pub use patch::patch_chunk; + /// A primitive array that stores [native types][vortex_dtype::NativePType] in a contiguous buffer /// of memory, along with an optional validity child. /// diff --git a/vortex-array/src/arrays/primitive/array/patch.rs b/vortex-array/src/arrays/primitive/array/patch.rs index f957797943c..d264eeee339 100644 --- a/vortex-array/src/arrays/primitive/array/patch.rs +++ b/vortex-array/src/arrays/primitive/array/patch.rs @@ -1,7 +1,9 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use vortex_dtype::{IntegerPType, NativePType, match_each_integer_ptype, match_each_native_ptype}; +use vortex_dtype::{ + IntegerPType, NativePType, UnsignedPType, match_each_integer_ptype, match_each_native_ptype, +}; use crate::ToCanonical; use crate::arrays::PrimitiveArray; @@ -55,6 +57,45 @@ impl PrimitiveArray { } } +/// Patches a chunk of decoded values. +/// +/// # Arguments +/// +/// * `decoded_values` - Mutable slice of decoded values to be patched +/// * `patches_indices` - Indices indicating which positions to patch +/// * `patches_values` - Values to apply at the patched indices +/// * `patches_offset` - Offset to subtract from patch indices +/// * `chunk_offsets_slice` - Slice containing offsets for each chunk +/// * `chunk_idx` - Index of the chunk to patch +#[inline] +pub fn patch_chunk( + decoded_values: &mut [T], + patches_indices: &[I], + patches_values: &[T], + patches_offset: usize, + chunk_offsets_slice: &[C], + chunk_idx: usize, +) where + T: NativePType, + I: UnsignedPType, + C: UnsignedPType, +{ + let patches_start_idx = chunk_offsets_slice[chunk_idx].as_(); + let patches_end_idx = if chunk_idx + 1 < chunk_offsets_slice.len() { + chunk_offsets_slice[chunk_idx + 1].as_() + } else { + patches_indices.len() + }; + + let chunk_start = chunk_idx * 1024; + for patches_idx in patches_start_idx..patches_end_idx { + let patched_value = patches_values[patches_idx]; + let absolute_index: usize = patches_indices[patches_idx].as_() - patches_offset; + let chunk_relative_index = absolute_index - chunk_start; + decoded_values[chunk_relative_index] = patched_value; + } +} + #[cfg(test)] mod tests { use vortex_buffer::buffer; diff --git a/vortex-array/src/arrays/primitive/mod.rs b/vortex-array/src/arrays/primitive/mod.rs index bd2359bef09..e1746e12113 100644 --- a/vortex-array/src/arrays/primitive/mod.rs +++ b/vortex-array/src/arrays/primitive/mod.rs @@ -2,7 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors mod array; -pub use array::PrimitiveArray; +pub use array::{PrimitiveArray, patch_chunk}; mod compute; pub use compute::{IS_CONST_LANE_WIDTH, compute_is_constant};