Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions arrow-row/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ impl Codec {
Ok(Self::Dictionary(converter, owned))
}
d if !d.is_nested() => Ok(Self::Stateless),
DataType::List(f) | DataType::LargeList(f) => {
DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
// The encoded contents will be inverted if descending is set to true
// As such we set `descending` to false and negate nulls first if it
// it set to true
Expand Down Expand Up @@ -450,6 +450,7 @@ impl Codec {
let values = match array.data_type() {
DataType::List(_) => as_list_array(array).values(),
DataType::LargeList(_) => as_large_list_array(array).values(),
DataType::FixedSizeList(_, _) => as_fixed_size_list_array(array).values(),
_ => unreachable!(),
};
let rows = converter.convert_columns(&[values.clone()])?;
Expand Down Expand Up @@ -536,9 +537,10 @@ impl RowConverter {
fn supports_datatype(d: &DataType) -> bool {
match d {
_ if !d.is_nested() => true,
DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => {
Self::supports_datatype(f.data_type())
}
DataType::List(f)
| DataType::LargeList(f)
| DataType::FixedSizeList(f, _)
| DataType::Map(f, _) => Self::supports_datatype(f.data_type()),
DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
_ => false,
}
Expand Down Expand Up @@ -1244,6 +1246,11 @@ fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec<usize> {
DataType::LargeList(_) => {
list::compute_lengths(&mut lengths, rows, as_large_list_array(array))
}
DataType::FixedSizeList(_, _) => list::compute_lengths_fixed_size_list(
&mut lengths,
rows,
as_fixed_size_list_array(array),
),
_ => unreachable!(),
},
}
Expand Down Expand Up @@ -1340,6 +1347,13 @@ fn encode_column(
DataType::LargeList(_) => {
list::encode(data, offsets, rows, opts, as_large_list_array(column))
}
DataType::FixedSizeList(_, _) => list::encode_fixed_size_list(
data,
offsets,
rows,
opts,
as_fixed_size_list_array(column),
),
_ => unreachable!(),
},
}
Expand Down Expand Up @@ -1425,6 +1439,13 @@ unsafe fn decode_column(
DataType::LargeList(_) => {
Arc::new(list::decode::<i64>(converter, rows, field, validate_utf8)?)
}
DataType::FixedSizeList(_, size) => Arc::new(list::decode_fixed_size_list(
converter,
rows,
field,
*size,
validate_utf8,
)?),
_ => unreachable!(),
},
};
Expand Down
132 changes: 131 additions & 1 deletion arrow-row/src/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::{null_sentinel, RowConverter, Rows, SortField};
use arrow_array::{Array, GenericListArray, OffsetSizeTrait};
use arrow_array::{Array, FixedSizeListArray, GenericListArray, OffsetSizeTrait};
use arrow_buffer::{Buffer, MutableBuffer};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{ArrowError, SortOptions};
Expand Down Expand Up @@ -184,3 +184,133 @@ pub unsafe fn decode<O: OffsetSizeTrait>(

Ok(GenericListArray::from(unsafe { builder.build_unchecked() }))
}

/// Computes the length of each encoded [`Rows`] for a FixedSizeListArray
pub fn compute_lengths_fixed_size_list(
lengths: &mut [usize],
rows: &Rows,
array: &FixedSizeListArray,
) {
let value_length = array.value_length() as usize;
lengths.iter_mut().enumerate().for_each(|(idx, length)| {
let range = array.is_valid(idx).then_some({
let start = idx * value_length;
start..start + value_length
});
*length += encoded_len(rows, range);
});
}

/// Encodes the provided `FixedSizeListArray` to `out` with the provided `SortOptions`
///
/// `rows` should contain the encoded child elements
pub fn encode_fixed_size_list(
data: &mut [u8],
offsets: &mut [usize],
rows: &Rows,
opts: SortOptions,
array: &FixedSizeListArray,
) {
let value_length = array.value_length() as usize;
offsets
.iter_mut()
.skip(1)
.enumerate()
.for_each(|(idx, offset)| {
let range = array.is_valid(idx).then_some({
let start = idx * value_length;
start..start + value_length
});
let out = &mut data[*offset..];
*offset += encode_one(out, rows, range, opts)
});
}

/// Decodes a FixedSizeListArray from `rows` with the provided `options`
///
/// # Safety
///
/// `rows` must contain valid data for the provided `converter`
pub unsafe fn decode_fixed_size_list(
converter: &RowConverter,
rows: &mut [&[u8]],
field: &SortField,
_size: i32,
validate_utf8: bool,
) -> Result<FixedSizeListArray, ArrowError> {
let opts = field.options;

let mut values_bytes = 0;
let mut total_values = 0;

// First pass: count total values and bytes needed
for row in rows.iter_mut() {
let mut row_offset = 0;
let mut values_in_row = 0;
loop {
let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
values_bytes += x.len();
});
if decoded <= 1 {
break;
}
row_offset += decoded;
values_in_row += 1;
}
total_values += values_in_row;
}

let mut null_count = 0;
let nulls = MutableBuffer::collect_bool(rows.len(), |x| {
let valid = rows[x][0] != null_sentinel(opts);
null_count += !valid as usize;
valid
});

let mut values_offsets = Vec::with_capacity(total_values);
let mut values_bytes = Vec::with_capacity(values_bytes);

for row in rows.iter_mut() {
let mut row_offset = 0;
loop {
let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
values_bytes.extend_from_slice(x)
});
row_offset += decoded;
if decoded <= 1 {
break;
}
values_offsets.push(values_bytes.len());
}
*row = &row[row_offset..];
}

if opts.descending {
values_bytes.iter_mut().for_each(|o| *o = !*o);
}

let mut last_value_offset = 0;
let mut child_rows: Vec<_> = values_offsets
.into_iter()
.map(|offset| {
let v = &values_bytes[last_value_offset..offset];
last_value_offset = offset;
v
})
.collect();

let child = converter.convert_raw(&mut child_rows, validate_utf8)?;
assert_eq!(child.len(), 1);

let child_data = child[0].to_data();

let builder = ArrayDataBuilder::new(field.data_type.clone())
.len(rows.len())
.null_count(null_count)
.null_bit_buffer(Some(nulls.into()))
.add_child_data(child_data);

Ok(FixedSizeListArray::from(unsafe {
builder.build_unchecked()
}))
}
Loading