Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions protos/encodings_v2_1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,25 @@ message FullZipLayout {
repeated RepDefLayer layers = 8;
}

// A layout used for pages where all values are null
// A layout used for pages where all (visible) values are the same scalar value.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "visible" mean here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except NULL which is invisible, maybe I should just use non-null values?

//
// There may be buffers of repetition and definition information
// if required in order to interpret what kind of nulls are present
message AllNullLayout {
// This generalizes the prior AllNullLayout semantics for file_version >= 2.2.
//
// There may be buffers of repetition and definition information if required in order
// to interpret what kind of nulls are present / which items are visible.
message ConstantLayout {
// The meaning of each repdef layer, used to interpret repdef buffers correctly
repeated RepDefLayer layers = 5;

// Inline fixed-width scalar value bytes.
//
// This MUST only be used for types where a single non-null element is represented by a single
// fixed-width Arrow value buffer (i.e. no offsets buffer, no child data).
Comment on lines +162 to +163
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a single value that is represented by multiple buffers couldn't we concatenate them with a header that gives us information on how to disassemble them? Or maybe we have a different approach for that case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not support that in this PR. One reason is that it would make the encode/decode logic quite complex. Another reason is that I suspect such data is unlikely to share the same constant value.

//
// Constraints:
// - MUST be absent for an all-null page
// - MUST be <= 32 bytes if present
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? If it is larger than 32 bytes do we put it elsewhere?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if it exceeds 32B, we will place it in a dedicated buffer instead of in the metadata.

The intention here is to avoid bloating our metadata too much. The size is set to the largest fixed data type we support (256B), though I am open to adjusting it.

optional bytes inline_value = 6;
}

// A layout where large binary data is encoded externally and only
Expand All @@ -176,8 +188,8 @@ message PageLayout {
oneof layout {
// A layout used for pages where the data is small
MiniBlockLayout mini_block_layout = 1;
// A layout used for pages where all values are null
AllNullLayout all_null_layout = 2;
// A layout used for pages where all (visible) values are the same scalar value or null.
ConstantLayout constant_layout = 2;
// A layout used for pages where the data is large
FullZipLayout full_zip_layout = 3;
// A layout where large binary data is encoded externally
Expand Down
1 change: 1 addition & 0 deletions rust/lance-arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub mod cast;
pub mod json;
pub mod list;
pub mod memory;
pub mod scalar;
pub mod r#struct;

/// Arrow extension metadata key for extension name
Expand Down
264 changes: 264 additions & 0 deletions rust/lance-arrow/src/scalar.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use arrow_array::{make_array, ArrayRef};
use arrow_buffer::Buffer;
use arrow_data::{transform::MutableArrayData, ArrayDataBuilder};
use arrow_schema::{ArrowError, DataType};

use crate::DataTypeExt;

type Result<T> = std::result::Result<T, ArrowError>;

pub const INLINE_VALUE_MAX_BYTES: usize = 32;

pub fn extract_scalar_value(array: &ArrayRef, idx: usize) -> Result<ArrayRef> {
if idx >= array.len() {
return Err(ArrowError::InvalidArgumentError(
"Scalar index out of bounds".to_string(),
));
}

let data = array.to_data();
let mut mutable = MutableArrayData::new(vec![&data], /*use_nulls=*/ true, 1);
mutable.extend(0, idx, idx + 1);
Ok(make_array(mutable.freeze()))
}

fn read_u32(buf: &[u8], offset: &mut usize) -> Result<u32> {
if *offset + 4 > buf.len() {
return Err(ArrowError::InvalidArgumentError(
"Invalid scalar value buffer: unexpected EOF".to_string(),
));
}
let bytes = [
buf[*offset],
buf[*offset + 1],
buf[*offset + 2],
buf[*offset + 3],
];
*offset += 4;
Ok(u32::from_le_bytes(bytes))
}

fn read_bytes<'a>(buf: &'a [u8], offset: &mut usize, len: usize) -> Result<&'a [u8]> {
if *offset + len > buf.len() {
return Err(ArrowError::InvalidArgumentError(
"Invalid scalar value buffer: unexpected EOF".to_string(),
));
}
let slice = &buf[*offset..*offset + len];
*offset += len;
Ok(slice)
}

fn write_u32(out: &mut Vec<u8>, v: u32) {
out.extend_from_slice(&v.to_le_bytes());
}

fn write_bytes(out: &mut Vec<u8>, bytes: &[u8]) {
out.extend_from_slice(bytes);
}

pub fn encode_scalar_value_buffer(scalar: &ArrayRef) -> Result<Vec<u8>> {
if scalar.len() != 1 || scalar.null_count() != 0 {
return Err(ArrowError::InvalidArgumentError(
"Scalar value buffer must be a single non-null value".to_string(),
));
}
let data = scalar.to_data();
if data.offset() != 0 {
return Err(ArrowError::InvalidArgumentError(
"Scalar value buffer must have offset=0".to_string(),
));
}
if !data.child_data().is_empty() {
return Err(ArrowError::InvalidArgumentError(
"Scalar value buffer does not support nested types".to_string(),
));
}

// Minimal format (RFC): store the Arrow value buffers for a length-1 array.
// Null bitmap and child data are intentionally not supported here.
//
// | u32 num_buffers |
// | u32 buffer_0_len | ... | u32 buffer_{n-1}_len |
// | buffer_0 bytes | ... | buffer_{n-1} bytes |
let mut out = Vec::with_capacity(128);
let buffers = data.buffers();
write_u32(&mut out, buffers.len() as u32);
for b in buffers {
write_u32(&mut out, b.len() as u32);
}
for b in buffers {
write_bytes(&mut out, b.as_slice());
}
Ok(out)
}

pub fn decode_scalar_from_value_buffer(
data_type: &DataType,
value_buffer: &[u8],
) -> Result<ArrayRef> {
if matches!(
data_type,
DataType::Struct(_) | DataType::FixedSizeList(_, _)
) {
return Err(ArrowError::InvalidArgumentError(format!(
"Scalar value buffer does not support nested data type {:?}",
data_type
)));
}

let mut offset = 0;
let num_buffers = read_u32(value_buffer, &mut offset)? as usize;
let buffer_lens = (0..num_buffers)
.map(|_| read_u32(value_buffer, &mut offset).map(|l| l as usize))
.collect::<Result<Vec<_>>>()?;

let mut buffers = Vec::with_capacity(num_buffers);
for len in buffer_lens {
let bytes = read_bytes(value_buffer, &mut offset, len)?;
buffers.push(Buffer::from_vec(bytes.to_vec()));
}

if offset != value_buffer.len() {
return Err(ArrowError::InvalidArgumentError(
"Invalid scalar value buffer: trailing bytes".to_string(),
));
}

let mut builder = ArrayDataBuilder::new(data_type.clone())
.len(1)
.null_count(0);
for b in buffers {
builder = builder.add_buffer(b);
}
Ok(make_array(builder.build()?))
}

pub fn decode_scalar_from_inline_value(
data_type: &DataType,
inline_value: &[u8],
) -> Result<ArrayRef> {
let byte_width = data_type.byte_width_opt().ok_or_else(|| {
ArrowError::InvalidArgumentError(format!(
"Inline constant is not supported for non-fixed-stride data type {:?}",
data_type
))
})?;

if inline_value.len() != byte_width {
return Err(ArrowError::InvalidArgumentError(format!(
"Inline constant length mismatch for {:?}: expected {} bytes but got {}",
data_type,
byte_width,
inline_value.len()
)));
}

let data = ArrayDataBuilder::new(data_type.clone())
.len(1)
.null_count(0)
.add_buffer(Buffer::from_vec(inline_value.to_vec()))
.build()?;
Ok(make_array(data))
}

pub fn try_inline_value(scalar: &ArrayRef) -> Option<Vec<u8>> {
if scalar.null_count() != 0 || scalar.len() != 1 {
return None;
}
let data = scalar.to_data();
if !data.child_data().is_empty() {
return None;
}
if data.buffers().len() != 1 {
return None;
}
let bytes = data.buffers()[0].as_slice();
if bytes.len() > INLINE_VALUE_MAX_BYTES {
return None;
}
Some(bytes.to_vec())
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use arrow_array::{cast::AsArray, FixedSizeBinaryArray, Int32Array, StringArray};

use super::*;

#[test]
fn test_extract_scalar_value() {
let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]));
let scalar = extract_scalar_value(&array, 2).unwrap();
assert_eq!(scalar.len(), 1);
assert_eq!(
scalar
.as_primitive::<arrow_array::types::Int32Type>()
.value(0),
3
);
}

