From ac450779cc43ccb5959cce017ad46e81c1cf41ec Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 19 Aug 2022 12:31:02 +0100 Subject: [PATCH 1/2] Remove DecimalByteArrayConvert (#2480) --- parquet/src/arrow/array_reader/builder.rs | 17 ++------------ parquet/src/arrow/array_reader/byte_array.rs | 23 ++++++++++++++++--- parquet/src/arrow/buffer/bit_util.rs | 11 +++++++++ parquet/src/arrow/buffer/converter.rs | 24 ++------------------ 4 files changed, 35 insertions(+), 40 deletions(-) diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 84e833ac45e1..3917c3d5a747 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -26,7 +26,7 @@ use crate::arrow::array_reader::{ PrimitiveArrayReader, RowGroupCollection, StructArrayReader, }; use crate::arrow::buffer::converter::{ - DecimalArrayConverter, DecimalByteArrayConvert, DecimalFixedLengthByteArrayConverter, + DecimalArrayConverter, DecimalFixedLengthByteArrayConverter, FixedLenBinaryConverter, FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter, IntervalDayTimeConverter, IntervalYearMonthArrayConverter, IntervalYearMonthConverter, @@ -35,7 +35,7 @@ use crate::arrow::schema::{convert_schema, ParquetField, ParquetFieldType}; use crate::arrow::ProjectionMask; use crate::basic::Type as PhysicalType; use crate::data_type::{ - BoolType, ByteArrayType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type, + BoolType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type, Int64Type, Int96Type, }; use crate::errors::Result; @@ -217,19 +217,6 @@ fn build_primitive_reader( Some(DataType::Dictionary(_, _)) => { make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type) } - Some(DataType::Decimal128(precision, scale)) => { - // read decimal data from parquet binary physical type - let convert = DecimalByteArrayConvert::new(DecimalArrayConverter::new( - precision as i32, - scale as i32, - )); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - DecimalByteArrayConvert, - >::new( - page_iterator, column_desc, convert, arrow_type - )?)) - } _ => make_byte_array_reader(page_iterator, column_desc, arrow_type), }, PhysicalType::FIXED_LEN_BYTE_ARRAY => match field.arrow_type { diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 172aeb96d6d1..2fde0fdbfaf3 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -16,6 +16,7 @@ // under the License. use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; +use crate::arrow::buffer::bit_util::sign_extend_be; use crate::arrow::buffer::offset_buffer::OffsetBuffer; use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::GenericRecordReader; @@ -31,11 +32,12 @@ use crate::encodings::{ use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use crate::util::memory::ByteBufferPtr; -use arrow::array::{ArrayRef, OffsetSizeTrait}; +use arrow::array::{Array, ArrayRef, BinaryArray, Decimal128Array, OffsetSizeTrait}; use arrow::buffer::Buffer; use arrow::datatypes::DataType as ArrowType; use std::any::Any; use std::ops::Range; +use std::sync::Arc; /// Returns an [`ArrayReader`] that decodes the provided byte array column pub fn make_byte_array_reader( @@ -52,7 +54,7 @@ pub fn make_byte_array_reader( }; match data_type { - ArrowType::Binary | ArrowType::Utf8 => { + ArrowType::Binary | ArrowType::Utf8 | ArrowType::Decimal128(_, _) => { let reader = GenericRecordReader::new(column_desc); Ok(Box::new(ByteArrayReader::::new( pages, data_type, reader, @@ -119,7 +121,22 @@ impl ArrayReader for ByteArrayReader { self.rep_levels_buffer = self.record_reader.consume_rep_levels(); self.record_reader.reset(); - Ok(buffer.into_array(null_buffer, self.data_type.clone())) + let array = match self.data_type { + ArrowType::Decimal128(p, s) => { + let array = buffer.into_array(null_buffer, ArrowType::Binary); + let binary = array.as_any().downcast_ref::().unwrap(); + let decimal = binary + .iter() + .map(|opt| Some(i128::from_be_bytes(sign_extend_be(opt?)))) + .collect::() + .with_precision_and_scale(p, s)?; + + Arc::new(decimal) + } + _ => buffer.into_array(null_buffer, self.data_type.clone()), + }; + + Ok(array) } fn skip_records(&mut self, num_records: usize) -> Result { diff --git a/parquet/src/arrow/buffer/bit_util.rs b/parquet/src/arrow/buffer/bit_util.rs index 192ab4b72163..04704237c458 100644 --- a/parquet/src/arrow/buffer/bit_util.rs +++ b/parquet/src/arrow/buffer/bit_util.rs @@ -51,6 +51,17 @@ pub fn iter_set_bits_rev(bytes: &[u8]) -> impl Iterator + '_ { }) } +/// Performs big endian sign extension +pub fn sign_extend_be(b: &[u8]) -> [u8; N] { + assert!(b.len() <= N, "Array too large, expected less than {}", N); + let is_negative = (b[0] & 128u8) == 128u8; + let mut result = if is_negative { [255u8; N] } else { [0u8; N] }; + for (d, s) in result.iter_mut().skip(N - b.len()).zip(b) { + *d = *s; + } + result +} + #[cfg(test)] mod tests { use super::*; diff --git a/parquet/src/arrow/buffer/converter.rs b/parquet/src/arrow/buffer/converter.rs index aeca548bde72..0b99f7ce36e1 100644 --- a/parquet/src/arrow/buffer/converter.rs +++ b/parquet/src/arrow/buffer/converter.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use crate::errors::Result; use std::marker::PhantomData; +use crate::arrow::buffer::bit_util::sign_extend_be; #[cfg(test)] use arrow::array::{StringArray, StringBuilder}; @@ -92,31 +93,13 @@ impl Converter>, Decimal128Array> } } -impl Converter>, Decimal128Array> for DecimalArrayConverter { - fn convert(&self, source: Vec>) -> Result { - let array = source - .into_iter() - .map(|array| array.map(|array| from_bytes_to_i128(array.data()))) - .collect::() - .with_precision_and_scale(self.precision as usize, self.scale as usize)?; - - Ok(array) - } -} - // Convert the bytes array to i128. // The endian of the input bytes array must be big-endian. fn from_bytes_to_i128(b: &[u8]) -> i128 { - assert!(b.len() <= 16, "Decimal128Array supports only up to size 16"); - let first_bit = b[0] & 128u8 == 128u8; - let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] }; - for (i, v) in b.iter().enumerate() { - result[i + (16 - b.len())] = *v; - } // The bytes array are from parquet file and must be the big-endian. // The endian is defined by parquet format, and the reference document // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66 - i128::from_be_bytes(result) + i128::from_be_bytes(sign_extend_be(b)) } /// An Arrow Interval converter, which reads the first 4 bytes of a Parquet interval, @@ -237,9 +220,6 @@ pub type DecimalFixedLengthByteArrayConverter = ArrayRefConverter< DecimalArrayConverter, >; -pub type DecimalByteArrayConvert = - ArrayRefConverter>, Decimal128Array, DecimalArrayConverter>; - pub struct ArrayRefConverter { _source: PhantomData, _array: PhantomData, From 95ecb3c4a265db56df411af77e1aaf1502fac6a7 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 19 Aug 2022 12:39:33 +0100 Subject: [PATCH 2/2] Clippy --- parquet/src/arrow/buffer/converter.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/buffer/converter.rs b/parquet/src/arrow/buffer/converter.rs index 0b99f7ce36e1..1ade4c6fd58d 100644 --- a/parquet/src/arrow/buffer/converter.rs +++ b/parquet/src/arrow/buffer/converter.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::data_type::{ByteArray, FixedLenByteArray, Int96}; +use crate::data_type::{FixedLenByteArray, Int96}; use arrow::array::{ Array, ArrayRef, Decimal128Array, FixedSizeBinaryArray, FixedSizeBinaryBuilder, IntervalDayTimeArray, IntervalDayTimeBuilder, IntervalYearMonthArray, @@ -27,6 +27,9 @@ use crate::errors::Result; use std::marker::PhantomData; use crate::arrow::buffer::bit_util::sign_extend_be; +#[cfg(test)] +use crate::data_type::ByteArray; + #[cfg(test)] use arrow::array::{StringArray, StringBuilder};