-
Notifications
You must be signed in to change notification settings - Fork 1k
Implement Push Parquet Decoder #7997
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
e203c17 to
4fe003f
Compare
|
Update: I spent this morning creating a "push" based ParquetMetadataDecoder as I found metadata is, of course, needed to configure the parquet metadata decoding process. I am quite pleased with the results |
cf1f993 to
58c033d
Compare
58c033d to
c6385ae
Compare
|
And the tests pass 🥁 🚀 Now I will do some test PRs to use this one implementation instead of the async reader (and I'll do the sync reader too) |
5fdc785 to
26f4ce4
Compare
26f4ce4 to
89cdfde
Compare
89cdfde to
bf6dd1f
Compare
cd3eea7 to
4a41365
Compare
Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com>
…o alamb/parquet_decoder
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much @adriangb and @mbrobbel for your comments on this PR. Super super helpful.
I'll plan to merge this over the next few days, after I:
- file a few more follow on tasks
- Verify benchmarks on this PR again
- get the PR to rewrite the async decoder ready
I also plan a blog post about this work:
| metadata: &'a ParquetMetaData, | ||
| } | ||
|
|
||
| impl InMemoryRowGroup<'_> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved!
|
|
||
| // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the | ||
| // `RowSelection` | ||
| let mut page_start_offsets: Vec<Vec<u64>> = vec![]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| _ => (), | ||
| } | ||
|
|
||
| // Expand selection to batch boundaries only for cached columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call -- I added some additional comments with context about the predicate cache and links
| /// Column chunk data representing only a subset of data pages | ||
| Sparse { | ||
| /// Length of the full column chunk | ||
| length: usize, | ||
| /// Subset of data pages included in this sparse chunk. | ||
| /// | ||
| /// Each element is a tuple of (page offset within file, page data). | ||
| /// Each entry is a complete page and the list is ordered by offset. | ||
| data: Vec<(usize, Bytes)>, | ||
| }, | ||
| /// Full column chunk and the offset within the original file | ||
| Dense { offset: usize, data: Bytes }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks -- this is just giving a name to what was already happening in
arrow-rs/parquet/src/arrow/async_reader/mod.rs
Lines 989 to 1083 in b9c2bf7
| let metadata = self.metadata.row_group(self.row_group_idx); | |
| if let Some((selection, offset_index)) = selection.zip(self.offset_index) { | |
| let expanded_selection = | |
| selection.expand_to_batch_boundaries(batch_size, self.row_count); | |
| // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the | |
| // `RowSelection` | |
| let mut page_start_offsets: Vec<Vec<u64>> = vec![]; | |
| let fetch_ranges = self | |
| .column_chunks | |
| .iter() | |
| .zip(metadata.columns()) | |
| .enumerate() | |
| .filter(|&(idx, (chunk, _chunk_meta))| { | |
| chunk.is_none() && projection.leaf_included(idx) | |
| }) | |
| .flat_map(|(idx, (_chunk, chunk_meta))| { | |
| // If the first page does not start at the beginning of the column, | |
| // then we need to also fetch a dictionary page. | |
| let mut ranges: Vec<Range<u64>> = vec![]; | |
| let (start, _len) = chunk_meta.byte_range(); | |
| match offset_index[idx].page_locations.first() { | |
| Some(first) if first.offset as u64 != start => { | |
| ranges.push(start..first.offset as u64); | |
| } | |
| _ => (), | |
| } | |
| // Expand selection to batch boundaries only for cached columns | |
| let use_expanded = cache_mask.map(|m| m.leaf_included(idx)).unwrap_or(false); | |
| if use_expanded { | |
| ranges.extend( | |
| expanded_selection.scan_ranges(&offset_index[idx].page_locations), | |
| ); | |
| } else { | |
| ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations)); | |
| } | |
| page_start_offsets.push(ranges.iter().map(|range| range.start).collect()); | |
| ranges | |
| }) | |
| .collect(); | |
| let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); | |
| let mut page_start_offsets = page_start_offsets.into_iter(); | |
| for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { | |
| if chunk.is_some() || !projection.leaf_included(idx) { | |
| continue; | |
| } | |
| if let Some(offsets) = page_start_offsets.next() { | |
| let mut chunks = Vec::with_capacity(offsets.len()); | |
| for _ in 0..offsets.len() { | |
| chunks.push(chunk_data.next().unwrap()); | |
| } | |
| *chunk = Some(Arc::new(ColumnChunkData::Sparse { | |
| length: metadata.column(idx).byte_range().1 as usize, | |
| data: offsets | |
| .into_iter() | |
| .map(|x| x as usize) | |
| .zip(chunks.into_iter()) | |
| .collect(), | |
| })) | |
| } | |
| } | |
| } else { | |
| let fetch_ranges = self | |
| .column_chunks | |
| .iter() | |
| .enumerate() | |
| .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx)) | |
| .map(|(idx, _chunk)| { | |
| let column = metadata.column(idx); | |
| let (start, length) = column.byte_range(); | |
| start..(start + length) | |
| }) | |
| .collect(); | |
| let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); | |
| for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { | |
| if chunk.is_some() || !projection.leaf_included(idx) { | |
| continue; | |
| } | |
| if let Some(data) = chunk_data.next() { | |
| *chunk = Some(Arc::new(ColumnChunkData::Dense { | |
| offset: metadata.column(idx).byte_range().0 as usize, | |
| data, | |
| })); | |
| } | |
| } | |
| } |
(but was stored in local variable names)
| /// | ||
| /// // In a loop, ask the decoder what it needs next, and provide it with the required data | ||
| /// loop { | ||
| /// match decoder.try_decode().unwrap() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes indeed, excellent call -- captured in
| /// Offset to apply to remaining row groups (decremented as rows are read) | ||
| offset: Option<usize>, | ||
|
|
||
| /// The size in bytes of the predicate cache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a link to RowGroupCache
| pub(crate) fn try_build( | ||
| &mut self, | ||
| ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> { | ||
| loop { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't called concurrently. try_transition can change the state as decoding proceeds. If the decoder already has enough data to potentially move on to the next state it begins doing so immediately via this loop -- I will make this clearer. in comments
| self.exclude_nested_columns_from_cache(&cache_projection) | ||
| } | ||
|
|
||
| /// Exclude leaves belonging to roots that span multiple parquet leaves (i.e. nested columns) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Short answer is I don't know -- this just preserves the existing semantics
Maybe @XiangpengHao can explain in more detail
| self.offset = offset; | ||
| self | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will file a ticket to track
| let Some(filter) = self.filter.take() else { | ||
| // no filter, start trying to read data immediately | ||
| return Ok(NextState::again(RowGroupDecoderState::StartData { | ||
| row_group_info, | ||
| column_chunks, | ||
| cache_info: None, | ||
| })); | ||
| }; | ||
| // no predicates in filter, so start reading immediately | ||
| if filter.predicates.is_empty() { | ||
| return Ok(NextState::again(RowGroupDecoderState::StartData { | ||
| row_group_info, | ||
| column_chunks, | ||
| cache_info: None, | ||
| })); | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it happens for each row group (aka row_group_info changes on each row group) I don't think it could be in RowGroupReaderBuilder::new()
We could possibly inline it in RowGroupReaderBuilder::next_row_group() 🤔
https://github.com/apache/arrow-rs/blob/10aae0fc6bc8020f77d584a5fa3cc0c5da605211/parquet/src/arrow/push_decoder/reader_builder/mod.rs#L244-L243
But I am not sure how much better that would be as it just moves the code around?
|
🤖 |
I agree it would be good to provide granular requests for data, but I think it is orthogonal to the 'what data to wait for until decoding can start' Right now, the readers (including the push decoder) will wait until all the data for a RowGroup (after filtering) is fetched Once the decoder can tell the caller how much data it really needs to decode something, then I think we'll be in a much better position to control CPU vs Memory |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
# Which issue does this PR close? - Part of #7983 - Part of #8000 - closes #8677 I am also working on a blog post about this - #8035 # TODOs - [x] Rewrite `test_cache_projection_excludes_nested_columns` in terms of higher level APIs (#8754) - [x] Benchmarks - [x] Benchmarks with DataFusion: apache/datafusion#18385 # Rationale for this change A new ParquetPushDecoder was implemented here - #7997 I need to refactor the async and sync readers to use the new push decoder in order to: 1. avoid the [xkcd standards effect](https://xkcd.com/927/) (aka there are now three control loops) 3. Prove that the push decoder works (by passing all the tests of the other two) 4. Set the stage for improving filter pushdown more with a single control loop <img width="400" alt="image" src="https://github.com/user-attachments/assets/e6886ee9-58b3-4a1e-8e88-9d2d03132b19" /> # What changes are included in this PR? 1. Refactor the `ParquetRecordBatchStream` to use `ParquetPushDecoder` # Are these changes tested? Yes, by the existing CI tests I also ran several benchmarks, both in arrow-rs and in DataFusion and I do not see any substantial performance difference (as expected): - apache/datafusion#18385 # Are there any user-facing changes? No --------- Co-authored-by: Vukasin Stefanovic <vukasin.stefanovic92@gmail.com>
Which issue does this PR close?
Part of [Epic] Parquet Reader Improvement Plan / Proposal - July 2025 #8000
closes Decouple IO and CPU operations in the Parquet Reader (push decoder) #7983
Rationale for this change
This PR is the first part of separating IO and decode operations in the rust parquet decoder.
Decoupling IO and CPU enables several important usecases:
ParquetRecordBatchStreamBuilderandParquetRecordBatchReaderBuilderWhat changes are included in this PR?
ParquetDecoderBuilder, andParquetDecoderand testsIt is effectively an explicit version of the state machine that is used in existing async reader (where the state machine is encoded as Rust
async/awaitstructures)Are these changes tested?
Yes -- there are extensive tests for the new code
Note that this PR actually adds a 3rd path for control flow (when I claim this will remove duplication!) In follow on PRs I will convert the existing readers to use this new pattern, similarly to the sequence I did for the metadata decoder:
Here is a preview of a PR that consolidates the async reader to use the push decoder internally (and removes one duplicate):
Rewrite
ParquetRecordBatchStreamin terms of the PushDecoder #8159closes [Parquet] Refactor InMemoryRowGroup to separate CPU and IO #8022
Are there any user-facing changes?
Yes, a new API, but now changes to the existing APIs