#[test]
fn test_scalar_value_buffer_utf8_round_trip() {
let scalar: ArrayRef = Arc::new(StringArray::from(vec!["hello"]));
let buf = encode_scalar_value_buffer(&scalar).unwrap();
let decoded = decode_scalar_from_value_buffer(&DataType::Utf8, &buf).unwrap();
assert_eq!(decoded.len(), 1);
assert_eq!(decoded.null_count(), 0);
assert_eq!(decoded.as_string::<i32>().value(0), "hello");
}

#[test]
fn test_scalar_value_buffer_fixed_size_binary_round_trip() {
let val = vec![0xABu8; 33];
let scalar: ArrayRef = Arc::new(
FixedSizeBinaryArray::try_from_sparse_iter_with_size(
std::iter::once(Some(val.as_slice())),
33,
)
.unwrap(),
);
let buf = encode_scalar_value_buffer(&scalar).unwrap();
let decoded =
decode_scalar_from_value_buffer(&DataType::FixedSizeBinary(33), &buf).unwrap();
assert_eq!(decoded.len(), 1);
assert_eq!(decoded.as_fixed_size_binary().value(0), val.as_slice());
}

#[test]
fn test_scalar_value_buffer_rejects_nested_type() {
let field = Arc::new(arrow_schema::Field::new("item", DataType::Int32, false));
let list: ArrayRef = Arc::new(arrow_array::FixedSizeListArray::new(
field,
2,
Arc::new(Int32Array::from(vec![1, 2])),
None,
));
let scalar = list.slice(0, 1);
assert!(encode_scalar_value_buffer(&scalar).is_err());
}

#[test]
fn test_decode_scalar_from_value_buffer_rejects_nested_type() {
let buf = Vec::<u8>::new();
let res =
decode_scalar_from_value_buffer(&DataType::Struct(arrow_schema::Fields::empty()), &buf);
assert!(res.is_err());
}

#[test]
fn test_decode_scalar_from_value_buffer_trailing_bytes() {
// num_buffers = 0, plus an extra byte
let mut bytes = Vec::new();
bytes.extend_from_slice(&0u32.to_le_bytes());
bytes.push(1);
let res = decode_scalar_from_value_buffer(&DataType::Int32, &bytes);
assert!(res.is_err());
}
}
Loading
Loading