Skip to content

Commit

Permalink
Fix StructArrayReader handling nested lists (apache#1651)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed May 13, 2022
1 parent dd16ec9 commit 230eb38
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 75 deletions.
167 changes: 103 additions & 64 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,17 @@
// under the License.

use std::any::Any;
use std::cmp::{max, min};
use std::cmp::max;
use std::marker::PhantomData;
use std::mem::size_of;
use std::result::Result::Ok;
use std::sync::Arc;
use std::vec::Vec;

use arrow::array::{
Array, ArrayData, ArrayDataBuilder, ArrayRef, BooleanArray, BooleanBufferBuilder,
DecimalArray, Int16BufferBuilder, Int32Array, Int64Array, PrimitiveArray,
StructArray,
DecimalArray, Int32Array, Int64Array, PrimitiveArray, StructArray,
};
use arrow::buffer::{Buffer, MutableBuffer};
use arrow::buffer::Buffer;
use arrow::datatypes::{
ArrowPrimitiveType, BooleanType as ArrowBooleanType, DataType as ArrowType,
Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type,
Expand Down Expand Up @@ -655,8 +653,7 @@ pub struct StructArrayReader {
data_type: ArrowType,
struct_def_level: i16,
struct_rep_level: i16,
def_level_buffer: Option<Buffer>,
rep_level_buffer: Option<Buffer>,
nullable: bool,
}

impl StructArrayReader {
Expand All @@ -666,14 +663,14 @@ impl StructArrayReader {
children: Vec<Box<dyn ArrayReader>>,
def_level: i16,
rep_level: i16,
nullable: bool,
) -> Self {
Self {
data_type,
children,
struct_def_level: def_level,
struct_rep_level: rep_level,
def_level_buffer: None,
rep_level_buffer: None,
nullable,
}
}
}
Expand Down Expand Up @@ -708,8 +705,6 @@ impl ArrayReader for StructArrayReader {
/// ```
fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
if self.children.is_empty() {
self.def_level_buffer = None;
self.rep_level_buffer = None;
return Ok(Arc::new(StructArray::from(Vec::new())));
}

Expand Down Expand Up @@ -742,80 +737,59 @@ impl ArrayReader for StructArrayReader {
.collect::<Vec<ArrayData>>(),
);

if self.struct_def_level != 0 {
if self.nullable {
// calculate struct def level data
let buffer_size = children_array_len * size_of::<i16>();
let mut def_level_data_buffer = MutableBuffer::new(buffer_size);
def_level_data_buffer.resize(buffer_size, 0);

// Safety: the buffer is always treated as `u16` in the code below
let def_level_data = unsafe { def_level_data_buffer.typed_data_mut() };
// children should have consistent view of parent, only need to inspect first child
let def_levels = self.children[0]
.get_def_levels()
.expect("child with nullable parents must have definition level");

def_level_data
.iter_mut()
.for_each(|v| *v = self.struct_def_level);
// calculate bitmap for current array
let mut bitmap_builder = BooleanBufferBuilder::new(children_array_len);

for child in &self.children {
if let Some(current_child_def_levels) = child.get_def_levels() {
if current_child_def_levels.len() != children_array_len {
return Err(general_err!("Child array length are not equal!"));
} else {
for i in 0..children_array_len {
def_level_data[i] =
min(def_level_data[i], current_child_def_levels[i]);
match self.children[0].get_rep_levels() {
Some(rep_levels) => {
// Sanity check
assert_eq!(rep_levels.len(), def_levels.len());

for (rep_level, def_level) in rep_levels.iter().zip(def_levels) {
if rep_level > &self.struct_rep_level {
// Already handled by inner list - SKIP
continue;
}
bitmap_builder.append(*def_level >= self.struct_def_level)
}
}
None => {
for def_level in def_levels {
bitmap_builder.append(*def_level >= self.struct_def_level)
}
}
}

// calculate bitmap for current array
let mut bitmap_builder = BooleanBufferBuilder::new(children_array_len);
for def_level in def_level_data {
let not_null = *def_level >= self.struct_def_level;
bitmap_builder.append(not_null);
if bitmap_builder.len() != children_array_len {
return Err(general_err!("Failed to decode level data for struct array"));
}

array_data_builder =
array_data_builder.null_bit_buffer(bitmap_builder.finish());

self.def_level_buffer = Some(def_level_data_buffer.into());
}

let array_data = unsafe { array_data_builder.build_unchecked() };

if self.struct_rep_level != 0 {
// calculate struct rep level data, since struct doesn't add to repetition
// levels, here we just need to keep repetition levels of first array
// TODO: Verify that all children array reader has same repetition levels
let rep_level_data = self
.children
.first()
.ok_or_else(|| {
general_err!("Struct array reader should have at least one child!")
})?
.get_rep_levels()
.map(|data| -> Result<Buffer> {
let mut buffer = Int16BufferBuilder::new(children_array_len);
buffer.append_slice(data);
Ok(buffer.finish())
})
.transpose()?;

self.rep_level_buffer = rep_level_data;
}
Ok(Arc::new(StructArray::from(array_data)))
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_level_buffer
.as_ref()
.map(|buf| unsafe { buf.typed_data() })
// Children definition levels should describe the same
// parent structure, so return first child's
self.children.first().and_then(|l| l.get_def_levels())
}

fn get_rep_levels(&self) -> Option<&[i16]> {
self.rep_level_buffer
.as_ref()
.map(|buf| unsafe { buf.typed_data() })
// Children definition levels should describe the same
// parent structure, so return first child's
self.children.first().and_then(|l| l.get_rep_levels())
}
}

Expand All @@ -828,7 +802,9 @@ mod tests {
use rand::{thread_rng, Rng};

use crate::arrow::array_reader::test_util::InMemoryArrayReader;
use arrow::array::{Array, ArrayRef, PrimitiveArray, StringArray, StructArray};
use arrow::array::{
Array, ArrayRef, ListArray, PrimitiveArray, StringArray, StructArray,
};
use arrow::datatypes::{
ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32, Field,
Int32Type as ArrowInt32, Int64Type as ArrowInt64,
Expand Down Expand Up @@ -1553,6 +1529,7 @@ mod tests {
vec![Box::new(array_reader_1), Box::new(array_reader_2)],
1,
1,
true,
);

let struct_array = struct_array_reader.next_batch(5).unwrap();
Expand All @@ -1566,12 +1543,74 @@ mod tests {
.collect::<Vec<bool>>()
);
assert_eq!(
Some(vec![0, 1, 1, 1, 1].as_slice()),
Some(vec![0, 1, 2, 3, 1].as_slice()),
struct_array_reader.get_def_levels()
);
assert_eq!(
Some(vec![0, 1, 1, 1, 1].as_slice()),
struct_array_reader.get_rep_levels()
);
}

#[test]
fn test_struct_array_reader_list() {
use arrow::datatypes::Int32Type;
// [
// {foo: [1, 2, null],
// {foo: []},
// {foo: null},
// null,
// ]

let expected_l =
Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(1), Some(2), None]),
Some(vec![]),
None,
None,
]));

let nulls = Buffer::from([0b00000111]);
let struct_fields = vec![(
Field::new("foo", expected_l.data_type().clone(), true),
expected_l.clone() as ArrayRef,
)];
let expected = StructArray::from((struct_fields, nulls));

let array = Arc::new(Int32Array::from_iter(vec![
Some(1),
Some(2),
None,
None,
None,
None,
]));
let reader = InMemoryArrayReader::new(
ArrowType::Int32,
array,
Some(vec![4, 4, 3, 2, 1, 0]),
Some(vec![0, 1, 1, 0, 0, 0]),
);

let list_reader = ListArrayReader::<i32>::new(
Box::new(reader),
expected_l.data_type().clone(),
ArrowType::Int32,
3,
1,
true,
);

let mut struct_reader = StructArrayReader::new(
expected.data_type().clone(),
vec![Box::new(list_reader)],
1,
0,
true,
);

let actual = struct_reader.next_batch(1024).unwrap();
let actual = actual.as_any().downcast_ref::<StructArray>().unwrap();
assert_eq!(actual, &expected)
}
}
1 change: 1 addition & 0 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ fn build_struct_reader(
children_reader,
field.def_level,
field.rep_level,
field.nullable,
)) as _)
}

