Skip to content

Commit 9e9eb94

Browse files
2010YOUY01alamb
andauthored
feat: Better parquet row-group/page pruning metrics display (#18321)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #18299 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> See writeup in #18297 This PR is for the remaining metrics in `DataSourceExec` with parquet data source. ### Demo In datafusion-cli ``` CREATE EXTERNAL TABLE IF NOT EXISTS lineitem STORED AS parquet LOCATION '/Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem'; set datafusion.explain.analyze_level = summary; explain analyze select * from lineitem where l_orderkey = 3000000; +-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=5, elapsed_compute=48.677µs, output_bytes=1092.0 B] | | | FilterExec: l_orderkey@0 = 3000000, metrics=[output_rows=5, elapsed_compute=1.65872ms, output_bytes=530.8 KB] | | | DataSourceExec: file_groups={14 groups: [[Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet:0..11525426], [Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet:11525426..20311205, Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-1.parquet:0..2739647], [Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-1.parquet:2739647..14265073], [Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-1.parquet:14265073..20193593, Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-2.parquet:0..5596906], [Users/yongting/Code/datafusion/benchmarks/data/tpch_sf1/lineitem/part-2.parquet:5596906..17122332], ...]}, projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment], file_type=parquet, predicate=l_orderkey@0 = 3000000, pruning_predicate=l_orderkey_null_count@2 != row_count@3 AND l_orderkey_min@0 <= 3000000 AND 3000000 <= l_orderkey_max@1, required_guarantees=[l_orderkey in (3000000)], metrics=[output_rows=19813, elapsed_compute=14ns, output_bytes=5.7 MB, files_ranges_pruned_statistics=21 total → 3 matched, page_index_rows_pruned=748901 total → 19813 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, bytes_scanned=2147308, metadata_load_time=1.794289ms] | | | | +-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 1 row(s) fetched. Elapsed 0.081 seconds. ``` ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Update `row_groups_pruned_statistics`, `row_groups_pruned_bloom_filter`, `page_index_rows_pruned` with the new `PruningMetrics` metric type. The functional changes in the pr are in `datafusion/datasource-parquet/src/*`, it's only a few of lines, most changes are fixing tests. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> UTs are updated for the new metrics ## Are there any user-facing changes? No <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 0a650a0 commit 9e9eb94

File tree

13 files changed

+254
-166
lines changed

13 files changed

+254
-166
lines changed

datafusion-examples/examples/json_shredding.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ async fn main() -> Result<()> {
142142
.await?;
143143
let plan = format!("{}", arrow::util::pretty::pretty_format_batches(&batches)?);
144144
println!("{plan}");
145-
assert_contains!(&plan, "row_groups_pruned_statistics=1");
145+
assert_contains!(&plan, "row_groups_pruned_statistics=2 total → 1 matched");
146146
assert_contains!(&plan, "pushdown_rows_pruned=1");
147147

148148
Ok(())

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ mod tests {
6565
use datafusion_physical_plan::analyze::AnalyzeExec;
6666
use datafusion_physical_plan::collect;
6767
use datafusion_physical_plan::metrics::{
68-
ExecutionPlanMetricsSet, MetricType, MetricsSet,
68+
ExecutionPlanMetricsSet, MetricType, MetricValue, MetricsSet,
6969
};
7070
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
7171

@@ -1175,8 +1175,10 @@ mod tests {
11751175
// There are 4 rows pruned in each of batch2, batch3, and
11761176
// batch4 for a total of 12. batch1 had no pruning as c2 was
11771177
// filled in as null
1178-
assert_eq!(get_value(&metrics, "page_index_rows_pruned"), 12);
1179-
assert_eq!(get_value(&metrics, "page_index_rows_matched"), 6);
1178+
let (page_index_pruned, page_index_matched) =
1179+
get_pruning_metric(&metrics, "page_index_rows_pruned");
1180+
assert_eq!(page_index_pruned, 12);
1181+
assert_eq!(page_index_matched, 6);
11801182
}
11811183

11821184
#[tokio::test]
@@ -1776,8 +1778,10 @@ mod tests {
17761778
| 5 |
17771779
+-----+
17781780
"###);
1779-
assert_eq!(get_value(&metrics, "page_index_rows_pruned"), 4);
1780-
assert_eq!(get_value(&metrics, "page_index_rows_matched"), 2);
1781+
let (page_index_pruned, page_index_matched) =
1782+
get_pruning_metric(&metrics, "page_index_rows_pruned");
1783+
assert_eq!(page_index_pruned, 4);
1784+
assert_eq!(page_index_matched, 2);
17811785
assert!(
17821786
get_value(&metrics, "page_index_eval_time") > 0,
17831787
"no eval time in metrics: {metrics:#?}"
@@ -1866,8 +1870,10 @@ mod tests {
18661870
assert_contains!(&explain, "predicate=c1@0 != bar");
18671871

18681872
// there's a single row group, but we can check that it matched
1869-
// if no pruning was done this would be 0 instead of 1
1870-
assert_contains!(&explain, "row_groups_matched_statistics=1");
1873+
assert_contains!(
1874+
&explain,
1875+
"row_groups_pruned_statistics=1 total \u{2192} 1 matched"
1876+
);
18711877

18721878
// check the projection
18731879
assert_contains!(&explain, "projection=[c1]");
@@ -1898,8 +1904,10 @@ mod tests {
18981904

18991905
// When both matched and pruned are 0, it means that the pruning predicate
19001906
// was not used at all.
1901-
assert_contains!(&explain, "row_groups_matched_statistics=0");
1902-
assert_contains!(&explain, "row_groups_pruned_statistics=0");
1907+
assert_contains!(
1908+
&explain,
1909+
"row_groups_pruned_statistics=1 total \u{2192} 1 matched"
1910+
);
19031911

19041912
// But pushdown predicate should be present
19051913
assert_contains!(
@@ -1952,7 +1960,12 @@ mod tests {
19521960
/// Panics if no such metric.
19531961
fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
19541962
match metrics.sum_by_name(metric_name) {
1955-
Some(v) => v.as_usize(),
1963+
Some(v) => match v {
1964+
MetricValue::PruningMetrics {
1965+
pruning_metrics, ..
1966+
} => pruning_metrics.pruned(),
1967+
_ => v.as_usize(),
1968+
},
19561969
_ => {
19571970
panic!(
19581971
"Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}"
@@ -1961,6 +1974,20 @@ mod tests {
19611974
}
19621975
}
19631976

1977+
fn get_pruning_metric(metrics: &MetricsSet, metric_name: &str) -> (usize, usize) {
1978+
match metrics.sum_by_name(metric_name) {
1979+
Some(MetricValue::PruningMetrics {
1980+
pruning_metrics, ..
1981+
}) => (pruning_metrics.pruned(), pruning_metrics.matched()),
1982+
Some(_) => panic!(
1983+
"Metric '{metric_name}' is not a pruning metric in\n\n{metrics:#?}"
1984+
),
1985+
None => panic!(
1986+
"Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}"
1987+
),
1988+
}
1989+
}
1990+
19641991
fn populate_csv_partitions(
19651992
tmp_dir: &TempDir,
19661993
partition_count: usize,

datafusion/core/tests/parquet/external_access_plan.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use datafusion_common::{assert_contains, DFSchema};
3333
use datafusion_datasource_parquet::{ParquetAccessPlan, RowGroupAccess};
3434
use datafusion_execution::object_store::ObjectStoreUrl;
3535
use datafusion_expr::{col, lit, Expr};
36-
use datafusion_physical_plan::metrics::MetricsSet;
36+
use datafusion_physical_plan::metrics::{MetricValue, MetricsSet};
3737
use datafusion_physical_plan::ExecutionPlan;
3838

3939
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
@@ -178,12 +178,21 @@ async fn plan_and_filter() {
178178
.unwrap();
179179

180180
// Verify that row group pruning still happens for just that group
181-
let row_groups_pruned_statistics =
182-
metric_value(&parquet_metrics, "row_groups_pruned_statistics").unwrap();
183-
assert_eq!(
184-
row_groups_pruned_statistics, 1,
185-
"metrics : {parquet_metrics:#?}",
186-
);
181+
let row_groups_pruned_statistics = parquet_metrics
182+
.sum_by_name("row_groups_pruned_statistics")
183+
.unwrap();
184+
if let MetricValue::PruningMetrics {
185+
pruning_metrics, ..
186+
} = row_groups_pruned_statistics
187+
{
188+
assert_eq!(
189+
pruning_metrics.pruned(),
190+
1,
191+
"metrics : {parquet_metrics:#?}",
192+
);
193+
} else {
194+
unreachable!("metrics `row_groups_pruned_statistics` should exist")
195+
}
187196
}
188197

189198
#[tokio::test]

datafusion/core/tests/parquet/filter_pushdown.rs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
use arrow::compute::concat_batches;
3030
use arrow::record_batch::RecordBatch;
3131
use datafusion::physical_plan::collect;
32-
use datafusion::physical_plan::metrics::MetricsSet;
32+
use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
3333
use datafusion::prelude::{
3434
col, lit, lit_timestamp_nano, Expr, ParquetReadOptions, SessionContext,
3535
};
@@ -563,9 +563,9 @@ impl<'a> TestCase<'a> {
563563
}
564564
};
565565

566-
let page_index_rows_pruned = get_value(&metrics, "page_index_rows_pruned");
566+
let (page_index_rows_pruned, page_index_rows_matched) =
567+
get_pruning_metrics(&metrics, "page_index_rows_pruned");
567568
println!(" page_index_rows_pruned: {page_index_rows_pruned}");
568-
let page_index_rows_matched = get_value(&metrics, "page_index_rows_matched");
569569
println!(" page_index_rows_matched: {page_index_rows_matched}");
570570

571571
let page_index_filtering_expected = if scan_options.enable_page_index {
@@ -592,14 +592,29 @@ impl<'a> TestCase<'a> {
592592
}
593593
}
594594

595+
fn get_pruning_metrics(metrics: &MetricsSet, metric_name: &str) -> (usize, usize) {
596+
match metrics.sum_by_name(metric_name) {
597+
Some(MetricValue::PruningMetrics {
598+
pruning_metrics, ..
599+
}) => (pruning_metrics.pruned(), pruning_metrics.matched()),
600+
Some(_) => {
601+
panic!("Metric '{metric_name}' is not a pruning metric in\n\n{metrics:#?}")
602+
}
603+
None => panic!(
604+
"Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}"
605+
),
606+
}
607+
}
608+
595609
fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
596610
match metrics.sum_by_name(metric_name) {
611+
Some(MetricValue::PruningMetrics {
612+
pruning_metrics, ..
613+
}) => pruning_metrics.pruned(),
597614
Some(v) => v.as_usize(),
598-
_ => {
599-
panic!(
600-
"Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}"
601-
);
602-
}
615+
None => panic!(
616+
"Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}"
617+
),
603618
}
604619
}
605620

datafusion/core/tests/parquet/mod.rs

Lines changed: 54 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -126,79 +126,97 @@ struct TestOutput {
126126
impl TestOutput {
127127
/// retrieve the value of the named metric, if any
128128
fn metric_value(&self, metric_name: &str) -> Option<usize> {
129+
if let Some((pruned, _matched)) = self.pruning_metric(metric_name) {
130+
return Some(pruned);
131+
}
132+
129133
self.parquet_metrics
130134
.sum(|metric| metric.value().name() == metric_name)
131-
.map(|v| v.as_usize())
132-
}
133-
134-
/// The number of times the pruning predicate evaluation errors
135-
fn predicate_evaluation_errors(&self) -> Option<usize> {
136-
self.metric_value("predicate_evaluation_errors")
137-
}
138-
139-
/// The number of row_groups matched by bloom filter
140-
fn row_groups_matched_bloom_filter(&self) -> Option<usize> {
141-
self.metric_value("row_groups_matched_bloom_filter")
142-
}
143-
144-
/// The number of row_groups pruned by bloom filter
145-
fn row_groups_pruned_bloom_filter(&self) -> Option<usize> {
146-
self.metric_value("row_groups_pruned_bloom_filter")
147-
}
148-
149-
/// The number of row_groups matched by statistics
150-
fn row_groups_matched_statistics(&self) -> Option<usize> {
151-
self.metric_value("row_groups_matched_statistics")
152-
}
153-
154-
/// The number of row_groups pruned by statistics
155-
fn row_groups_pruned_statistics(&self) -> Option<usize> {
156-
self.metric_value("row_groups_pruned_statistics")
135+
.map(|v| match v {
136+
MetricValue::PruningMetrics {
137+
pruning_metrics, ..
138+
} => pruning_metrics.pruned(),
139+
_ => v.as_usize(),
140+
})
157141
}
158142

159-
/// Metric `files_ranges_pruned_statistics` tracks both pruned and matched count,
160-
/// for testing purpose, here it only aggregate the `pruned` count.
161-
fn files_ranges_pruned_statistics(&self) -> Option<usize> {
143+
fn pruning_metric(&self, metric_name: &str) -> Option<(usize, usize)> {
162144
let mut total_pruned = 0;
145+
let mut total_matched = 0;
163146
let mut found = false;
164147

165148
for metric in self.parquet_metrics.iter() {
166149
let metric = metric.as_ref();
167-
if metric.value().name() == "files_ranges_pruned_statistics" {
150+
if metric.value().name() == metric_name {
168151
if let MetricValue::PruningMetrics {
169152
pruning_metrics, ..
170153
} = metric.value()
171154
{
172155
total_pruned += pruning_metrics.pruned();
156+
total_matched += pruning_metrics.matched();
173157
found = true;
174158
}
175159
}
176160
}
177161

178162
if found {
179-
Some(total_pruned)
163+
Some((total_pruned, total_matched))
180164
} else {
181165
None
182166
}
183167
}
184168

169+
/// The number of times the pruning predicate evaluation errors
170+
fn predicate_evaluation_errors(&self) -> Option<usize> {
171+
self.metric_value("predicate_evaluation_errors")
172+
}
173+
174+
/// The number of row_groups pruned / matched by bloom filter
175+
fn row_groups_bloom_filter(&self) -> Option<(usize, usize)> {
176+
self.pruning_metric("row_groups_pruned_bloom_filter")
177+
}
178+
179+
/// The number of row_groups matched by statistics
180+
fn row_groups_matched_statistics(&self) -> Option<usize> {
181+
self.pruning_metric("row_groups_pruned_statistics")
182+
.map(|(_pruned, matched)| matched)
183+
}
184+
185+
/// The number of row_groups pruned by statistics
186+
fn row_groups_pruned_statistics(&self) -> Option<usize> {
187+
self.pruning_metric("row_groups_pruned_statistics")
188+
.map(|(pruned, _matched)| pruned)
189+
}
190+
191+
/// Metric `files_ranges_pruned_statistics` tracks both pruned and matched count,
192+
/// for testing purpose, here it only aggregate the `pruned` count.
193+
fn files_ranges_pruned_statistics(&self) -> Option<usize> {
194+
self.pruning_metric("files_ranges_pruned_statistics")
195+
.map(|(pruned, _matched)| pruned)
196+
}
197+
185198
/// The number of row_groups matched by bloom filter or statistics
199+
///
200+
/// E.g. starting with 10 row groups, statistics: 10 total -> 7 matched, bloom
201+
/// filter: 7 total -> 3 matched, this function returns 3 for the final matched
202+
/// count.
186203
fn row_groups_matched(&self) -> Option<usize> {
187-
self.row_groups_matched_bloom_filter()
188-
.zip(self.row_groups_matched_statistics())
189-
.map(|(a, b)| a + b)
204+
self.row_groups_bloom_filter()
205+
.map(|(_pruned, matched)| matched)
190206
}
191207

192208
/// The number of row_groups pruned
193209
fn row_groups_pruned(&self) -> Option<usize> {
194-
self.row_groups_pruned_bloom_filter()
210+
self.row_groups_bloom_filter()
211+
.map(|(pruned, _matched)| pruned)
195212
.zip(self.row_groups_pruned_statistics())
196213
.map(|(a, b)| a + b)
197214
}
198215

199216
/// The number of row pages pruned
200217
fn row_pages_pruned(&self) -> Option<usize> {
201-
self.metric_value("page_index_rows_pruned")
218+
self.pruning_metric("page_index_rows_pruned")
219+
.map(|(pruned, _matched)| pruned)
202220
}
203221

204222
fn description(&self) -> String {

0 commit comments

Comments
 (0)