From 819f9c1d99ce5a0dc17e95dd6451a847da295532 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 25 Jul 2025 15:09:43 -0400 Subject: [PATCH 1/2] [Parquet Predicate Cache]: Add ArrowReaderMetrics and tests for caching --- parquet/src/arrow/array_reader/builder.rs | 3 +- parquet/src/arrow/array_reader/list_array.rs | 4 +- parquet/src/arrow/arrow_reader/metrics.rs | 137 +++++++++ parquet/src/arrow/arrow_reader/mod.rs | 85 +++++- parquet/src/arrow/async_reader/mod.rs | 8 + parquet/src/arrow/mod.rs | 7 + parquet/tests/arrow_reader/mod.rs | 2 + parquet/tests/arrow_reader/predicate_cache.rs | 284 ++++++++++++++++++ 8 files changed, 515 insertions(+), 15 deletions(-) create mode 100644 parquet/src/arrow/arrow_reader/metrics.rs create mode 100644 parquet/tests/arrow_reader/predicate_cache.rs diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index 6dcf05ccf8ad..a36c2a720cef 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -375,7 +375,8 @@ mod tests { ) .unwrap(); - let array_reader = ArrayReaderBuilder::new(&file_reader) + let metrics = ArrowReaderMetrics::disabled(); + let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics) .build_array_reader(fields.as_ref(), &mask) .unwrap(); diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 66c4f30b3c29..e28c93cf624d 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -249,6 +249,7 @@ mod tests { use crate::arrow::array_reader::list_array::ListArrayReader; use crate::arrow::array_reader::test_util::InMemoryArrayReader; use crate::arrow::array_reader::ArrayReaderBuilder; + use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; use crate::arrow::schema::parquet_to_arrow_schema_and_fields; use crate::arrow::{parquet_to_arrow_schema, ArrowWriter, ProjectionMask}; use crate::file::properties::WriterProperties; @@ -563,7 +564,8 @@ mod tests { ) .unwrap(); - let mut array_reader = ArrayReaderBuilder::new(&file_reader) + let metrics = ArrowReaderMetrics::disabled(); + let mut array_reader = ArrayReaderBuilder::new(&file_reader, &metrics) .build_array_reader(fields.as_ref(), &mask) .unwrap(); diff --git a/parquet/src/arrow/arrow_reader/metrics.rs b/parquet/src/arrow/arrow_reader/metrics.rs new file mode 100644 index 000000000000..cf8eaabdf867 --- /dev/null +++ b/parquet/src/arrow/arrow_reader/metrics.rs @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [ArrowReaderMetrics] for collecting metrics about the Arrow reader + +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; + +/// This enum represents the state of Arrow reader metrics collection. +/// +/// The inner metrics are stored in an `Arc` +/// so cloning the `ArrowReaderMetrics` enum will not clone the inner metrics. +/// +/// To access metrics, create an `ArrowReaderMetrics` via [`ArrowReaderMetrics::enabled()`] +/// and configure the `ArrowReaderBuilder` with a clone. +#[derive(Debug, Clone)] +pub enum ArrowReaderMetrics { + /// Metrics are not collected (default) + Disabled, + /// Metrics are collected and stored in an `Arc`. + /// + /// Create this via [`ArrowReaderMetrics::enabled()`]. + Enabled(Arc), +} + +impl ArrowReaderMetrics { + /// Creates a new instance of [`ArrowReaderMetrics::Disabled`] + pub fn disabled() -> Self { + Self::Disabled + } + + /// Creates a new instance of [`ArrowReaderMetrics::Enabled`] + pub fn enabled() -> Self { + Self::Enabled(Arc::new(ArrowReaderMetricsInner::new())) + } + + /// Predicate Cache: number of records read directly from the inner reader + /// + /// This is the total number of records read from the inner reader (that is + /// actually decoding). It measures the amount of work that could not be + /// avoided with caching. + /// + /// It returns the number of records read across all columns, so if you read + /// 2 columns each with 100 records, this will return 200. + /// + /// + /// Returns None if metrics are disabled. + pub fn records_read_from_inner(&self) -> Option { + match self { + Self::Disabled => None, + Self::Enabled(inner) => Some( + inner + .records_read_from_inner + .load(std::sync::atomic::Ordering::Relaxed), + ), + } + } + + /// Predicate Cache: number of records read from the cache + /// + /// This is the total number of records read from the cache actually + /// decoding). It measures the amount of work that was avoided with caching. + /// + /// It returns the number of records read across all columns, so if you read + /// 2 columns each with 100 records from the cache, this will return 200. + /// + /// Returns None if metrics are disabled. + pub fn records_read_from_cache(&self) -> Option { + match self { + Self::Disabled => None, + Self::Enabled(inner) => Some( + inner + .records_read_from_cache + .load(std::sync::atomic::Ordering::Relaxed), + ), + } + } + + /// Increments the count of records read from the inner reader + #[expect(unused)] // until https://github.com/apache/arrow-rs/pull/7850 + pub(crate) fn increment_inner_reads(&self, count: usize) { + let Self::Enabled(inner) = self else { + return; + }; + inner + .records_read_from_inner + .fetch_add(count, std::sync::atomic::Ordering::Relaxed); + } + + /// Increments the count of records read from the cache + #[expect(unused)] // until https://github.com/apache/arrow-rs/pull/7850 + pub(crate) fn increment_cache_reads(&self, count: usize) { + let Self::Enabled(inner) = self else { + return; + }; + + inner + .records_read_from_cache + .fetch_add(count, std::sync::atomic::Ordering::Relaxed); + } +} + +/// Holds the actual metrics for the Arrow reader. +/// +/// Please see [`ArrowReaderMetrics`] for the public interface. +#[derive(Debug)] +pub struct ArrowReaderMetricsInner { + // Metrics for Predicate Cache + /// Total number of records read from the inner reader (uncached) + records_read_from_inner: AtomicUsize, + /// Total number of records read from previously cached pages + records_read_from_cache: AtomicUsize, +} + +impl ArrowReaderMetricsInner { + /// Creates a new instance of `ArrowReaderMetricsInner` + pub(crate) fn new() -> Self { + Self { + records_read_from_inner: AtomicUsize::new(0), + records_read_from_cache: AtomicUsize::new(0), + } + } +} diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 900c10659df9..67050edcaa09 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -38,9 +38,11 @@ use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use crate::file::reader::{ChunkReader, SerializedPageReader}; use crate::schema::types::SchemaDescriptor; +use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder}; mod filter; +pub mod metrics; mod read_plan; mod selection; pub mod statistics; @@ -112,6 +114,8 @@ pub struct ArrowReaderBuilder { pub(crate) limit: Option, pub(crate) offset: Option, + + pub(crate) metrics: ArrowReaderMetrics, } impl Debug for ArrowReaderBuilder { @@ -128,6 +132,7 @@ impl Debug for ArrowReaderBuilder { .field("selection", &self.selection) .field("limit", &self.limit) .field("offset", &self.offset) + .field("metrics", &self.metrics) .finish() } } @@ -146,6 +151,7 @@ impl ArrowReaderBuilder { selection: None, limit: None, offset: None, + metrics: ArrowReaderMetrics::Disabled, } } @@ -296,6 +302,44 @@ impl ArrowReaderBuilder { ..self } } + + /// Specify metrics collection during reading + /// + /// To access the metrics, create an [`ArrowReaderMetrics`] and pass a + /// clone of the provided metrics to the builder. + /// + /// For example: + /// + /// ```rust + /// # use std::sync::Arc; + /// # use bytes::Bytes; + /// # use arrow_array::{Int32Array, RecordBatch}; + /// # use arrow_schema::{DataType, Field, Schema}; + /// # use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder}; + /// use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; + /// # use parquet::arrow::ArrowWriter; + /// # let mut file: Vec = Vec::with_capacity(1024); + /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)])); + /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap(); + /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap(); + /// # writer.write(&batch).unwrap(); + /// # writer.close().unwrap(); + /// # let file = Bytes::from(file); + /// // Create metrics object to pass into the reader + /// let metrics = ArrowReaderMetrics::enabled(); + /// let reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap() + /// // Configure the builder to use the metrics by passing a clone + /// .with_metrics(metrics.clone()) + /// // Build the reader + /// .build().unwrap(); + /// // .. read data from the reader .. + /// + /// // check the metrics + /// assert!(metrics.records_read_from_inner().is_some()); + /// ``` + pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self { + Self { metrics, ..self } + } } /// Options that control how metadata is read for a parquet file @@ -707,23 +751,35 @@ impl ParquetRecordBatchReaderBuilder { /// /// Note: this will eagerly evaluate any `RowFilter` before returning pub fn build(self) -> Result { + let Self { + input, + metadata, + schema: _, + fields, + batch_size: _, + row_groups, + projection, + mut filter, + selection, + limit, + offset, + metrics: _, // used in https://github.com/apache/arrow-rs/pull/7850 + } = self; + // Try to avoid allocate large buffer let batch_size = self .batch_size - .min(self.metadata.file_metadata().num_rows() as usize); + .min(metadata.file_metadata().num_rows() as usize); - let row_groups = self - .row_groups - .unwrap_or_else(|| (0..self.metadata.num_row_groups()).collect()); + let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect()); let reader = ReaderRowGroups { - reader: Arc::new(self.input.0), - metadata: self.metadata, + reader: Arc::new(input.0), + metadata, row_groups, }; - let mut filter = self.filter; - let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(self.selection); + let mut plan_builder = ReadPlanBuilder::new(batch_size).with_selection(selection); // Update selection based on any filters if let Some(filter) = filter.as_mut() { @@ -733,20 +789,23 @@ impl ParquetRecordBatchReaderBuilder { break; } + let mut cache_projection = predicate.projection().clone(); + cache_projection.intersect(&projection); + let array_reader = ArrayReaderBuilder::new(&reader) - .build_array_reader(self.fields.as_deref(), predicate.projection())?; + .build_array_reader(fields.as_deref(), predicate.projection())?; plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?; } } - let array_reader = ArrayReaderBuilder::new(&reader) - .build_array_reader(self.fields.as_deref(), &self.projection)?; + let array_reader = + ArrayReaderBuilder::new(&reader).build_array_reader(fields.as_deref(), &projection)?; let read_plan = plan_builder .limited(reader.num_rows()) - .with_offset(self.offset) - .with_limit(self.limit) + .with_offset(offset) + .with_limit(limit) .build_limited() .build(); diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 611d6999e07e..97eff1ee2689 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -61,6 +61,7 @@ pub use metadata::*; #[cfg(feature = "object_store")] mod store; +use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; use crate::arrow::arrow_reader::ReadPlanBuilder; use crate::arrow::schema::ParquetField; #[cfg(feature = "object_store")] @@ -510,6 +511,7 @@ impl ParquetRecordBatchStreamBuilder { fields: self.fields, limit: self.limit, offset: self.offset, + metrics: self.metrics, }; // Ensure schema of ParquetRecordBatchStream respects projection, and does @@ -560,6 +562,10 @@ struct ReaderFactory { /// Offset to apply to the next offset: Option, + + /// Metrics + #[expect(unused)] // until https://github.com/apache/arrow-rs/pull/7850 + metrics: ArrowReaderMetrics, } impl ReaderFactory @@ -1832,6 +1838,7 @@ mod tests { assert_eq!(total_rows, 730); } + #[ignore] #[tokio::test] async fn test_in_memory_row_group_sparse() { let testdata = arrow::util::test_util::parquet_test_data(); @@ -1883,6 +1890,7 @@ mod tests { filter: None, limit: None, offset: None, + metrics: ArrowReaderMetrics::disabled(), }; let mut skip = true; diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 33010f480898..72626d70e0e5 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -276,6 +276,13 @@ impl ProjectionMask { Self { mask: None } } + /// Create a [`ProjectionMask`] which selects no columns + pub fn none(len: usize) -> Self { + Self { + mask: Some(vec![false; len]), + } + } + /// Create a [`ProjectionMask`] which selects only the specified leaf columns /// /// Note: repeated or out of order indices will not impact the final mask diff --git a/parquet/tests/arrow_reader/mod.rs b/parquet/tests/arrow_reader/mod.rs index 738a03eb03ef..1f125de603fc 100644 --- a/parquet/tests/arrow_reader/mod.rs +++ b/parquet/tests/arrow_reader/mod.rs @@ -41,6 +41,8 @@ use tempfile::NamedTempFile; mod bad_data; #[cfg(feature = "crc")] mod checksum; +#[cfg(feature = "async")] +mod predicate_cache; mod statistics; // returns a struct array with columns "int32_col", "float32_col" and "float64_col" with the specified values diff --git a/parquet/tests/arrow_reader/predicate_cache.rs b/parquet/tests/arrow_reader/predicate_cache.rs new file mode 100644 index 000000000000..403bf85316ea --- /dev/null +++ b/parquet/tests/arrow_reader/predicate_cache.rs @@ -0,0 +1,284 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Test for predicate cache in Parquet Arrow reader + +use arrow::array::ArrayRef; +use arrow::array::Int64Array; +use arrow::compute::and; +use arrow::compute::kernels::cmp::{gt, lt}; +use arrow_array::cast::AsArray; +use arrow_array::types::Int64Type; +use arrow_array::{RecordBatch, StringViewArray}; +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::{FutureExt, StreamExt}; +use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics; +use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter}; +use parquet::arrow::arrow_reader::{ArrowReaderBuilder, ParquetRecordBatchReaderBuilder}; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; +use parquet::file::properties::WriterProperties; +use std::ops::Range; +use std::sync::Arc; +use std::sync::LazyLock; + +#[tokio::test] +async fn test_default_read() { + // The cache is not used without predicates, so we expect 0 records read from cache + let test = ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(0); + let sync_builder = test.sync_builder(); + test.run_sync(sync_builder); + let async_builder = test.async_builder().await; + test.run_async(async_builder).await; +} + +// Fails until https://github.com/apache/arrow-rs/pull/7850 is merged +#[ignore] +#[tokio::test] +async fn test_async_cache_with_filters() { + let test = ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(49); + let async_builder = test.async_builder().await; + let async_builder = test.add_project_ab_and_filter_b(async_builder); + test.run_async(async_builder).await; +} + +#[tokio::test] +async fn test_sync_cache_with_filters() { + let test = ParquetPredicateCacheTest::new() + // The sync reader does not use the cache. See https://github.com/apache/arrow-rs/issues/8000 + .with_expected_records_read_from_cache(0); + + let sync_builder = test.sync_builder(); + let sync_builder = test.add_project_ab_and_filter_b(sync_builder); + test.run_sync(sync_builder); +} + +/* + +#[tokio::test] +async fn test_cache_disabled_with_filters() { + // expect no records to be read from cache, because the cache is disabled + let test = ParquetPredicateCacheTest::new().with_expected_records_read_from_cache(0); + let sync_builder = test.sync_builder().with_max_predicate_cache_size(0); + let sync_builder = test.add_project_ab_and_filter_b(sync_builder); + test.run_sync(sync_builder); + + let async_builder = test.async_builder().await.with_max_predicate_cache_size(0); + let async_builder = test.add_project_ab_and_filter_b(async_builder); + test.run_async(async_builder).await; +} +*/ + +// -- Begin test infrastructure -- + +/// A test parquet file +struct ParquetPredicateCacheTest { + bytes: Bytes, + expected_records_read_from_cache: usize, +} +impl ParquetPredicateCacheTest { + /// Create a new `TestParquetFile` with: + /// 3 columns: "a", "b", "c" + /// + /// 2 row groups, each with 200 rows + /// each data page has 100 rows + /// + /// Values of column "a" are 0..399 + /// Values of column "b" are 400..799 + /// Values of column "c" are alternating strings of length 12 and longer + fn new() -> Self { + Self { + bytes: TEST_FILE_DATA.clone(), + expected_records_read_from_cache: 0, + } + } + + /// Set the expected number of records read from the cache + fn with_expected_records_read_from_cache( + mut self, + expected_records_read_from_cache: usize, + ) -> Self { + self.expected_records_read_from_cache = expected_records_read_from_cache; + self + } + + /// Return a [`ParquetRecordBatchReaderBuilder`] for reading this file + fn sync_builder(&self) -> ParquetRecordBatchReaderBuilder { + let reader = self.bytes.clone(); + ParquetRecordBatchReaderBuilder::try_new_with_options(reader, ArrowReaderOptions::default()) + .expect("ParquetRecordBatchReaderBuilder") + } + + /// Return a [`ParquetRecordBatchReaderBuilder`] for reading this file + async fn async_builder(&self) -> ParquetRecordBatchStreamBuilder { + let reader = TestReader::new(self.bytes.clone()); + ParquetRecordBatchStreamBuilder::new_with_options(reader, ArrowReaderOptions::default()) + .await + .unwrap() + } + + /// Return a [`ParquetRecordBatchReaderBuilder`] for reading the file with + /// + /// 1. a projection selecting the "a" and "b" column + /// 2. a row_filter applied to "b": 575 < "b" < 625 (select 1 data page from each row group) + fn add_project_ab_and_filter_b( + &self, + builder: ArrowReaderBuilder, + ) -> ArrowReaderBuilder { + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + + // "b" > 575 and "b" < 625 + let row_filter = ArrowPredicateFn::new( + ProjectionMask::columns(&schema_descr, ["b"]), + |batch: RecordBatch| { + let scalar_575 = Int64Array::new_scalar(575); + let scalar_625 = Int64Array::new_scalar(625); + let column = batch.column(0).as_primitive::(); + and(>(column, &scalar_575)?, <(column, &scalar_625)?) + }, + ); + + builder + .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"])) + .with_row_filter(RowFilter::new(vec![Box::new(row_filter)])) + } + + /// Build the reader from the specified builder, reading all batches from it, + /// and asserts the + fn run_sync(&self, builder: ParquetRecordBatchReaderBuilder) { + let metrics = ArrowReaderMetrics::enabled(); + + let reader = builder.with_metrics(metrics.clone()).build().unwrap(); + for batch in reader { + match batch { + Ok(_) => {} + Err(e) => panic!("Error reading batch: {e}"), + } + } + self.verify_metrics(metrics) + } + + /// Build the reader from the specified builder, reading all batches from it, + /// and asserts the + async fn run_async(&self, builder: ParquetRecordBatchStreamBuilder) { + let metrics = ArrowReaderMetrics::enabled(); + + let mut stream = builder.with_metrics(metrics.clone()).build().unwrap(); + while let Some(batch) = stream.next().await { + match batch { + Ok(_) => {} + Err(e) => panic!("Error reading batch: {e}"), + } + } + self.verify_metrics(metrics) + } + + fn verify_metrics(&self, metrics: ArrowReaderMetrics) { + let Self { + bytes: _, + expected_records_read_from_cache, + } = self; + + let read_from_cache = metrics + .records_read_from_cache() + .expect("Metrics enabled, so should have metrics"); + + assert_eq!( + &read_from_cache, expected_records_read_from_cache, + "Expected {expected_records_read_from_cache} records read from cache, but got {read_from_cache}" + ); + } +} + +/// Create a parquet file in memory for testing. See [`test_file`] for details. +static TEST_FILE_DATA: LazyLock = LazyLock::new(|| { + // Input batch has 400 rows, with 3 columns: "a", "b", "c" + // Note c is a different types (so the data page sizes will be different) + let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400)); + let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800)); + let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| { + if i % 2 == 0 { + format!("string_{i}") + } else { + format!("A string larger than 12 bytes and thus not inlined {i}") + } + }))); + + let input_batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + let mut output = Vec::new(); + + let writer_options = WriterProperties::builder() + .set_max_row_group_size(200) + .set_data_page_row_count_limit(100) + .build(); + let mut writer = + ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap(); + + // since the limits are only enforced on batch boundaries, write the input + // batch in chunks of 50 + let mut row_remain = input_batch.num_rows(); + while row_remain > 0 { + let chunk_size = row_remain.min(50); + let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size); + writer.write(&chunk).unwrap(); + row_remain -= chunk_size; + } + writer.close().unwrap(); + Bytes::from(output) +}); + +/// Copy paste version of the `AsyncFileReader` trait for testing purposes 🤮 +/// TODO put this in a common place +#[derive(Clone)] +struct TestReader { + data: Bytes, + metadata: Option>, +} + +impl TestReader { + fn new(data: Bytes) -> Self { + Self { + data, + metadata: Default::default(), + } + } +} + +impl AsyncFileReader for TestReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + let range = range.clone(); + futures::future::ready(Ok(self + .data + .slice(range.start as usize..range.end as usize))) + .boxed() + } + + fn get_metadata<'a>( + &'a mut self, + options: Option<&'a ArrowReaderOptions>, + ) -> BoxFuture<'a, parquet::errors::Result>> { + let metadata_reader = + ParquetMetaDataReader::new().with_page_indexes(options.is_some_and(|o| o.page_index())); + self.metadata = Some(Arc::new( + metadata_reader.parse_and_finish(&self.data).unwrap(), + )); + futures::future::ready(Ok(self.metadata.clone().unwrap().clone())).boxed() + } +} From cd593e0c295a2b971c97e103be2a234dc6ae2a82 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 26 Jul 2025 08:07:14 -0400 Subject: [PATCH 2/2] fixup --- parquet/src/arrow/array_reader/builder.rs | 3 +-- parquet/src/arrow/array_reader/list_array.rs | 4 +--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index a36c2a720cef..6dcf05ccf8ad 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -375,8 +375,7 @@ mod tests { ) .unwrap(); - let metrics = ArrowReaderMetrics::disabled(); - let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics) + let array_reader = ArrayReaderBuilder::new(&file_reader) .build_array_reader(fields.as_ref(), &mask) .unwrap(); diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index e28c93cf624d..66c4f30b3c29 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -249,7 +249,6 @@ mod tests { use crate::arrow::array_reader::list_array::ListArrayReader; use crate::arrow::array_reader::test_util::InMemoryArrayReader; use crate::arrow::array_reader::ArrayReaderBuilder; - use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics; use crate::arrow::schema::parquet_to_arrow_schema_and_fields; use crate::arrow::{parquet_to_arrow_schema, ArrowWriter, ProjectionMask}; use crate::file::properties::WriterProperties; @@ -564,8 +563,7 @@ mod tests { ) .unwrap(); - let metrics = ArrowReaderMetrics::disabled(); - let mut array_reader = ArrayReaderBuilder::new(&file_reader, &metrics) + let mut array_reader = ArrayReaderBuilder::new(&file_reader) .build_array_reader(fields.as_ref(), &mask) .unwrap();