Skip to content

Commit cd3eea7

Browse files
committed
Implement ParquetDecoder push API
1 parent 3a61625 commit cd3eea7

File tree

14 files changed

+2403
-283
lines changed

14 files changed

+2403
-283
lines changed

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ pub mod statistics;
5858
///
5959
/// * synchronous API: [`ParquetRecordBatchReaderBuilder::try_new`]
6060
/// * `async` API: [`ParquetRecordBatchStreamBuilder::new`]
61+
/// * decoder API: [`ParquetDecoderBuilder::new`]
6162
///
6263
/// # Features
6364
/// * Projection pushdown: [`Self::with_projection`]
@@ -93,6 +94,7 @@ pub mod statistics;
9394
/// Millisecond Latency] Arrow blog post.
9495
///
9596
/// [`ParquetRecordBatchStreamBuilder::new`]: crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new
97+
/// [`ParquetDecoderBuilder::new`]: crate::arrow::push_decoder::ParquetPushDecoderBuilder::new
9698
/// [Apache Arrow]: https://arrow.apache.org/
9799
/// [`StatisticsConverter`]: statistics::StatisticsConverter
98100
/// [Querying Parquet with Millisecond Latency]: https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
@@ -991,12 +993,26 @@ impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
991993

992994
/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
993995
/// read from a parquet data source
996+
///
997+
/// This reader is created by [`ParquetRecordBatchReaderBuilder`], and has all
998+
/// the buffered state (DataPages, etc) necessary to decode the parquet data into
999+
/// Arrow arrays.
9941000
pub struct ParquetRecordBatchReader {
9951001
array_reader: Box<dyn ArrayReader>,
9961002
schema: SchemaRef,
9971003
read_plan: ReadPlan,
9981004
}
9991005

1006+
impl Debug for ParquetRecordBatchReader {
1007+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1008+
f.debug_struct("ParquetRecordBatchReader")
1009+
.field("array_reader", &"...")
1010+
.field("schema", &self.schema)
1011+
.field("read_plan", &self.read_plan)
1012+
.finish()
1013+
}
1014+
}
1015+
10001016
impl Iterator for ParquetRecordBatchReader {
10011017
type Item = Result<RecordBatch, ArrowError>;
10021018

parquet/src/arrow/arrow_reader/read_plan.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use arrow_select::filter::prep_null_mask_filter;
2828
use std::collections::VecDeque;
2929

3030
/// A builder for [`ReadPlan`]
31-
#[derive(Clone)]
31+
#[derive(Clone, Debug)]
3232
pub(crate) struct ReadPlanBuilder {
3333
batch_size: usize,
3434
/// Current to apply, includes all filters
@@ -51,7 +51,6 @@ impl ReadPlanBuilder {
5151
}
5252

5353
/// Returns the current selection, if any
54-
#[cfg(feature = "async")]
5554
pub(crate) fn selection(&self) -> Option<&RowSelection> {
5655
self.selection.as_ref()
5756
}
@@ -76,7 +75,6 @@ impl ReadPlanBuilder {
7675
}
7776

7877
/// Returns the number of rows selected, or `None` if all rows are selected.
79-
#[cfg(feature = "async")]
8078
pub(crate) fn num_rows_selected(&self) -> Option<usize> {
8179
self.selection.as_ref().map(|s| s.row_count())
8280
}
@@ -230,6 +228,7 @@ impl LimitedReadPlanBuilder {
230228
/// A plan reading specific rows from a Parquet Row Group.
231229
///
232230
/// See [`ReadPlanBuilder`] to create `ReadPlan`s
231+
#[derive(Debug)]
233232
pub(crate) struct ReadPlan {
234233
/// The number of rows to read in each batch
235234
batch_size: usize,

parquet/src/arrow/arrow_reader/selection.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,6 @@ impl RowSelection {
445445
/// Expands the selection to align with batch boundaries.
446446
/// This is needed when using cached array readers to ensure that
447447
/// the cached data covers full batches.
448-
#[cfg(feature = "async")]
449448
pub(crate) fn expand_to_batch_boundaries(&self, batch_size: usize, total_rows: usize) -> Self {
450449
if batch_size == 0 {
451450
return self.clone();

0 commit comments

Comments
 (0)