Skip to content

Commit e9ea12b

Browse files
alambmbrobbeladriangb
authored
Implement Push Parquet Decoder (#7997)
# Which issue does this PR close? - Part of #8000 - closes #7983 # Rationale for this change This PR is the first part of separating IO and decode operations in the rust parquet decoder. Decoupling IO and CPU enables several important usecases: 1. Different IO patterns (e.g. not buffer the entire row group at once) 2. Different IO APIs e.g. use io_uring, or OpenDAL, etc. 3. Deliberate prefetching within a file 4. Avoid code duplication between the `ParquetRecordBatchStreamBuilder` and `ParquetRecordBatchReaderBuilder` # What changes are included in this PR? 1. Add new `ParquetDecoderBuilder`, and `ParquetDecoder` and tests It is effectively an explicit version of the state machine that is used in existing async reader (where the state machine is encoded as Rust `async` / `await` structures) # Are these changes tested? Yes -- there are extensive tests for the new code Note that this PR actually adds a **3rd** path for control flow (when I claim this will remove duplication!) In follow on PRs I will convert the existing readers to use this new pattern, similarly to the sequence I did for the metadata decoder: - #8080 - #8340 Here is a preview of a PR that consolidates the async reader to use the push decoder internally (and removes one duplicate): - #8159 - closes #8022 # Are there any user-facing changes? Yes, a new API, but now changes to the existing APIs --------- Co-authored-by: Matthijs Brobbel <m1brobbel@gmail.com> Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com>
1 parent 0745bb4 commit e9ea12b

File tree

14 files changed

+2689
-242
lines changed

14 files changed

+2689
-242
lines changed

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ pub mod statistics;
5858
///
5959
/// * synchronous API: [`ParquetRecordBatchReaderBuilder::try_new`]
6060
/// * `async` API: [`ParquetRecordBatchStreamBuilder::new`]
61+
/// * decoder API: [`ParquetDecoderBuilder::new`]
6162
///
6263
/// # Features
6364
/// * Projection pushdown: [`Self::with_projection`]
@@ -93,6 +94,7 @@ pub mod statistics;
9394
/// Millisecond Latency] Arrow blog post.
9495
///
9596
/// [`ParquetRecordBatchStreamBuilder::new`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new
97+
/// [`ParquetDecoderBuilder::new`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder::new
9698
/// [Apache Arrow]: https://arrow.apache.org/
9799
/// [`StatisticsConverter`]: statistics::StatisticsConverter
98100
/// [Querying Parquet with Millisecond Latency]: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
@@ -992,12 +994,26 @@ impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
992994

993995
/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
994996
/// read from a parquet data source
997+
///
998+
/// This reader is created by [`ParquetRecordBatchReaderBuilder`], and has all
999+
/// the buffered state (DataPages, etc) necessary to decode the parquet data into
1000+
/// Arrow arrays.
9951001
pub struct ParquetRecordBatchReader {
9961002
array_reader: Box<dyn ArrayReader>,
9971003
schema: SchemaRef,
9981004
read_plan: ReadPlan,
9991005
}
10001006

1007+
impl Debug for ParquetRecordBatchReader {
1008+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1009+
f.debug_struct("ParquetRecordBatchReader")
1010+
.field("array_reader", &"...")
1011+
.field("schema", &self.schema)
1012+
.field("read_plan", &self.read_plan)
1013+
.finish()
1014+
}
1015+
}
1016+
10011017
impl Iterator for ParquetRecordBatchReader {
10021018
type Item = Result<RecordBatch, ArrowError>;
10031019

parquet/src/arrow/arrow_reader/read_plan.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use arrow_select::filter::prep_null_mask_filter;
2828
use std::collections::VecDeque;
2929

3030
/// A builder for [`ReadPlan`]
31-
#[derive(Clone)]
31+
#[derive(Clone, Debug)]
3232
pub struct ReadPlanBuilder {
3333
batch_size: usize,
3434
/// Current to apply, includes all filters
@@ -51,7 +51,6 @@ impl ReadPlanBuilder {
5151
}
5252

5353
/// Returns the current selection, if any
54-
#[cfg(feature = "async")]
5554
pub fn selection(&self) -> Option<&RowSelection> {
5655
self.selection.as_ref()
5756
}
@@ -76,7 +75,6 @@ impl ReadPlanBuilder {
7675
}
7776

7877
/// Returns the number of rows selected, or `None` if all rows are selected.
79-
#[cfg(feature = "async")]
8078
pub fn num_rows_selected(&self) -> Option<usize> {
8179
self.selection.as_ref().map(|s| s.row_count())
8280
}
@@ -230,6 +228,7 @@ impl LimitedReadPlanBuilder {
230228
/// A plan reading specific rows from a Parquet Row Group.
231229
///
232230
/// See [`ReadPlanBuilder`] to create `ReadPlan`s
231+
#[derive(Debug)]
233232
pub struct ReadPlan {
234233
/// The number of rows to read in each batch
235234
batch_size: usize,

parquet/src/arrow/arrow_reader/selection.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,6 @@ impl RowSelection {
447447
/// Expands the selection to align with batch boundaries.
448448
/// This is needed when using cached array readers to ensure that
449449
/// the cached data covers full batches.
450-
#[cfg(feature = "async")]
451450
pub(crate) fn expand_to_batch_boundaries(&self, batch_size: usize, total_rows: usize) -> Self {
452451
if batch_size == 0 {
453452
return self.clone();

parquet/src/arrow/async_reader/mod.rs

Lines changed: 19 additions & 231 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use std::pin::Pin;
2929
use std::sync::{Arc, Mutex};
3030
use std::task::{Context, Poll};
3131

32-
use bytes::{Buf, Bytes};
32+
use bytes::Bytes;
3333
use futures::future::{BoxFuture, FutureExt};
3434
use futures::ready;
3535
use futures::stream::Stream;
@@ -38,10 +38,6 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
3838
use arrow_array::RecordBatch;
3939
use arrow_schema::{DataType, Fields, Schema, SchemaRef};
4040

41-
use crate::arrow::ProjectionMask;
42-
use crate::arrow::array_reader::{
43-
ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache, RowGroups,
44-
};
4541
use crate::arrow::arrow_reader::{
4642
ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
4743
RowFilter, RowSelection,
@@ -51,20 +47,20 @@ use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash
5147
use crate::bloom_filter::{
5248
SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
5349
};
54-
use crate::column::page::{PageIterator, PageReader};
5550
use crate::errors::{ParquetError, Result};
5651
use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
57-
use crate::file::page_index::offset_index::OffsetIndexMetaData;
58-
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
5952

6053
mod metadata;
6154
pub use metadata::*;
6255

6356
#[cfg(feature = "object_store")]
6457
mod store;
6558

59+
use crate::arrow::ProjectionMask;
60+
use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache};
6661
use crate::arrow::arrow_reader::ReadPlanBuilder;
6762
use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
63+
use crate::arrow::in_memory_row_group::{FetchRanges, InMemoryRowGroup};
6864
use crate::arrow::schema::ParquetField;
6965
#[cfg(feature = "object_store")]
7066
pub use store::*;
@@ -571,6 +567,8 @@ struct ReaderFactory<T> {
571567
metrics: ArrowReaderMetrics,
572568

573569
/// Maximum size of the predicate cache
570+
///
571+
/// See [`RowGroupCache`] for details.
574572
max_predicate_cache_size: usize,
575573
}
576574

@@ -967,245 +965,35 @@ where
967965
}
968966
}
969967

970-
/// An in-memory collection of column chunks
971-
struct InMemoryRowGroup<'a> {
972-
offset_index: Option<&'a [OffsetIndexMetaData]>,
973-
/// Column chunks for this row group
974-
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
975-
row_count: usize,
976-
row_group_idx: usize,
977-
metadata: &'a ParquetMetaData,
978-
}
979-
968+
// Note this implementation is not with the rest of the InMemoryRowGroup
969+
// implementation because it relies on several async traits and types
970+
// that are only available when the "async" feature is enabled.
980971
impl InMemoryRowGroup<'_> {
981972
/// Fetches any additional column data specified in `projection` that is not already
982973
/// present in `self.column_chunks`.
983974
///
984975
/// If `selection` is provided, only the pages required for the selection
985976
/// are fetched. Otherwise, all pages are fetched.
986-
async fn fetch<T: AsyncFileReader + Send>(
977+
pub(crate) async fn fetch<T: AsyncFileReader + Send>(
987978
&mut self,
988979
input: &mut T,
989980
projection: &ProjectionMask,
990981
selection: Option<&RowSelection>,
991982
batch_size: usize,
992983
cache_mask: Option<&ProjectionMask>,
993984
) -> Result<()> {
994-
let metadata = self.metadata.row_group(self.row_group_idx);
995-
if let Some((selection, offset_index)) = selection.zip(self.offset_index) {
996-
let expanded_selection =
997-
selection.expand_to_batch_boundaries(batch_size, self.row_count);
998-
// If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the
999-
// `RowSelection`
1000-
let mut page_start_offsets: Vec<Vec<u64>> = vec![];
1001-
1002-
let fetch_ranges = self
1003-
.column_chunks
1004-
.iter()
1005-
.zip(metadata.columns())
1006-
.enumerate()
1007-
.filter(|&(idx, (chunk, _chunk_meta))| {
1008-
chunk.is_none() && projection.leaf_included(idx)
1009-
})
1010-
.flat_map(|(idx, (_chunk, chunk_meta))| {
1011-
// If the first page does not start at the beginning of the column,
1012-
// then we need to also fetch a dictionary page.
1013-
let mut ranges: Vec<Range<u64>> = vec![];
1014-
let (start, _len) = chunk_meta.byte_range();
1015-
match offset_index[idx].page_locations.first() {
1016-
Some(first) if first.offset as u64 != start => {
1017-
ranges.push(start..first.offset as u64);
1018-
}
1019-
_ => (),
1020-
}
1021-
1022-
// Expand selection to batch boundaries only for cached columns
1023-
let use_expanded = cache_mask.map(|m| m.leaf_included(idx)).unwrap_or(false);
1024-
if use_expanded {
1025-
ranges.extend(
1026-
expanded_selection.scan_ranges(&offset_index[idx].page_locations),
1027-
);
1028-
} else {
1029-
ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
1030-
}
1031-
page_start_offsets.push(ranges.iter().map(|range| range.start).collect());
1032-
1033-
ranges
1034-
})
1035-
.collect();
1036-
1037-
let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
1038-
let mut page_start_offsets = page_start_offsets.into_iter();
1039-
1040-
for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
1041-
if chunk.is_some() || !projection.leaf_included(idx) {
1042-
continue;
1043-
}
1044-
1045-
if let Some(offsets) = page_start_offsets.next() {
1046-
let mut chunks = Vec::with_capacity(offsets.len());
1047-
for _ in 0..offsets.len() {
1048-
chunks.push(chunk_data.next().unwrap());
1049-
}
1050-
1051-
*chunk = Some(Arc::new(ColumnChunkData::Sparse {
1052-
length: metadata.column(idx).byte_range().1 as usize,
1053-
data: offsets
1054-
.into_iter()
1055-
.map(|x| x as usize)
1056-
.zip(chunks.into_iter())
1057-
.collect(),
1058-
}))
1059-
}
1060-
}
1061-
} else {
1062-
let fetch_ranges = self
1063-
.column_chunks
1064-
.iter()
1065-
.enumerate()
1066-
.filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
1067-
.map(|(idx, _chunk)| {
1068-
let column = metadata.column(idx);
1069-
let (start, length) = column.byte_range();
1070-
start..(start + length)
1071-
})
1072-
.collect();
1073-
1074-
let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter();
1075-
1076-
for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
1077-
if chunk.is_some() || !projection.leaf_included(idx) {
1078-
continue;
1079-
}
1080-
1081-
if let Some(data) = chunk_data.next() {
1082-
*chunk = Some(Arc::new(ColumnChunkData::Dense {
1083-
offset: metadata.column(idx).byte_range().0 as usize,
1084-
data,
1085-
}));
1086-
}
1087-
}
1088-
}
1089-
985+
// Figure out what ranges to fetch
986+
let FetchRanges {
987+
ranges,
988+
page_start_offsets,
989+
} = self.fetch_ranges(projection, selection, batch_size, cache_mask);
990+
// do the actual fetch
991+
let chunk_data = input.get_byte_ranges(ranges).await?.into_iter();
992+
// update our in memory buffers (self.column_chunks) with the fetched data
993+
self.fill_column_chunks(projection, page_start_offsets, chunk_data);
1090994
Ok(())
1091995
}
1092996
}
1093-
1094-
impl RowGroups for InMemoryRowGroup<'_> {
1095-
fn num_rows(&self) -> usize {
1096-
self.row_count
1097-
}
1098-
1099-
/// Return chunks for column i
1100-
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1101-
match &self.column_chunks[i] {
1102-
None => Err(ParquetError::General(format!(
1103-
"Invalid column index {i}, column was not fetched"
1104-
))),
1105-
Some(data) => {
1106-
let page_locations = self
1107-
.offset_index
1108-
// filter out empty offset indexes (old versions specified Some(vec![]) when no present)
1109-
.filter(|index| !index.is_empty())
1110-
.map(|index| index[i].page_locations.clone());
1111-
let column_chunk_metadata = self.metadata.row_group(self.row_group_idx).column(i);
1112-
let page_reader = SerializedPageReader::new(
1113-
data.clone(),
1114-
column_chunk_metadata,
1115-
self.row_count,
1116-
page_locations,
1117-
)?;
1118-
let page_reader = page_reader.add_crypto_context(
1119-
self.row_group_idx,
1120-
i,
1121-
self.metadata,
1122-
column_chunk_metadata,
1123-
)?;
1124-
1125-
let page_reader: Box<dyn PageReader> = Box::new(page_reader);
1126-
1127-
Ok(Box::new(ColumnChunkIterator {
1128-
reader: Some(Ok(page_reader)),
1129-
}))
1130-
}
1131-
}
1132-
}
1133-
}
1134-
1135-
/// An in-memory column chunk
1136-
#[derive(Clone)]
1137-
enum ColumnChunkData {
1138-
/// Column chunk data representing only a subset of data pages
1139-
Sparse {
1140-
/// Length of the full column chunk
1141-
length: usize,
1142-
/// Subset of data pages included in this sparse chunk.
1143-
///
1144-
/// Each element is a tuple of (page offset within file, page data).
1145-
/// Each entry is a complete page and the list is ordered by offset.
1146-
data: Vec<(usize, Bytes)>,
1147-
},
1148-
/// Full column chunk and the offset within the original file
1149-
Dense { offset: usize, data: Bytes },
1150-
}
1151-
1152-
impl ColumnChunkData {
1153-
/// Return the data for this column chunk at the given offset
1154-
fn get(&self, start: u64) -> Result<Bytes> {
1155-
match &self {
1156-
ColumnChunkData::Sparse { data, .. } => data
1157-
.binary_search_by_key(&start, |(offset, _)| *offset as u64)
1158-
.map(|idx| data[idx].1.clone())
1159-
.map_err(|_| {
1160-
ParquetError::General(format!(
1161-
"Invalid offset in sparse column chunk data: {start}"
1162-
))
1163-
}),
1164-
ColumnChunkData::Dense { offset, data } => {
1165-
let start = start as usize - *offset;
1166-
Ok(data.slice(start..))
1167-
}
1168-
}
1169-
}
1170-
}
1171-
1172-
impl Length for ColumnChunkData {
1173-
/// Return the total length of the full column chunk
1174-
fn len(&self) -> u64 {
1175-
match &self {
1176-
ColumnChunkData::Sparse { length, .. } => *length as u64,
1177-
ColumnChunkData::Dense { data, .. } => data.len() as u64,
1178-
}
1179-
}
1180-
}
1181-
1182-
impl ChunkReader for ColumnChunkData {
1183-
type T = bytes::buf::Reader<Bytes>;
1184-
1185-
fn get_read(&self, start: u64) -> Result<Self::T> {
1186-
Ok(self.get(start)?.reader())
1187-
}
1188-
1189-
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
1190-
Ok(self.get(start)?.slice(..length))
1191-
}
1192-
}
1193-
1194-
/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
1195-
struct ColumnChunkIterator {
1196-
reader: Option<Result<Box<dyn PageReader>>>,
1197-
}
1198-
1199-
impl Iterator for ColumnChunkIterator {
1200-
type Item = Result<Box<dyn PageReader>>;
1201-
1202-
fn next(&mut self) -> Option<Self::Item> {
1203-
self.reader.take()
1204-
}
1205-
}
1206-
1207-
impl PageIterator for ColumnChunkIterator {}
1208-
1209997
#[cfg(test)]
1210998
mod tests {
1211999
use super::*;

0 commit comments

Comments
 (0)