Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve dictionary encoding when decoding parquet into Arrow arrays, 60x perf improvement (#171) #1180

Merged
merged 10 commits into from
Jan 24, 2022
135 changes: 128 additions & 7 deletions parquet/benches/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::Array;
use arrow::datatypes::DataType;
use criterion::{criterion_group, criterion_main, Criterion};
use parquet::util::{DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator};
use parquet::{
Expand All @@ -24,6 +26,7 @@ use parquet::{
data_type::{ByteArrayType, Int32Type},
schema::types::{ColumnDescPtr, SchemaDescPtr},
};
use rand::{rngs::StdRng, Rng, SeedableRng};
use std::{collections::VecDeque, sync::Arc};

fn build_test_schema() -> SchemaDescPtr {
Expand All @@ -47,9 +50,6 @@ const PAGES_PER_GROUP: usize = 2;
const VALUES_PER_PAGE: usize = 10_000;
const BATCH_SIZE: usize = 8192;

use arrow::array::Array;
use rand::{rngs::StdRng, Rng, SeedableRng};

pub fn seedable_rng() -> StdRng {
StdRng::seed_from_u64(42)
}
Expand Down Expand Up @@ -333,6 +333,46 @@ fn create_string_byte_array_reader(
make_byte_array_reader(Box::new(page_iterator), column_desc, None, true).unwrap()
}

fn create_string_byte_array_dictionary_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
use parquet::arrow::array_reader::make_byte_array_dictionary_reader;
let arrow_type =
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));

make_byte_array_dictionary_reader(
Box::new(page_iterator),
column_desc,
Some(arrow_type),
true,
)
.unwrap()
}

fn create_complex_object_byte_array_dictionary_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
use parquet::arrow::array_reader::{
make_byte_array_dictionary_reader, ComplexObjectArrayReader,
};
use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
let arrow_type =
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));

let converter = Utf8Converter::new(Utf8ArrayConverter {});
Box::new(
ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
Box::new(page_iterator),
column_desc,
converter,
Some(arrow_type),
)
.unwrap(),
)
}

