Skip to content

Commit

Permalink
feat(parquet): support for reading structs nested within lists (#1187)
Browse files Browse the repository at this point in the history
* feat(parquet): support for reading structs nested within lists

* fix: logical conflict

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
helgikrs and alamb authored Jan 18, 2022
1 parent 548c961 commit 4f1064e
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 1 deletion.
31 changes: 30 additions & 1 deletion parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,36 @@ fn remove_indices(
indices,
size
),
ArrowType::Struct(fields) => {
let struct_array = arr.as_any()
.downcast_ref::<StructArray>()
.expect("Array should be a struct");

// Recursively call remove indices on each of the structs fields
let new_columns = fields.into_iter()
.zip(struct_array.columns())
.map(|(field, column)| {
let dt = field.data_type().clone();
Ok((field,
remove_indices(column.clone(), dt, indices.clone())?))
})
.collect::<Result<Vec<_>>>()?;

if arr.data().null_count() == 0 {
// No nulls, nothing to do.
Ok(Arc::new(StructArray::from(new_columns)))
} else {
// Construct a new validity buffer by removing `indices` from the original validity
// map.
let mut valid = BooleanBufferBuilder::new(arr.len() - indices.len());
for idx in 0..arr.len() {
if !indices.contains(&idx) {
valid.append(!arr.is_null(idx));
}
}
Ok(Arc::new(StructArray::from((new_columns, valid.finish()))))
}
}
_ => Err(ParquetError::General(format!(
"ListArray of type List({:?}) is not supported by array_reader",
item_type
Expand Down Expand Up @@ -1562,7 +1592,6 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
match item_reader_type {
ArrowType::List(_)
| ArrowType::FixedSizeList(_, _)
| ArrowType::Struct(_)
| ArrowType::Dictionary(_, _) => Err(ArrowError(format!(
"reading List({:?}) into arrow not supported yet",
item_type
Expand Down
100 changes: 100 additions & 0 deletions parquet/src/arrow/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1623,4 +1623,104 @@ mod tests {
let stats = column.statistics().unwrap();
assert_eq!(stats.null_count(), 2);
}

#[test]
fn test_list_of_struct_roundtrip() {
// define schema
let int_field = Field::new("a", DataType::Int32, true);
let int_field2 = Field::new("b", DataType::Int32, true);

let int_builder = Int32Builder::new(10);
let int_builder2 = Int32Builder::new(10);

let struct_builder = StructBuilder::new(
vec![int_field, int_field2],
vec![Box::new(int_builder), Box::new(int_builder2)],
);
let mut list_builder = ListBuilder::new(struct_builder);

// Construct the following array
// [{a: 1, b: 2}], [], null, [null, null], [{a: null, b: 3}], [{a: 2, b: null}]

// [{a: 1, b: 2}]
let values = list_builder.values();
values
.field_builder::<Int32Builder>(0)
.unwrap()
.append_value(1)
.unwrap();
values
.field_builder::<Int32Builder>(1)
.unwrap()
.append_value(2)
.unwrap();
values.append(true).unwrap();
list_builder.append(true).unwrap();

// []
list_builder.append(true).unwrap();

// null
list_builder.append(false).unwrap();

// [null, null]
let values = list_builder.values();
values
.field_builder::<Int32Builder>(0)
.unwrap()
.append_null()
.unwrap();
values
.field_builder::<Int32Builder>(1)
.unwrap()
.append_null()
.unwrap();
values.append(false).unwrap();
values
.field_builder::<Int32Builder>(0)
.unwrap()
.append_null()
.unwrap();
values
.field_builder::<Int32Builder>(1)
.unwrap()
.append_null()
.unwrap();
values.append(false).unwrap();
list_builder.append(true).unwrap();

// [{a: null, b: 3}]
let values = list_builder.values();
values
.field_builder::<Int32Builder>(0)
.unwrap()
.append_null()
.unwrap();
values
.field_builder::<Int32Builder>(1)
.unwrap()
.append_value(3)
.unwrap();
values.append(true).unwrap();
list_builder.append(true).unwrap();

// [{a: 2, b: null}]
let values = list_builder.values();
values
.field_builder::<Int32Builder>(0)
.unwrap()
.append_value(2)
.unwrap();
values
.field_builder::<Int32Builder>(1)
.unwrap()
.append_null()
.unwrap();
values.append(true).unwrap();
list_builder.append(true).unwrap();

let array = Arc::new(list_builder.finish());

one_column_roundtrip(array, true, Some(10));
}
}

0 comments on commit 4f1064e

Please sign in to comment.