Skip to content

Commit

Permalink
Fix null struct and list roundtrip (#270)
Browse files Browse the repository at this point in the history
* fix null struct and list inconsistencies in writer

* fix list reader null and empty slot calculation

* remove stray TODOs
  • Loading branch information
nevi-me authored May 11, 2021
1 parent 510f02f commit 8226219
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 314 deletions.
95 changes: 48 additions & 47 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,8 @@ pub struct ListArrayReader<OffsetSize: OffsetSizeTrait> {
item_type: ArrowType,
list_def_level: i16,
list_rep_level: i16,
list_empty_def_level: i16,
list_null_def_level: i16,
def_level_buffer: Option<Buffer>,
rep_level_buffer: Option<Buffer>,
_marker: PhantomData<OffsetSize>,
Expand All @@ -628,13 +630,17 @@ impl<OffsetSize: OffsetSizeTrait> ListArrayReader<OffsetSize> {
item_type: ArrowType,
def_level: i16,
rep_level: i16,
list_null_def_level: i16,
list_empty_def_level: i16,
) -> Self {
Self {
item_reader,
data_type,
item_type,
list_def_level: def_level,
list_rep_level: rep_level,
list_null_def_level,
list_empty_def_level,
def_level_buffer: None,
rep_level_buffer: None,
_marker: PhantomData,
Expand Down Expand Up @@ -843,61 +849,49 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
// Where n is the max definition level of the list's parent.
// If a Parquet schema's only leaf is the list, then n = 0.

// TODO: ARROW-10391 - add a test case with a non-nullable child, check if max is 3
let list_field_type = match self.get_data_type() {
ArrowType::List(field)
| ArrowType::FixedSizeList(field, _)
| ArrowType::LargeList(field) => field,
_ => {
// Panic: this is safe as we only write lists from list datatypes
unreachable!()
}
};
let max_list_def_range = if list_field_type.is_nullable() { 3 } else { 2 };
let max_list_definition = *(def_levels.iter().max().unwrap());
// TODO: ARROW-10391 - Find a reliable way of validating deeply-nested lists
// debug_assert!(
// max_list_definition >= max_list_def_range,
// "Lift definition max less than range"
// );
let list_null_def = max_list_definition - max_list_def_range;
let list_empty_def = max_list_definition - 1;
let mut null_list_indices: Vec<usize> = Vec::new();
for i in 0..def_levels.len() {
if def_levels[i] == list_null_def {
null_list_indices.push(i);
}
}
// If the list index is at empty definition, the child slot is null
let null_list_indices: Vec<usize> = def_levels
.iter()
.enumerate()
.filter_map(|(index, def)| {
if *def <= self.list_empty_def_level {
Some(index)
} else {
None
}
})
.collect();
let batch_values = match null_list_indices.len() {
0 => next_batch_array.clone(),
_ => remove_indices(next_batch_array.clone(), item_type, null_list_indices)?,
};

// null list has def_level = 0
// empty list has def_level = 1
// null item in a list has def_level = 2
// non-null item has def_level = 3
// first item in each list has rep_level = 0, subsequent items have rep_level = 1

let mut offsets: Vec<OffsetSize> = Vec::new();
let mut cur_offset = OffsetSize::zero();
for i in 0..rep_levels.len() {
if rep_levels[i] == 0 {
offsets.push(cur_offset)
def_levels.iter().zip(rep_levels).for_each(|(d, r)| {
if *r == 0 || d == &self.list_empty_def_level {
offsets.push(cur_offset);
}
if def_levels[i] >= list_empty_def {
if d > &self.list_empty_def_level {
cur_offset += OffsetSize::one();
}
}
});
offsets.push(cur_offset);

let num_bytes = bit_util::ceil(offsets.len(), 8);
let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
// TODO: A useful optimization is to use the null count to fill with
// 0 or null, to reduce individual bits set in a loop.
// To favour dense data, set every slot to true, then unset
let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
let null_slice = null_buf.as_slice_mut();
let mut list_index = 0;
for i in 0..rep_levels.len() {
if rep_levels[i] == 0 && def_levels[i] != 0 {
bit_util::set_bit(null_slice, list_index);
// If the level is lower than empty, then the slot is null.
// When a list is non-nullable, its empty level = null level,
// so this automatically factors that in.
if rep_levels[i] == 0 && def_levels[i] < self.list_empty_def_level {
bit_util::unset_bit(null_slice, list_index);
}
if rep_levels[i] == 0 {
list_index += 1;
Expand Down Expand Up @@ -1282,16 +1276,15 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
let mut new_context = context.clone();

new_context.path.append(vec![list_type.name().to_string()]);
// We need to know at what definition a list or its child is null
let list_null_def = new_context.def_level;
let mut list_empty_def = new_context.def_level;

match list_type.get_basic_info().repetition() {
Repetition::REPEATED => {
new_context.def_level += 1;
new_context.rep_level += 1;
}
Repetition::OPTIONAL => {
new_context.def_level += 1;
}
_ => (),
// If the list's root is nullable
if let Repetition::OPTIONAL = list_type.get_basic_info().repetition() {
new_context.def_level += 1;
// current level is nullable, increment to get level for empty list slot
list_empty_def += 1;
}

match list_child.get_basic_info().repetition() {
Expand Down Expand Up @@ -1350,13 +1343,17 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
item_reader_type,
new_context.def_level,
new_context.rep_level,
list_null_def,
list_empty_def,
)),
ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new(
item_reader,
arrow_type,
item_reader_type,
new_context.def_level,
new_context.rep_level,
list_null_def,
list_empty_def,
)),

_ => {
Expand Down Expand Up @@ -2468,6 +2465,8 @@ mod tests {
ArrowType::Int32,
1,
1,
0,
1,
);

let next_batch = list_array_reader.next_batch(1024).unwrap();
Expand Down Expand Up @@ -2522,6 +2521,8 @@ mod tests {
ArrowType::Int32,
1,
1,
0,
1,
);

let next_batch = list_array_reader.next_batch(1024).unwrap();
Expand Down
54 changes: 29 additions & 25 deletions parquet/src/arrow/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
let batch_level = LevelInfo::new_from_batch(batch);
let mut row_group_writer = self.writer.next_row_group()?;
for (array, field) in batch.columns().iter().zip(batch.schema().fields()) {
let mut levels = batch_level.calculate_array_levels(array, field, false);
let mut levels = batch_level.calculate_array_levels(array, field);
// Reverse levels as we pop() them when writing arrays
levels.reverse();
write_leaves(&mut row_group_writer, array, &mut levels)?;
Expand Down Expand Up @@ -793,25 +793,29 @@ mod tests {
let struct_field_g = Field::new(
"g",
DataType::List(Box::new(Field::new("item", DataType::Int16, true))),
false,
);
let struct_field_h = Field::new(
"h",
DataType::List(Box::new(Field::new("item", DataType::Int16, false))),
true,
);
let struct_field_e = Field::new(
"e",
DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()]),
true,
DataType::Struct(vec![
struct_field_f.clone(),
struct_field_g.clone(),
struct_field_h.clone(),
]),
false,
);
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, true),
// Note: when the below struct is set to non-nullable, this test fails,
// but the output data written is correct.
// Interestingly, pyarrow will read it correctly, but pyspark fails to.
// This might be a compatibility quirk between arrow and parquet.
// We have opened https://github.com/apache/arrow-rs/issues/245 to investigate
Field::new(
"c",
DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()]),
true,
false,
),
]);

Expand All @@ -831,15 +835,23 @@ mod tests {
// Construct a list array from the above two
let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
.len(5)
.add_buffer(g_value_offsets)
.add_buffer(g_value_offsets.clone())
.add_child_data(g_value.data().clone())
// .null_bit_buffer(Buffer::from(vec![0b00011011])) // TODO: add to test after resolving other issues
.build();
let g = ListArray::from(g_list_data);
// The difference between g and h is that h has a null bitmap
let h_list_data = ArrayData::builder(struct_field_h.data_type().clone())
.len(5)
.add_buffer(g_value_offsets)
.add_child_data(g_value.data().clone())
.null_bit_buffer(Buffer::from(vec![0b00011011]))
.build();
let h = ListArray::from(h_list_data);

let e = StructArray::from(vec![
(struct_field_f, Arc::new(f) as ArrayRef),
(struct_field_g, Arc::new(g) as ArrayRef),
(struct_field_h, Arc::new(h) as ArrayRef),
]);

let c = StructArray::from(vec![
Expand All @@ -860,14 +872,10 @@ mod tests {
#[test]
fn arrow_writer_complex_mixed() {
// This test was added while investigating https://github.com/apache/arrow-rs/issues/244.
// Only writing the "offest_field" column works when "some_nested_object" is non-null.
// This indicates that a non-null struct should not have a null child (with null values).
// One observation is that spark doesn't consider the parent struct's nullness,
// and so, we should investigate the impact of always treating structs as null.
// See https://github.com/apache/arrow-rs/issues/245.
// It was subsequently fixed while investigating https://github.com/apache/arrow-rs/issues/245.

// define schema
let offset_field = Field::new("offset", DataType::Int32, true);
let offset_field = Field::new("offset", DataType::Int32, false);
let partition_field = Field::new("partition", DataType::Int64, true);
let topic_field = Field::new("topic", DataType::Utf8, true);
let schema = Schema::new(vec![Field::new(
Expand All @@ -877,7 +885,7 @@ mod tests {
partition_field.clone(),
topic_field.clone(),
]),
true,
false,
)]);

// create some data
Expand Down Expand Up @@ -970,14 +978,10 @@ mod tests {
let schema = Schema::new(vec![field_a.clone()]);

// create data
// When the null buffer of the struct is created, this test fails.
// It appears that the nullness of the struct is ignored when the
// struct is read back.
// See https://github.com/apache/arrow-rs/issues/245
let c = Int32Array::from(vec![1, 2, 3, 4, 5, 6]);
let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
.len(6)
// .null_bit_buffer(Buffer::from(vec![0b00100111]))
.null_bit_buffer(Buffer::from(vec![0b00100111]))
.add_child_data(c.data().clone())
.build();
let b = StructArray::from(b_data);
Expand All @@ -989,7 +993,7 @@ mod tests {
let a = StructArray::from(a_data);

assert_eq!(a.null_count(), 0);
assert_eq!(a.column(0).null_count(), 0);
assert_eq!(a.column(0).null_count(), 2);

// build a racord batch
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();
Expand Down Expand Up @@ -1362,7 +1366,7 @@ mod tests {
let a_list_data = ArrayData::builder(DataType::List(Box::new(Field::new(
"item",
DataType::Int32,
true, // TODO: why does this fail when false? Is it related to logical nulls?
false,
))))
.len(5)
.add_buffer(a_value_offsets)
Expand Down
Loading

0 comments on commit 8226219

Please sign in to comment.