Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed error
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jul 26, 2022
1 parent 5c745f4 commit f775c2a
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/array/struct_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl StructArray {
.try_for_each(|(index, a_len)| {
if a_len != len {
Err(Error::oos(format!(
"The children DataTypes of a StructArray must equal the children data types.
"The children must have an equal number of values.
However, the values at index {index} have a length of {a_len}, which is different from values at index 0, {len}."
)))
} else {
Expand Down
22 changes: 12 additions & 10 deletions src/io/parquet/read/deserialize/nested_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,20 +348,18 @@ pub(super) fn extend<'a, D: NestedDecoder<'a>>(
let mut page = NestedPage::try_new(page)?;

let capacity = chunk_size.unwrap_or(0);
let chunk_size = chunk_size.map(|x| x.min(*remaining)).unwrap_or(*remaining);
// chunk_size = None, remaining = 44 => chunk_size = 44
let chunk_size = chunk_size.unwrap_or(usize::MAX);

let (mut nested, mut decoded) = if let Some((nested, decoded)) = items.pop_back() {
*remaining += nested.len();
(nested, decoded)
} else {
// there is no state => initialize it
(init_nested(init, capacity), decoder.with_capacity(0))
};
let existing = nested.len();

// e.g. chunk = 10, remaining = 100, decoded = 2 => 8.min(100) = 8
// e.g. chunk = 100, remaining = 100, decoded = 0 => 100.min(100) = 100
// e.g. chunk = 10, remaining = 2, decoded = 2 => 8.min(2) = 2
let additional = (chunk_size - nested.len()).min(*remaining);
let additional = (chunk_size - existing).min(*remaining);

// extend the current state
extend_offsets2(
Expand All @@ -372,7 +370,7 @@ pub(super) fn extend<'a, D: NestedDecoder<'a>>(
decoder,
additional,
);
*remaining -= nested.len();
*remaining -= nested.len() - existing;
items.push_back((nested, decoded));

while page.len() > 0 && *remaining > 0 {
Expand Down Expand Up @@ -480,8 +478,10 @@ where
{
// front[a1, a2, a3, ...]back
if items.len() > 1 {
let (nested, decoded) = items.pop_front().unwrap();
return MaybeNext::Some(Ok((nested, decoded)));
return MaybeNext::Some(Ok(items.pop_front().unwrap()));
}
if (items.len() == 1) && items.front().unwrap().0.len() == chunk_size.unwrap_or(usize::MAX) {
return MaybeNext::Some(Ok(items.pop_front().unwrap()));
}
if *remaining == 0 {
return match items.pop_front() {
Expand All @@ -506,7 +506,9 @@ where
Err(e) => return MaybeNext::Some(Err(e)),
};

if items.front().unwrap().0.len() < chunk_size.unwrap_or(0) {
if (items.len() == 1)
&& items.front().unwrap().0.len() < chunk_size.unwrap_or(usize::MAX)
{
MaybeNext::More
} else {
MaybeNext::Some(Ok(items.pop_front().unwrap()))
Expand Down
22 changes: 10 additions & 12 deletions src/io/parquet/read/deserialize/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,25 +376,20 @@ pub(super) fn extend_from_new_page<'a, T: Decoder<'a>>(
decoder: &T,
) {
let capacity = chunk_size.unwrap_or(0);
let chunk_size = chunk_size.map(|x| x.min(*remaining)).unwrap_or(*remaining);
let chunk_size = chunk_size.unwrap_or(usize::MAX);

let mut decoded = if let Some(decoded) = items.pop_back() {
*remaining += decoded.len();
decoded
} else {
// there is no state => initialize it
decoder.with_capacity(capacity)
};
let existing = decoded.len();

// e.g. chunk = 10, remaining = 100, decoded = 2 => 8.min(100) = 8
// e.g. chunk = 100, remaining = 100, decoded = 0 => 100.min(100) = 100
// e.g. chunk = 10, remaining = 2, decoded = 2 => 8.min(2) = 2
let additional = (chunk_size - decoded.len()).min(*remaining);
let additional = (chunk_size - existing).min(*remaining);

// extend the current state
decoder.extend_from_state(&mut page, &mut decoded, additional);

*remaining -= decoded.len();
*remaining -= decoded.len() - existing;
items.push_back(decoded);

while page.len() > 0 && *remaining > 0 {
Expand Down Expand Up @@ -424,8 +419,10 @@ pub(super) fn next<'a, I: DataPages, D: Decoder<'a>>(
) -> MaybeNext<Result<D::DecodedState, Error>> {
// front[a1, a2, a3, ...]back
if items.len() > 1 {
let item = items.pop_front().unwrap();
return MaybeNext::Some(Ok(item));
return MaybeNext::Some(Ok(items.pop_front().unwrap()));
}
if (items.len() == 1) && items.front().unwrap().len() == chunk_size.unwrap_or(usize::MAX) {
return MaybeNext::Some(Ok(items.pop_front().unwrap()));
}
if *remaining == 0 {
return match items.pop_front() {
Expand All @@ -445,7 +442,8 @@ pub(super) fn next<'a, I: DataPages, D: Decoder<'a>>(

extend_from_new_page(page, chunk_size, items, remaining, decoder);

if (items.len() == 1) && items.front().unwrap().len() < chunk_size.unwrap_or(0) {
if (items.len() == 1) && items.front().unwrap().len() < chunk_size.unwrap_or(usize::MAX)
{
MaybeNext::More
} else {
let decoded = items.pop_front().unwrap();
Expand Down

0 comments on commit f775c2a

Please sign in to comment.