From f775c2a67f90ddfe2c098f87d13db932399a424d Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 26 Jul 2022 12:42:07 +0000 Subject: [PATCH] Fixed error --- src/array/struct_/mod.rs | 2 +- .../parquet/read/deserialize/nested_utils.rs | 22 ++++++++++--------- src/io/parquet/read/deserialize/utils.rs | 22 +++++++++---------- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/array/struct_/mod.rs b/src/array/struct_/mod.rs index 2c9383c1ec1..a7c88b6fb19 100644 --- a/src/array/struct_/mod.rs +++ b/src/array/struct_/mod.rs @@ -81,7 +81,7 @@ impl StructArray { .try_for_each(|(index, a_len)| { if a_len != len { Err(Error::oos(format!( - "The children DataTypes of a StructArray must equal the children data types. + "The children must have an equal number of values. However, the values at index {index} have a length of {a_len}, which is different from values at index 0, {len}." ))) } else { diff --git a/src/io/parquet/read/deserialize/nested_utils.rs b/src/io/parquet/read/deserialize/nested_utils.rs index 627c1dc3a45..760cedd737e 100644 --- a/src/io/parquet/read/deserialize/nested_utils.rs +++ b/src/io/parquet/read/deserialize/nested_utils.rs @@ -348,20 +348,18 @@ pub(super) fn extend<'a, D: NestedDecoder<'a>>( let mut page = NestedPage::try_new(page)?; let capacity = chunk_size.unwrap_or(0); - let chunk_size = chunk_size.map(|x| x.min(*remaining)).unwrap_or(*remaining); + // chunk_size = None, remaining = 44 => chunk_size = 44 + let chunk_size = chunk_size.unwrap_or(usize::MAX); let (mut nested, mut decoded) = if let Some((nested, decoded)) = items.pop_back() { - *remaining += nested.len(); (nested, decoded) } else { // there is no state => initialize it (init_nested(init, capacity), decoder.with_capacity(0)) }; + let existing = nested.len(); - // e.g. chunk = 10, remaining = 100, decoded = 2 => 8.min(100) = 8 - // e.g. chunk = 100, remaining = 100, decoded = 0 => 100.min(100) = 100 - // e.g. chunk = 10, remaining = 2, decoded = 2 => 8.min(2) = 2 - let additional = (chunk_size - nested.len()).min(*remaining); + let additional = (chunk_size - existing).min(*remaining); // extend the current state extend_offsets2( @@ -372,7 +370,7 @@ pub(super) fn extend<'a, D: NestedDecoder<'a>>( decoder, additional, ); - *remaining -= nested.len(); + *remaining -= nested.len() - existing; items.push_back((nested, decoded)); while page.len() > 0 && *remaining > 0 { @@ -480,8 +478,10 @@ where { // front[a1, a2, a3, ...]back if items.len() > 1 { - let (nested, decoded) = items.pop_front().unwrap(); - return MaybeNext::Some(Ok((nested, decoded))); + return MaybeNext::Some(Ok(items.pop_front().unwrap())); + } + if (items.len() == 1) && items.front().unwrap().0.len() == chunk_size.unwrap_or(usize::MAX) { + return MaybeNext::Some(Ok(items.pop_front().unwrap())); } if *remaining == 0 { return match items.pop_front() { @@ -506,7 +506,9 @@ where Err(e) => return MaybeNext::Some(Err(e)), }; - if items.front().unwrap().0.len() < chunk_size.unwrap_or(0) { + if (items.len() == 1) + && items.front().unwrap().0.len() < chunk_size.unwrap_or(usize::MAX) + { MaybeNext::More } else { MaybeNext::Some(Ok(items.pop_front().unwrap())) diff --git a/src/io/parquet/read/deserialize/utils.rs b/src/io/parquet/read/deserialize/utils.rs index 5ff3d269537..eaae1644df8 100644 --- a/src/io/parquet/read/deserialize/utils.rs +++ b/src/io/parquet/read/deserialize/utils.rs @@ -376,25 +376,20 @@ pub(super) fn extend_from_new_page<'a, T: Decoder<'a>>( decoder: &T, ) { let capacity = chunk_size.unwrap_or(0); - let chunk_size = chunk_size.map(|x| x.min(*remaining)).unwrap_or(*remaining); + let chunk_size = chunk_size.unwrap_or(usize::MAX); let mut decoded = if let Some(decoded) = items.pop_back() { - *remaining += decoded.len(); decoded } else { // there is no state => initialize it decoder.with_capacity(capacity) }; + let existing = decoded.len(); - // e.g. chunk = 10, remaining = 100, decoded = 2 => 8.min(100) = 8 - // e.g. chunk = 100, remaining = 100, decoded = 0 => 100.min(100) = 100 - // e.g. chunk = 10, remaining = 2, decoded = 2 => 8.min(2) = 2 - let additional = (chunk_size - decoded.len()).min(*remaining); + let additional = (chunk_size - existing).min(*remaining); - // extend the current state decoder.extend_from_state(&mut page, &mut decoded, additional); - - *remaining -= decoded.len(); + *remaining -= decoded.len() - existing; items.push_back(decoded); while page.len() > 0 && *remaining > 0 { @@ -424,8 +419,10 @@ pub(super) fn next<'a, I: DataPages, D: Decoder<'a>>( ) -> MaybeNext> { // front[a1, a2, a3, ...]back if items.len() > 1 { - let item = items.pop_front().unwrap(); - return MaybeNext::Some(Ok(item)); + return MaybeNext::Some(Ok(items.pop_front().unwrap())); + } + if (items.len() == 1) && items.front().unwrap().len() == chunk_size.unwrap_or(usize::MAX) { + return MaybeNext::Some(Ok(items.pop_front().unwrap())); } if *remaining == 0 { return match items.pop_front() { @@ -445,7 +442,8 @@ pub(super) fn next<'a, I: DataPages, D: Decoder<'a>>( extend_from_new_page(page, chunk_size, items, remaining, decoder); - if (items.len() == 1) && items.front().unwrap().len() < chunk_size.unwrap_or(0) { + if (items.len() == 1) && items.front().unwrap().len() < chunk_size.unwrap_or(usize::MAX) + { MaybeNext::More } else { let decoded = items.pop_front().unwrap();