Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed error in reading Struct<List<...>> from parquet (#1150)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jul 9, 2022
1 parent ba45d55 commit 890853c
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 13 deletions.
29 changes: 29 additions & 0 deletions parquet_integration/write_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,15 @@ def case_struct() -> Tuple[dict, pa.Schema, str]:
"struct_nullable",
pa.struct(struct_fields),
),
pa.field(
"struct_struct_nullable",
pa.struct(
[
("f1", pa.struct(struct_fields)),
("f2", pa.bool_()),
]
),
),
]
)

Expand All @@ -248,6 +257,11 @@ def case_struct() -> Tuple[dict, pa.Schema, str]:
names=["f1", "f2"],
),
"struct_nullable": struct_nullable,
"struct_struct_nullable": pa.StructArray.from_arrays(
[struct, pa.array(boolean)],
names=["f1", "f2"],
mask=pa.array([False, False, True, False, False, False, False, False, False, False]),
),
},
schema,
f"struct_nullable_10.parquet",
Expand All @@ -257,15 +271,30 @@ def case_struct() -> Tuple[dict, pa.Schema, str]:
def case_nested_edge():
simple = [[0, 1]]
null = [None]

struct_list_nullable = pa.StructArray.from_arrays(
[pa.array([["a", "b", None, "c"]])],
fields=[
("f1", pa.list_(pa.utf8())),
],
)

