Skip to content

Commit

Permalink
Preserve dictionary encoding when decoding parquet into Arrow arrays,…
Browse files Browse the repository at this point in the history
… 60x perf improvement (#171) (#1180)

* Preserve dictionary encoding from parquet (#171)

* Use OffsetBuffer::into_array for dictionary

* Fix and test handling of empty dictionaries

Don't panic if missing dictionary page

* Use ArrayRef instead of Arc<ArrayData>

* Update doc comments

* Add integration test

Tweak RecordReader buffering logic

* Add benchmark

* Set write batch size in parquet fuzz tests

Fix bug in column writer with small page sizes

* Fix test_dictionary_preservation

* Add batch_size comment
  • Loading branch information
tustvold authored Jan 24, 2022
1 parent 66e029e commit d801ac2
Show file tree
Hide file tree
Showing 14 changed files with 1,615 additions and 258 deletions.
135 changes: 128 additions & 7 deletions parquet/benches/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::Array;
use arrow::datatypes::DataType;
use criterion::{criterion_group, criterion_main, Criterion};
use parquet::util::{DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator};
use parquet::{
Expand All @@ -24,6 +26,7 @@ use parquet::{
data_type::{ByteArrayType, Int32Type},
schema::types::{ColumnDescPtr, SchemaDescPtr},
};
use rand::{rngs::StdRng, Rng, SeedableRng};
use std::{collections::VecDeque, sync::Arc};

fn build_test_schema() -> SchemaDescPtr {
Expand All @@ -47,9 +50,6 @@ const PAGES_PER_GROUP: usize = 2;
const VALUES_PER_PAGE: usize = 10_000;
const BATCH_SIZE: usize = 8192;

use arrow::array::Array;
use rand::{rngs::StdRng, Rng, SeedableRng};

pub fn seedable_rng() -> StdRng {
StdRng::seed_from_u64(42)
}
Expand Down Expand Up @@ -333,6 +333,46 @@ fn create_string_byte_array_reader(
make_byte_array_reader(Box::new(page_iterator), column_desc, None, true).unwrap()
}

fn create_string_byte_array_dictionary_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
use parquet::arrow::array_reader::make_byte_array_dictionary_reader;
let arrow_type =
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));

make_byte_array_dictionary_reader(
Box::new(page_iterator),
column_desc,
Some(arrow_type),
true,
)
.unwrap()
}

fn create_complex_object_byte_array_dictionary_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
use parquet::arrow::array_reader::{
make_byte_array_dictionary_reader, ComplexObjectArrayReader,
};
use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
let arrow_type =
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));

let converter = Utf8Converter::new(Utf8ArrayConverter {});
Box::new(
ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
Box::new(page_iterator),
column_desc,
converter,
Some(arrow_type),
)
.unwrap(),
)
}

