Skip to content

Commit

Permalink
Improve parquet reading performance for columns with nulls by preserv…
Browse files Browse the repository at this point in the history
…ing bitmask when possible (#1037) (#1054)

* Preserve bitmask (#1037)

* Remove now unnecessary box (#1061)

* Fix handling of empty bitmasks

* More docs

* Add nested nullability test case

* Add packed decoder test
  • Loading branch information
tustvold authored Jan 13, 2022
1 parent 4218c74 commit 231cf78
Show file tree
Hide file tree
Showing 8 changed files with 571 additions and 86 deletions.
5 changes: 5 additions & 0 deletions arrow/src/array/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,11 @@ impl BooleanBufferBuilder {
);
}

/// Returns the packed bits
pub fn as_slice(&self) -> &[u8] {
self.buffer.as_slice()
}

#[inline]
pub fn finish(&mut self) -> Buffer {
let buf = std::mem::replace(&mut self.buffer, MutableBuffer::new(0));
Expand Down
9 changes: 7 additions & 2 deletions parquet/benches/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,13 @@ fn create_int32_primitive_array_reader(
column_desc: ColumnDescPtr,
) -> impl ArrayReader {
use parquet::arrow::array_reader::PrimitiveArrayReader;
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
.unwrap()
PrimitiveArrayReader::<Int32Type>::new_with_options(
Box::new(page_iterator),
column_desc,
None,
true,
)
.unwrap()
}

fn create_string_arrow_array_reader(
Expand Down
92 changes: 62 additions & 30 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,17 @@ where
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
) -> Result<Self> {
Self::new_with_options(pages, column_desc, arrow_type, false)
}

/// Construct primitive array reader with ability to only compute null mask and not
/// buffer level data
pub fn new_with_options(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
null_mask_only: bool,
) -> Result<Self> {
// Check if Arrow type is specified, else create it from Parquet type
let data_type = match arrow_type {
Expand All @@ -256,7 +267,7 @@ where
.clone(),
};

let record_reader = RecordReader::<T>::new(column_desc.clone());
let record_reader = RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);

Ok(Self {
data_type,
Expand Down Expand Up @@ -1350,19 +1361,26 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
let mut new_context = context.clone();
new_context.path.append(vec![cur_type.name().to_string()]);

match cur_type.get_basic_info().repetition() {
let null_mask_only = match cur_type.get_basic_info().repetition() {
Repetition::REPEATED => {
new_context.def_level += 1;
new_context.rep_level += 1;
false
}
Repetition::OPTIONAL => {
new_context.def_level += 1;

// Can just compute null mask if no parent
context.def_level == 0 && context.rep_level == 0
}
_ => (),
}
_ => false,
};

let reader =
self.build_for_primitive_type_inner(cur_type.clone(), &new_context)?;
let reader = self.build_for_primitive_type_inner(
cur_type.clone(),
&new_context,
null_mask_only,
)?;

if cur_type.get_basic_info().repetition() == Repetition::REPEATED {
Err(ArrowError(
Expand Down Expand Up @@ -1641,6 +1659,7 @@ impl<'a> ArrayReaderBuilder {
&self,
cur_type: TypePtr,
context: &'a ArrayReaderBuilderContext,
null_mask_only: bool,
) -> Result<Box<dyn ArrayReader>> {
let column_desc = Arc::new(ColumnDescriptor::new(
cur_type.clone(),
Expand All @@ -1658,30 +1677,39 @@ impl<'a> ArrayReaderBuilder {
.map(|f| f.data_type().clone());

match cur_type.get_physical_type() {
PhysicalType::BOOLEAN => Ok(Box::new(PrimitiveArrayReader::<BoolType>::new(
page_iterator,
column_desc,
arrow_type,
)?)),
PhysicalType::BOOLEAN => Ok(Box::new(
PrimitiveArrayReader::<BoolType>::new_with_options(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
)?,
)),
PhysicalType::INT32 => {
if let Some(ArrowType::Null) = arrow_type {
Ok(Box::new(NullArrayReader::<Int32Type>::new(
page_iterator,
column_desc,
)?))
} else {
Ok(Box::new(PrimitiveArrayReader::<Int32Type>::new(
page_iterator,
column_desc,
arrow_type,
)?))
Ok(Box::new(
PrimitiveArrayReader::<Int32Type>::new_with_options(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
)?,
))
}
}
PhysicalType::INT64 => Ok(Box::new(PrimitiveArrayReader::<Int64Type>::new(
page_iterator,
column_desc,
arrow_type,
)?)),
PhysicalType::INT64 => Ok(Box::new(
PrimitiveArrayReader::<Int64Type>::new_with_options(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
)?,
)),
PhysicalType::INT96 => {
// get the optional timezone information from arrow type
let timezone = arrow_type
Expand All @@ -1705,18 +1733,22 @@ impl<'a> ArrayReaderBuilder {
arrow_type,
)?))
}
PhysicalType::FLOAT => Ok(Box::new(PrimitiveArrayReader::<FloatType>::new(
page_iterator,
column_desc,
arrow_type,
)?)),
PhysicalType::DOUBLE => {
Ok(Box::new(PrimitiveArrayReader::<DoubleType>::new(
PhysicalType::FLOAT => Ok(Box::new(
PrimitiveArrayReader::<FloatType>::new_with_options(
page_iterator,
column_desc,
arrow_type,
)?))
}
null_mask_only,
)?,
)),
PhysicalType::DOUBLE => Ok(Box::new(
PrimitiveArrayReader::<DoubleType>::new_with_options(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
)?,
)),
PhysicalType::BYTE_ARRAY => {
if cur_type.get_basic_info().converted_type() == ConvertedType::UTF8 {
if let Some(ArrowType::LargeUtf8) = arrow_type {
Expand Down
55 changes: 53 additions & 2 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,11 @@ mod tests {
use crate::file::properties::{WriterProperties, WriterVersion};
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::file::writer::{FileWriter, SerializedFileWriter};
use crate::schema::parser::parse_message_type;
use crate::schema::types::{Type, TypePtr};
use crate::util::test_common::{get_temp_filename, RandGen};
use crate::util::test_common::{get_temp_file, get_temp_filename, RandGen};
use arrow::array::*;
use arrow::datatypes::DataType as ArrowDataType;
use arrow::datatypes::{DataType as ArrowDataType, Field};
use arrow::record_batch::RecordBatchReader;
use rand::{thread_rng, RngCore};
use serde_json::json;
Expand Down Expand Up @@ -916,4 +917,54 @@ mod tests {
batch.unwrap();
}
}

#[test]
fn test_nested_nullability() {
let message_type = "message nested {
OPTIONAL Group group {
REQUIRED INT32 leaf;
}
}";

let file = get_temp_file("nested_nullability.parquet", &[]);
let schema = Arc::new(parse_message_type(message_type).unwrap());

{
// Write using low-level parquet API (#1167)
let writer_props = Arc::new(WriterProperties::builder().build());
let mut writer = SerializedFileWriter::new(
file.try_clone().unwrap(),
schema,
writer_props,
)
.unwrap();

let mut row_group_writer = writer.next_row_group().unwrap();
let mut column_writer = row_group_writer.next_column().unwrap().unwrap();

get_typed_column_writer_mut::<Int32Type>(&mut column_writer)
.write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
.unwrap();

row_group_writer.close_column(column_writer).unwrap();
writer.close_row_group(row_group_writer).unwrap();

writer.close().unwrap();
}

let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
let mut batch = ParquetFileArrowReader::new(file_reader);
let reader = batch.get_record_reader_by_columns(vec![0], 1024).unwrap();

let expected_schema = arrow::datatypes::Schema::new(vec![Field::new(
"group",
ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)]),
true,
)]);

let batch = reader.into_iter().next().unwrap().unwrap();
assert_eq!(batch.schema().as_ref(), &expected_schema);
assert_eq!(batch.num_rows(), 4);
assert_eq!(batch.column(0).data().null_count(), 2);
}
}
37 changes: 30 additions & 7 deletions parquet/src/arrow/record_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,35 @@ where
V: ValuesBuffer + Default,
CV: ColumnValueDecoder<Slice = V::Slice>,
{
/// Create a new [`GenericRecordReader`]
pub fn new(desc: ColumnDescPtr) -> Self {
let def_levels =
(desc.max_def_level() > 0).then(|| DefinitionLevelBuffer::new(&desc));
Self::new_with_options(desc, false)
}

/// Create a new [`GenericRecordReader`] with the ability to only generate the bitmask
///
/// If `null_mask_only` is true only the null bitmask will be generated and
/// [`Self::consume_def_levels`] and [`Self::consume_rep_levels`] will always return `None`
///
/// It is insufficient to solely check that that the max definition level is 1 as we
/// need there to be no nullable parent array that will required decoded definition levels
///
/// In particular consider the case of:
///
/// ```ignore
/// message nested {
/// OPTIONAL Group group {
/// REQUIRED INT32 leaf;
/// }
/// }
/// ```
///
/// The maximum definition level of leaf is 1, however, we still need to decode the
/// definition levels so that the parent group can be constructed correctly
///
pub(crate) fn new_with_options(desc: ColumnDescPtr, null_mask_only: bool) -> Self {
let def_levels = (desc.max_def_level() > 0)
.then(|| DefinitionLevelBuffer::new(&desc, null_mask_only));

let rep_levels = (desc.max_rep_level() > 0).then(ScalarBuffer::new);

Expand Down Expand Up @@ -171,7 +197,7 @@ where
/// as record values, e.g. those from `self.num_values` to `self.values_written`.
pub fn consume_def_levels(&mut self) -> Result<Option<Buffer>> {
Ok(match self.def_levels.as_mut() {
Some(x) => Some(x.split_off(self.num_values)),
Some(x) => x.split_levels(self.num_values),
None => None,
})
}
Expand Down Expand Up @@ -221,10 +247,7 @@ where
.as_mut()
.map(|levels| levels.spare_capacity_mut(batch_size));

let def_levels = self
.def_levels
.as_mut()
.map(|levels| levels.spare_capacity_mut(batch_size));
let def_levels = self.def_levels.as_mut();

let values = self.records.spare_capacity_mut(batch_size);

Expand Down
5 changes: 5 additions & 0 deletions parquet/src/arrow/record_reader/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ impl<T: ScalarValue> ScalarBuffer<T> {
self.len == 0
}

pub fn resize(&mut self, len: usize) {
self.buffer.resize(len * std::mem::size_of::<T>(), 0);
self.len = len;
}

#[inline]
pub fn as_slice(&self) -> &[T] {
let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() };
Expand Down
Loading

0 comments on commit 231cf78

Please sign in to comment.