Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions encodings/alp/benches/alp_compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
use divan::Bencher;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng as _};
use vortex_alp::{ALPFloat, ALPRDFloat, RDEncoder, alp_encode};
use vortex_alp::{ALPFloat, ALPRDFloat, RDEncoder, alp_encode, decompress};
use vortex_array::arrays::PrimitiveArray;
use vortex_array::compute::warm_up_vtables;
use vortex_array::validity::Validity;
use vortex_buffer::buffer;
use vortex_buffer::{Buffer, buffer};
use vortex_dtype::NativePType;

fn main() {
Expand Down Expand Up @@ -84,10 +84,15 @@ fn decompress_alp<T: ALPFloat + NativePType>(bencher: Bencher, args: (usize, f64
Validity::NonNullable
};
let values = values.freeze();
let array = alp_encode(&PrimitiveArray::new(values, validity), None).unwrap();
bencher
.with_inputs(|| array.clone())
.bench_values(|array| array.to_canonical());
.with_inputs(|| {
alp_encode(
&PrimitiveArray::new(Buffer::copy_from(&values), validity.clone()),
None,
)
.unwrap()
})
.bench_values(decompress);
}

#[divan::bench(types = [f32, f64], args = [10_000, 100_000])]
Expand Down
2 changes: 1 addition & 1 deletion encodings/alp/src/alp/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,6 @@ impl ArrayVTable<ALPVTable> for ALPVTable {

impl CanonicalVTable<ALPVTable> for ALPVTable {
fn canonicalize(array: &ALPArray) -> Canonical {
Canonical::Primitive(decompress(array))
Canonical::Primitive(decompress(array.clone()))
}
}
288 changes: 276 additions & 12 deletions encodings/alp/src/alp/compress.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::mem::transmute;

use itertools::Itertools;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::{PrimitiveArray, patch_chunk};
use vortex_array::patches::Patches;
use vortex_array::validity::Validity;
use vortex_array::vtable::ValidityHelper;
use vortex_array::{ArrayRef, IntoArray, ToCanonical};
use vortex_buffer::{Buffer, BufferMut};
use vortex_dtype::PType;
use vortex_dtype::{PType, match_each_unsigned_integer_ptype};
use vortex_error::{VortexResult, vortex_bail};
use vortex_mask::Mask;

Expand Down Expand Up @@ -118,20 +120,113 @@ where
Ok((exponents, encoded_array, patches))
}

pub fn decompress(array: &ALPArray) -> PrimitiveArray {
/// Decompresses an ALP-encoded array.
///
/// # Arguments
///
/// * `array` - The ALP-encoded array to decompress
///
/// # Returns
///
/// A `PrimitiveArray` containing the decompressed floating-point values with all patches applied.
pub fn decompress(array: ALPArray) -> PrimitiveArray {
let patches = array.patches().cloned();
if let Some(patches) = patches
&& let Some(chunk_offsets) = patches.chunk_offsets()
{
return decompress_chunked(array, &patches, &chunk_offsets.as_ref().to_primitive());
}

decompress_unchunked(array)
}

/// Decompresses an ALP-encoded array in 1024-element chunks.
///
/// Decoding and applying patches is done in chunks of 1024 elements for better L1 cache locality.
///
/// # Arguments
///
/// * `array` - The ALP-encoded array to decompress
/// * `patches` - The patches containing exceptional values and their positions
/// * `patches_chunk_offsets` - Offsets into the patches array for each chunk
///
/// # Returns
///
/// A `PrimitiveArray` containing the decompressed values with all patches applied.
#[allow(clippy::cognitive_complexity)]
pub fn decompress_chunked(
array: ALPArray,
patches: &Patches,
patches_chunk_offsets: &PrimitiveArray,
) -> PrimitiveArray {
let encoded = array.encoded().to_primitive();
let validity = encoded.validity().clone();

let patches_indices = patches.indices().as_ref().to_primitive();
let patches_values = patches.values().as_ref().to_primitive();
let ptype = array.dtype().as_ptype();
let array_len = array.len();
let exponents = array.exponents();
let patches_offset = patches.offset();

// We need to drop ALPArray here in case converting encoded buffer into primitive didn't create a copy. In that case
// both alp_encoded and array will hold a reference to the buffer we want to mutate.
drop(array);

match_each_alp_float_ptype!(ptype, |T| {
let patches_values = patches_values.as_slice::<T>();
let mut alp_buffer = encoded.into_buffer_mut();
match_each_unsigned_integer_ptype!(patches_chunk_offsets.ptype(), |C| {
let patches_chunk_offsets = patches_chunk_offsets.as_slice::<C>();
match_each_unsigned_integer_ptype!(patches_indices.ptype(), |I| {
let patches_indices = patches_indices.as_slice::<I>();

for (chunk_idx, chunk_start) in (0..array_len).step_by(1024).enumerate() {
let chunk_end = (chunk_start + 1024).min(array_len);
let chunk_slice = &mut alp_buffer.as_mut_slice()[chunk_start..chunk_end];
<T>::decode_slice_inplace(chunk_slice, exponents);

let decoded_chunk: &mut [T] = unsafe { transmute(chunk_slice) };
patch_chunk(
decoded_chunk,
patches_indices,
patches_values,
patches_offset,
patches_chunk_offsets,
chunk_idx,
);
}

let decoded_buffer: BufferMut<T> = unsafe { transmute(alp_buffer) };
PrimitiveArray::new::<T>(decoded_buffer.freeze(), validity)
})
})
})
}

/// Decompresses an ALP-encoded array without chunk offsets.
///
/// This function decodes the complete array at once and then applies any patches after.
fn decompress_unchunked(array: ALPArray) -> PrimitiveArray {
let patches = array.patches().cloned();
let encoded = array.encoded().to_primitive();
let validity = encoded.validity().clone();
let exponents = array.exponents();
let ptype = array.dtype().as_ptype();

// We need to drop ALPArray here in case converting encoded buffer into primitive didn't create a copy. In that case
// both alp_encoded and array will hold a reference to the buffer we want to mutate.
drop(array);

let decoded = match_each_alp_float_ptype!(ptype, |T| {
PrimitiveArray::new::<T>(
<T>::decode_buffer(encoded.into_buffer_mut(), array.exponents()),
<T>::decode_buffer(encoded.into_buffer_mut(), exponents),
validity,
)
});

if let Some(patches) = array.patches() {
decoded.patch(patches)
if let Some(patches) = patches {
decoded.patch(&patches)
} else {
decoded
}
Expand All @@ -158,7 +253,7 @@ mod tests {
assert_arrays_eq!(encoded.encoded(), expected_encoded);
assert_eq!(encoded.exponents(), Exponents { e: 9, f: 6 });

let decoded = decompress(&encoded);
let decoded = decompress(encoded);
assert_arrays_eq!(decoded, array);
}

Expand All @@ -171,7 +266,7 @@ mod tests {
assert_arrays_eq!(encoded.encoded(), expected_encoded);
assert_eq!(encoded.exponents(), Exponents { e: 9, f: 6 });

let decoded = decompress(&encoded);
let decoded = decompress(encoded);
let expected = PrimitiveArray::from_option_iter(vec![None, Some(1.234f32), None]);
assert_arrays_eq!(decoded, expected);
}
Expand All @@ -187,7 +282,7 @@ mod tests {
assert_arrays_eq!(encoded.encoded(), expected_encoded);
assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 });

let decoded = decompress(&encoded);
let decoded = decompress(encoded);
let expected_decoded = PrimitiveArray::new(values, Validity::NonNullable);
assert_arrays_eq!(decoded, expected_decoded);
}
Expand All @@ -204,7 +299,7 @@ mod tests {
assert_arrays_eq!(encoded.encoded(), expected_encoded);
assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 });

let decoded = decompress(&encoded);
let decoded = decompress(encoded);
assert_arrays_eq!(decoded, array);
}

