From 3e98eb7d6e23d465b177be481aa13f1597fa0a4d Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Mon, 27 Oct 2025 20:06:16 +0800 Subject: [PATCH 1/3] Add PruningMetrics metrics type; Use it on `files_ranges_pruning_statistics` metric in parquet scanner --- datafusion/core/tests/parquet/mod.rs | 25 +++- datafusion/core/tests/sql/explain_analyze.rs | 31 +++++ datafusion/datasource-parquet/src/metrics.rs | 8 +- datafusion/datasource-parquet/src/opener.rs | 16 ++- .../physical-plan/src/metrics/builder.rs | 22 ++- datafusion/physical-plan/src/metrics/mod.rs | 5 +- datafusion/physical-plan/src/metrics/value.rs | 126 ++++++++++++++++-- 7 files changed, 210 insertions(+), 23 deletions(-) diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index c44d14abd381..29eaef7cef4c 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -37,6 +37,7 @@ use datafusion::{ prelude::{ParquetReadOptions, SessionConfig, SessionContext}, }; use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder}; +use datafusion_physical_plan::{metrics::MetricValue, Metric}; use parquet::arrow::ArrowWriter; use parquet::file::properties::{EnabledStatistics, WriterProperties}; use std::sync::Arc; @@ -155,8 +156,30 @@ impl TestOutput { self.metric_value("row_groups_pruned_statistics") } + /// Metric `files_ranges_pruned_statistics` tracks both pruned and matched count, + /// for testing purpose, here it only aggregate the `pruned` count. fn files_ranges_pruned_statistics(&self) -> Option { - self.metric_value("files_ranges_pruned_statistics") + let mut total_pruned = 0; + let mut found = false; + + for metric in self.parquet_metrics.iter() { + let metric = metric.as_ref(); + if metric.value().name() == "files_ranges_pruned_statistics" { + if let MetricValue::PruningMetrics { + pruning_metrics, .. + } = metric.value() + { + total_pruned += pruning_metrics.pruned(); + found = true; + } + } + } + + if found { + Some(total_pruned) + } else { + None + } } /// The number of row_groups matched by bloom filter or statistics diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 43f79ead0257..a7cc30a9484c 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -257,6 +257,37 @@ async fn explain_analyze_level_datasource_parquet() { } } +#[tokio::test] +async fn explain_analyze_parquet_pruning_metrics() { + let table_name = "tpch_lineitem_small"; + let parquet_path = "tests/data/tpch_lineitem_small.parquet"; + let ctx = SessionContext::new(); + ctx.register_parquet(table_name, parquet_path, ParquetReadOptions::default()) + .await + .expect("register parquet table for explain analyze test"); + + // Test scenario: + // This table's l_orderkey has range [1, 7] + // So the following query can't prune the file: + // select * from tpch_lineitem_small where l_orderkey = 5; + // If change filter to `l_orderkey=10`, the whole file can be pruned using stat. + for (l_orderkey, expected_pruning_metrics) in + [(5, "1 total → 1 matched"), (10, "1 total → 0 matched")] + { + let sql = format!( + "explain analyze select * from {table_name} where l_orderkey = {l_orderkey};" + ); + + let plan = + collect_plan_with_context(&sql, &ctx, ExplainAnalyzeLevel::Summary).await; + + let expected_metrics = + format!("files_ranges_pruned_statistics={expected_pruning_metrics}"); + + assert_metrics!(&plan, "DataSourceExec", &expected_metrics); + } +} + #[tokio::test] async fn csv_explain_plans() { // This test verify the look of each plan in its full cycle plan creation diff --git a/datafusion/datasource-parquet/src/metrics.rs b/datafusion/datasource-parquet/src/metrics.rs index 5f17fbb4b9ee..9d86a3ae9f2d 100644 --- a/datafusion/datasource-parquet/src/metrics.rs +++ b/datafusion/datasource-parquet/src/metrics.rs @@ -16,7 +16,7 @@ // under the License. use datafusion_physical_plan::metrics::{ - Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, Time, + Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, PruningMetrics, Time, }; /// Stores metrics about the parquet execution for a particular parquet file. @@ -27,7 +27,7 @@ use datafusion_physical_plan::metrics::{ /// [`ParquetFileReaderFactory`]: super::ParquetFileReaderFactory #[derive(Debug, Clone)] pub struct ParquetFileMetrics { - /// Number of file **ranges** pruned by partition or file level statistics. + /// Number of file **ranges** pruned or matched by partition or file level statistics. /// Pruning of files often happens at planning time but may happen at execution time /// if dynamic filters (e.g. from a join) result in additional pruning. /// @@ -41,7 +41,7 @@ pub struct ParquetFileMetrics { /// pushdown optimization may fill up the TopK heap when reading the first part of a file, /// then skip the second part if file statistics indicate it cannot contain rows /// that would be in the TopK. - pub files_ranges_pruned_statistics: Count, + pub files_ranges_pruned_statistics: PruningMetrics, /// Number of times the predicate could not be evaluated pub predicate_evaluation_errors: Count, /// Number of row groups whose bloom filters were checked and matched (not pruned) @@ -132,7 +132,7 @@ impl ParquetFileMetrics { let files_ranges_pruned_statistics = MetricBuilder::new(metrics) .with_type(MetricType::SUMMARY) - .counter("files_ranges_pruned_statistics", partition); + .pruning_metrics("files_ranges_pruned_statistics", partition); // ----------------------- // 'dev' level metrics diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 167fc3c5147e..33f5e411c37f 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -40,7 +40,9 @@ use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::{ is_dynamic_physical_expr, PhysicalExpr, }; -use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder}; +use datafusion_physical_plan::metrics::{ + Count, ExecutionPlanMetricsSet, MetricBuilder, PruningMetrics, +}; use datafusion_pruning::{build_pruning_predicate, FilePruner, PruningPredicate}; #[cfg(feature = "parquet_encryption")] @@ -195,11 +197,13 @@ impl FileOpener for ParquetOpener { if let Some(file_pruner) = &mut file_pruner { if file_pruner.should_prune()? { // Return an empty stream immediately to skip the work of setting up the actual stream - file_metrics.files_ranges_pruned_statistics.add(1); + file_metrics.files_ranges_pruned_statistics.add_pruned(1); return Ok(futures::stream::empty().boxed()); } } + file_metrics.files_ranges_pruned_statistics.add_matched(1); + // Don't load the page index yet. Since it is not stored inline in // the footer, loading the page index if it is not needed will do // unnecessary I/O. We decide later if it is needed to evaluate the @@ -480,7 +484,7 @@ struct EarlyStoppingStream { /// None done: bool, file_pruner: FilePruner, - files_ranges_pruned_statistics: Count, + files_ranges_pruned_statistics: PruningMetrics, /// The inner stream inner: S, } @@ -489,7 +493,7 @@ impl EarlyStoppingStream { pub fn new( stream: S, file_pruner: FilePruner, - files_ranges_pruned_statistics: Count, + files_ranges_pruned_statistics: PruningMetrics, ) -> Self { Self { done: false, @@ -509,7 +513,9 @@ where // Since dynamic filters may have been updated, see if we can stop // reading this stream entirely. if self.file_pruner.should_prune()? { - self.files_ranges_pruned_statistics.add(1); + self.files_ranges_pruned_statistics.add_pruned(1); + // Previously this file range has been counted as matched + self.files_ranges_pruned_statistics.subtract_matched(1); self.done = true; Ok(None) } else { diff --git a/datafusion/physical-plan/src/metrics/builder.rs b/datafusion/physical-plan/src/metrics/builder.rs index 88ec1a3f67d1..8751f65f88a2 100644 --- a/datafusion/physical-plan/src/metrics/builder.rs +++ b/datafusion/physical-plan/src/metrics/builder.rs @@ -19,7 +19,7 @@ use std::{borrow::Cow, sync::Arc}; -use crate::metrics::MetricType; +use crate::metrics::{value::PruningMetrics, MetricType}; use super::{ Count, ExecutionPlanMetricsSet, Gauge, Label, Metric, MetricValue, Time, Timestamp, @@ -250,4 +250,24 @@ impl<'a> MetricBuilder<'a> { .build(MetricValue::EndTimestamp(timestamp.clone())); timestamp } + + pub fn pruning_metrics( + self, + name: impl Into>, + partition: usize, + ) -> PruningMetrics { + // let count = Count::new(); + // self.with_partition(partition) + // .build(MetricValue::OutputBytes(count.clone())); + // count + + let pruning_metrics = PruningMetrics::new(); + self.with_partition(partition) + .build(MetricValue::PruningMetrics { + name: name.into(), + // inner values will be `Arc::clone()` + pruning_metrics: pruning_metrics.clone(), + }); + pruning_metrics + } } diff --git a/datafusion/physical-plan/src/metrics/mod.rs b/datafusion/physical-plan/src/metrics/mod.rs index 02aad6eb60ac..e66db8f0c911 100644 --- a/datafusion/physical-plan/src/metrics/mod.rs +++ b/datafusion/physical-plan/src/metrics/mod.rs @@ -35,7 +35,9 @@ use datafusion_common::HashMap; pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics, SplitMetrics}; pub use builder::MetricBuilder; pub use custom::CustomMetricValue; -pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp}; +pub use value::{ + Count, Gauge, MetricValue, PruningMetrics, ScopedTimerGuard, Time, Timestamp, +}; /// Something that tracks a value of interest (metric) of a DataFusion /// [`ExecutionPlan`] execution. @@ -302,6 +304,7 @@ impl MetricsSet { MetricValue::Gauge { name, .. } => name == metric_name, MetricValue::StartTimestamp(_) => false, MetricValue::EndTimestamp(_) => false, + MetricValue::PruningMetrics { .. } => false, MetricValue::Custom { .. } => false, }) } diff --git a/datafusion/physical-plan/src/metrics/value.rs b/datafusion/physical-plan/src/metrics/value.rs index fc947935503c..0218f10702a6 100644 --- a/datafusion/physical-plan/src/metrics/value.rs +++ b/datafusion/physical-plan/src/metrics/value.rs @@ -362,6 +362,74 @@ impl Drop for ScopedTimerGuard<'_> { } } +/// Counters tracking pruning metrics +/// +/// For example, a file scanner initially is planned to scan 10 files, but skipped +/// 8 of them using statistics, the pruning metrics would look like: 10 total -> 2 matched +/// +/// Note `clone`ing update the same underlying metrics +#[derive(Debug, Clone)] +pub struct PruningMetrics { + pruned: Arc, + matched: Arc, +} + +impl Display for PruningMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let matched = self.matched.load(Ordering::Relaxed); + let total = self.pruned.load(Ordering::Relaxed) + matched; + + write!(f, "{total} total → {matched} matched") + } +} + +impl Default for PruningMetrics { + fn default() -> Self { + Self::new() + } +} + +impl PruningMetrics { + /// create a new counter + pub fn new() -> Self { + Self { + pruned: Arc::new(AtomicUsize::new(0)), + matched: Arc::new(AtomicUsize::new(0)), + } + } + + /// Add `n` to the metric's value + pub fn add_pruned(&self, n: usize) { + // relaxed ordering for operations on `value` poses no issues + // we're purely using atomic ops with no associated memory ops + self.pruned.fetch_add(n, Ordering::Relaxed); + } + + /// Add `n` to the metric's value + pub fn add_matched(&self, n: usize) { + // relaxed ordering for operations on `value` poses no issues + // we're purely using atomic ops with no associated memory ops + self.matched.fetch_add(n, Ordering::Relaxed); + } + + /// Subtract `n` to the metric's value. + pub fn subtract_matched(&self, n: usize) { + // relaxed ordering for operations on `value` poses no issues + // we're purely using atomic ops with no associated memory ops + self.matched.fetch_sub(n, Ordering::Relaxed); + } + + /// Number of items pruned + pub fn pruned(&self) -> usize { + self.pruned.load(Ordering::Relaxed) + } + + /// Number of items matched (not pruned) + pub fn matched(&self) -> usize { + self.matched.load(Ordering::Relaxed) + } +} + /// Possible values for a [super::Metric]. /// /// Among other differences, the metric types have different ways to @@ -426,6 +494,11 @@ pub enum MetricValue { StartTimestamp(Timestamp), /// The time at which execution ended EndTimestamp(Timestamp), + /// Metrics related to scan pruning + PruningMetrics { + name: Cow<'static, str>, + pruning_metrics: PruningMetrics, + }, Custom { /// The provided name of this metric name: Cow<'static, str>, @@ -519,11 +592,13 @@ impl MetricValue { Self::Time { name, .. } => name.borrow(), Self::StartTimestamp(_) => "start_timestamp", Self::EndTimestamp(_) => "end_timestamp", + Self::PruningMetrics { name, .. } => name.borrow(), Self::Custom { name, .. } => name.borrow(), } } - /// Return the value of the metric as a usize value + /// Return the value of the metric as a usize value, used to aggregate metric + /// value across partitions. pub fn as_usize(&self) -> usize { match self { Self::OutputRows(count) => count.value(), @@ -546,6 +621,11 @@ impl MetricValue { .and_then(|ts| ts.timestamp_nanos_opt()) .map(|nanos| nanos as usize) .unwrap_or(0), + // This function is a utility for aggregating metrics, for complex metric + // like `PruningMetrics`, implement it inside `MetricsSet` directly. + Self::PruningMetrics { .. } => { + unreachable!() + } Self::Custom { value, .. } => value.as_usize(), } } @@ -575,6 +655,10 @@ impl MetricValue { }, Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()), Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()), + Self::PruningMetrics { name, .. } => Self::PruningMetrics { + name: name.clone(), + pruning_metrics: PruningMetrics::new(), + }, Self::Custom { name, value } => Self::Custom { name: name.clone(), value: value.new_empty(), @@ -626,6 +710,20 @@ impl MetricValue { (Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => { timestamp.update_to_max(other_timestamp); } + ( + Self::PruningMetrics { + pruning_metrics, .. + }, + Self::PruningMetrics { + pruning_metrics: other_pruning_metrics, + .. + }, + ) => { + let pruned = other_pruning_metrics.pruned.load(Ordering::Relaxed); + let matched = other_pruning_metrics.matched.load(Ordering::Relaxed); + pruning_metrics.add_pruned(pruned); + pruning_metrics.add_matched(matched); + } ( Self::Custom { value, .. }, Self::Custom { @@ -652,16 +750,17 @@ impl MetricValue { Self::ElapsedCompute(_) => 1, Self::OutputBytes(_) => 2, // Other metrics - Self::SpillCount(_) => 3, - Self::SpilledBytes(_) => 4, - Self::SpilledRows(_) => 5, - Self::CurrentMemoryUsage(_) => 6, - Self::Count { .. } => 7, - Self::Gauge { .. } => 8, - Self::Time { .. } => 9, - Self::StartTimestamp(_) => 10, // show timestamps last - Self::EndTimestamp(_) => 11, - Self::Custom { .. } => 12, + Self::PruningMetrics { .. } => 3, + Self::SpillCount(_) => 4, + Self::SpilledBytes(_) => 5, + Self::SpilledRows(_) => 6, + Self::CurrentMemoryUsage(_) => 7, + Self::Count { .. } => 8, + Self::Gauge { .. } => 9, + Self::Time { .. } => 10, + Self::StartTimestamp(_) => 11, // show timestamps last + Self::EndTimestamp(_) => 12, + Self::Custom { .. } => 13, } } @@ -700,6 +799,11 @@ impl Display for MetricValue { Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => { write!(f, "{timestamp}") } + Self::PruningMetrics { + pruning_metrics, .. + } => { + write!(f, "{pruning_metrics}") + } Self::Custom { name, value } => { write!(f, "name:{name} {value}") } From 1066b82fdcb24e9536fb9b0560461748c67b6270 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Mon, 27 Oct 2025 20:14:04 +0800 Subject: [PATCH 2/3] update comments --- datafusion/physical-plan/src/metrics/builder.rs | 6 +----- datafusion/physical-plan/src/metrics/value.rs | 8 ++++---- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-plan/src/metrics/builder.rs b/datafusion/physical-plan/src/metrics/builder.rs index 8751f65f88a2..bf59dccf6625 100644 --- a/datafusion/physical-plan/src/metrics/builder.rs +++ b/datafusion/physical-plan/src/metrics/builder.rs @@ -251,16 +251,12 @@ impl<'a> MetricBuilder<'a> { timestamp } + /// Consumes self and creates a new `PruningMetrics` pub fn pruning_metrics( self, name: impl Into>, partition: usize, ) -> PruningMetrics { - // let count = Count::new(); - // self.with_partition(partition) - // .build(MetricValue::OutputBytes(count.clone())); - // count - let pruning_metrics = PruningMetrics::new(); self.with_partition(partition) .build(MetricValue::PruningMetrics { diff --git a/datafusion/physical-plan/src/metrics/value.rs b/datafusion/physical-plan/src/metrics/value.rs index 0218f10702a6..0e8f806de415 100644 --- a/datafusion/physical-plan/src/metrics/value.rs +++ b/datafusion/physical-plan/src/metrics/value.rs @@ -390,7 +390,7 @@ impl Default for PruningMetrics { } impl PruningMetrics { - /// create a new counter + /// create a new PruningMetrics pub fn new() -> Self { Self { pruned: Arc::new(AtomicUsize::new(0)), @@ -398,21 +398,21 @@ impl PruningMetrics { } } - /// Add `n` to the metric's value + /// Add `n` to the metric's pruned value pub fn add_pruned(&self, n: usize) { // relaxed ordering for operations on `value` poses no issues // we're purely using atomic ops with no associated memory ops self.pruned.fetch_add(n, Ordering::Relaxed); } - /// Add `n` to the metric's value + /// Add `n` to the metric's matched value pub fn add_matched(&self, n: usize) { // relaxed ordering for operations on `value` poses no issues // we're purely using atomic ops with no associated memory ops self.matched.fetch_add(n, Ordering::Relaxed); } - /// Subtract `n` to the metric's value. + /// Subtract `n` to the metric's matched value. pub fn subtract_matched(&self, n: usize) { // relaxed ordering for operations on `value` poses no issues // we're purely using atomic ops with no associated memory ops From 5d32fff05dd2d991060ecfd5c0c9c75dd5766d3a Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Mon, 27 Oct 2025 20:47:52 +0800 Subject: [PATCH 3/3] clippy + review feedback --- datafusion/core/tests/parquet/mod.rs | 2 +- datafusion/physical-plan/src/metrics/value.rs | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 29eaef7cef4c..34a48cdae374 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -37,7 +37,7 @@ use datafusion::{ prelude::{ParquetReadOptions, SessionConfig, SessionContext}, }; use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder}; -use datafusion_physical_plan::{metrics::MetricValue, Metric}; +use datafusion_physical_plan::metrics::MetricValue; use parquet::arrow::ArrowWriter; use parquet::file::properties::{EnabledStatistics, WriterProperties}; use std::sync::Arc; diff --git a/datafusion/physical-plan/src/metrics/value.rs b/datafusion/physical-plan/src/metrics/value.rs index 0e8f806de415..3b8aa7a2bd34 100644 --- a/datafusion/physical-plan/src/metrics/value.rs +++ b/datafusion/physical-plan/src/metrics/value.rs @@ -622,10 +622,9 @@ impl MetricValue { .map(|nanos| nanos as usize) .unwrap_or(0), // This function is a utility for aggregating metrics, for complex metric - // like `PruningMetrics`, implement it inside `MetricsSet` directly. - Self::PruningMetrics { .. } => { - unreachable!() - } + // like `PruningMetrics`, this function is not supposed to get called. + // Metrics aggregation for them are implemented inside `MetricsSet` directly. + Self::PruningMetrics { .. } => 0, Self::Custom { value, .. } => value.as_usize(), } }