diff --git a/protos/encodings_v2_1.proto b/protos/encodings_v2_1.proto index 2e79ed40216..83c8c771227 100644 --- a/protos/encodings_v2_1.proto +++ b/protos/encodings_v2_1.proto @@ -147,13 +147,25 @@ message FullZipLayout { repeated RepDefLayer layers = 8; } -// A layout used for pages where all values are null +// A layout used for pages where all (visible) values are the same scalar value. // -// There may be buffers of repetition and definition information -// if required in order to interpret what kind of nulls are present -message AllNullLayout { +// This generalizes the prior AllNullLayout semantics for file_version >= 2.2. +// +// There may be buffers of repetition and definition information if required in order +// to interpret what kind of nulls are present / which items are visible. +message ConstantLayout { // The meaning of each repdef layer, used to interpret repdef buffers correctly repeated RepDefLayer layers = 5; + + // Inline fixed-width scalar value bytes. + // + // This MUST only be used for types where a single non-null element is represented by a single + // fixed-width Arrow value buffer (i.e. no offsets buffer, no child data). + // + // Constraints: + // - MUST be absent for an all-null page + // - MUST be <= 32 bytes if present + optional bytes inline_value = 6; } // A layout where large binary data is encoded externally and only @@ -176,8 +188,8 @@ message PageLayout { oneof layout { // A layout used for pages where the data is small MiniBlockLayout mini_block_layout = 1; - // A layout used for pages where all values are null - AllNullLayout all_null_layout = 2; + // A layout used for pages where all (visible) values are the same scalar value or null. + ConstantLayout constant_layout = 2; // A layout used for pages where the data is large FullZipLayout full_zip_layout = 3; // A layout where large binary data is encoded externally diff --git a/rust/lance-arrow/src/lib.rs b/rust/lance-arrow/src/lib.rs index aa332ae1f5b..0a61a407395 100644 --- a/rust/lance-arrow/src/lib.rs +++ b/rust/lance-arrow/src/lib.rs @@ -34,6 +34,7 @@ pub mod cast; pub mod json; pub mod list; pub mod memory; +pub mod scalar; pub mod r#struct; /// Arrow extension metadata key for extension name diff --git a/rust/lance-arrow/src/scalar.rs b/rust/lance-arrow/src/scalar.rs new file mode 100644 index 00000000000..e475b0561a7 --- /dev/null +++ b/rust/lance-arrow/src/scalar.rs @@ -0,0 +1,264 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use arrow_array::{make_array, ArrayRef}; +use arrow_buffer::Buffer; +use arrow_data::{transform::MutableArrayData, ArrayDataBuilder}; +use arrow_schema::{ArrowError, DataType}; + +use crate::DataTypeExt; + +type Result = std::result::Result; + +pub const INLINE_VALUE_MAX_BYTES: usize = 32; + +pub fn extract_scalar_value(array: &ArrayRef, idx: usize) -> Result { + if idx >= array.len() { + return Err(ArrowError::InvalidArgumentError( + "Scalar index out of bounds".to_string(), + )); + } + + let data = array.to_data(); + let mut mutable = MutableArrayData::new(vec![&data], /*use_nulls=*/ true, 1); + mutable.extend(0, idx, idx + 1); + Ok(make_array(mutable.freeze())) +} + +fn read_u32(buf: &[u8], offset: &mut usize) -> Result { + if *offset + 4 > buf.len() { + return Err(ArrowError::InvalidArgumentError( + "Invalid scalar value buffer: unexpected EOF".to_string(), + )); + } + let bytes = [ + buf[*offset], + buf[*offset + 1], + buf[*offset + 2], + buf[*offset + 3], + ]; + *offset += 4; + Ok(u32::from_le_bytes(bytes)) +} + +fn read_bytes<'a>(buf: &'a [u8], offset: &mut usize, len: usize) -> Result<&'a [u8]> { + if *offset + len > buf.len() { + return Err(ArrowError::InvalidArgumentError( + "Invalid scalar value buffer: unexpected EOF".to_string(), + )); + } + let slice = &buf[*offset..*offset + len]; + *offset += len; + Ok(slice) +} + +fn write_u32(out: &mut Vec, v: u32) { + out.extend_from_slice(&v.to_le_bytes()); +} + +fn write_bytes(out: &mut Vec, bytes: &[u8]) { + out.extend_from_slice(bytes); +} + +pub fn encode_scalar_value_buffer(scalar: &ArrayRef) -> Result> { + if scalar.len() != 1 || scalar.null_count() != 0 { + return Err(ArrowError::InvalidArgumentError( + "Scalar value buffer must be a single non-null value".to_string(), + )); + } + let data = scalar.to_data(); + if data.offset() != 0 { + return Err(ArrowError::InvalidArgumentError( + "Scalar value buffer must have offset=0".to_string(), + )); + } + if !data.child_data().is_empty() { + return Err(ArrowError::InvalidArgumentError( + "Scalar value buffer does not support nested types".to_string(), + )); + } + + // Minimal format (RFC): store the Arrow value buffers for a length-1 array. + // Null bitmap and child data are intentionally not supported here. + // + // | u32 num_buffers | + // | u32 buffer_0_len | ... | u32 buffer_{n-1}_len | + // | buffer_0 bytes | ... | buffer_{n-1} bytes | + let mut out = Vec::with_capacity(128); + let buffers = data.buffers(); + write_u32(&mut out, buffers.len() as u32); + for b in buffers { + write_u32(&mut out, b.len() as u32); + } + for b in buffers { + write_bytes(&mut out, b.as_slice()); + } + Ok(out) +} + +pub fn decode_scalar_from_value_buffer( + data_type: &DataType, + value_buffer: &[u8], +) -> Result { + if matches!( + data_type, + DataType::Struct(_) | DataType::FixedSizeList(_, _) + ) { + return Err(ArrowError::InvalidArgumentError(format!( + "Scalar value buffer does not support nested data type {:?}", + data_type + ))); + } + + let mut offset = 0; + let num_buffers = read_u32(value_buffer, &mut offset)? as usize; + let buffer_lens = (0..num_buffers) + .map(|_| read_u32(value_buffer, &mut offset).map(|l| l as usize)) + .collect::>>()?; + + let mut buffers = Vec::with_capacity(num_buffers); + for len in buffer_lens { + let bytes = read_bytes(value_buffer, &mut offset, len)?; + buffers.push(Buffer::from_vec(bytes.to_vec())); + } + + if offset != value_buffer.len() { + return Err(ArrowError::InvalidArgumentError( + "Invalid scalar value buffer: trailing bytes".to_string(), + )); + } + + let mut builder = ArrayDataBuilder::new(data_type.clone()) + .len(1) + .null_count(0); + for b in buffers { + builder = builder.add_buffer(b); + } + Ok(make_array(builder.build()?)) +} + +pub fn decode_scalar_from_inline_value( + data_type: &DataType, + inline_value: &[u8], +) -> Result { + let byte_width = data_type.byte_width_opt().ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "Inline constant is not supported for non-fixed-stride data type {:?}", + data_type + )) + })?; + + if inline_value.len() != byte_width { + return Err(ArrowError::InvalidArgumentError(format!( + "Inline constant length mismatch for {:?}: expected {} bytes but got {}", + data_type, + byte_width, + inline_value.len() + ))); + } + + let data = ArrayDataBuilder::new(data_type.clone()) + .len(1) + .null_count(0) + .add_buffer(Buffer::from_vec(inline_value.to_vec())) + .build()?; + Ok(make_array(data)) +} + +pub fn try_inline_value(scalar: &ArrayRef) -> Option> { + if scalar.null_count() != 0 || scalar.len() != 1 { + return None; + } + let data = scalar.to_data(); + if !data.child_data().is_empty() { + return None; + } + if data.buffers().len() != 1 { + return None; + } + let bytes = data.buffers()[0].as_slice(); + if bytes.len() > INLINE_VALUE_MAX_BYTES { + return None; + } + Some(bytes.to_vec()) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_array::{cast::AsArray, FixedSizeBinaryArray, Int32Array, StringArray}; + + use super::*; + + #[test] + fn test_extract_scalar_value() { + let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])); + let scalar = extract_scalar_value(&array, 2).unwrap(); + assert_eq!(scalar.len(), 1); + assert_eq!( + scalar + .as_primitive::() + .value(0), + 3 + ); + } + + #[test] + fn test_scalar_value_buffer_utf8_round_trip() { + let scalar: ArrayRef = Arc::new(StringArray::from(vec!["hello"])); + let buf = encode_scalar_value_buffer(&scalar).unwrap(); + let decoded = decode_scalar_from_value_buffer(&DataType::Utf8, &buf).unwrap(); + assert_eq!(decoded.len(), 1); + assert_eq!(decoded.null_count(), 0); + assert_eq!(decoded.as_string::().value(0), "hello"); + } + + #[test] + fn test_scalar_value_buffer_fixed_size_binary_round_trip() { + let val = vec![0xABu8; 33]; + let scalar: ArrayRef = Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter_with_size( + std::iter::once(Some(val.as_slice())), + 33, + ) + .unwrap(), + ); + let buf = encode_scalar_value_buffer(&scalar).unwrap(); + let decoded = + decode_scalar_from_value_buffer(&DataType::FixedSizeBinary(33), &buf).unwrap(); + assert_eq!(decoded.len(), 1); + assert_eq!(decoded.as_fixed_size_binary().value(0), val.as_slice()); + } + + #[test] + fn test_scalar_value_buffer_rejects_nested_type() { + let field = Arc::new(arrow_schema::Field::new("item", DataType::Int32, false)); + let list: ArrayRef = Arc::new(arrow_array::FixedSizeListArray::new( + field, + 2, + Arc::new(Int32Array::from(vec![1, 2])), + None, + )); + let scalar = list.slice(0, 1); + assert!(encode_scalar_value_buffer(&scalar).is_err()); + } + + #[test] + fn test_decode_scalar_from_value_buffer_rejects_nested_type() { + let buf = Vec::::new(); + let res = + decode_scalar_from_value_buffer(&DataType::Struct(arrow_schema::Fields::empty()), &buf); + assert!(res.is_err()); + } + + #[test] + fn test_decode_scalar_from_value_buffer_trailing_bytes() { + // num_buffers = 0, plus an extra byte + let mut bytes = Vec::new(); + bytes.extend_from_slice(&0u32.to_le_bytes()); + bytes.push(1); + let res = decode_scalar_from_value_buffer(&DataType::Int32, &bytes); + assert!(res.is_err()); + } +} diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index c633c9b93fa..fc78aee288e 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -24,12 +24,13 @@ use crate::{ }, }; use arrow_array::{cast::AsArray, make_array, types::UInt64Type, Array, ArrayRef, PrimitiveArray}; -use arrow_buffer::{BooleanBuffer, NullBuffer, ScalarBuffer}; +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer}; use arrow_schema::{DataType, Field as ArrowField}; use bytes::Bytes; use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt, TryStreamExt}; use itertools::Itertools; use lance_arrow::deepcopy::deep_copy_nulls; +use lance_arrow::DataTypeExt; use lance_core::{ cache::{CacheKey, Context, DeepSizeOf}, error::{Error, LanceOptionExt}, @@ -85,6 +86,7 @@ use crate::{ }; pub mod blob; +pub mod constant; pub mod dict; pub mod fullzip; pub mod miniblock; @@ -3071,13 +3073,23 @@ impl StructuralPrimitiveFieldScheduler { scheduler.enable_cache = cache_repetition_index; Box::new(scheduler) } - Layout::AllNullLayout(all_null) => { - let def_meaning = all_null + Layout::ConstantLayout(constant_layout) => { + let def_meaning = constant_layout .layers .iter() .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l)) .collect::>(); - if def_meaning.len() == 1 + let has_scalar_value = constant_layout.inline_value.is_some() + || page_info.buffer_offsets_and_sizes.len() == 1 + || page_info.buffer_offsets_and_sizes.len() == 3; + if has_scalar_value { + Box::new(constant::ConstantPageScheduler::try_new( + page_info.buffer_offsets_and_sizes.clone(), + constant_layout.inline_value.clone(), + target_field.data_type(), + def_meaning.into(), + )?) as Box + } else if def_meaning.len() == 1 && def_meaning[0] == DefinitionInterpretation::NullableItem { Box::new(SimpleAllNullScheduler::default()) as Box @@ -3811,7 +3823,8 @@ impl PrimitiveStructuralEncoder { num_rows: u64, row_number: u64, ) -> Result { - let description = ProtobufUtils21::simple_all_null_layout(); + let description = + ProtobufUtils21::constant_layout(&[DefinitionInterpretation::NullableItem], None); Ok(EncodedPage { column_idx, data: vec![], @@ -3826,12 +3839,10 @@ impl PrimitiveStructuralEncoder { // different kinds of null) fn encode_complex_all_null( column_idx: u32, - repdefs: Vec, + repdef: crate::repdef::SerializedRepDefs, row_number: u64, num_rows: u64, ) -> Result { - let repdef = RepDefBuilder::serialize(repdefs); - // TODO: Actually compress repdef let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() { LanceBuffer::reinterpret_slice(rep.clone()) @@ -3845,7 +3856,7 @@ impl PrimitiveStructuralEncoder { LanceBuffer::empty() }; - let description = ProtobufUtils21::all_null_layout(&repdef.def_meaning); + let description = ProtobufUtils21::constant_layout(&repdef.def_meaning, None); Ok(EncodedPage { column_idx, data: vec![rep_bytes, def_bytes], @@ -3855,20 +3866,204 @@ impl PrimitiveStructuralEncoder { }) } + fn leaf_validity( + repdef: &crate::repdef::SerializedRepDefs, + num_values: usize, + ) -> Result> { + let rep = repdef + .repetition_levels + .as_ref() + .map(|rep| rep.as_ref().to_vec()); + let def = repdef + .definition_levels + .as_ref() + .map(|def| def.as_ref().to_vec()); + let mut unraveler = RepDefUnraveler::new( + rep, + def, + repdef.def_meaning.clone().into(), + num_values as u64, + ); + if unraveler.is_all_valid() { + return Ok(None); + } + let mut validity = BooleanBufferBuilder::new(num_values); + unraveler.unravel_validity(&mut validity); + Ok(Some(validity.finish())) + } + + fn is_constant_values( + arrays: &[ArrayRef], + scalar: &ArrayRef, + validity: Option<&BooleanBuffer>, + ) -> Result { + debug_assert_eq!(scalar.len(), 1); + debug_assert_eq!(scalar.null_count(), 0); + + match scalar.data_type() { + DataType::Boolean => { + let mut global_idx = 0usize; + let scalar_val = scalar.as_boolean().value(0); + for arr in arrays { + let bool_arr = arr.as_boolean(); + for i in 0..arr.len() { + let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true); + global_idx += 1; + if !is_valid { + continue; + } + if bool_arr.value(i) != scalar_val { + return Ok(false); + } + } + } + Ok(true) + } + DataType::Utf8 => Self::is_constant_utf8::(arrays, scalar, validity), + DataType::LargeUtf8 => Self::is_constant_utf8::(arrays, scalar, validity), + DataType::Binary => Self::is_constant_binary::(arrays, scalar, validity), + DataType::LargeBinary => Self::is_constant_binary::(arrays, scalar, validity), + data_type => { + let mut global_idx = 0usize; + let Some(byte_width) = data_type.byte_width_opt() else { + return Ok(false); + }; + let scalar_data = scalar.to_data(); + if scalar_data.buffers().len() != 1 || !scalar_data.child_data().is_empty() { + return Ok(false); + } + let scalar_bytes = scalar_data.buffers()[0].as_slice(); + if scalar_bytes.len() != byte_width { + return Ok(false); + } + + for arr in arrays { + let data = arr.to_data(); + if data.buffers().is_empty() { + return Ok(false); + } + let buf = data.buffers()[0].as_slice(); + let base = data.offset(); + for i in 0..arr.len() { + let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true); + global_idx += 1; + if !is_valid { + continue; + } + let start = (base + i) * byte_width; + if buf[start..start + byte_width] != scalar_bytes[..] { + return Ok(false); + } + } + } + Ok(true) + } + } + } + + fn is_constant_utf8( + arrays: &[ArrayRef], + scalar: &ArrayRef, + validity: Option<&BooleanBuffer>, + ) -> Result { + debug_assert_eq!(scalar.len(), 1); + let scalar_val = scalar.as_string::().value(0).as_bytes(); + let mut global_idx = 0usize; + for arr in arrays { + let str_arr = arr.as_string::(); + for i in 0..arr.len() { + let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true); + global_idx += 1; + if !is_valid { + continue; + } + if str_arr.value(i).as_bytes() != scalar_val { + return Ok(false); + } + } + } + Ok(true) + } + + fn is_constant_binary( + arrays: &[ArrayRef], + scalar: &ArrayRef, + validity: Option<&BooleanBuffer>, + ) -> Result { + debug_assert_eq!(scalar.len(), 1); + let scalar_val = scalar.as_binary::().value(0); + let mut global_idx = 0usize; + for arr in arrays { + let bin_arr = arr.as_binary::(); + for i in 0..arr.len() { + let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true); + global_idx += 1; + if !is_valid { + continue; + } + if bin_arr.value(i) != scalar_val { + return Ok(false); + } + } + } + Ok(true) + } + + fn find_constant_scalar( + arrays: &[ArrayRef], + validity: Option<&BooleanBuffer>, + ) -> Result> { + if arrays.is_empty() { + return Ok(None); + } + + let global_scalar_idx = if let Some(validity) = validity { + let Some(idx) = (0..validity.len()).find(|&i| validity.value(i)) else { + return Ok(None); + }; + idx + } else { + 0 + }; + + let mut idx_remaining = global_scalar_idx; + let mut scalar_arr_idx = 0usize; + while scalar_arr_idx < arrays.len() { + let len = arrays[scalar_arr_idx].len(); + if idx_remaining < len { + break; + } + idx_remaining -= len; + scalar_arr_idx += 1; + } + + if scalar_arr_idx >= arrays.len() { + return Ok(None); + } + + let scalar = + lance_arrow::scalar::extract_scalar_value(&arrays[scalar_arr_idx], idx_remaining)?; + if scalar.null_count() != 0 { + return Ok(None); + } + if !Self::is_constant_values(arrays, &scalar, validity)? { + return Ok(None); + } + Ok(Some(scalar)) + } + #[allow(clippy::too_many_arguments)] fn encode_miniblock( column_idx: u32, field: &Field, compression_strategy: &dyn CompressionStrategy, data: DataBlock, - repdefs: Vec, + repdef: crate::repdef::SerializedRepDefs, row_number: u64, dictionary_data: Option, num_rows: u64, support_large_chunk: bool, ) -> Result { - let repdef = RepDefBuilder::serialize(repdefs); - if let DataBlock::AllNull(_null_block) = data { // We should not be using mini-block for all-null. There are other structural // encodings for that. @@ -4190,11 +4385,10 @@ impl PrimitiveStructuralEncoder { field: &Field, compression_strategy: &dyn CompressionStrategy, data: DataBlock, - repdefs: Vec, + repdef: crate::repdef::SerializedRepDefs, row_number: u64, num_lists: u64, ) -> Result { - let repdef = RepDefBuilder::serialize(repdefs); let max_rep = repdef .repetition_levels .as_ref() @@ -4424,29 +4618,33 @@ impl PrimitiveStructuralEncoder { let support_large_chunk = self.support_large_chunk; let version = self.version; let task = spawn_cpu(move || { - let num_values = arrays.iter().map(|arr| arr.len() as u64).sum(); + let num_values = arrays.iter().map(|arr| arr.len() as u64).sum(); + let is_simple_validity = repdefs.iter().all(|rd| rd.is_simple_validity()); + let has_repdef_info = repdefs.iter().any(|rd| !rd.is_empty()); + let repdef = RepDefBuilder::serialize(repdefs); if num_values == 0 { // We should not encode empty arrays. So if we get here that should mean that we // either have all empty lists or all null lists (or a mix). We still need to encode // the rep/def information but we can skip the data encoding. log::debug!("Encoding column {} with {} items ({} rows) using complex-null layout", column_idx, num_values, num_rows); - return Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows); + return Self::encode_complex_all_null(column_idx, repdef, row_number, num_rows); } - let num_nulls = arrays - .iter() - .map(|arr| arr.logical_nulls().map(|n| n.null_count()).unwrap_or(0) as u64) - .sum::(); - if num_values == num_nulls { - return if repdefs.iter().all(|rd| rd.is_simple_validity()) { + let leaf_validity = Self::leaf_validity(&repdef, num_values as usize)?; + let all_null = leaf_validity + .as_ref() + .map(|validity| validity.count_set_bits() == 0) + .unwrap_or(false); + + if all_null { + return if is_simple_validity { log::debug!( "Encoding column {} with {} items ({} rows) using simple-null layout", column_idx, num_values, num_rows ); - // Simple case, no rep/def and all nulls, we don't need to encode any data Self::encode_simple_all_null(column_idx, num_values, row_number) } else { log::debug!( @@ -4455,14 +4653,13 @@ impl PrimitiveStructuralEncoder { num_values, num_rows ); - // If we get here then we have definition levels and we need to store those - Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows) + Self::encode_complex_all_null(column_idx, repdef, row_number, num_rows) }; } if let DataType::Struct(fields) = &field.data_type() { if fields.is_empty() { - if repdefs.iter().any(|rd| !rd.is_empty()) { + if has_repdef_info { return Err(Error::InvalidInput { source: format!("Empty structs with rep/def information are not yet supported. The field {} is an empty struct that either has nulls or is in a list.", field.name).into(), location: location!() }); } // This is maybe a little confusing but the reader should never look at this anyways and it @@ -4473,6 +4670,25 @@ impl PrimitiveStructuralEncoder { let data_block = DataBlock::from_arrays(&arrays, num_values); + if version.resolve() >= LanceFileVersion::V2_2 { + if let Some(scalar) = Self::find_constant_scalar(&arrays, leaf_validity.as_ref())? + { + log::debug!( + "Encoding column {} with {} items ({} rows) using constant layout", + column_idx, + num_values, + num_rows + ); + return constant::encode_constant_page( + column_idx, + scalar, + repdef, + row_number, + num_rows, + ); + } + } + let requires_full_zip_packed_struct = if let DataBlock::Struct(ref struct_data_block) = data_block { struct_data_block.has_variable_width_child() @@ -4491,7 +4707,7 @@ impl PrimitiveStructuralEncoder { &field, compression_strategy.as_ref(), data_block, - repdefs, + repdef, row_number, num_rows, ); @@ -4510,7 +4726,7 @@ impl PrimitiveStructuralEncoder { &field, compression_strategy.as_ref(), indices_data_block, - repdefs, + repdef, row_number, Some(dictionary_data_block), num_rows, @@ -4524,12 +4740,13 @@ impl PrimitiveStructuralEncoder { ); let (indices_data_block, dictionary_data_block) = dict::dictionary_encode(data_block); + Self::encode_miniblock( column_idx, &field, compression_strategy.as_ref(), indices_data_block, - repdefs, + repdef, row_number, Some(dictionary_data_block), num_rows, @@ -4546,7 +4763,7 @@ impl PrimitiveStructuralEncoder { &field, compression_strategy.as_ref(), data_block, - repdefs, + repdef, row_number, None, num_rows, @@ -4563,7 +4780,7 @@ impl PrimitiveStructuralEncoder { &field, compression_strategy.as_ref(), data_block, - repdefs, + repdef, row_number, num_rows, ) @@ -5980,4 +6197,268 @@ mod tests { assert!(!result, "Should not use dictionary encode based on size"); } + + async fn encode_first_page( + field: arrow_schema::Field, + array: ArrayRef, + version: LanceFileVersion, + ) -> crate::encoder::EncodedPage { + use crate::encoder::{ + default_encoding_strategy, ColumnIndexSequence, EncodingOptions, OutOfLineBuffers, + MIN_PAGE_BUFFER_ALIGNMENT, + }; + use crate::repdef::RepDefBuilder; + + let lance_field = lance_core::datatypes::Field::try_from(&field).unwrap(); + let encoding_strategy = default_encoding_strategy(version); + let mut column_index_seq = ColumnIndexSequence::default(); + let encoding_options = EncodingOptions { + cache_bytes_per_column: 1, + max_page_bytes: 32 * 1024 * 1024, + keep_original_array: true, + buffer_alignment: MIN_PAGE_BUFFER_ALIGNMENT, + version, + }; + + let mut encoder = encoding_strategy + .create_field_encoder( + encoding_strategy.as_ref(), + &lance_field, + &mut column_index_seq, + &encoding_options, + ) + .unwrap(); + + let mut external_buffers = OutOfLineBuffers::new(0, MIN_PAGE_BUFFER_ALIGNMENT); + let repdef = RepDefBuilder::default(); + let num_rows = array.len() as u64; + let mut pages = Vec::new(); + for task in encoder + .maybe_encode(array, &mut external_buffers, repdef, 0, num_rows) + .unwrap() + { + pages.push(task.await.unwrap()); + } + for task in encoder.flush(&mut external_buffers).unwrap() { + pages.push(task.await.unwrap()); + } + pages.into_iter().next().unwrap() + } + + #[tokio::test] + async fn test_constant_layout_out_of_line_fixed_size_binary_v2_2() { + use crate::format::pb21::page_layout::Layout; + + let val = vec![0xABu8; 33]; + let arr: ArrayRef = Arc::new( + arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size( + std::iter::repeat_n(Some(val.as_slice()), 256), + 33, + ) + .unwrap(), + ); + let field = arrow_schema::Field::new("c", DataType::FixedSizeBinary(33), true); + let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await; + + let PageEncoding::Structural(layout) = &page.description else { + panic!("Expected structural encoding"); + }; + let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else { + panic!("Expected constant layout in slot 2"); + }; + assert!(layout.inline_value.is_none()); + assert_eq!(page.data.len(), 1); + + let test_cases = TestCases::default() + .with_min_file_version(LanceFileVersion::V2_2) + .with_max_file_version(LanceFileVersion::V2_2) + .with_page_sizes(vec![4096]); + check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await; + } + + #[tokio::test] + async fn test_constant_layout_out_of_line_utf8_v2_2() { + use crate::format::pb21::page_layout::Layout; + + let arr: ArrayRef = Arc::new(arrow_array::StringArray::from_iter_values( + std::iter::repeat_n("hello", 512), + )); + let field = arrow_schema::Field::new("c", DataType::Utf8, true); + let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await; + + let PageEncoding::Structural(layout) = &page.description else { + panic!("Expected structural encoding"); + }; + let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else { + panic!("Expected constant layout in slot 2"); + }; + assert!(layout.inline_value.is_none()); + assert_eq!(page.data.len(), 1); + + let test_cases = TestCases::default() + .with_min_file_version(LanceFileVersion::V2_2) + .with_max_file_version(LanceFileVersion::V2_2) + .with_page_sizes(vec![4096]); + check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await; + } + + #[tokio::test] + async fn test_constant_layout_nullable_item_v2_2() { + use crate::format::pb21::page_layout::Layout; + + let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![ + Some(7), + None, + Some(7), + None, + Some(7), + ])); + let field = arrow_schema::Field::new("c", DataType::Int32, true); + let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await; + + let PageEncoding::Structural(layout) = &page.description else { + panic!("Expected structural encoding"); + }; + let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else { + panic!("Expected constant layout in slot 2"); + }; + assert!(layout.inline_value.is_some()); + assert_eq!(page.data.len(), 2); + + let test_cases = TestCases::default() + .with_min_file_version(LanceFileVersion::V2_2) + .with_max_file_version(LanceFileVersion::V2_2) + .with_page_sizes(vec![4096]); + check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await; + } + + #[tokio::test] + async fn test_constant_layout_list_repdef_v2_2() { + use crate::format::pb21::page_layout::Layout; + use arrow_array::builder::{Int32Builder, ListBuilder}; + + let mut builder = ListBuilder::new(Int32Builder::new()); + builder.values().append_value(7); + builder.values().append_null(); + builder.values().append_value(7); + builder.append(true); + + builder.append(true); + + builder.values().append_value(7); + builder.append(true); + + builder.append_null(); + + let arr: ArrayRef = Arc::new(builder.finish()); + let field = arrow_schema::Field::new( + "c", + DataType::List(Arc::new(arrow_schema::Field::new( + "item", + DataType::Int32, + true, + ))), + true, + ); + let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await; + + let PageEncoding::Structural(layout) = &page.description else { + panic!("Expected structural encoding"); + }; + let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else { + panic!("Expected constant layout in slot 2"); + }; + assert!(layout.inline_value.is_some()); + assert_eq!(page.data.len(), 2); + + let test_cases = TestCases::default() + .with_min_file_version(LanceFileVersion::V2_2) + .with_max_file_version(LanceFileVersion::V2_2) + .with_page_sizes(vec![4096]); + check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await; + } + + #[tokio::test] + async fn test_constant_layout_fixed_size_list_not_used_v2_2() { + use crate::format::pb21::page_layout::Layout; + use arrow_array::builder::{FixedSizeListBuilder, Int32Builder}; + + let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3); + for _ in 0..64 { + builder.values().append_value(1); + builder.values().append_null(); + builder.values().append_value(3); + builder.append(true); + } + let arr: ArrayRef = Arc::new(builder.finish()); + let field = arrow_schema::Field::new( + "c", + DataType::FixedSizeList( + Arc::new(arrow_schema::Field::new("item", DataType::Int32, true)), + 3, + ), + true, + ); + let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await; + + if let PageEncoding::Structural(layout) = &page.description { + assert!( + !matches!(layout.layout.as_ref().unwrap(), Layout::ConstantLayout(_)), + "FixedSizeList should not use constant layout yet" + ); + } + + let test_cases = TestCases::default() + .with_min_file_version(LanceFileVersion::V2_2) + .with_max_file_version(LanceFileVersion::V2_2) + .with_page_sizes(vec![4096]); + check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await; + } + + #[tokio::test] + async fn test_constant_layout_not_written_before_v2_2() { + use crate::format::pb21::page_layout::Layout; + + let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![7; 1024])); + let field = arrow_schema::Field::new("c", DataType::Int32, true); + let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_1).await; + + let PageEncoding::Structural(layout) = &page.description else { + return; + }; + assert!( + !matches!(layout.layout.as_ref().unwrap(), Layout::ConstantLayout(_)), + "Should not emit constant layout before v2.2" + ); + + let test_cases = TestCases::default() + .with_min_file_version(LanceFileVersion::V2_1) + .with_max_file_version(LanceFileVersion::V2_1) + .with_page_sizes(vec![4096]); + check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await; + } + + #[tokio::test] + async fn test_all_null_constant_layout_still_works_v2_2() { + use crate::format::pb21::page_layout::Layout; + + let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![None, None, None])); + let field = arrow_schema::Field::new("c", DataType::Int32, true); + let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await; + + let PageEncoding::Structural(layout) = &page.description else { + panic!("Expected structural encoding"); + }; + let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else { + panic!("Expected layout in slot 2"); + }; + assert!(layout.inline_value.is_none()); + assert_eq!(page.data.len(), 0); + + let test_cases = TestCases::default() + .with_min_file_version(LanceFileVersion::V2_2) + .with_max_file_version(LanceFileVersion::V2_2) + .with_page_sizes(vec![4096]); + check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await; + } } diff --git a/rust/lance-encoding/src/encodings/logical/primitive/constant.rs b/rust/lance-encoding/src/encodings/logical/primitive/constant.rs new file mode 100644 index 00000000000..822c90fcb3f --- /dev/null +++ b/rust/lance-encoding/src/encodings/logical/primitive/constant.rs @@ -0,0 +1,515 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::{any::Any, collections::VecDeque, ops::Range, sync::Arc}; + +use arrow_array::{new_empty_array, Array, ArrayRef}; +use arrow_buffer::ScalarBuffer; +use arrow_schema::DataType; +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::FutureExt; +use snafu::location; + +use lance_core::{ + cache::{Context, DeepSizeOf}, + Error, Result, +}; + +use crate::{ + buffer::LanceBuffer, + decoder::PageEncoding, + encoder::EncodedPage, + encodings::logical::primitive::{CachedPageData, PageLoadTask}, + format::ProtobufUtils21, + repdef::{DefinitionInterpretation, RepDefUnraveler}, + EncodingsIo, +}; + +pub(crate) fn encode_constant_page( + column_idx: u32, + scalar: ArrayRef, + repdef: crate::repdef::SerializedRepDefs, + row_number: u64, + num_rows: u64, +) -> Result { + let inline_value = lance_arrow::scalar::try_inline_value(&scalar); + let value_buffer = if inline_value.is_some() { + None + } else { + Some(LanceBuffer::from( + lance_arrow::scalar::encode_scalar_value_buffer(&scalar)?, + )) + }; + + let description = ProtobufUtils21::constant_layout(&repdef.def_meaning, inline_value); + + let has_repdef = repdef.repetition_levels.is_some() || repdef.definition_levels.is_some(); + + let data = if !has_repdef { + value_buffer.into_iter().collect::>() + } else { + let rep_bytes = repdef + .repetition_levels + .as_ref() + .map(|rep| LanceBuffer::reinterpret_slice(rep.clone())) + .unwrap_or_else(LanceBuffer::empty); + let def_bytes = repdef + .definition_levels + .as_ref() + .map(|def| LanceBuffer::reinterpret_slice(def.clone())) + .unwrap_or_else(LanceBuffer::empty); + + match value_buffer { + Some(value_buffer) => vec![value_buffer, rep_bytes, def_bytes], + None => vec![rep_bytes, def_bytes], + } + }; + + Ok(EncodedPage { + column_idx, + data, + description: PageEncoding::Structural(description), + num_rows, + row_number, + }) +} + +#[derive(Debug)] +struct CachedConstantState { + scalar: ArrayRef, + rep: Option>, + def: Option>, +} + +impl DeepSizeOf for CachedConstantState { + fn deep_size_of_children(&self, _ctx: &mut Context) -> usize { + self.scalar.get_buffer_memory_size() + + self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0) + + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0) + } +} + +impl CachedPageData for CachedConstantState { + fn as_arc_any(self: Arc) -> Arc { + self + } +} + +#[derive(Debug, Clone)] +enum ScalarSource { + Inline(Vec), + ValueBuffer(usize), +} + +#[derive(Debug)] +pub struct ConstantPageScheduler { + buffer_offsets_and_sizes: Arc<[(u64, u64)]>, + scalar_source: ScalarSource, + rep_buf_idx: Option, + def_buf_idx: Option, + data_type: DataType, + def_meaning: Arc<[DefinitionInterpretation]>, + max_rep: u16, + max_visible_def: u16, + repdef: Option>, +} + +impl ConstantPageScheduler { + pub fn try_new( + buffer_offsets_and_sizes: Arc<[(u64, u64)]>, + inline_value: Option, + data_type: DataType, + def_meaning: Arc<[DefinitionInterpretation]>, + ) -> Result { + let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16; + let max_visible_def = def_meaning + .iter() + .filter(|d| !d.is_list()) + .map(|d| d.num_def_levels()) + .sum(); + + let (scalar_source, rep_buf_idx, def_buf_idx) = + match (inline_value, buffer_offsets_and_sizes.len()) { + (Some(inline), 0) => (ScalarSource::Inline(inline.to_vec()), None, None), + (Some(inline), 2) => (ScalarSource::Inline(inline.to_vec()), Some(0), Some(1)), + (None, 1) => (ScalarSource::ValueBuffer(0), None, None), + (None, 3) => (ScalarSource::ValueBuffer(0), Some(1), Some(2)), + (Some(_inline), 1) => { + return Err(Error::invalid_input( + format!( + "Invalid constant layout: inline_value present with {} buffers", + 1 + ), + location!(), + )); + } + (Some(_inline), 3) => { + return Err(Error::invalid_input( + "Invalid constant layout: inline_value present with 3 buffers", + location!(), + )); + } + (None, 0) => { + return Err(Error::invalid_input( + "Invalid constant layout: missing scalar source", + location!(), + )) + } + (None, 2) => { + return Err(Error::invalid_input( + "Invalid constant layout: ambiguous (2 buffers and no inline_value)", + location!(), + )) + } + (Some(_), n) => { + return Err(Error::invalid_input( + format!( + "Invalid constant layout: inline_value present with {} buffers", + n + ), + location!(), + )) + } + (None, n) => { + return Err(Error::invalid_input( + format!("Invalid constant layout: unexpected buffer count {}", n), + location!(), + )) + } + }; + + Ok(Self { + buffer_offsets_and_sizes, + scalar_source, + rep_buf_idx, + def_buf_idx, + data_type, + def_meaning, + max_rep, + max_visible_def, + repdef: None, + }) + } +} + +impl crate::encodings::logical::primitive::StructuralPageScheduler for ConstantPageScheduler { + fn initialize<'a>( + &'a mut self, + io: &Arc, + ) -> BoxFuture<'a, Result>> { + let rep_range = self + .rep_buf_idx + .and_then(|idx| self.buffer_offsets_and_sizes.get(idx).copied()) + .filter(|(_, len)| *len > 0) + .map(|(pos, len)| pos..pos + len); + + let def_range = self + .def_buf_idx + .and_then(|idx| self.buffer_offsets_and_sizes.get(idx).copied()) + .filter(|(_, len)| *len > 0) + .map(|(pos, len)| pos..pos + len); + + let scalar_range = match self.scalar_source { + ScalarSource::ValueBuffer(idx) => { + let (pos, len) = self.buffer_offsets_and_sizes[idx]; + Some(pos..pos + len) + } + ScalarSource::Inline(_) => None, + }; + + let mut reads = Vec::with_capacity(3); + if let Some(r) = scalar_range { + reads.push(r); + } + if let Some(r) = rep_range.clone() { + reads.push(r); + } + if let Some(r) = def_range.clone() { + reads.push(r); + } + + if reads.is_empty() { + let ScalarSource::Inline(inline) = &self.scalar_source else { + return std::future::ready(Err(Error::invalid_input( + "Invalid constant layout: missing scalar source", + location!(), + ))) + .boxed(); + }; + + let scalar = match lance_arrow::scalar::decode_scalar_from_inline_value( + &self.data_type, + inline.as_slice(), + ) { + Ok(s) => s, + Err(e) => return std::future::ready(Err(e.into())).boxed(), + }; + let cached = Arc::new(CachedConstantState { + scalar, + rep: None, + def: None, + }); + self.repdef = Some(cached.clone()); + return std::future::ready(Ok(cached as Arc)).boxed(); + } + + let data = io.submit_request(reads, 0); + let scalar_source = self.scalar_source.clone(); + let data_type = self.data_type.clone(); + async move { + let mut data_iter = data.await?.into_iter(); + + let scalar = match scalar_source { + ScalarSource::Inline(inline) => { + lance_arrow::scalar::decode_scalar_from_inline_value(&data_type, &inline)? + } + ScalarSource::ValueBuffer(_) => { + let bytes = data_iter.next().unwrap(); + let buf = LanceBuffer::from_bytes(bytes, 1); + lance_arrow::scalar::decode_scalar_from_value_buffer(&data_type, buf.as_ref())? + } + }; + + let rep = rep_range.map(|_| { + let rep = data_iter.next().unwrap(); + let rep = LanceBuffer::from_bytes(rep, 2); + rep.borrow_to_typed_slice::() + }); + + let def = def_range.map(|_| { + let def = data_iter.next().unwrap(); + let def = LanceBuffer::from_bytes(def, 2); + def.borrow_to_typed_slice::() + }); + + let cached = Arc::new(CachedConstantState { scalar, rep, def }); + self.repdef = Some(cached.clone()); + Ok(cached as Arc) + } + .boxed() + } + + fn load(&mut self, data: &Arc) { + self.repdef = Some( + data.clone() + .as_arc_any() + .downcast::() + .unwrap(), + ); + } + + fn schedule_ranges( + &self, + ranges: &[Range], + _io: &Arc, + ) -> Result> { + let num_rows = ranges.iter().map(|r| r.end - r.start).sum::(); + let decoder = Box::new(ConstantPageDecoder { + ranges: VecDeque::from_iter(ranges.iter().cloned()), + scalar: self.repdef.as_ref().unwrap().scalar.clone(), + rep: self.repdef.as_ref().unwrap().rep.clone(), + def: self.repdef.as_ref().unwrap().def.clone(), + def_meaning: self.def_meaning.clone(), + max_rep: self.max_rep, + max_visible_def: self.max_visible_def, + cursor_row: 0, + cursor_level: 0, + num_rows, + }) + as Box; + Ok(vec![PageLoadTask { + decoder_fut: std::future::ready(Ok(decoder)).boxed(), + num_rows, + }]) + } +} + +#[derive(Debug)] +struct ConstantPageDecoder { + ranges: VecDeque>, + scalar: ArrayRef, + rep: Option>, + def: Option>, + def_meaning: Arc<[DefinitionInterpretation]>, + max_rep: u16, + max_visible_def: u16, + cursor_row: u64, + cursor_level: usize, + num_rows: u64, +} + +impl ConstantPageDecoder { + fn drain_ranges(&mut self, num_rows: u64) -> Vec> { + let mut rows_desired = num_rows; + let mut ranges = Vec::with_capacity(self.ranges.len()); + while rows_desired > 0 { + let front = self.ranges.front_mut().unwrap(); + let avail = front.end - front.start; + if avail > rows_desired { + ranges.push(front.start..front.start + rows_desired); + front.start += rows_desired; + rows_desired = 0; + } else { + ranges.push(self.ranges.pop_front().unwrap()); + rows_desired -= avail; + } + } + ranges + } + + fn take_row(&mut self) -> Result<(Range, u64)> { + let start = self.cursor_level; + let end = if let Some(rep) = &self.rep { + if start >= rep.len() { + return Err(Error::Internal { + message: "Invalid constant layout: repetition buffer too short".into(), + location: location!(), + }); + } + if rep[start] != self.max_rep { + return Err(Error::Internal { + message: "Invalid constant layout: row did not start at max_rep".into(), + location: location!(), + }); + } + let mut end = start + 1; + while end < rep.len() && rep[end] != self.max_rep { + end += 1; + } + end + } else { + start + 1 + }; + + let visible = if let Some(def) = &self.def { + def[start..end] + .iter() + .filter(|d| **d <= self.max_visible_def) + .count() as u64 + } else { + (end - start) as u64 + }; + + self.cursor_level = end; + self.cursor_row += 1; + Ok((start..end, visible)) + } + + fn skip_to_row(&mut self, target_row: u64) -> Result<()> { + while self.cursor_row < target_row { + self.take_row()?; + } + Ok(()) + } +} + +impl crate::encodings::logical::primitive::StructuralPageDecoder for ConstantPageDecoder { + fn drain(&mut self, num_rows: u64) -> Result> { + let drained_ranges = self.drain_ranges(num_rows); + + let mut level_slices: Vec> = Vec::new(); + let mut visible_items_total: u64 = 0; + + for range in drained_ranges { + self.skip_to_row(range.start)?; + for _ in range.start..range.end { + let (level_range, visible) = self.take_row()?; + visible_items_total += visible; + if let Some(last) = level_slices.last_mut() { + if last.end == level_range.start { + last.end = level_range.end; + continue; + } + } + level_slices.push(level_range); + } + } + + Ok(Box::new(DecodeConstantTask { + scalar: self.scalar.clone(), + rep: self.rep.clone(), + def: self.def.clone(), + level_slices, + visible_items_total, + def_meaning: self.def_meaning.clone(), + max_visible_def: self.max_visible_def, + })) + } + + fn num_rows(&self) -> u64 { + self.num_rows + } +} + +#[derive(Debug)] +struct DecodeConstantTask { + scalar: ArrayRef, + rep: Option>, + def: Option>, + level_slices: Vec>, + visible_items_total: u64, + def_meaning: Arc<[DefinitionInterpretation]>, + max_visible_def: u16, +} + +impl DecodeConstantTask { + fn slice_levels( + levels: &Option>, + slices: &[Range], + ) -> Option> { + levels.as_ref().map(|levels| { + let total = slices.iter().map(|r| r.end - r.start).sum(); + let mut out = Vec::with_capacity(total); + for r in slices { + out.extend(levels[r.start..r.end].iter().copied()); + } + out + }) + } + + fn materialize_values(&self, num_values: u64) -> Result { + if num_values == 0 { + return Ok(new_empty_array(self.scalar.data_type())); + } + + if let DataType::Struct(fields) = self.scalar.data_type() { + if fields.is_empty() { + return Ok(Arc::new(arrow_array::StructArray::new_empty_fields( + num_values as usize, + None, + )) as ArrayRef); + } + } + + let indices = arrow_array::UInt64Array::from(vec![0u64; num_values as usize]); + Ok(arrow_select::take::take( + self.scalar.as_ref(), + &indices, + None, + )?) + } +} + +impl crate::decoder::DecodePageTask for DecodeConstantTask { + fn decode(self: Box) -> Result { + let rep = Self::slice_levels(&self.rep, &self.level_slices); + let def = Self::slice_levels(&self.def, &self.level_slices); + + let visible_items_total = if let Some(def) = &def { + def.iter().filter(|d| **d <= self.max_visible_def).count() as u64 + } else { + self.visible_items_total + }; + + let values = self.materialize_values(visible_items_total)?; + let data = crate::data::DataBlock::from_array(values); + let unraveler = + RepDefUnraveler::new(rep, def, self.def_meaning.clone(), visible_items_total); + + Ok(crate::decoder::DecodedPage { + data, + repdef: unraveler, + }) + } +} diff --git a/rust/lance-encoding/src/format.rs b/rust/lance-encoding/src/format.rs index 30b1220826e..7114f17e31f 100644 --- a/rust/lance-encoding/src/format.rs +++ b/rust/lance-encoding/src/format.rs @@ -662,26 +662,7 @@ macro_rules! impl_common_protobuf_utils { } } - pub fn all_null_layout( - def_meaning: &[DefinitionInterpretation], - ) -> crate::format::$module::PageLayout { - crate::format::$module::PageLayout { - layout: Some( - crate::format::$module::page_layout::Layout::AllNullLayout( - crate::format::$module::AllNullLayout { - layers: def_meaning - .iter() - .map(|&def| Self::def_inter_to_repdef_layer(def)) - .collect(), - }, - ), - ), - } - } - pub fn simple_all_null_layout() -> crate::format::$module::PageLayout { - Self::all_null_layout(&[DefinitionInterpretation::NullableItem]) - } } }; } @@ -689,6 +670,23 @@ macro_rules! impl_common_protobuf_utils { impl_common_protobuf_utils!(pb21, ProtobufUtils21); impl ProtobufUtils21 { + pub fn constant_layout( + def_meaning: &[DefinitionInterpretation], + inline_value: Option>, + ) -> crate::format::pb21::PageLayout { + crate::format::pb21::PageLayout { + layout: Some(crate::format::pb21::page_layout::Layout::ConstantLayout( + crate::format::pb21::ConstantLayout { + inline_value: inline_value.map(bytes::Bytes::from), + layers: def_meaning + .iter() + .map(|&def| Self::def_inter_to_repdef_layer(def)) + .collect(), + }, + )), + } + } + pub fn packed_struct( values: crate::format::pb21::CompressiveEncoding, bits_per_values: Vec, diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index 9b4cadfd8c7..e0a3c04a626 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -656,8 +656,8 @@ fn collect_page_encoding(layout: &PageLayout, actual_chain: &mut Vec) -> actual_chain.extend(chain); } } - Layout::AllNullLayout(_) => { - // No value encoding for all null + Layout::ConstantLayout(_) => { + // Constant layout does not describe a value encoding chain. } Layout::BlobLayout(blob) => { if let Some(inner_layout) = &blob.inner_layout { @@ -685,6 +685,19 @@ fn verify_page_encoding( match &page.description { PageEncoding::Structural(layout) => { collect_page_encoding(layout, &mut actual_chain)?; + + // All-null structural pages may legitimately contain no encodings to verify. + // This can happen even when compression is configured because there is no value data + // (and rep/def compression is not currently described in the page layout). + if actual_chain.is_empty() && page.data.is_empty() { + if let Some(crate::format::pb21::page_layout::Layout::ConstantLayout(cl)) = + layout.layout.as_ref() + { + if cl.inline_value.is_none() { + return Ok(()); + } + } + } } PageEncoding::Legacy(_) => { // We don't need to care about legacy.