Expand All @@ -223,9 +318,9 @@ mod tests {

assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 });

assert_arrays_eq!(&encoded, array);
assert_arrays_eq!(encoded, array);

let _decoded = decompress(&encoded);
let _decoded = decompress(encoded);
}

#[test]
Expand Down Expand Up @@ -362,4 +457,173 @@ mod tests {
let expected_values = PrimitiveArray::from_iter(vec![PI, E]);
assert_arrays_eq!(patch_values, expected_values);
}

#[test]
fn test_slice_half_chunk_f32_roundtrip() {
// Create 1024 elements, encode, slice to first 512, then decode
let values = vec![1.234f32; 1024];
let original = PrimitiveArray::new(Buffer::from(values), Validity::NonNullable);
let encoded = alp_encode(&original, None).unwrap();

let sliced_alp = encoded.slice(512..1024);
let decoded = sliced_alp.to_primitive();

let expected_slice = original.slice(512..1024).to_primitive();
assert_eq!(expected_slice.as_slice::<f32>(), decoded.as_slice::<f32>());
}

#[test]
fn test_slice_half_chunk_f64_roundtrip() {
let values = vec![5.678f64; 1024];
let original = PrimitiveArray::new(Buffer::from(values), Validity::NonNullable);
let encoded = alp_encode(&original, None).unwrap();

let sliced_alp = encoded.slice(512..1024);
let decoded = sliced_alp.to_primitive();

let expected_slice = original.slice(512..1024).to_primitive();
assert_eq!(expected_slice.as_slice::<f64>(), decoded.as_slice::<f64>());
}

