diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index d0fad12210db..5e525e4b7bdf 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -396,7 +396,7 @@ impl Codec { Ok(Self::Dictionary(converter, owned)) } d if !d.is_nested() => Ok(Self::Stateless), - DataType::List(f) | DataType::LargeList(f) => { + DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => { // The encoded contents will be inverted if descending is set to true // As such we set `descending` to false and negate nulls first if it // it set to true @@ -450,6 +450,7 @@ impl Codec { let values = match array.data_type() { DataType::List(_) => as_list_array(array).values(), DataType::LargeList(_) => as_large_list_array(array).values(), + DataType::FixedSizeList(_, _) => as_fixed_size_list_array(array).values(), _ => unreachable!(), }; let rows = converter.convert_columns(&[values.clone()])?; @@ -536,9 +537,10 @@ impl RowConverter { fn supports_datatype(d: &DataType) -> bool { match d { _ if !d.is_nested() => true, - DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => { - Self::supports_datatype(f.data_type()) - } + DataType::List(f) + | DataType::LargeList(f) + | DataType::FixedSizeList(f, _) + | DataType::Map(f, _) => Self::supports_datatype(f.data_type()), DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())), _ => false, } @@ -1244,6 +1246,11 @@ fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec { DataType::LargeList(_) => { list::compute_lengths(&mut lengths, rows, as_large_list_array(array)) } + DataType::FixedSizeList(_, _) => list::compute_lengths_fixed_size_list( + &mut lengths, + rows, + as_fixed_size_list_array(array), + ), _ => unreachable!(), }, } @@ -1340,6 +1347,13 @@ fn encode_column( DataType::LargeList(_) => { list::encode(data, offsets, rows, opts, as_large_list_array(column)) } + DataType::FixedSizeList(_, _) => list::encode_fixed_size_list( + data, + offsets, + rows, + opts, + as_fixed_size_list_array(column), + ), _ => unreachable!(), }, } @@ -1425,6 +1439,13 @@ unsafe fn decode_column( DataType::LargeList(_) => { Arc::new(list::decode::(converter, rows, field, validate_utf8)?) } + DataType::FixedSizeList(_, size) => Arc::new(list::decode_fixed_size_list( + converter, + rows, + field, + *size, + validate_utf8, + )?), _ => unreachable!(), }, }; diff --git a/arrow-row/src/list.rs b/arrow-row/src/list.rs index 46cd0f3d3d81..7c007887478e 100644 --- a/arrow-row/src/list.rs +++ b/arrow-row/src/list.rs @@ -16,7 +16,7 @@ // under the License. use crate::{null_sentinel, RowConverter, Rows, SortField}; -use arrow_array::{Array, GenericListArray, OffsetSizeTrait}; +use arrow_array::{Array, FixedSizeListArray, GenericListArray, OffsetSizeTrait}; use arrow_buffer::{Buffer, MutableBuffer}; use arrow_data::ArrayDataBuilder; use arrow_schema::{ArrowError, SortOptions}; @@ -184,3 +184,133 @@ pub unsafe fn decode( Ok(GenericListArray::from(unsafe { builder.build_unchecked() })) } + +/// Computes the length of each encoded [`Rows`] for a FixedSizeListArray +pub fn compute_lengths_fixed_size_list( + lengths: &mut [usize], + rows: &Rows, + array: &FixedSizeListArray, +) { + let value_length = array.value_length() as usize; + lengths.iter_mut().enumerate().for_each(|(idx, length)| { + let range = array.is_valid(idx).then_some({ + let start = idx * value_length; + start..start + value_length + }); + *length += encoded_len(rows, range); + }); +} + +/// Encodes the provided `FixedSizeListArray` to `out` with the provided `SortOptions` +/// +/// `rows` should contain the encoded child elements +pub fn encode_fixed_size_list( + data: &mut [u8], + offsets: &mut [usize], + rows: &Rows, + opts: SortOptions, + array: &FixedSizeListArray, +) { + let value_length = array.value_length() as usize; + offsets + .iter_mut() + .skip(1) + .enumerate() + .for_each(|(idx, offset)| { + let range = array.is_valid(idx).then_some({ + let start = idx * value_length; + start..start + value_length + }); + let out = &mut data[*offset..]; + *offset += encode_one(out, rows, range, opts) + }); +} + +/// Decodes a FixedSizeListArray from `rows` with the provided `options` +/// +/// # Safety +/// +/// `rows` must contain valid data for the provided `converter` +pub unsafe fn decode_fixed_size_list( + converter: &RowConverter, + rows: &mut [&[u8]], + field: &SortField, + _size: i32, + validate_utf8: bool, +) -> Result { + let opts = field.options; + + let mut values_bytes = 0; + let mut total_values = 0; + + // First pass: count total values and bytes needed + for row in rows.iter_mut() { + let mut row_offset = 0; + let mut values_in_row = 0; + loop { + let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| { + values_bytes += x.len(); + }); + if decoded <= 1 { + break; + } + row_offset += decoded; + values_in_row += 1; + } + total_values += values_in_row; + } + + let mut null_count = 0; + let nulls = MutableBuffer::collect_bool(rows.len(), |x| { + let valid = rows[x][0] != null_sentinel(opts); + null_count += !valid as usize; + valid + }); + + let mut values_offsets = Vec::with_capacity(total_values); + let mut values_bytes = Vec::with_capacity(values_bytes); + + for row in rows.iter_mut() { + let mut row_offset = 0; + loop { + let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| { + values_bytes.extend_from_slice(x) + }); + row_offset += decoded; + if decoded <= 1 { + break; + } + values_offsets.push(values_bytes.len()); + } + *row = &row[row_offset..]; + } + + if opts.descending { + values_bytes.iter_mut().for_each(|o| *o = !*o); + } + + let mut last_value_offset = 0; + let mut child_rows: Vec<_> = values_offsets + .into_iter() + .map(|offset| { + let v = &values_bytes[last_value_offset..offset]; + last_value_offset = offset; + v + }) + .collect(); + + let child = converter.convert_raw(&mut child_rows, validate_utf8)?; + assert_eq!(child.len(), 1); + + let child_data = child[0].to_data(); + + let builder = ArrayDataBuilder::new(field.data_type.clone()) + .len(rows.len()) + .null_count(null_count) + .null_bit_buffer(Some(nulls.into())) + .add_child_data(child_data); + + Ok(FixedSizeListArray::from(unsafe { + builder.build_unchecked() + })) +}