From a6b65fbcbd8846d93dd761fd61d94d023511586f Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Mon, 30 Jan 2023 13:02:30 +0200 Subject: [PATCH 1/4] Add limit to ArrowReaderBuilder to push limit down to parquet reader --- parquet/src/arrow/arrow_reader/mod.rs | 56 +++++++++++- parquet/src/arrow/arrow_reader/selection.rs | 95 ++++++++++++++++++++- 2 files changed, 149 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 87165ef8e575..70ab098ab025 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -69,6 +69,8 @@ pub struct ArrowReaderBuilder { pub(crate) filter: Option, pub(crate) selection: Option, + + pub(crate) limit: Option, } impl ArrowReaderBuilder { @@ -98,6 +100,7 @@ impl ArrowReaderBuilder { projection: ProjectionMask::all(), filter: None, selection: None, + limit: None, }) } @@ -167,6 +170,17 @@ impl ArrowReaderBuilder { ..self } } + + /// Provide a limit to the number of rows to be read + /// + /// The limit will be used to generate an `RowSelection` so only `limit` + /// rows are decoded + pub fn with_limit(self, limit: usize) -> Self { + Self { + limit: Some(limit), + ..self + } + } } /// Arrow reader api. @@ -453,6 +467,17 @@ impl ArrowReaderBuilder> { selection = Some(RowSelection::from(vec![])); } + // If a limit is defined, apply it to the final `RowSelection` + if let Some(limit) = self.limit { + selection = Some( + selection + .map(|selection| selection.limit(limit)) + .unwrap_or_else(|| { + RowSelection::from_limit(limit, reader.num_rows()) + }), + ); + } + Ok(ParquetRecordBatchReader::new( batch_size, array_reader, @@ -1215,6 +1240,8 @@ mod tests { row_selections: Option<(RowSelection, usize)>, /// row filter row_filter: Option>, + /// limit + limit: Option, } /// Manually implement this to avoid printing entire contents of row_selections and row_filter @@ -1233,6 +1260,7 @@ mod tests { .field("encoding", &self.encoding) .field("row_selections", &self.row_selections.is_some()) .field("row_filter", &self.row_filter.is_some()) + .field("limit", &self.limit) .finish() } } @@ -1252,6 +1280,7 @@ mod tests { encoding: Encoding::PLAIN, row_selections: None, row_filter: None, + limit: None, } } } @@ -1323,6 +1352,13 @@ mod tests { } } + fn with_limit(self, limit: usize) -> Self { + Self { + limit: Some(limit), + ..self + } + } + fn writer_props(&self) -> WriterProperties { let builder = WriterProperties::builder() .set_data_pagesize_limit(self.max_data_page_size) @@ -1381,6 +1417,14 @@ mod tests { TestOptions::new(2, 256, 127).with_null_percent(0), // Test optional with nulls TestOptions::new(2, 256, 93).with_null_percent(25), + // Test with limit of 0 + TestOptions::new(4, 100, 25).with_limit(0), + // Test with limit of 50 + TestOptions::new(4, 100, 25).with_limit(50), + // Test with limit equal to number of rows + TestOptions::new(4, 100, 25).with_limit(10), + // Test with limit larger than number of rows + TestOptions::new(4, 100, 25).with_limit(101), // Test with no page-level statistics TestOptions::new(2, 256, 91) .with_null_percent(25) @@ -1423,6 +1467,11 @@ mod tests { TestOptions::new(2, 256, 93) .with_null_percent(25) .with_row_selections(), + // Test optional with nulls + TestOptions::new(2, 256, 93) + .with_null_percent(25) + .with_row_selections() + .with_limit(10), // Test filter // Test with row filter @@ -1592,7 +1641,7 @@ mod tests { } }; - let expected_data = match opts.row_filter { + let mut expected_data = match opts.row_filter { Some(filter) => { let expected_data = expected_data .into_iter() @@ -1622,6 +1671,11 @@ mod tests { None => expected_data, }; + if let Some(limit) = opts.limit { + builder = builder.with_limit(limit); + expected_data = expected_data.into_iter().take(limit).collect(); + } + let mut record_reader = builder .with_batch_size(opts.record_batch_size) .build() diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 03c7e01e0840..1e32b5510760 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -19,6 +19,7 @@ use arrow_array::{Array, BooleanArray}; use arrow_select::filter::SlicesIterator; use std::cmp::Ordering; use std::collections::VecDeque; +use std::mem; use std::ops::Range; /// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when @@ -110,8 +111,18 @@ impl RowSelection { Self::from_consecutive_ranges(iter, total_rows) } + /// Creates a [`RowSelection`] that will select `limit` rows and skip all remaining rows. + pub(crate) fn from_limit(limit: usize, total_rows: usize) -> Self { + Self { + selectors: vec![ + RowSelector::select(limit), + RowSelector::skip(total_rows.saturating_sub(limit)), + ], + } + } + /// Creates a [`RowSelection`] from an iterator of consecutive ranges to keep - fn from_consecutive_ranges>>( + pub(crate) fn from_consecutive_ranges>>( ranges: I, total_rows: usize, ) -> Self { @@ -371,6 +382,35 @@ impl RowSelection { self } + /// Limit this [`RowSelection`] to only select `limit` rows + pub(crate) fn limit(mut self, mut limit: usize) -> Self { + let mut remaining = 0; + let mut new_selectors = Vec::with_capacity(self.selectors.len()); + for selection in mem::take(&mut self.selectors) { + if limit == 0 { + remaining += selection.row_count; + } else if !selection.skip { + if selection.row_count > limit { + remaining += selection.row_count - limit; + new_selectors.push(RowSelector::select(limit)); + limit = 0; + } else { + limit -= selection.row_count; + new_selectors.push(selection); + } + } else { + new_selectors.push(selection); + } + } + + if remaining > 0 { + new_selectors.push(RowSelector::skip(remaining)); + } + + self.selectors = new_selectors; + self + } + /// Returns an iterator over the [`RowSelector`]s for this /// [`RowSelection`]. pub fn iter(&self) -> impl Iterator { @@ -841,6 +881,59 @@ mod tests { assert_eq!(selectors, round_tripped); } + #[test] + fn test_limit() { + // Limit to existing limit should no-op + let selection = RowSelection::from_limit(10, 100); + let limited = selection.clone().limit(10); + assert_eq!(selection, limited); + + let selection = RowSelection::from(vec![ + RowSelector::select(10), + RowSelector::skip(10), + RowSelector::select(10), + RowSelector::skip(10), + RowSelector::select(10), + ]); + + let limited = selection.clone().limit(5); + let expected = vec![RowSelector::select(5), RowSelector::skip(45)]; + assert_eq!(limited.selectors, expected); + + let limited = selection.clone().limit(15); + let expected = vec![ + RowSelector::select(10), + RowSelector::skip(10), + RowSelector::select(5), + RowSelector::skip(25), + ]; + assert_eq!(limited.selectors, expected); + + let limited = selection.clone().limit(0); + let expected = vec![RowSelector::skip(50)]; + assert_eq!(limited.selectors, expected); + + let limited = selection.clone().limit(30); + let expected = vec![ + RowSelector::select(10), + RowSelector::skip(10), + RowSelector::select(10), + RowSelector::skip(10), + RowSelector::select(10), + ]; + assert_eq!(limited.selectors, expected); + + let limited = selection.limit(100); + let expected = vec![ + RowSelector::select(10), + RowSelector::skip(10), + RowSelector::select(10), + RowSelector::skip(10), + RowSelector::select(10), + ]; + assert_eq!(limited.selectors, expected); + } + #[test] fn test_scan_ranges() { let index = vec![ From db9c2e7ba8299c2b95dcb816b9305d4c638e5e95 Mon Sep 17 00:00:00 2001 From: Dan Harris <1327726+thinkharderdev@users.noreply.github.com> Date: Mon, 30 Jan 2023 14:14:46 +0200 Subject: [PATCH 2/4] Update parquet/src/arrow/arrow_reader/mod.rs Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- parquet/src/arrow/arrow_reader/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 70ab098ab025..d4acbf86dc2f 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -173,8 +173,8 @@ impl ArrowReaderBuilder { /// Provide a limit to the number of rows to be read /// - /// The limit will be used to generate an `RowSelection` so only `limit` - /// rows are decoded + /// The limit will be applied after any [`Self::with_row_selection`] and [`Self::with_row_filter`] + /// allowing it to limit the final set of rows decoded after any pushed down predicates pub fn with_limit(self, limit: usize) -> Self { Self { limit: Some(limit), From 59eccc544784cec087225d6dd6330b20721b8244 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Mon, 30 Jan 2023 15:04:16 +0200 Subject: [PATCH 3/4] pr comments --- parquet/src/arrow/arrow_reader/mod.rs | 4 +- parquet/src/arrow/arrow_reader/selection.rs | 43 ++++------- parquet/src/arrow/async_reader/mod.rs | 84 ++++++++++++++++++++- 3 files changed, 100 insertions(+), 31 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index d4acbf86dc2f..c4b645da7ce5 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -473,7 +473,9 @@ impl ArrowReaderBuilder> { selection .map(|selection| selection.limit(limit)) .unwrap_or_else(|| { - RowSelection::from_limit(limit, reader.num_rows()) + RowSelection::from(vec![RowSelector::select( + limit.min(reader.num_rows()), + )]) }), ); } diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 1e32b5510760..d2af4516dd08 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -111,16 +111,6 @@ impl RowSelection { Self::from_consecutive_ranges(iter, total_rows) } - /// Creates a [`RowSelection`] that will select `limit` rows and skip all remaining rows. - pub(crate) fn from_limit(limit: usize, total_rows: usize) -> Self { - Self { - selectors: vec![ - RowSelector::select(limit), - RowSelector::skip(total_rows.saturating_sub(limit)), - ], - } - } - /// Creates a [`RowSelection`] from an iterator of consecutive ranges to keep pub(crate) fn from_consecutive_ranges>>( ranges: I, @@ -384,16 +374,17 @@ impl RowSelection { /// Limit this [`RowSelection`] to only select `limit` rows pub(crate) fn limit(mut self, mut limit: usize) -> Self { - let mut remaining = 0; let mut new_selectors = Vec::with_capacity(self.selectors.len()); - for selection in mem::take(&mut self.selectors) { + for mut selection in mem::take(&mut self.selectors) { if limit == 0 { - remaining += selection.row_count; - } else if !selection.skip { - if selection.row_count > limit { - remaining += selection.row_count - limit; - new_selectors.push(RowSelector::select(limit)); - limit = 0; + break; + } + + if !selection.skip { + if selection.row_count >= limit { + selection.row_count = limit; + new_selectors.push(selection); + break; } else { limit -= selection.row_count; new_selectors.push(selection); @@ -403,10 +394,6 @@ impl RowSelection { } } - if remaining > 0 { - new_selectors.push(RowSelector::skip(remaining)); - } - self.selectors = new_selectors; self } @@ -884,9 +871,10 @@ mod tests { #[test] fn test_limit() { // Limit to existing limit should no-op - let selection = RowSelection::from_limit(10, 100); - let limited = selection.clone().limit(10); - assert_eq!(selection, limited); + let selection = + RowSelection::from(vec![RowSelector::select(10), RowSelector::skip(90)]); + let limited = selection.limit(10); + assert_eq!(RowSelection::from(vec![RowSelector::select(10)]), limited); let selection = RowSelection::from(vec![ RowSelector::select(10), @@ -897,7 +885,7 @@ mod tests { ]); let limited = selection.clone().limit(5); - let expected = vec![RowSelector::select(5), RowSelector::skip(45)]; + let expected = vec![RowSelector::select(5)]; assert_eq!(limited.selectors, expected); let limited = selection.clone().limit(15); @@ -905,12 +893,11 @@ mod tests { RowSelector::select(10), RowSelector::skip(10), RowSelector::select(5), - RowSelector::skip(25), ]; assert_eq!(limited.selectors, expected); let limited = selection.clone().limit(0); - let expected = vec![RowSelector::skip(50)]; + let expected = vec![]; assert_eq!(limited.selectors, expected); let limited = selection.clone().limit(30); diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 0397df206bff..0ba8a1631775 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -99,7 +99,7 @@ use arrow_schema::SchemaRef; use crate::arrow::array_reader::{build_array_reader, RowGroupCollection}; use crate::arrow::arrow_reader::{ evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderOptions, - ParquetRecordBatchReader, RowFilter, RowSelection, + ParquetRecordBatchReader, RowFilter, RowSelection, RowSelector, }; use crate::arrow::schema::ParquetField; use crate::arrow::ProjectionMask; @@ -352,6 +352,7 @@ impl ArrowReaderBuilder> { Ok(ParquetRecordBatchStream { metadata: self.metadata, batch_size, + limit: self.limit, row_groups, projection: self.projection, selection: self.selection, @@ -389,6 +390,7 @@ where mut selection: Option, projection: ProjectionMask, batch_size: usize, + limit: Option, ) -> ReadResult { // TODO: calling build_array multiple times is wasteful @@ -430,6 +432,17 @@ where return Ok((self, None)); } + // If a limit is defined, apply it to the final `RowSelection` + if let Some(limit) = limit { + selection = Some( + selection + .map(|selection| selection.limit(limit)) + .unwrap_or_else(|| { + RowSelection::from(vec![RowSelector::select(limit)]) + }), + ); + } + row_group .fetch(&mut self.input, &projection, selection.as_ref()) .await?; @@ -479,6 +492,8 @@ pub struct ParquetRecordBatchStream { batch_size: usize, + limit: Option, + selection: Option, /// This is an option so it can be moved into a future @@ -548,6 +563,7 @@ where selection, self.projection.clone(), self.batch_size, + self.limit, ) .boxed(); @@ -943,6 +959,70 @@ mod tests { assert_eq!(async_batches, sync_batches); } + #[tokio::test] + async fn test_async_reader_with_limit() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet"); + let data = Bytes::from(std::fs::read(path).unwrap()); + + let metadata = parse_metadata(&data).unwrap(); + let metadata = Arc::new(metadata); + + assert_eq!(metadata.num_row_groups(), 1); + + let async_reader = TestReader { + data: data.clone(), + metadata: metadata.clone(), + requests: Default::default(), + }; + + let options = ArrowReaderOptions::new().with_page_index(true); + let builder = + ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options) + .await + .unwrap(); + + // The builder should have page and offset indexes loaded now + let metadata_with_index = builder.metadata(); + + // Check offset indexes are present for all columns + for rg in metadata_with_index.row_groups() { + let page_locations = + rg.page_offset_index().expect("expected page offset index"); + assert_eq!(page_locations.len(), rg.columns().len()) + } + + // Check page indexes are present for all columns + let page_indexes = metadata_with_index + .page_indexes() + .expect("expected page indexes"); + for (idx, rg) in metadata_with_index.row_groups().iter().enumerate() { + assert_eq!(page_indexes[idx].len(), rg.columns().len()) + } + + let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]); + let stream = builder + .with_projection(mask.clone()) + .with_batch_size(1024) + .with_limit(1) + .build() + .unwrap(); + + let async_batches: Vec<_> = stream.try_collect().await.unwrap(); + + let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data) + .unwrap() + .with_projection(mask) + .with_batch_size(1024) + .with_limit(1) + .build() + .unwrap() + .collect::>>() + .unwrap(); + + assert_eq!(async_batches, sync_batches); + } + #[tokio::test] async fn test_async_reader_skip_pages() { let testdata = arrow::util::test_util::parquet_test_data(); @@ -1330,7 +1410,7 @@ mod tests { let selection = RowSelection::from(selectors); let (_factory, _reader) = reader_factory - .read_row_group(0, Some(selection), projection.clone(), 48) + .read_row_group(0, Some(selection), projection.clone(), 48, None) .await .expect("reading row group"); From 6805164e3fcf31c3d609bd93c74ba80a98abf4fe Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Mon, 30 Jan 2023 18:24:00 +0200 Subject: [PATCH 4/4] Apply limit to entire file instead of each row group --- parquet/src/arrow/async_reader/mod.rs | 63 ++++++++++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 0ba8a1631775..71f95e07a756 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -534,7 +534,12 @@ where loop { match &mut self.state { StreamState::Decoding(batch_reader) => match batch_reader.next() { - Some(Ok(batch)) => return Poll::Ready(Some(Ok(batch))), + Some(Ok(batch)) => { + if let Some(limit) = self.limit.as_mut() { + *limit -= batch.num_rows(); + } + return Poll::Ready(Some(Ok(batch))); + } Some(Err(e)) => { self.state = StreamState::Error; return Poll::Ready(Some(Err(ParquetError::ArrowError( @@ -819,6 +824,7 @@ mod tests { use crate::arrow::ArrowWriter; use crate::file::footer::parse_metadata; use crate::file::page_index::index_reader; + use crate::file::properties::WriterProperties; use arrow::error::Result as ArrowResult; use arrow_array::{Array, ArrayRef, Int32Array, StringArray}; use futures::TryStreamExt; @@ -1284,6 +1290,61 @@ mod tests { assert_eq!(requests.lock().unwrap().len(), 3); } + #[tokio::test] + async fn test_limit_multiple_row_groups() { + let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]); + let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]); + let c = Int32Array::from_iter(0..6); + let data = RecordBatch::try_from_iter([ + ("a", Arc::new(a) as ArrayRef), + ("b", Arc::new(b) as ArrayRef), + ("c", Arc::new(c) as ArrayRef), + ]) + .unwrap(); + + let mut buf = Vec::with_capacity(1024); + let props = WriterProperties::builder() + .set_max_row_group_size(3) + .build(); + let mut writer = + ArrowWriter::try_new(&mut buf, data.schema(), Some(props)).unwrap(); + writer.write(&data).unwrap(); + writer.close().unwrap(); + + let data: Bytes = buf.into(); + let metadata = parse_metadata(&data).unwrap(); + + assert_eq!(metadata.num_row_groups(), 2); + + let test = TestReader { + data, + metadata: Arc::new(metadata), + requests: Default::default(), + }; + + let stream = ParquetRecordBatchStreamBuilder::new(test) + .await + .unwrap() + .with_batch_size(1024) + .with_limit(4) + .build() + .unwrap(); + + let batches: Vec<_> = stream.try_collect().await.unwrap(); + // Expect one batch for each row group + assert_eq!(batches.len(), 2); + + let batch = &batches[0]; + // First batch should contain all rows + assert_eq!(batch.num_rows(), 3); + assert_eq!(batch.num_columns(), 3); + + let batch = &batches[1]; + // Second batch should trigger the limit and only have one row + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 3); + } + #[tokio::test] async fn test_row_filter_with_index() { let testdata = arrow::util::test_util::parquet_test_data();