Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve parquet reading performance for columns with nulls by preserving bitmask when possible (#1037) #1054

Merged
merged 6 commits into from
Jan 13, 2022

Conversation

tustvold
Copy link
Contributor

@tustvold tustvold commented Dec 17, 2021

Which issue does this PR close?

Highly experimental, builds on #1021 #1039 #1052 #1041

Closes #1037

Rationale for this change

See ticket.

This leads to anything from a 2-6x performance improvement when decoding columns containing nulls. As is to be expected the biggest savings are where the other decode overheads are less - with the 6x return on "Int32Array, plain encoded, optional, half NULLs - old "

There is some funkiness with the benchmarks and the memory allocator on my local machine, with it "faster" to preallocate a single 64 byte array first before trying to read data.

What changes are included in this PR?

This changes RecordReader to use a new DefinitionLevelBuffer that has a corresponding DefinitionLevelDecoder that can read directly from parquet. Skipping intermediate buffering, and avoiding decoding parquet bitmasks where not necessary

Are there any user-facing changes?

No

@github-actions github-actions bot added arrow Changes to the arrow crate parquet Changes to the parquet crate labels Dec 17, 2021
>;

#[doc(hidden)]
pub struct GenericColumnReader<R, D, V> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would help me to document somewhere what R, D and V are intended for (to make reading this code easier)

@tustvold
Copy link
Contributor Author

tustvold commented Dec 21, 2021

Pushed a PR that fixes a bug in the handling of ColumnLevelDecoder::read w.r.t ranges. This is an area of the traits that I'm currently not very happy with and largely stems from preserving the ability to pass [i32] to ColumnLevelDecoderImpl, which have no state about where they've been written up until and so need this to be passed in. I hope to remove this derp prior to merge of #1041 and by extension this

} else if self.packed_count != self.packed_offset {
let to_read = (self.packed_count - self.packed_offset).min(len - read);
let offset = self.data_offset * 8 + self.packed_offset;
buffer.append_packed_range(offset..offset + to_read, self.data.as_ref());
Copy link
Contributor

@yordan-pavlov yordan-pavlov Dec 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this is the main change in this PR? how often does this case happen for def levels in practice?

Copy link
Contributor Author

@tustvold tustvold Dec 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how often does this case happen for def levels in practice

This depends on what you mean by "this" 😅

The major change in this PR is not decoding definition levels for columns without nested nullability - i.e. max_def_level == 1, and just decoding directly to the null bitmask. This is very common, with almost all parquet data I've come across being flat.

My personal experience with projects trying to use nested data in parquet is eventually it becomes too much of a pain due to the patchy ecosystem support, and the schema ends up just getting flattened

Previously the code would allocate i16 buffers, populate them with the decoded data, and then deduce a null bitmask from these i16 buffers. This code will now decode directly to the null bitmask in the event of max_def_level == 1, avoiding allocations along with the costs associated with decode and bitmask reconstruction.

As an added bonus, it happens that by decoding directly we can exploit the inherent properties of the hybrid encoding to improve performance - with the packed representation already being a bitmask, and the RLE representation allowing operations on runs of bits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies, I should have been more explicit; what I meant is how common is it in practice to have max_def_level == 1 plus bit-packing of the def levels, because this is where the biggest optimization is, isn't it. RLE-encoded def level reading would still be better than before (as no intermediate translation into integers) and that's great, but probably not as fast as directly copying the bit-packed values. I do agree on flat parquet files being common though, most parquet files I have seen have been flat as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on second thought, reading of run-length-encoded def levels could be just as fast if append_packed could be used for it as well (except that the buffer to copy from would be a static buffer of all 1s of some fixed length)

Copy link
Contributor Author

@tustvold tustvold Dec 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plus bit-packing of the def levels

The logic within RleEncoder uses run-length encoding if the repetition count is greater than 8, otherwise it uses the bit-packed version. Therefore how common bit-packed sequences are depends on the distribution of nulls within the data.

TBC what is called RLE encoding by parquet is actually hybrid encoding, a page isn't entirely bit-packed or run-length encoded, but contains blocks of either

but probably not as fast as directly copying the bit-packed values

I'm not sure I agree with this, copying the bit-packed values is actually potentially more expensive, as it requires shifting and masking the source data. By contrast, inserting a run of nulls is simply a case of incrementing the length of the buffer (as everything is 0-initialized), whereas setting sequences of valid bits can be done at the byte level (or possibly larger).

while read != len {
if self.rle_left != 0 {
let to_read = self.rle_left.min(len - read);
buffer.append_n(to_read, self.rle_value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if append_n could be made faster using append_packed

@codecov-commenter
Copy link

codecov-commenter commented Jan 11, 2022

Codecov Report

Merging #1054 (55c2f6f) into master (06431ee) will increase coverage by 0.04%.
The diff coverage is 86.57%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #1054      +/-   ##
==========================================
+ Coverage   82.53%   82.58%   +0.04%     
==========================================
  Files         173      173              
  Lines       50615    50876     +261     
==========================================
+ Hits        41774    42014     +240     
- Misses       8841     8862      +21     
Impacted Files Coverage Δ
parquet/src/arrow/array_reader.rs 77.16% <81.57%> (-0.14%) ⬇️
...rquet/src/arrow/record_reader/definition_levels.rs 86.20% <86.74%> (-4.12%) ⬇️
arrow/src/array/builder.rs 86.50% <100.00%> (+0.01%) ⬆️
parquet/src/arrow/record_reader.rs 94.75% <100.00%> (+0.74%) ⬆️
parquet/src/arrow/record_reader/buffer.rs 86.00% <100.00%> (+0.89%) ⬆️
parquet/src/column/reader/decoder.rs 76.27% <100.00%> (ø)
arrow/src/array/mod.rs 100.00% <0.00%> (ø)
arrow/src/array/cast.rs 91.66% <0.00%> (ø)
arrow/src/buffer/ops.rs 96.77% <0.00%> (ø)
... and 4 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 06431ee...55c2f6f. Read the comment docs.

@tustvold tustvold marked this pull request as ready for review January 11, 2022 18:22
@tustvold
Copy link
Contributor Author

Looking into test failures

@alamb alamb changed the title Preserve Parquet Bitmask (#1037) Improve parquet reading performance for columns with nulls by preserving bitmask when possible (#1037) Jan 12, 2022
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is great work. Thank you @tustvold

@yordan-pavlov would you like to review again prior to merge?

/// [`Self::consume_def_levels`] and [`Self::consume_rep_levels`] will always return `None`
///
pub(crate) fn new_with_options(desc: ColumnDescPtr, null_mask_only: bool) -> Self {
let def_levels = (desc.max_def_level() > 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the use of null_mask_only here -- I thought null_mask_only would be set only if max_def_level() == )

AKA https://github.com/apache/arrow-rs/pull/1054/files#diff-0d6bed48d78c5a2472b7680a8185cabdc0bd259d6484e184439ed7830060661fR1374

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment clarifying, its an edge case of nested nullability. Perhaps I should add an explicit test 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test added in 59846eb

type Slice = [i16];
impl DefinitionLevelBuffer {
pub fn new(desc: &ColumnDescPtr, null_mask_only: bool) -> Self {
let inner = match null_mask_only {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why null_mask_only is passed down all the way here only to be rechecked / assert!ed.

Would it be possible / feasible to decide here in DefinitionLevelBuilder::new to use BufferInner::Mask if max_def_level() is 1 and max_rep_levels() is 0 and thus avoid passing plumbing the argument around?

let decoder = match self.data.take() {
Some(data) => self
.packed_decoder
.insert(PackedDecoder::new(self.encoding, data)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL: Option::insert 👍

}
}

struct PackedDecoder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code looks quite similar to BitReader https://github.com/tustvold/arrow-rs/blob/bitmask-preservation/parquet/src/util/bit_util.rs#L501

I wonder if you looked at possibly reusing that implmentation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The short answer is not using that implementation is the major reason this PR represents a non-trivial speed bump, it can decode more optimally as it can decode directly using append_packed_range / append_n. Will add some comments clarifying


use rand::{thread_rng, Rng, RngCore};

#[test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know there is now significant coverage of this code using the fuzz tests -- #1156 and friends.

Do you think that is sufficient coverage for PackedDecoder ? Or would some more targeted unit tests be valueble too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be possible to write a simple test that compares the output with that of BitReader 👍 Will do

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test added in b001f11

packed_offset: usize,
}

impl PackedDecoder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the details of the parquet format sufficiently to truly evaluate the correctness of this code; Perhaps some additional test coverage would help, but the fuzz testing may be good enough.

}
}

struct PackedDecoder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
struct PackedDecoder {
/// Specialized decoder for bitpacked hybrid format (TODO link) that contains
/// only 0 and 1 (for example, definition levels in a non-nested column)
/// that directly decodes into a bitmask in the fastest possible way
struct PackedDecoder {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to leave breadcrumbs for the next person to look at this code. Is this a correct description of what this structure implements?

@alamb
Copy link
Contributor

alamb commented Jan 12, 2022

Likewise cc @nevi-me @sunchao in case you are interested

@alamb
Copy link
Contributor

alamb commented Jan 12, 2022

Unless anyone wants additional time to review, I'll plan to merge this tomorrow

@@ -228,6 +232,20 @@ impl ColumnLevelDecoder for DefinitionLevelDecoder {
}
}

/// An optimized decoder for decoding [RLE] and [BIT_PACKED] data with a bit width of 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb alamb merged commit 231cf78 into apache:master Jan 13, 2022
@alamb
Copy link
Contributor

alamb commented Jan 13, 2022

Thanks @tustvold -- this is pretty epic

assert_eq!(range.start + writer.len, nulls.len());

let decoder = match self.data.take() {
Some(data) => self.column_decoder.insert(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like the intention is that self.data will only be used once (to create a ColumnLevelDecoderImpl) and if that's the case, why not move the entire match statement in the constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the type of writer determines the type of decoder. If BufferInner::Full it constructs ColumnLevelDecoderImpl, otherwise it constructs PackedDecoder. I guess we could just construct both, but this way you'd get a panic if you change writer type...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate parquet Changes to the parquet crate performance
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Parquet Preserve BitMask
4 participants