#[test]
fn test_slice_half_chunk_with_patches_roundtrip() {
let mut values = vec![1.0f64; 1024];
values[100] = PI;
values[200] = E;
values[600] = 42.42;

let original = PrimitiveArray::new(Buffer::from(values), Validity::NonNullable);
let encoded = alp_encode(&original, None).unwrap();

let sliced_alp = encoded.slice(512..1024);
let decoded = sliced_alp.to_primitive();

let expected_slice = original.slice(512..1024).to_primitive();
assert_eq!(expected_slice.as_slice::<f64>(), decoded.as_slice::<f64>());
assert!(encoded.patches().is_some());
}

#[test]
fn test_slice_half_chunk_nullable_roundtrip() {
let values = (0..1024)
.map(|i| if i % 3 == 0 { None } else { Some(2.5f32) })
.collect::<Vec<_>>();

let original = PrimitiveArray::from_option_iter(values);
let encoded = alp_encode(&original, None).unwrap();

let sliced_alp = encoded.slice(512..1024);
let decoded = sliced_alp.to_primitive();

let expected_slice = original.slice(512..1024);
assert_arrays_eq!(decoded, expected_slice);
}

#[test]
fn test_large_f32_array_uniform_values() {
let size = 10_000;
let array = PrimitiveArray::new(buffer![42.125f32; size], Validity::NonNullable);
let encoded = alp_encode(&array, None).unwrap();

assert!(encoded.patches().is_none());
let decoded = decompress(encoded);
assert_eq!(array.as_slice::<f32>(), decoded.as_slice::<f32>());
}

#[test]
fn test_large_f64_array_uniform_values() {
let size = 50_000;
let array = PrimitiveArray::new(buffer![123.456789f64; size], Validity::NonNullable);
let encoded = alp_encode(&array, None).unwrap();

assert!(encoded.patches().is_none());
let decoded = decompress(encoded);
assert_eq!(array.as_slice::<f64>(), decoded.as_slice::<f64>());
}

#[test]
fn test_large_f32_array_with_patches() {
let size = 5_000;
let mut values = vec![1.5f32; size];
values[100] = std::f32::consts::PI;
values[1500] = std::f32::consts::E;
values[3000] = f32::NEG_INFINITY;
values[4500] = f32::INFINITY;

let array = PrimitiveArray::new(Buffer::from(values.clone()), Validity::NonNullable);
let encoded = alp_encode(&array, None).unwrap();

assert!(encoded.patches().is_some());
let decoded = decompress(encoded);
assert_eq!(values.as_slice(), decoded.as_slice::<f32>());
}

#[test]
fn test_large_f64_array_with_patches() {
let size = 8_000;
let mut values = vec![2.2184f64; size];
values[0] = PI;
values[1000] = E;
values[2000] = f64::NAN;
values[3000] = f64::INFINITY;
values[4000] = f64::NEG_INFINITY;
values[5000] = 0.0;
values[6000] = -0.0;
values[7000] = 999.999999999;

let array = PrimitiveArray::new(Buffer::from(values.clone()), Validity::NonNullable);
let encoded = alp_encode(&array, None).unwrap();

assert!(encoded.patches().is_some());
let decoded = decompress(encoded);

for idx in 0..size {
let decoded_val = decoded.as_slice::<f64>()[idx];
let original_val = values[idx];
assert!(
decoded_val.is_eq(original_val),
"At index {idx}: Expected {original_val} but got {decoded_val}"
);
}
}

#[test]
fn test_large_nullable_array() {
let size = 12_000;
let values: Vec<Option<f32>> = (0..size)
.map(|i| {
if i % 7 == 0 {
None
} else {
Some((i as f32) * 0.1)
}
})
.collect();

let array = PrimitiveArray::from_option_iter(values);
let encoded = alp_encode(&array, None).unwrap();
let decoded = decompress(encoded);

assert_arrays_eq!(decoded, array);
}

#[test]
fn test_large_mixed_validity_with_patches() {
let size = 6_000;
let mut values = vec![10.125f64; size];

values[500] = PI;
values[1500] = E;
values[2500] = f64::INFINITY;
values[3500] = f64::NEG_INFINITY;
values[4500] = f64::NAN;

let validity = Validity::from_iter((0..size).map(|i| !matches!(i, 500 | 2500)));

let array = PrimitiveArray::new(Buffer::from(values), validity);
let encoded = alp_encode(&array, None).unwrap();
let decoded = decompress(encoded);

assert_arrays_eq!(decoded, array);
}
}
Loading
Loading