fields = [
pa.field("simple", pa.list_(pa.int64())),
pa.field("null", pa.list_(pa.field("item", pa.int64(), True))),
pa.field(
"struct_list_nullable",
pa.struct([
("f1", pa.list_(pa.utf8())),
]),
)
]
schema = pa.schema(fields)
return (
{
"simple": simple,
"null": null,
"struct_list_nullable": struct_list_nullable,
},
schema,
f"nested_edge_nullable_10.parquet",
Expand Down
2 changes: 1 addition & 1 deletion src/array/struct_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl StructArray {
if a_len != len {
Err(Error::oos(format!(
"The children DataTypes of a StructArray must equal the children data types.
However, the values {index} has a length of {a_len}, which is different from values 0, {len}."
However, the values at index {index} have a length of {a_len}, which is different from values at index 0, {len}."
)))
} else {
Ok(())
Expand Down
19 changes: 13 additions & 6 deletions src/io/parquet/read/deserialize/nested_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,10 @@ fn extend_offsets2<'a, D: NestedDecoder<'a>>(
cum_sum[i + 1] = cum_sum[i] + delta;
}

let mut is_required = vec![false; nested.len()];
for (depth, nest) in nested.iter().enumerate().take(nested.len() - 1) {
is_required[depth + 1] = nest.is_required() && nest.is_nullable()
let mut cum_rep = vec![0u32; nested.len() + 1];
for (i, nest) in nested.iter().enumerate() {
let delta = nest.is_repeated() as u32;
cum_rep[i + 1] = cum_rep[i] + delta;
}

let max_depth = nested.len() - 1;
Expand All @@ -424,15 +425,21 @@ fn extend_offsets2<'a, D: NestedDecoder<'a>>(
rows += 1;
}

for (depth, (nest, &is_required)) in nested.iter_mut().zip(is_required.iter()).enumerate() {
let right_level = depth as u32 >= rep && def >= cum_sum[depth];
let mut is_required = false;
for (depth, nest) in nested.iter_mut().enumerate() {
let right_level = rep <= cum_rep[depth] && def >= cum_sum[depth];
if is_required || right_level {
let is_valid = nest.is_nullable() && def != cum_sum[depth];
let is_valid = nest.is_nullable() && def > cum_sum[depth];
let length = values_count[depth];
nest.push(length, is_valid);
if depth > 0 {
values_count[depth - 1] = nest.len() as i64;
};
if nest.is_required() && !is_valid {
is_required = true;
} else {
is_required = false
};

if depth == max_depth {
// the leaf / primitive
Expand Down
124 changes: 118 additions & 6 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,21 @@ pub fn pyarrow_nested_edge(column: &str) -> Box<dyn Array> {
let array: ListArray<i32> = a.into();
Box::new(array)
}
"struct_list_nullable" => {
// [["a", "b", None, "c"]]
let a = ListArray::<i32>::new(
DataType::List(Box::new(Field::new("item", DataType::Utf8, true))),
vec![0, 4].into(),
Utf8Array::<i32>::from([Some("a"), Some("b"), None, Some("c")]).boxed(),
None,
);
StructArray::new(
DataType::Struct(vec![Field::new("f1", a.data_type().clone(), true)]),
vec![a.boxed()],
None,
)
.boxed()
}
_ => todo!(),
}
}
Expand Down Expand Up @@ -675,6 +690,17 @@ pub fn pyarrow_nested_edge_statistics(column: &str) -> Statistics {
)
};

let new_struct = |arrays: Vec<Box<dyn Array>>, names: Vec<String>| {
let fields = names
.into_iter()
.zip(arrays.iter())
.map(|(n, a)| Field::new(n, a.data_type().clone(), true))
.collect();
StructArray::new(DataType::Struct(fields), arrays, None)
};

let names = vec!["f1".to_string()];

match column {
"simple" => Statistics {
distinct_count: Count::List(new_list(UInt64Array::from([None]).boxed())),
Expand All @@ -688,6 +714,24 @@ pub fn pyarrow_nested_edge_statistics(column: &str) -> Statistics {
min_value: new_list(Box::new(Int64Array::from([None]))).boxed(),
max_value: new_list(Box::new(Int64Array::from([None]))).boxed(),
},
"struct_list_nullable" => Statistics {
distinct_count: Count::Struct(new_struct(
vec![new_list(Box::new(UInt64Array::from([None]))).boxed()],
names.clone(),
)),
null_count: Count::Struct(new_struct(
vec![new_list(Box::new(UInt64Array::from([Some(1)]))).boxed()],
names.clone(),
)),
min_value: Box::new(new_struct(
vec![new_list(Box::new(Utf8Array::<i32>::from_slice(["a"]))).boxed()],
names.clone(),
)),
max_value: Box::new(new_struct(
vec![new_list(Box::new(Utf8Array::<i32>::from_slice(["c"]))).boxed()],
names,
)),
},
_ => unreachable!(),
}
}
Expand Down Expand Up @@ -721,6 +765,8 @@ pub fn pyarrow_struct(column: &str) -> Box<dyn Array> {
];
let string = Utf8Array::<i32>::from(string).boxed();

let mask = [true, true, false, true, true, true, true, true, true, true];

let fields = vec![
Field::new("f1", DataType::Utf8, true),
Field::new("f2", DataType::Boolean, true),
Expand All @@ -729,12 +775,7 @@ pub fn pyarrow_struct(column: &str) -> Box<dyn Array> {
"struct" => StructArray::new(DataType::Struct(fields), vec![string, boolean], None).boxed(),
"struct_nullable" => {
let values = vec![string, boolean];
StructArray::new(
DataType::Struct(fields),
values,
Some([true, true, false, true, true, true, true, true, true, true].into()),
)
.boxed()
StructArray::new(DataType::Struct(fields), values, Some(mask.into())).boxed()
}
"struct_struct" => {
let struct_ = pyarrow_struct("struct");
Expand All @@ -747,6 +788,17 @@ pub fn pyarrow_struct(column: &str) -> Box<dyn Array> {
None,
))
}
"struct_struct_nullable" => {
let struct_ = pyarrow_struct("struct");
Box::new(StructArray::new(
DataType::Struct(vec![
Field::new("f1", DataType::Struct(fields), true),
Field::new("f2", DataType::Boolean, true),
]),
vec![struct_, boolean],
Some(mask.into()),
))
}
_ => todo!(),
}
}
Expand Down Expand Up @@ -854,6 +906,66 @@ pub fn pyarrow_struct_statistics(column: &str) -> Statistics {
)
.boxed(),
},
"struct_struct_nullable" => Statistics {
distinct_count: Count::Struct(new_struct(
vec![
new_struct(
vec![
Box::new(UInt64Array::from([None])),
Box::new(UInt64Array::from([None])),
],
names.clone(),
)
.boxed(),
UInt64Array::from([None]).boxed(),
],
names.clone(),
)),
null_count: Count::Struct(new_struct(
vec![
new_struct(
vec![
Box::new(UInt64Array::from([Some(5)])),
Box::new(UInt64Array::from([Some(5)])),
],
names.clone(),
)
.boxed(),
UInt64Array::from([Some(5)]).boxed(),
],
names.clone(),
)),
min_value: new_struct(
vec![
new_struct(
vec![
Utf8Array::<i32>::from_slice([""]).boxed(),
BooleanArray::from_slice([false]).boxed(),
],
names.clone(),
)
.boxed(),
BooleanArray::from_slice([false]).boxed(),
],
names.clone(),
)
.boxed(),
max_value: new_struct(
vec![
new_struct(
vec![
Utf8Array::<i32>::from_slice(["def"]).boxed(),
BooleanArray::from_slice([true]).boxed(),
],
names.clone(),
)
.boxed(),
BooleanArray::from_slice([true]).boxed(),
],
names,
)
.boxed(),
},
_ => todo!(),
}
}
Expand Down
10 changes: 10 additions & 0 deletions tests/it/io/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,11 @@ fn v1_struct_optional_optional() -> Result<()> {
test_pyarrow_integration("struct_nullable", 1, "struct", false, false, None)
}

#[test]
fn v1_struct_struct_optional() -> Result<()> {
test_pyarrow_integration("struct_struct_nullable", 1, "struct", false, false, None)
}

#[test]
fn v1_nested_edge_1() -> Result<()> {
test_pyarrow_integration("simple", 1, "nested_edge", false, false, None)
Expand All @@ -472,6 +477,11 @@ fn v1_nested_edge_2() -> Result<()> {
test_pyarrow_integration("null", 1, "nested_edge", false, false, None)
}

#[test]
fn v1_nested_edge_3() -> Result<()> {
test_pyarrow_integration("struct_list_nullable", 1, "nested_edge", false, false, None)
}

#[test]
fn v1_map() -> Result<()> {
test_pyarrow_integration("map", 1, "map", false, true, None)
Expand Down

0 comments on commit 890853c

Please sign in to comment.