Skip to content

Commit

Permalink
Optimized ByteArrayReader (#1040)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jan 12, 2022
1 parent 94d66ad commit 578ca91
Show file tree
Hide file tree
Showing 5 changed files with 756 additions and 84 deletions.
27 changes: 10 additions & 17 deletions parquet/benches/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,20 +319,13 @@ fn create_string_arrow_array_reader(
ArrowArrayReader::try_new(page_iterator, column_desc, converter, None).unwrap()
}

fn create_string_complex_array_reader(
fn create_string_byte_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> impl ArrayReader {
use parquet::arrow::array_reader::ComplexObjectArrayReader;
use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
let converter = Utf8Converter::new(Utf8ArrayConverter {});
ComplexObjectArrayReader::<parquet::data_type::ByteArrayType, Utf8Converter>::new(
Box::new(page_iterator),
column_desc,
converter,
None,
)
.unwrap()
use parquet::arrow::array_reader::ByteArrayReader;
ByteArrayReader::new_with_options(Box::new(page_iterator), column_desc, None, true)
.unwrap()
}

fn add_benches(c: &mut Criterion) {
Expand Down Expand Up @@ -568,7 +561,7 @@ fn add_benches(c: &mut Criterion) {
"read StringArray, plain encoded, mandatory, no NULLs - old",
|b| {
b.iter(|| {
let array_reader = create_string_complex_array_reader(
let array_reader = create_string_byte_array_reader(
plain_string_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
Expand Down Expand Up @@ -601,7 +594,7 @@ fn add_benches(c: &mut Criterion) {
"read StringArray, plain encoded, optional, no NULLs - old",
|b| {
b.iter(|| {
let array_reader = create_string_complex_array_reader(
let array_reader = create_string_byte_array_reader(
plain_string_no_null_data.clone(),
optional_string_column_desc.clone(),
);
Expand Down Expand Up @@ -635,7 +628,7 @@ fn add_benches(c: &mut Criterion) {
"read StringArray, plain encoded, optional, half NULLs - old",
|b| {
b.iter(|| {
let array_reader = create_string_complex_array_reader(
let array_reader = create_string_byte_array_reader(
plain_string_half_null_data.clone(),
optional_string_column_desc.clone(),
);
Expand Down Expand Up @@ -669,7 +662,7 @@ fn add_benches(c: &mut Criterion) {
"read StringArray, dictionary encoded, mandatory, no NULLs - old",
|b| {
b.iter(|| {
let array_reader = create_string_complex_array_reader(
let array_reader = create_string_byte_array_reader(
dictionary_string_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
Expand Down Expand Up @@ -702,7 +695,7 @@ fn add_benches(c: &mut Criterion) {
"read StringArray, dictionary encoded, optional, no NULLs - old",
|b| {
b.iter(|| {
let array_reader = create_string_complex_array_reader(
let array_reader = create_string_byte_array_reader(
dictionary_string_no_null_data.clone(),
optional_string_column_desc.clone(),
);
Expand Down Expand Up @@ -736,7 +729,7 @@ fn add_benches(c: &mut Criterion) {
"read StringArray, dictionary encoded, optional, half NULLs - old",
|b| {
b.iter(|| {
let array_reader = create_string_complex_array_reader(
let array_reader = create_string_byte_array_reader(
dictionary_string_half_null_data.clone(),
optional_string_column_desc.clone(),
);
Expand Down
90 changes: 39 additions & 51 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ use crate::arrow::converter::{
DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter,
Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter,
IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
IntervalYearMonthConverter, LargeBinaryArrayConverter, LargeBinaryConverter,
LargeUtf8ArrayConverter, LargeUtf8Converter,
IntervalYearMonthConverter, Utf8ArrayConverter, Utf8Converter,
};
use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer};
use crate::arrow::record_reader::{GenericRecordReader, RecordReader};
Expand All @@ -81,6 +80,10 @@ use crate::schema::types::{
};
use crate::schema::visitor::TypeVisitor;

mod byte_array;

pub use byte_array::ByteArrayReader;

/// Array reader reads parquet data into arrow array.
pub trait ArrayReader {
fn as_any(&self) -> &dyn Any;
Expand Down Expand Up @@ -1749,57 +1752,42 @@ impl<'a> ArrayReaderBuilder {
null_mask_only,
)?,
)),
PhysicalType::BYTE_ARRAY => {
if cur_type.get_basic_info().converted_type() == ConvertedType::UTF8 {
if let Some(ArrowType::LargeUtf8) = arrow_type {
let converter =
LargeUtf8Converter::new(LargeUtf8ArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
LargeUtf8Converter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
} else {
use crate::arrow::arrow_array_reader::{
ArrowArrayReader, StringArrayConverter,
};
let converter = StringArrayConverter::new();
Ok(Box::new(ArrowArrayReader::try_new(
*page_iterator,
column_desc,
converter,
arrow_type,
)?))
PhysicalType::BYTE_ARRAY => match arrow_type {
// TODO: Replace with optimised dictionary reader (#171)
Some(ArrowType::Dictionary(_, _)) => {
match cur_type.get_basic_info().converted_type() {
ConvertedType::UTF8 => {
let converter = Utf8Converter::new(Utf8ArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
Utf8Converter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
_ => {
let converter = BinaryConverter::new(BinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
BinaryConverter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
}
} else if let Some(ArrowType::LargeBinary) = arrow_type {
let converter =
LargeBinaryConverter::new(LargeBinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
LargeBinaryConverter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
} else {
let converter = BinaryConverter::new(BinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
BinaryConverter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
}
_ => Ok(Box::new(ByteArrayReader::new(
page_iterator,
column_desc,
arrow_type,
)?)),
},
PhysicalType::FIXED_LEN_BYTE_ARRAY
if cur_type.get_basic_info().converted_type()
== ConvertedType::DECIMAL =>
Expand Down
Loading

0 comments on commit 578ca91

Please sign in to comment.