Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Reduce memory usage in Parquet->Arrow decimal column chunk conversion #751

Merged
merged 4 commits into from
Jan 13, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 53 additions & 39 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use std::{
collections::VecDeque,
convert::TryInto,
convert::TryFrom,
io::{Read, Seek},
sync::Arc,
};
Expand All @@ -30,9 +30,13 @@ pub use parquet2::{

use crate::{
array::{Array, DictionaryKey, NullArray, PrimitiveArray, StructArray},
bitmap::MutableBitmap,
datatypes::{DataType, Field, IntervalUnit, TimeUnit},
error::{ArrowError, Result},
io::parquet::read::nested_utils::{create_list, init_nested},
io::parquet::read::{
fixed_size_binary::extend_from_page,
nested_utils::{create_list, init_nested},
},
};

mod binary;
Expand Down Expand Up @@ -314,44 +318,54 @@ fn page_iter_to_array<I: FallibleStreamingIterator<Item = DataPage, Error = Parq
PhysicalType::Int64 => {
primitive::iter_to_array(iter, metadata, data_type, nested, |x: i64| x as i128)
}
PhysicalType::FixedLenByteArray(n) => {
if *n > 16 {
Err(ArrowError::NotYetImplemented(format!(
"Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}",
n
)))
} else {
let zeros_padding = (0..(16 - *n)).map(|_| 0u8).collect::<Vec<_>>();
let ones_padding = (0..(16 - *n)).map(|_| !0u8).collect::<Vec<_>>();
fixed_size_binary::iter_to_array(
iter,
DataType::FixedSizeBinary(*n as usize),
metadata,
)
.map(|e| {
let a = e
.into_iter()
.map(|v| {
v.and_then(|v1| {
// Pad with the value of the MSB to correctly handle (two's complement) negative integers.
let msb_set = v1.first().unwrap_or(&0) >> 7 == 1;
let padding = if msb_set {
&ones_padding
} else {
&zeros_padding
};
[padding, v1]
.concat()
.try_into()
.map(i128::from_be_bytes)
.ok()
})
})
.collect::<Vec<_>>();
Box::new(PrimitiveArray::<i128>::from(a).to(data_type))
as Box<dyn Array>
})
&PhysicalType::FixedLenByteArray(n) if n > 16 => {
Err(ArrowError::NotYetImplemented(format!(
"Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}",
n
)))
}
&PhysicalType::FixedLenByteArray(n) => {
let n = usize::try_from(n).unwrap();
let capacity = metadata.num_values() as usize;
let mut validity = MutableBitmap::with_capacity(capacity);
let mut byte_values = Vec::<u8>::new();
let mut i128_values = Vec::<i128>::with_capacity(capacity);

// Iterate through the fixed-size binary pages, converting each fixed-size
// value to an i128, and append to `i128_values`. This conversion requires
// fully materializing the compressed Parquet page into an uncompressed byte
// buffer (`byte_values`), so operating page-at-a-time reduces memory usage as
// opposed to operating on the entire chunk.
while let Some(page) = iter.next()? {
byte_values.clear();
byte_values.reserve(page.num_values() * n);

extend_from_page(
page,
n,
metadata.descriptor(),
&mut byte_values,
&mut validity,
)?;

debug_assert_eq!(byte_values.len() % n, 0);

for fixed_size_value in byte_values.as_slice().chunks_exact(n) {
// Copy the fixed-size byte value to the start of a 16 byte stack
// allocated buffer, then use an arithmetic right shift to fill in
// MSBs, which accounts for leading 1's in negative (two's complement)
// values.
let mut i128_bytes = [0u8; 16];
i128_bytes[..n].copy_from_slice(fixed_size_value);
i128_values.push(i128::from_be_bytes(i128_bytes) >> (128 - 8 * n));
}
}

Ok(Box::new(PrimitiveArray::<i128>::from_data(
data_type,
i128_values.into(),
Some(validity.into()),
)))
}
_ => unreachable!(),
},
Expand Down