Skip to content

Commit 195415d

Browse files
authored
feat: Introduce PruningMetrics and use it in parquet file pruning metric (#18297)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> part of #18195 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Make pruning related metrics display nicer. Before: `metrics=[...files_ranges_matched_statistics=3, files_ranges_pruned_statistics=7...]` PR: `metrics=[...files_ranges_pruned_statistics=10 total → 3 matched...]` ### Demo with `datafusion-cli` ``` CREATE EXTERNAL TABLE IF NOT EXISTS lineitem STORED AS parquet LOCATION '/Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem'; set datafusion.explain.analyze_level = summary; explain analyze select * from lineitem where l_orderkey = 3000000; +-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=5, elapsed_compute=384.635µs, output_bytes=1092.0 B] | | | FilterExec: l_orderkey@0 = 3000000, metrics=[output_rows=5, elapsed_compute=1.303305ms, output_bytes=530.8 KB] | | | DataSourceExec: file_groups={14 groups: [[Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet:0..11525426], [Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet:11525426..20311205, Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-1.parquet:0..2739647], [Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-1.parquet:2739647..14265073], [Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-1.parquet:14265073..20193593, Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-2.parquet:0..5596906], [Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-2.parquet:5596906..17122332], ...]}, projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment], file_type=parquet, predicate=l_orderkey@0 = 3000000, pruning_predicate=l_orderkey_null_count@2 != row_count@3 AND l_orderkey_min@0 <= 3000000 AND 3000000 <= l_orderkey_max@1, required_guarantees=[l_orderkey in (3000000)], metrics=[output_rows=19813, elapsed_compute=14ns, output_bytes=5.7 MB, files_ranges_pruned_statistics=21 total → 3 matched, bytes_scanned=2147308, page_index_rows_matched=19813, page_index_rows_pruned=729088, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, metadata_load_time=1.167622ms] | | | | +-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 1 row(s) fetched. Elapsed 0.051 seconds. ``` ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> 1. Introduce `PruningMetrics` metrics type 2. Update `files_ranges_pruned_metrics` with this new metric type. Note this is applicable to other 6 metrics for different row group/page level pruning in parquet scanner, they're not included here to keep this PR easier to review. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 4. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> UT ## Are there any user-facing changes? No <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent b579e60 commit 195415d

File tree

7 files changed

+205
-23
lines changed

7 files changed

+205
-23
lines changed

datafusion/core/tests/parquet/mod.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use datafusion::{
3737
prelude::{ParquetReadOptions, SessionConfig, SessionContext},
3838
};
3939
use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
40+
use datafusion_physical_plan::metrics::MetricValue;
4041
use parquet::arrow::ArrowWriter;
4142
use parquet::file::properties::{EnabledStatistics, WriterProperties};
4243
use std::sync::Arc;
@@ -155,8 +156,30 @@ impl TestOutput {
155156
self.metric_value("row_groups_pruned_statistics")
156157
}
157158

159+
/// Metric `files_ranges_pruned_statistics` tracks both pruned and matched count,
160+
/// for testing purpose, here it only aggregate the `pruned` count.
158161
fn files_ranges_pruned_statistics(&self) -> Option<usize> {
159-
self.metric_value("files_ranges_pruned_statistics")
162+
let mut total_pruned = 0;
163+
let mut found = false;
164+
165+
for metric in self.parquet_metrics.iter() {
166+
let metric = metric.as_ref();
167+
if metric.value().name() == "files_ranges_pruned_statistics" {
168+
if let MetricValue::PruningMetrics {
169+
pruning_metrics, ..
170+
} = metric.value()
171+
{
172+
total_pruned += pruning_metrics.pruned();
173+
found = true;
174+
}
175+
}
176+
}
177+
178+
if found {
179+
Some(total_pruned)
180+
} else {
181+
None
182+
}
160183
}
161184

162185
/// The number of row_groups matched by bloom filter or statistics

datafusion/core/tests/sql/explain_analyze.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,37 @@ async fn explain_analyze_level_datasource_parquet() {
257257
}
258258
}
259259