Expand Down
18 changes: 7 additions & 11 deletions parquet/src/arrow/array_reader/map_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::arrow::array_reader::ArrayReader;
use crate::errors::ParquetError::ArrowError;
use crate::errors::{Result, ParquetError};
use crate::errors::{ParquetError, Result};
use arrow::array::{ArrayDataBuilder, ArrayRef, MapArray};
use arrow::buffer::{Buffer, MutableBuffer};
use arrow::datatypes::DataType as ArrowType;
Expand All @@ -33,8 +33,6 @@ pub struct MapArrayReader {
data_type: ArrowType,
map_def_level: i16,
map_rep_level: i16,
def_level_buffer: Option<Buffer>,
rep_level_buffer: Option<Buffer>,
}

impl MapArrayReader {
Expand All @@ -51,8 +49,6 @@ impl MapArrayReader {
data_type,
map_def_level: rep_level,
map_rep_level: def_level,
def_level_buffer: None,
rep_level_buffer: None,
}
}
}
Expand Down Expand Up @@ -154,15 +150,15 @@ impl ArrayReader for MapArrayReader {
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_level_buffer
.as_ref()
.map(|buf| unsafe { buf.typed_data() })
// Children definition levels should describe the same parent structure,
// so return key_reader only
self.key_reader.get_def_levels()
}

fn get_rep_levels(&self) -> Option<&[i16]> {
self.rep_level_buffer
.as_ref()
.map(|buf| unsafe { buf.typed_data() })
// Children repetition levels should describe the same parent structure,
// so return key_reader only
self.key_reader.get_rep_levels()
}
}

Expand Down

0 comments on commit 230eb38

Please sign in to comment.