Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve parquet reading performance for columns with nulls by preserving bitmask when possible (#1037) #1054

Merged
merged 6 commits into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions arrow/src/array/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,11 @@ impl BooleanBufferBuilder {
);
}

/// Returns the packed bits
pub fn as_slice(&self) -> &[u8] {
self.buffer.as_slice()
}

#[inline]
pub fn finish(&mut self) -> Buffer {
let buf = std::mem::replace(&mut self.buffer, MutableBuffer::new(0));
Expand Down
9 changes: 7 additions & 2 deletions parquet/benches/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,13 @@ fn create_int32_primitive_array_reader(
column_desc: ColumnDescPtr,
) -> impl ArrayReader {
use parquet::arrow::array_reader::PrimitiveArrayReader;
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
.unwrap()
PrimitiveArrayReader::<Int32Type>::new_with_options(
Box::new(page_iterator),
column_desc,
None,
true,
)
.unwrap()
}

fn create_string_arrow_array_reader(
Expand Down
92 changes: 62 additions & 30 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,17 @@ where
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
) -> Result<Self> {
Self::new_with_options(pages, column_desc, arrow_type, false)
}

/// Construct primitive array reader with ability to only compute null mask and not
/// buffer level data
pub fn new_with_options(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
null_mask_only: bool,
) -> Result<Self> {
// Check if Arrow type is specified, else create it from Parquet type
let data_type = match arrow_type {
Expand All @@ -256,7 +267,7 @@ where
.clone(),
};

let record_reader = RecordReader::<T>::new(column_desc.clone());
let record_reader = RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);

Ok(Self {
data_type,
Expand Down Expand Up @@ -1350,19 +1361,26 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
let mut new_context = context.clone();
new_context.path.append(vec![cur_type.name().to_string()]);

match cur_type.get_basic_info().repetition() {
let null_mask_only = match cur_type.get_basic_info().repetition() {
Repetition::REPEATED => {
new_context.def_level += 1;
new_context.rep_level += 1;
false
}
Repetition::OPTIONAL => {
new_context.def_level += 1;

// Can just compute null mask if no parent
context.def_level == 0 && context.rep_level == 0
}
_ => (),
}
_ => false,
};

let reader =
self.build_for_primitive_type_inner(cur_type.clone(), &new_context)?;
let reader = self.build_for_primitive_type_inner(
cur_type.clone(),
&new_context,
null_mask_only,
)?;

if cur_type.get_basic_info().repetition() == Repetition::REPEATED {
Err(ArrowError(
Expand Down Expand Up @@ -1641,6 +1659,7 @@ impl<'a> ArrayReaderBuilder {
&self,
cur_type: TypePtr,
context: &'a ArrayReaderBuilderContext,
null_mask_only: bool,
) -> Result<Box<dyn ArrayReader>> {
let column_desc = Arc::new(ColumnDescriptor::new(
cur_type.clone(),
Expand All @@ -1658,30 +1677,39 @@ impl<'a> ArrayReaderBuilder {
.map(|f| f.data_type().clone());

match cur_type.get_physical_type() {
PhysicalType::BOOLEAN => Ok(Box::new(PrimitiveArrayReader::<BoolType>::new(
page_iterator,
column_desc,
arrow_type,
)?)),
PhysicalType::BOOLEAN => Ok(Box::new(
PrimitiveArrayReader::<BoolType>::new_with_options(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
)?,
)),
PhysicalType::INT32 => {
if let Some(ArrowType::Null) = arrow_type {
Ok(Box::new(NullArrayReader::<Int32Type>::new(
page_iterator,
column_desc,
)?))
} else {
Ok(Box::new(PrimitiveArrayReader::<Int32Type>::new(
page_iterator,
column_desc,
arrow_type,
)?))
Ok(Box::new(
PrimitiveArrayReader::<Int32Type>::new_with_options(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
)?,
))
}
}
PhysicalType::INT64 => Ok(Box::new(PrimitiveArrayReader::<Int64Type>::new(
page_iterator,
column_desc,
arrow_type,
)?)),
PhysicalType::INT64 => Ok(Box::new(
PrimitiveArrayReader::<Int64Type>::new_with_options(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
)?,
)),
PhysicalType::INT96 => {
// get the optional timezone information from arrow type
let timezone = arrow_type
Expand All @@ -1705,18 +1733,22 @@ impl<'a> ArrayReaderBuilder {
arrow_type,
)?))
}
PhysicalType::FLOAT => Ok(Box::new(PrimitiveArrayReader::<FloatType>::new(
page_iterator,
column_desc,
arrow_type,
)?)),
PhysicalType::DOUBLE => {
Ok(Box::new(PrimitiveArrayReader::<DoubleType>::new(
PhysicalType::FLOAT => Ok(Box::new(
PrimitiveArrayReader::<FloatType>::new_with_options(
page_iterator,
column_desc,
arrow_type,
)?))
}
null_mask_only,
)?,
)),
PhysicalType::DOUBLE => Ok(Box::new(
PrimitiveArrayReader::<DoubleType>::new_with_options(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
)?,
)),
PhysicalType::BYTE_ARRAY => {
if cur_type.get_basic_info().converted_type() == ConvertedType::UTF8 {
if let Some(ArrowType::LargeUtf8) = arrow_type {
Expand Down
55 changes: 53 additions & 2 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,11 @@ mod tests {
use crate::file::properties::{WriterProperties, WriterVersion};
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::file::writer::{FileWriter, SerializedFileWriter};
use crate::schema::parser::parse_message_type;
use crate::schema::types::{Type, TypePtr};
use crate::util::test_common::{get_temp_filename, RandGen};
use crate::util::test_common::{get_temp_file, get_temp_filename, RandGen};
use arrow::array::*;
use arrow::datatypes::DataType as ArrowDataType;
use arrow::datatypes::{DataType as ArrowDataType, Field};
use arrow::record_batch::RecordBatchReader;
use rand::{thread_rng, RngCore};
use serde_json::json;
Expand Down Expand Up @@ -868,4 +869,54 @@ mod tests {
batch.unwrap();
}
}

#[test]
fn test_nested_nullability() {
let message_type = "message nested {
OPTIONAL Group group {
REQUIRED INT32 leaf;
}
}";

let file = get_temp_file("nested_nullability.parquet", &[]);
let schema = Arc::new(parse_message_type(message_type).unwrap());

{
// Write using low-level parquet API (#1167)
let writer_props = Arc::new(WriterProperties::builder().build());
let mut writer = SerializedFileWriter::new(
file.try_clone().unwrap(),
schema,
writer_props,
)
.unwrap();

let mut row_group_writer = writer.next_row_group().unwrap();
let mut column_writer = row_group_writer.next_column().unwrap().unwrap();

get_typed_column_writer_mut::<Int32Type>(&mut column_writer)
.write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
.unwrap();

row_group_writer.close_column(column_writer).unwrap();
writer.close_row_group(row_group_writer).unwrap();

writer.close().unwrap();
}

let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
let mut batch = ParquetFileArrowReader::new(file_reader);
let reader = batch.get_record_reader_by_columns(vec![0], 1024).unwrap();

let expected_schema = arrow::datatypes::Schema::new(vec![Field::new(
"group",
ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)]),
true,
)]);

let batch = reader.into_iter().next().unwrap().unwrap();
assert_eq!(batch.schema().as_ref(), &expected_schema);
assert_eq!(batch.num_rows(), 4);
assert_eq!(batch.column(0).data().null_count(), 2);
}
}
37 changes: 30 additions & 7 deletions parquet/src/arrow/record_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,35 @@ where
V: ValuesBuffer + Default,
CV: ColumnValueDecoder<Slice = V::Slice>,
{
/// Create a new [`GenericRecordReader`]
pub fn new(desc: ColumnDescPtr) -> Self {
let def_levels =
(desc.max_def_level() > 0).then(|| DefinitionLevelBuffer::new(&desc));
Self::new_with_options(desc, false)
}

/// Create a new [`GenericRecordReader`] with the ability to only generate the bitmask
///
/// If `null_mask_only` is true only the null bitmask will be generated and
/// [`Self::consume_def_levels`] and [`Self::consume_rep_levels`] will always return `None`
///
/// It is insufficient to solely check that that the max definition level is 1 as we
/// need there to be no nullable parent array that will required decoded definition levels
///
/// In particular consider the case of:
///
/// ```ignore
/// message nested {
/// OPTIONAL Group group {
/// REQUIRED INT32 leaf;
/// }
/// }
/// ```
///
/// The maximum definition level of leaf is 1, however, we still need to decode the
/// definition levels so that the parent group can be constructed correctly
///
pub(crate) fn new_with_options(desc: ColumnDescPtr, null_mask_only: bool) -> Self {
let def_levels = (desc.max_def_level() > 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the use of null_mask_only here -- I thought null_mask_only would be set only if max_def_level() == )

AKA https://github.com/apache/arrow-rs/pull/1054/files#diff-0d6bed48d78c5a2472b7680a8185cabdc0bd259d6484e184439ed7830060661fR1374

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment clarifying, its an edge case of nested nullability. Perhaps I should add an explicit test 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test added in 59846eb

.then(|| DefinitionLevelBuffer::new(&desc, null_mask_only));

let rep_levels = (desc.max_rep_level() > 0).then(ScalarBuffer::new);

Expand Down Expand Up @@ -171,7 +197,7 @@ where
/// as record values, e.g. those from `self.num_values` to `self.values_written`.
pub fn consume_def_levels(&mut self) -> Result<Option<Buffer>> {
Ok(match self.def_levels.as_mut() {
Some(x) => Some(x.split_off(self.num_values)),
Some(x) => x.split_levels(self.num_values),
None => None,
})
}
Expand Down Expand Up @@ -221,10 +247,7 @@ where
.as_mut()
.map(|levels| levels.spare_capacity_mut(batch_size));

let def_levels = self
.def_levels
.as_mut()
.map(|levels| levels.spare_capacity_mut(batch_size));
let def_levels = self.def_levels.as_mut();

let values = self.records.spare_capacity_mut(batch_size);

Expand Down
5 changes: 5 additions & 0 deletions parquet/src/arrow/record_reader/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ impl<T: ScalarValue> ScalarBuffer<T> {
self.len == 0
}

pub fn resize(&mut self, len: usize) {
self.buffer.resize(len * std::mem::size_of::<T>(), 0);
self.len = len;
}

#[inline]
pub fn as_slice(&self) -> &[T] {
let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() };
Expand Down
Loading