Skip to content

Commit

Permalink
Preserve dictionary encoding from parquet (apache#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jan 14, 2022
1 parent 93717bd commit 400d8c7
Show file tree
Hide file tree
Showing 7 changed files with 923 additions and 374 deletions.
87 changes: 34 additions & 53 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ use std::sync::Arc;
use std::vec::Vec;

use arrow::array::{
new_empty_array, Array, ArrayData, ArrayDataBuilder, ArrayRef, BinaryArray,
BinaryBuilder, BooleanArray, BooleanBufferBuilder, BooleanBuilder, DecimalBuilder,
FixedSizeBinaryArray, FixedSizeBinaryBuilder, GenericListArray, Int16BufferBuilder,
Int32Array, Int64Array, MapArray, OffsetSizeTrait, PrimitiveArray, PrimitiveBuilder,
Array, ArrayData, ArrayDataBuilder, ArrayRef, BinaryArray, BinaryBuilder,
BooleanArray, BooleanBufferBuilder, BooleanBuilder, DecimalBuilder, FixedSizeBinaryArray,
FixedSizeBinaryBuilder, GenericListArray, Int16BufferBuilder, Int32Array,
Int64Array, MapArray, new_empty_array, OffsetSizeTrait, PrimitiveArray, PrimitiveBuilder,
StringArray, StringBuilder, StructArray,
};
use arrow::buffer::{Buffer, MutableBuffer};
Expand All @@ -45,33 +45,34 @@ use arrow::datatypes::{
Time32MillisecondType as ArrowTime32MillisecondType,
Time32SecondType as ArrowTime32SecondType,
Time64MicrosecondType as ArrowTime64MicrosecondType,
Time64NanosecondType as ArrowTime64NanosecondType, TimeUnit as ArrowTimeUnit,
TimestampMicrosecondType as ArrowTimestampMicrosecondType,
Time64NanosecondType as ArrowTime64NanosecondType, TimestampMicrosecondType as ArrowTimestampMicrosecondType,
TimestampMillisecondType as ArrowTimestampMillisecondType,
TimestampNanosecondType as ArrowTimestampNanosecondType,
TimestampSecondType as ArrowTimestampSecondType, ToByteSlice,
TimestampSecondType as ArrowTimestampSecondType,
TimeUnit as ArrowTimeUnit, ToByteSlice,
UInt16Type as ArrowUInt16Type, UInt32Type as ArrowUInt32Type,
UInt64Type as ArrowUInt64Type, UInt8Type as ArrowUInt8Type,
};
use arrow::util::bit_util;
use byte_array::make_byte_array_reader;
use byte_array_dictionary::make_byte_array_dictionary_reader;

use crate::arrow::converter::{
BinaryArrayConverter, BinaryConverter, Converter, DecimalArrayConverter,
DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter,
Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter,
IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
IntervalYearMonthConverter, Utf8ArrayConverter, Utf8Converter,
Converter, DecimalArrayConverter, DecimalConverter, FixedLenBinaryConverter,
FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter,
IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
IntervalYearMonthArrayConverter, IntervalYearMonthConverter,
};
use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer};
use crate::arrow::record_reader::{GenericRecordReader, RecordReader};
use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer};
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{ConvertedType, Repetition, Type as PhysicalType};
use crate::column::page::PageIterator;
use crate::column::reader::decoder::ColumnValueDecoder;
use crate::column::reader::ColumnReaderImpl;
use crate::column::reader::decoder::ColumnValueDecoder;
use crate::data_type::{
BoolType, ByteArrayType, DataType, DoubleType, FixedLenByteArrayType, FloatType,
Int32Type, Int64Type, Int96Type,
BoolType, DataType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type,
Int64Type, Int96Type,
};
use crate::errors::{ParquetError, ParquetError::ArrowError, Result};
use crate::file::reader::{FilePageIterator, FileReader};
Expand All @@ -81,8 +82,9 @@ use crate::schema::types::{
use crate::schema::visitor::TypeVisitor;

mod byte_array;

pub use byte_array::ByteArrayReader;
mod byte_array_dictionary;
mod dictionary_buffer;
mod offset_buffer;

/// Array reader reads parquet data into arrow array.
pub trait ArrayReader {
Expand Down Expand Up @@ -270,7 +272,8 @@ where
.clone(),
};

let record_reader = RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);
let record_reader =
RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);

Ok(Self {
data_type,
Expand Down Expand Up @@ -1753,40 +1756,18 @@ impl<'a> ArrayReaderBuilder {
)?,
)),
PhysicalType::BYTE_ARRAY => match arrow_type {
// TODO: Replace with optimised dictionary reader (#171)
Some(ArrowType::Dictionary(_, _)) => {
match cur_type.get_basic_info().converted_type() {
ConvertedType::UTF8 => {
let converter = Utf8Converter::new(Utf8ArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
Utf8Converter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
_ => {
let converter = BinaryConverter::new(BinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
BinaryConverter,
>::new(
page_iterator,
column_desc,
converter,
arrow_type,
)?))
}
}
}
_ => Ok(Box::new(ByteArrayReader::new(
Some(ArrowType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
page_iterator,
column_desc,
arrow_type,
)?)),
null_mask_only,
),
_ => make_byte_array_reader(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
),
},
PhysicalType::FIXED_LEN_BYTE_ARRAY
if cur_type.get_basic_info().converted_type()
Expand Down Expand Up @@ -1974,8 +1955,8 @@ mod tests {
use std::collections::VecDeque;
use std::sync::Arc;

use rand::{Rng, thread_rng};
use rand::distributions::uniform::SampleUniform;
use rand::{thread_rng, Rng};

use arrow::array::{
Array, ArrayRef, LargeListArray, ListArray, PrimitiveArray, StringArray,
Expand All @@ -1994,15 +1975,15 @@ mod tests {
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::basic::{Encoding, Type as PhysicalType};
use crate::column::page::{Page, PageReader};
use crate::data_type::{ByteArray, DataType, Int32Type, Int64Type};
use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type, Int64Type};
use crate::errors::Result;
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::schema::parser::parse_message_type;
use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
use crate::util::test_common::{get_test_file, make_pages};
use crate::util::test_common::page_util::{
DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator,
};
use crate::util::test_common::{get_test_file, make_pages};

use super::*;

Expand Down
Loading

0 comments on commit 400d8c7

Please sign in to comment.