fn add_benches(c: &mut Criterion) {
const EXPECTED_VALUE_COUNT: usize =
NUM_ROW_GROUPS * PAGES_PER_GROUP * VALUES_PER_PAGE;
Expand All @@ -344,10 +384,7 @@ fn add_benches(c: &mut Criterion) {
let mandatory_int32_column_desc = schema.column(0);
let optional_int32_column_desc = schema.column(1);
let mandatory_string_column_desc = schema.column(2);
// println!("mandatory_string_column_desc: {:?}", mandatory_string_column_desc);
let optional_string_column_desc = schema.column(3);
// println!("optional_string_column_desc: {:?}", optional_string_column_desc);

// primitive / int32 benchmarks
// =============================

Expand Down Expand Up @@ -726,7 +763,7 @@ fn add_benches(c: &mut Criterion) {

// string, dictionary encoded, half NULLs
let dictionary_string_half_null_data = build_dictionary_encoded_string_page_iterator(
schema,
schema.clone(),
optional_string_column_desc.clone(),
0.5,
);
Expand Down Expand Up @@ -758,6 +795,90 @@ fn add_benches(c: &mut Criterion) {
},
);

group.bench_function(
"read StringDictionary, dictionary encoded, mandatory, no NULLs - old",
|b| {
b.iter(|| {
let array_reader = create_complex_object_byte_array_dictionary_reader(
dictionary_string_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);

group.bench_function(
"read StringDictionary, dictionary encoded, mandatory, no NULLs - new",
|b| {
b.iter(|| {
let array_reader = create_string_byte_array_dictionary_reader(
dictionary_string_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);

group.bench_function(
"read StringDictionary, dictionary encoded, optional, no NULLs - old",
|b| {
b.iter(|| {
let array_reader = create_complex_object_byte_array_dictionary_reader(
dictionary_string_no_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);

group.bench_function(
"read StringDictionary, dictionary encoded, optional, no NULLs - new",
|b| {
b.iter(|| {
let array_reader = create_string_byte_array_dictionary_reader(
dictionary_string_no_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);

group.bench_function(
"read StringDictionary, dictionary encoded, optional, half NULLs - old",
|b| {
b.iter(|| {
let array_reader = create_complex_object_byte_array_dictionary_reader(
dictionary_string_half_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);

group.bench_function(
"read StringDictionary, dictionary encoded, optional, half NULLs - new",
|b| {
b.iter(|| {
let array_reader = create_string_byte_array_dictionary_reader(
dictionary_string_half_null_data.clone(),
optional_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);

group.finish();
}

Expand Down
68 changes: 26 additions & 42 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ use arrow::datatypes::{
use arrow::util::bit_util;

use crate::arrow::converter::{
BinaryArrayConverter, BinaryConverter, Converter, DecimalArrayConverter,
DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter,
Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter,
IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
IntervalYearMonthConverter, Utf8ArrayConverter, Utf8Converter,
Converter, DecimalArrayConverter, DecimalConverter, FixedLenBinaryConverter,
FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter,
IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
};
use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer};
use crate::arrow::record_reader::{GenericRecordReader, RecordReader};
Expand All @@ -70,8 +69,8 @@ use crate::column::page::PageIterator;
use crate::column::reader::decoder::ColumnValueDecoder;
use crate::column::reader::ColumnReaderImpl;
use crate::data_type::{
BoolType, ByteArrayType, DataType, DoubleType, FixedLenByteArrayType, FloatType,
Int32Type, Int64Type, Int96Type,
BoolType, DataType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type,
Int64Type, Int96Type,
};
use crate::errors::{ParquetError, ParquetError::ArrowError, Result};
use crate::file::reader::{FilePageIterator, FileReader};
Expand All @@ -81,9 +80,15 @@ use crate::schema::types::{
use crate::schema::visitor::TypeVisitor;

mod byte_array;
mod byte_array_dictionary;
mod dictionary_buffer;
mod offset_buffer;

#[cfg(test)]
mod test_util;

pub use byte_array::make_byte_array_reader;
pub use byte_array_dictionary::make_byte_array_dictionary_reader;

/// Array reader reads parquet data into arrow array.
pub trait ArrayReader {
Expand Down Expand Up @@ -271,7 +276,8 @@ where
.clone(),
};

let record_reader = RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);
let record_reader =
RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);

Ok(Self {
data_type,
Expand Down Expand Up @@ -829,17 +835,18 @@ fn remove_indices(
size
),
ArrowType::Struct(fields) => {
let struct_array = arr.as_any()
let struct_array = arr
.as_any()
.downcast_ref::<StructArray>()
.expect("Array should be a struct");

// Recursively call remove indices on each of the structs fields
let new_columns = fields.into_iter()
let new_columns = fields
.into_iter()
.zip(struct_array.columns())
.map(|(field, column)| {
let dt = field.data_type().clone();
Ok((field,
remove_indices(column.clone(), dt, indices.clone())?))
Ok((field, remove_indices(column.clone(), dt, indices.clone())?))
})
.collect::<Result<Vec<_>>>()?;

Expand Down Expand Up @@ -1783,35 +1790,12 @@ impl<'a> ArrayReaderBuilder {
)?,
)),
PhysicalType::BYTE_ARRAY => match arrow_type {
// TODO: Replace with optimised dictionary reader (#171)
Some(ArrowType::Dictionary(_, _)) => {
match cur_type.get_basic_info().converted_type() {
ConvertedType::UTF8 => {
let converter = Utf8Converter::new(Utf8ArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
Utf8Converter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
_ => {
let converter = BinaryConverter::new(BinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
BinaryConverter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
}
}
Some(ArrowType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
),
_ => make_byte_array_reader(
page_iterator,
column_desc,
Expand Down Expand Up @@ -2025,7 +2009,7 @@ mod tests {
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::basic::{Encoding, Type as PhysicalType};
use crate::column::page::{Page, PageReader};
use crate::data_type::{ByteArray, DataType, Int32Type, Int64Type};
use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type, Int64Type};
use crate::errors::Result;
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::schema::parser::parse_message_type;
Expand Down
Loading

0 comments on commit d801ac2

Please sign in to comment.