fn add_benches(c: &mut Criterion) {
const EXPECTED_VALUE_COUNT: usize =
NUM_ROW_GROUPS * PAGES_PER_GROUP * VALUES_PER_PAGE;
Expand All @@ -344,10 +384,7 @@ fn add_benches(c: &mut Criterion) {
let mandatory_int32_column_desc = schema.column(0);
let optional_int32_column_desc = schema.column(1);
let mandatory_string_column_desc = schema.column(2);
// println!("mandatory_string_column_desc: {:?}", mandatory_string_column_desc);
let optional_string_column_desc = schema.column(3);
// println!("optional_string_column_desc: {:?}", optional_string_column_desc);

// primitive / int32 benchmarks
// =============================

Expand Down Expand Up @@ -726,7 +763,7 @@ fn add_benches(c: &mut Criterion) {

// string, dictionary encoded, half NULLs
let dictionary_string_half_null_data = build_dictionary_encoded_string_page_iterator(
schema,
schema.clone(),
optional_string_column_desc.clone(),
0.5,
);
Expand Down Expand Up @@ -758,6 +795,90 @@ fn add_benches(c: &mut Criterion) {
},
);

group.bench_function(
"read StringDictionary, dictionary encoded, mandatory, no NULLs - old",
|b| {
b.iter(|| {
let array_reader = create_complex_object_byte_array_dictionary_reader(
dictionary_string_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);

group.bench_function(
"read StringDictionary, dictionary encoded, mandatory, no NULLs - new",
|b| {
b.iter(|| {
let array_reader = create_string_byte_array_dictionary_reader(
dictionary_string_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);

group.bench_function(
"read StringDictionary, dictionary encoded, optional, no NULLs - old",
|b| {
b.iter(|| {
let array_reader = create_complex_object_byte_array_dictionary_reader(
dictionary_string_no_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);

group.bench_function(
"read StringDictionary, dictionary encoded, optional, no NULLs - new",
|b| {
b.iter(|| {
let array_reader = create_string_byte_array_dictionary_reader(
dictionary_string_no_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);

group.bench_function(
"read StringDictionary, dictionary encoded, optional, half NULLs - old",
|b| {
b.iter(|| {
let array_reader = create_complex_object_byte_array_dictionary_reader(
dictionary_string_half_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);

group.bench_function(
"read StringDictionary, dictionary encoded, optional, half NULLs - new",
|b| {
b.iter(|| {
let array_reader = create_string_byte_array_dictionary_reader(
dictionary_string_half_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);

group.finish();
}

Expand Down
68 changes: 26 additions & 42 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ use arrow::datatypes::{
use arrow::util::bit_util;

use crate::arrow::converter::{
BinaryArrayConverter, BinaryConverter, Converter, DecimalArrayConverter,
DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter,
Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter,
IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
IntervalYearMonthConverter, Utf8ArrayConverter, Utf8Converter,
Converter, DecimalArrayConverter, DecimalConverter, FixedLenBinaryConverter,
FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter,
IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
};
use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer};
use crate::arrow::record_reader::{GenericRecordReader, RecordReader};
Expand All @@ -70,8 +69,8 @@ use crate::column::page::PageIterator;
use crate::column::reader::decoder::ColumnValueDecoder;
use crate::column::reader::ColumnReaderImpl;
use crate::data_type::{
BoolType, ByteArrayType, DataType, DoubleType, FixedLenByteArrayType, FloatType,
Int32Type, Int64Type, Int96Type,
BoolType, DataType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type,
Int64Type, Int96Type,
};
use crate::errors::{ParquetError, ParquetError::ArrowError, Result};
use crate::file::reader::{FilePageIterator, FileReader};
Expand All @@ -81,9 +80,15 @@ use crate::schema::types::{
use crate::schema::visitor::TypeVisitor;

mod byte_array;
mod byte_array_dictionary;
mod dictionary_buffer;
mod offset_buffer;

#[cfg(test)]
tustvold marked this conversation as resolved.
Show resolved Hide resolved
mod test_util;

pub use byte_array::make_byte_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;

/// Array reader reads parquet data into arrow array.
pub trait ArrayReader {
Expand Down Expand Up @@ -271,7 +276,8 @@ where
.clone(),
};

let record_reader = RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);
let record_reader =
RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);

Ok(Self {
data_type,
Expand Down Expand Up @@ -829,17 +835,18 @@ fn remove_indices(
size
),
ArrowType::Struct(fields) => {
let struct_array = arr.as_any()
let struct_array = arr
.as_any()
.downcast_ref::<StructArray>()
.expect("Array should be a struct");

// Recursively call remove indices on each of the structs fields
let new_columns = fields.into_iter()
let new_columns = fields
.into_iter()
.zip(struct_array.columns())
.map(|(field, column)| {
let dt = field.data_type().clone();
Ok((field,
remove_indices(column.clone(), dt, indices.clone())?))
Ok((field, remove_indices(column.clone(), dt, indices.clone())?))
})
.collect::<Result<Vec<_>>>()?;

Expand Down Expand Up @@ -1783,35 +1790,12 @@ impl<'a> ArrayReaderBuilder {
)?,
)),
PhysicalType::BYTE_ARRAY => match arrow_type {
// TODO: Replace with optimised dictionary reader (#171)
Some(ArrowType::Dictionary(_, _)) => {
match cur_type.get_basic_info().converted_type() {
ConvertedType::UTF8 => {
let converter = Utf8Converter::new(Utf8ArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
Utf8Converter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
_ => {
let converter = BinaryConverter::new(BinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
BinaryConverter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
}
}
Some(ArrowType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
),
_ => make_byte_array_reader(
page_iterator,
column_desc,
Expand Down Expand Up @@ -2025,7 +2009,7 @@ mod tests {
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::basic::{Encoding, Type as PhysicalType};
use crate::column::page::{Page, PageReader};
use crate::data_type::{ByteArray, DataType, Int32Type, Int64Type};
use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type, Int64Type};
use crate::errors::Result;
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::schema::parser::parse_message_type;
Expand Down
Loading