260+
#[tokio::test]
261+
async fn explain_analyze_parquet_pruning_metrics() {
262+
let table_name = "tpch_lineitem_small";
263+
let parquet_path = "tests/data/tpch_lineitem_small.parquet";
264+
let ctx = SessionContext::new();
265+
ctx.register_parquet(table_name, parquet_path, ParquetReadOptions::default())
266+
.await
267+
.expect("register parquet table for explain analyze test");
268+
269+
// Test scenario:
270+
// This table's l_orderkey has range [1, 7]
271+
// So the following query can't prune the file:
272+
// select * from tpch_lineitem_small where l_orderkey = 5;
273+
// If change filter to `l_orderkey=10`, the whole file can be pruned using stat.
274+
for (l_orderkey, expected_pruning_metrics) in
275+
[(5, "1 total → 1 matched"), (10, "1 total → 0 matched")]
276+
{
277+
let sql = format!(
278+
"explain analyze select * from {table_name} where l_orderkey = {l_orderkey};"
279+
);
280+
281+
let plan =
282+
collect_plan_with_context(&sql, &ctx, ExplainAnalyzeLevel::Summary).await;
283+
284+
let expected_metrics =
285+
format!("files_ranges_pruned_statistics={expected_pruning_metrics}");
286+
287+
assert_metrics!(&plan, "DataSourceExec", &expected_metrics);
288+
}
289+
}
290+
260291
#[tokio::test]
261292
async fn csv_explain_plans() {
262293
// This test verify the look of each plan in its full cycle plan creation

datafusion/datasource-parquet/src/metrics.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use datafusion_physical_plan::metrics::{
19-
Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, Time,
19+
Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, PruningMetrics, Time,
2020
};
2121

2222
/// Stores metrics about the parquet execution for a particular parquet file.
@@ -27,7 +27,7 @@ use datafusion_physical_plan::metrics::{
2727
/// [`ParquetFileReaderFactory`]: super::ParquetFileReaderFactory
2828
#[derive(Debug, Clone)]
2929
pub struct ParquetFileMetrics {
30-
/// Number of file **ranges** pruned by partition or file level statistics.
30+
/// Number of file **ranges** pruned or matched by partition or file level statistics.
3131
/// Pruning of files often happens at planning time but may happen at execution time
3232
/// if dynamic filters (e.g. from a join) result in additional pruning.
3333
///
@@ -41,7 +41,7 @@ pub struct ParquetFileMetrics {
4141
/// pushdown optimization may fill up the TopK heap when reading the first part of a file,
4242
/// then skip the second part if file statistics indicate it cannot contain rows
4343
/// that would be in the TopK.
44-
pub files_ranges_pruned_statistics: Count,
44+
pub files_ranges_pruned_statistics: PruningMetrics,
4545
/// Number of times the predicate could not be evaluated
4646
pub predicate_evaluation_errors: Count,
4747
/// Number of row groups whose bloom filters were checked and matched (not pruned)
@@ -132,7 +132,7 @@ impl ParquetFileMetrics {
132132

133133
let files_ranges_pruned_statistics = MetricBuilder::new(metrics)
134134
.with_type(MetricType::SUMMARY)
135-
.counter("files_ranges_pruned_statistics", partition);
135+
.pruning_metrics("files_ranges_pruned_statistics", partition);
136136

137137
// -----------------------
138138
// 'dev' level metrics

datafusion/datasource-parquet/src/opener.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
4040
use datafusion_physical_expr_common::physical_expr::{
4141
is_dynamic_physical_expr, PhysicalExpr,
4242
};
43-
use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder};
43+
use datafusion_physical_plan::metrics::{
44+
Count, ExecutionPlanMetricsSet, MetricBuilder, PruningMetrics,
45+
};
4446
use datafusion_pruning::{build_pruning_predicate, FilePruner, PruningPredicate};
4547

4648
#[cfg(feature = "parquet_encryption")]
@@ -195,11 +197,13 @@ impl FileOpener for ParquetOpener {
195197
if let Some(file_pruner) = &mut file_pruner {
196198
if file_pruner.should_prune()? {
197199
// Return an empty stream immediately to skip the work of setting up the actual stream
198-
file_metrics.files_ranges_pruned_statistics.add(1);
200+
file_metrics.files_ranges_pruned_statistics.add_pruned(1);
199201
return Ok(futures::stream::empty().boxed());
200202
}
201203
}
202204

205+
file_metrics.files_ranges_pruned_statistics.add_matched(1);
206+
203207
// Don't load the page index yet. Since it is not stored inline in
204208
// the footer, loading the page index if it is not needed will do
205209
// unnecessary I/O. We decide later if it is needed to evaluate the
@@ -480,7 +484,7 @@ struct EarlyStoppingStream<S> {
480484
/// None
481485
done: bool,
482486
file_pruner: FilePruner,
483-
files_ranges_pruned_statistics: Count,
487+
files_ranges_pruned_statistics: PruningMetrics,
484488
/// The inner stream
485489
inner: S,
486490
}
@@ -489,7 +493,7 @@ impl<S> EarlyStoppingStream<S> {
489493
pub fn new(
490494
stream: S,
491495
file_pruner: FilePruner,
492-
files_ranges_pruned_statistics: Count,
496+
files_ranges_pruned_statistics: PruningMetrics,
493497
) -> Self {
494498
Self {
495499
done: false,
@@ -509,7 +513,9 @@ where
509513
// Since dynamic filters may have been updated, see if we can stop
510514
// reading this stream entirely.
511515
if self.file_pruner.should_prune()? {
512-
self.files_ranges_pruned_statistics.add(1);
516+
self.files_ranges_pruned_statistics.add_pruned(1);
517+
// Previously this file range has been counted as matched
518+
self.files_ranges_pruned_statistics.subtract_matched(1);
513519
self.done = true;
514520
Ok(None)
515521
} else {

datafusion/physical-plan/src/metrics/builder.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
2020
use std::{borrow::Cow, sync::Arc};
2121

22-
use crate::metrics::MetricType;
22+
use crate::metrics::{value::PruningMetrics, MetricType};
2323

2424
use super::{
2525
Count, ExecutionPlanMetricsSet, Gauge, Label, Metric, MetricValue, Time, Timestamp,
@@ -250,4 +250,20 @@ impl<'a> MetricBuilder<'a> {
250250
.build(MetricValue::EndTimestamp(timestamp.clone()));
251251
timestamp
252252
}
253+
254+
/// Consumes self and creates a new `PruningMetrics`
255+
pub fn pruning_metrics(
256+
self,
257+
name: impl Into<Cow<'static, str>>,
258+
partition: usize,
259+
) -> PruningMetrics {
260+
let pruning_metrics = PruningMetrics::new();
261+
self.with_partition(partition)
262+
.build(MetricValue::PruningMetrics {
263+
name: name.into(),
264+
// inner values will be `Arc::clone()`
265+
pruning_metrics: pruning_metrics.clone(),
266+
});
267+
pruning_metrics
268+
}
253269
}

datafusion/physical-plan/src/metrics/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ use datafusion_common::HashMap;
3535
pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics, SplitMetrics};
3636
pub use builder::MetricBuilder;
3737
pub use custom::CustomMetricValue;
38-
pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp};
38+
pub use value::{
39+
Count, Gauge, MetricValue, PruningMetrics, ScopedTimerGuard, Time, Timestamp,
40+
};
3941

4042
/// Something that tracks a value of interest (metric) of a DataFusion
4143
/// [`ExecutionPlan`] execution.
@@ -302,6 +304,7 @@ impl MetricsSet {
302304
MetricValue::Gauge { name, .. } => name == metric_name,
303305
MetricValue::StartTimestamp(_) => false,
304306
MetricValue::EndTimestamp(_) => false,
307+
MetricValue::PruningMetrics { .. } => false,
305308
MetricValue::Custom { .. } => false,
306309
})
307310
}

0 commit comments

Comments
 (0)