Skip to content

Commit

Permalink
Support to read/write parquet for FixedSizeList type
Browse files Browse the repository at this point in the history
Signed-off-by: yah01 <yah2er0ne@outlook.com>
  • Loading branch information
yah01 committed May 16, 2023
1 parent 0190408 commit 7a5cb2f
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 5 deletions.
2 changes: 2 additions & 0 deletions arrow-array/src/array/fixed_size_list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ impl FixedSizeListArray {
}
}

// impl OffsetSizeTrait

impl From<ArrayData> for FixedSizeListArray {
fn from(data: ArrayData) -> Self {
let value_length = match data.data_type() {
Expand Down
7 changes: 7 additions & 0 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ fn build_reader(
DataType::Map(_, _) => build_map_reader(field, mask, row_groups),
DataType::Struct(_) => build_struct_reader(field, mask, row_groups),
DataType::List(_) => build_list_reader(field, mask, false, row_groups),
DataType::FixedSizeList(_, _) => {
build_list_reader(field, mask, false, row_groups)
}
DataType::LargeList(_) => build_list_reader(field, mask, true, row_groups),
d => unimplemented!("reading group type {} not implemented", d),
},
Expand Down Expand Up @@ -137,6 +140,10 @@ fn build_list_reader(
DataType::List(f) => {
DataType::List(Arc::new(f.as_ref().clone().with_data_type(item_type)))
}
DataType::FixedSizeList(f, value_length) => DataType::FixedSizeList(
Arc::new(f.as_ref().clone().with_data_type(item_type)),
*value_length,
),
DataType::LargeList(f) => DataType::LargeList(Arc::new(
f.as_ref().clone().with_data_type(item_type),
)),
Expand Down
10 changes: 8 additions & 2 deletions parquet/src/arrow/array_reader/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use crate::arrow::array_reader::ArrayReader;
use crate::errors::ParquetError;
use crate::errors::Result;
use arrow_array::FixedSizeListArray;
use arrow_array::{
builder::BooleanBufferBuilder, new_empty_array, Array, ArrayRef, GenericListArray,
OffsetSizeTrait,
Expand Down Expand Up @@ -227,8 +228,13 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {

let list_data = unsafe { data_builder.build_unchecked() };

let result_array = GenericListArray::<OffsetSize>::from(list_data);
Ok(Arc::new(result_array))
let result_array: ArrayRef = match *list_data.data_type() {
ArrowType::FixedSizeList(_, _) => {
Arc::new(FixedSizeListArray::from(list_data))
}
_ => Arc::new(GenericListArray::<OffsetSize>::from(list_data)),
};
Ok(result_array)
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
Expand Down
9 changes: 8 additions & 1 deletion parquet/src/arrow/arrow_writer/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
use crate::errors::{ParquetError, Result};
use arrow_array::cast::AsArray;
use arrow_array::{Array, ArrayRef, OffsetSizeTrait, StructArray};
use arrow_array::{Array, ArrayRef, FixedSizeListArray, OffsetSizeTrait, StructArray};
use arrow_buffer::NullBuffer;
use arrow_schema::{DataType, Field};
use std::ops::Range;
Expand Down Expand Up @@ -143,6 +143,7 @@ impl LevelInfoBuilder {
Ok(Self::Struct(children, ctx))
}
DataType::List(child)
| DataType::FixedSizeList(child, _)
| DataType::LargeList(child)
| DataType::Map(child, _) => {
let def_level = match field.is_nullable() {
Expand Down Expand Up @@ -194,6 +195,12 @@ impl LevelInfoBuilder {
range,
)
}
DataType::FixedSizeList(_, _) => {
let array = array.as_any().downcast_ref::<FixedSizeListArray>().unwrap();
let offsets: Vec<_> =
(0..=array.len()).map(|i| array.value_offset(i)).collect();
self.write_list(&offsets, array.nulls(), array.values(), range)
}
DataType::LargeList(_) => {
let array = array.as_list::<i64>();
self.write_list(
Expand Down
12 changes: 10 additions & 2 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;

use arrow_array::cast::AsArray;
use arrow_array::types::{Decimal128Type, Int32Type, Int64Type, UInt32Type, UInt64Type};
use arrow_array::{types, Array, ArrayRef, RecordBatch, RecordBatchWriter};
use arrow_array::{types, Array, ArrayRef, RecordBatch, RecordBatchWriter, FixedSizeListArray};
use arrow_schema::{ArrowError, DataType as ArrowDataType, IntervalUnit, SchemaRef};

use super::schema::{
Expand Down Expand Up @@ -310,6 +310,14 @@ fn write_leaves<W: Write>(
write_leaves(row_group_writer, &arrays, levels)?;
Ok(())
}
ArrowDataType::FixedSizeList(_, _) => {
let arrays: Vec<_> = arrays.iter().map(|array|{
array.as_any().downcast_ref::<FixedSizeListArray>().unwrap().values().clone()
}).collect();

write_leaves(row_group_writer, &arrays, levels)?;
Ok(())
}
ArrowDataType::LargeList(_) => {
let arrays: Vec<_> = arrays.iter().map(|array|{
array.as_list::<i64>().values().clone()
Expand Down Expand Up @@ -375,7 +383,7 @@ fn write_leaves<W: Write>(
ArrowDataType::Float16 => Err(ParquetError::ArrowError(
"Float16 arrays not supported".to_string(),
)),
ArrowDataType::FixedSizeList(_, _) | ArrowDataType::Union(_, _) | ArrowDataType::RunEndEncoded(_, _) => {
ArrowDataType::Union(_, _) | ArrowDataType::RunEndEncoded(_, _) => {
Err(ParquetError::NYI(
format!(
"Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"
Expand Down

0 comments on commit 7a5cb2f

Please sign in to comment.