Skip to content

Commit

Permalink
Optimized ByteArrayReader (apache#1040)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Dec 21, 2021
1 parent 47d17ca commit 56c3bdc
Show file tree
Hide file tree
Showing 6 changed files with 753 additions and 89 deletions.
27 changes: 10 additions & 17 deletions parquet/benches/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,20 +319,13 @@ fn create_string_arrow_array_reader(
ArrowArrayReader::try_new(page_iterator, column_desc, converter, None).unwrap()
}

fn create_string_complex_array_reader(
fn create_string_byte_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> impl ArrayReader {
use parquet::arrow::array_reader::ComplexObjectArrayReader;
use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
let converter = Utf8Converter::new(Utf8ArrayConverter {});
ComplexObjectArrayReader::<parquet::data_type::ByteArrayType, Utf8Converter>::new(
Box::new(page_iterator),
column_desc,
converter,
None,
)
.unwrap()
use parquet::arrow::array_reader::ByteArrayReader;
ByteArrayReader::new_with_options(Box::new(page_iterator), column_desc, None, true)
.unwrap()
}

fn add_benches(c: &mut Criterion) {
Expand Down Expand Up @@ -568,7 +561,7 @@ fn add_benches(c: &mut Criterion) {
"read StringArray, plain encoded, mandatory, no NULLs - old",
|b| {
b.iter(|| {
let array_reader = create_string_complex_array_reader(
let array_reader = create_string_byte_array_reader(
plain_string_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
Expand Down Expand Up @@ -601,7 +594,7 @@ fn add_benches(c: &mut Criterion) {
"read StringArray, plain encoded, optional, no NULLs - old",
|b| {
b.iter(|| {
let array_reader = create_string_complex_array_reader(
let array_reader = create_string_byte_array_reader(
plain_string_no_null_data.clone(),
optional_string_column_desc.clone(),
);
Expand Down Expand Up @@ -635,7 +628,7 @@ fn add_benches(c: &mut Criterion) {
"read StringArray, plain encoded, optional, half NULLs - old",
|b| {
b.iter(|| {
let array_reader = create_string_complex_array_reader(
let array_reader = create_string_byte_array_reader(
plain_string_half_null_data.clone(),
optional_string_column_desc.clone(),
);
Expand Down Expand Up @@ -669,7 +662,7 @@ fn add_benches(c: &mut Criterion) {
"read StringArray, dictionary encoded, mandatory, no NULLs - old",
|b| {
b.iter(|| {
let array_reader = create_string_complex_array_reader(
let array_reader = create_string_byte_array_reader(
dictionary_string_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
Expand Down Expand Up @@ -702,7 +695,7 @@ fn add_benches(c: &mut Criterion) {
"read StringArray, dictionary encoded, optional, no NULLs - old",
|b| {
b.iter(|| {
let array_reader = create_string_complex_array_reader(
let array_reader = create_string_byte_array_reader(
dictionary_string_no_null_data.clone(),
optional_string_column_desc.clone(),
);
Expand Down Expand Up @@ -736,7 +729,7 @@ fn add_benches(c: &mut Criterion) {
"read StringArray, dictionary encoded, optional, half NULLs - old",
|b| {
b.iter(|| {
let array_reader = create_string_complex_array_reader(
let array_reader = create_string_byte_array_reader(
dictionary_string_half_null_data.clone(),
optional_string_column_desc.clone(),
);
Expand Down
104 changes: 49 additions & 55 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,14 @@ use crate::arrow::converter::{
DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter,
Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter,
IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
IntervalYearMonthConverter, LargeBinaryArrayConverter, LargeBinaryConverter,
LargeUtf8ArrayConverter, LargeUtf8Converter,
IntervalYearMonthConverter, Utf8ArrayConverter, Utf8Converter,
};
use crate::arrow::record_reader::RecordReader;
use crate::arrow::record_reader::buffer::{RecordBuffer, ValueBuffer};
use crate::arrow::record_reader::{GenericRecordReader, RecordReader};
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{ConvertedType, Repetition, Type as PhysicalType};
use crate::column::page::PageIterator;
use crate::column::reader::decoder::ColumnValueDecoder;
use crate::column::reader::ColumnReaderImpl;
use crate::data_type::{
BoolType, ByteArrayType, DataType, DoubleType, FixedLenByteArrayType, FloatType,
Expand All @@ -79,6 +80,10 @@ use crate::schema::types::{
use crate::schema::visitor::TypeVisitor;
use std::any::Any;

mod byte_array;

pub use byte_array::ByteArrayReader;

/// Array reader reads parquet data into arrow array.
pub trait ArrayReader {
fn as_any(&self) -> &dyn Any;
Expand All @@ -104,11 +109,15 @@ pub trait ArrayReader {
///
/// Returns the number of records read, which can be less than batch_size if
/// pages is exhausted.
fn read_records<T: DataType>(
record_reader: &mut RecordReader<T>,
fn read_records<V, CV>(
record_reader: &mut GenericRecordReader<V, CV>,
pages: &mut dyn PageIterator,
batch_size: usize,
) -> Result<usize> {
) -> Result<usize>
where
V: RecordBuffer + ValueBuffer + Default,
CV: ColumnValueDecoder<Writer = V::Writer>,
{
let mut records_read = 0usize;
while records_read < batch_size {
let records_to_read = batch_size - records_read;
Expand Down Expand Up @@ -1706,57 +1715,42 @@ impl<'a> ArrayReaderBuilder {
null_mask_only,
)?,
)),
PhysicalType::BYTE_ARRAY => {
if cur_type.get_basic_info().converted_type() == ConvertedType::UTF8 {
if let Some(ArrowType::LargeUtf8) = arrow_type {
let converter =
LargeUtf8Converter::new(LargeUtf8ArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
LargeUtf8Converter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
} else {
use crate::arrow::arrow_array_reader::{
ArrowArrayReader, StringArrayConverter,
};
let converter = StringArrayConverter::new();
Ok(Box::new(ArrowArrayReader::try_new(
*page_iterator,
column_desc,
converter,
arrow_type,
)?))
PhysicalType::BYTE_ARRAY => match arrow_type {
// TODO: Replace with optimised dictionary reader (#171)
Some(ArrowType::Dictionary(_, _)) => {
match cur_type.get_basic_info().converted_type() {
ConvertedType::UTF8 => {
let converter = Utf8Converter::new(Utf8ArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
Utf8Converter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
_ => {
let converter = BinaryConverter::new(BinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
BinaryConverter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
}
} else if let Some(ArrowType::LargeBinary) = arrow_type {
let converter =
LargeBinaryConverter::new(LargeBinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
LargeBinaryConverter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
} else {
let converter = BinaryConverter::new(BinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
BinaryConverter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
}
_ => Ok(Box::new(ByteArrayReader::new(
page_iterator,
column_desc,
arrow_type,
)?)),
},
PhysicalType::FIXED_LEN_BYTE_ARRAY
if cur_type.get_basic_info().converted_type()
== ConvertedType::DECIMAL =>
Expand Down
Loading

0 comments on commit 56c3bdc

Please sign in to comment.