Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove DecimalByteArrayConvert (#2480) #2522

Merged
merged 3 commits into from
Aug 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 2 additions & 14 deletions parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::arrow::array_reader::{
PrimitiveArrayReader, RowGroupCollection, StructArrayReader,
};
use crate::arrow::buffer::converter::{
DecimalArrayConverter, DecimalByteArrayConvert, DecimalFixedLengthByteArrayConverter,
DecimalArrayConverter, DecimalFixedLengthByteArrayConverter,
FixedLenBinaryConverter, FixedSizeArrayConverter, Int96ArrayConverter,
Int96Converter, IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
Expand All @@ -35,7 +35,7 @@ use crate::arrow::schema::{convert_schema, ParquetField, ParquetFieldType};
use crate::arrow::ProjectionMask;
use crate::basic::Type as PhysicalType;
use crate::data_type::{
BoolType, ByteArrayType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type,
BoolType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type,
Int64Type, Int96Type,
};
use crate::errors::Result;
Expand Down Expand Up @@ -217,18 +217,6 @@ fn build_primitive_reader(
Some(DataType::Dictionary(_, _)) => {
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)
}
Some(DataType::Decimal128(precision, scale)) => {
// read decimal data from parquet binary physical type
let convert = DecimalByteArrayConvert::new(DecimalArrayConverter::new(
precision, scale,
));
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
DecimalByteArrayConvert,
>::new(
page_iterator, column_desc, convert, arrow_type
)?))
}
_ => make_byte_array_reader(page_iterator, column_desc, arrow_type),
},
PhysicalType::FIXED_LEN_BYTE_ARRAY => match field.arrow_type {
Expand Down
23 changes: 20 additions & 3 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
use crate::arrow::buffer::bit_util::sign_extend_be;
use crate::arrow::buffer::offset_buffer::OffsetBuffer;
use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
use crate::arrow::record_reader::buffer::ScalarValue;
Expand All @@ -29,11 +30,12 @@ use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::memory::ByteBufferPtr;
use arrow::array::{ArrayRef, OffsetSizeTrait};
use arrow::array::{Array, ArrayRef, BinaryArray, Decimal128Array, OffsetSizeTrait};
use arrow::buffer::Buffer;
use arrow::datatypes::DataType as ArrowType;
use std::any::Any;
use std::ops::Range;
use std::sync::Arc;

/// Returns an [`ArrayReader`] that decodes the provided byte array column
pub fn make_byte_array_reader(
Expand All @@ -50,7 +52,7 @@ pub fn make_byte_array_reader(
};

match data_type {
ArrowType::Binary | ArrowType::Utf8 => {
ArrowType::Binary | ArrowType::Utf8 | ArrowType::Decimal128(_, _) => {
let reader = GenericRecordReader::new(column_desc);
Ok(Box::new(ByteArrayReader::<i32>::new(
pages, data_type, reader,
Expand Down Expand Up @@ -117,7 +119,22 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {
self.rep_levels_buffer = self.record_reader.consume_rep_levels();
self.record_reader.reset();

Ok(buffer.into_array(null_buffer, self.data_type.clone()))
let array = match self.data_type {
ArrowType::Decimal128(p, s) => {
let array = buffer.into_array(null_buffer, ArrowType::Binary);
let binary = array.as_any().downcast_ref::<BinaryArray>().unwrap();
let decimal = binary
.iter()
.map(|opt| Some(i128::from_be_bytes(sign_extend_be(opt?))))
.collect::<Decimal128Array>()
.with_precision_and_scale(p, s)?;

Arc::new(decimal)
}
_ => buffer.into_array(null_buffer, self.data_type.clone()),
};

Ok(array)
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
Expand Down
11 changes: 11 additions & 0 deletions parquet/src/arrow/buffer/bit_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ pub fn iter_set_bits_rev(bytes: &[u8]) -> impl Iterator<Item = usize> + '_ {
})
}

/// Performs big endian sign extension
pub fn sign_extend_be<const N: usize>(b: &[u8]) -> [u8; N] {
assert!(b.len() <= N, "Array too large, expected less than {}", N);
let is_negative = (b[0] & 128u8) == 128u8;
let mut result = if is_negative { [255u8; N] } else { [0u8; N] };
for (d, s) in result.iter_mut().skip(N - b.len()).zip(b) {
*d = *s;
}
result
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
29 changes: 6 additions & 23 deletions parquet/src/arrow/buffer/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::data_type::{ByteArray, FixedLenByteArray, Int96};
use crate::data_type::{FixedLenByteArray, Int96};
use arrow::array::{
Array, ArrayRef, Decimal128Array, FixedSizeBinaryArray, FixedSizeBinaryBuilder,
IntervalDayTimeArray, IntervalDayTimeBuilder, IntervalYearMonthArray,
Expand All @@ -26,6 +26,10 @@ use std::sync::Arc;
use crate::errors::Result;
use std::marker::PhantomData;

use crate::arrow::buffer::bit_util::sign_extend_be;
#[cfg(test)]
use crate::data_type::ByteArray;

#[cfg(test)]
use arrow::array::{StringArray, StringBuilder};

Expand Down Expand Up @@ -93,31 +97,13 @@ impl Converter<Vec<Option<FixedLenByteArray>>, Decimal128Array>
}
}

impl Converter<Vec<Option<ByteArray>>, Decimal128Array> for DecimalArrayConverter {
fn convert(&self, source: Vec<Option<ByteArray>>) -> Result<Decimal128Array> {
let array = source
.into_iter()
.map(|array| array.map(|array| from_bytes_to_i128(array.data())))
.collect::<Decimal128Array>()
.with_precision_and_scale(self.precision, self.scale)?;

Ok(array)
}
}

// Convert the bytes array to i128.
// The endian of the input bytes array must be big-endian.
fn from_bytes_to_i128(b: &[u8]) -> i128 {
assert!(b.len() <= 16, "Decimal128Array supports only up to size 16");
let first_bit = b[0] & 128u8 == 128u8;
let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] };
for (i, v) in b.iter().enumerate() {
result[i + (16 - b.len())] = *v;
}
// The bytes array are from parquet file and must be the big-endian.
// The endian is defined by parquet format, and the reference document
// https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
i128::from_be_bytes(result)
i128::from_be_bytes(sign_extend_be(b))
}

/// An Arrow Interval converter, which reads the first 4 bytes of a Parquet interval,
Expand Down Expand Up @@ -238,9 +224,6 @@ pub type DecimalFixedLengthByteArrayConverter = ArrayRefConverter<
DecimalArrayConverter,
>;

pub type DecimalByteArrayConvert =
ArrayRefConverter<Vec<Option<ByteArray>>, Decimal128Array, DecimalArrayConverter>;

pub struct ArrayRefConverter<S, A, C> {
_source: PhantomData<S>,
_array: PhantomData<A>,
Expand Down