Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use datafusion::{
prelude::{ParquetReadOptions, SessionConfig, SessionContext},
};
use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
use datafusion_physical_plan::metrics::MetricValue;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::{EnabledStatistics, WriterProperties};
use std::sync::Arc;
Expand Down Expand Up @@ -155,8 +156,30 @@ impl TestOutput {
self.metric_value("row_groups_pruned_statistics")
}

/// Metric `files_ranges_pruned_statistics` tracks both pruned and matched count,
/// for testing purpose, here it only aggregate the `pruned` count.
fn files_ranges_pruned_statistics(&self) -> Option<usize> {
self.metric_value("files_ranges_pruned_statistics")
let mut total_pruned = 0;
let mut found = false;

for metric in self.parquet_metrics.iter() {
let metric = metric.as_ref();
if metric.value().name() == "files_ranges_pruned_statistics" {
if let MetricValue::PruningMetrics {
pruning_metrics, ..
} = metric.value()
{
total_pruned += pruning_metrics.pruned();
found = true;
}
}
}

if found {
Some(total_pruned)
} else {
None
}
}

/// The number of row_groups matched by bloom filter or statistics
Expand Down
31 changes: 31 additions & 0 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,37 @@ async fn explain_analyze_level_datasource_parquet() {
}
}

#[tokio::test]
async fn explain_analyze_parquet_pruning_metrics() {
let table_name = "tpch_lineitem_small";
let parquet_path = "tests/data/tpch_lineitem_small.parquet";
let ctx = SessionContext::new();
ctx.register_parquet(table_name, parquet_path, ParquetReadOptions::default())
.await
.expect("register parquet table for explain analyze test");

// Test scenario:
// This table's l_orderkey has range [1, 7]
// So the following query can't prune the file:
// select * from tpch_lineitem_small where l_orderkey = 5;
// If change filter to `l_orderkey=10`, the whole file can be pruned using stat.
for (l_orderkey, expected_pruning_metrics) in
[(5, "1 total → 1 matched"), (10, "1 total → 0 matched")]
{
let sql = format!(
"explain analyze select * from {table_name} where l_orderkey = {l_orderkey};"
);

let plan =
collect_plan_with_context(&sql, &ctx, ExplainAnalyzeLevel::Summary).await;

let expected_metrics =
format!("files_ranges_pruned_statistics={expected_pruning_metrics}");

assert_metrics!(&plan, "DataSourceExec", &expected_metrics);
}
}

#[tokio::test]
async fn csv_explain_plans() {
// This test verify the look of each plan in its full cycle plan creation
Expand Down
8 changes: 4 additions & 4 deletions datafusion/datasource-parquet/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use datafusion_physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, Time,
Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, PruningMetrics, Time,
};

/// Stores metrics about the parquet execution for a particular parquet file.
Expand All @@ -27,7 +27,7 @@ use datafusion_physical_plan::metrics::{
/// [`ParquetFileReaderFactory`]: super::ParquetFileReaderFactory
#[derive(Debug, Clone)]
pub struct ParquetFileMetrics {
/// Number of file **ranges** pruned by partition or file level statistics.
/// Number of file **ranges** pruned or matched by partition or file level statistics.
/// Pruning of files often happens at planning time but may happen at execution time
/// if dynamic filters (e.g. from a join) result in additional pruning.
///
Expand All @@ -41,7 +41,7 @@ pub struct ParquetFileMetrics {
/// pushdown optimization may fill up the TopK heap when reading the first part of a file,
/// then skip the second part if file statistics indicate it cannot contain rows
/// that would be in the TopK.
pub files_ranges_pruned_statistics: Count,
pub files_ranges_pruned_statistics: PruningMetrics,
/// Number of times the predicate could not be evaluated
pub predicate_evaluation_errors: Count,
/// Number of row groups whose bloom filters were checked and matched (not pruned)
Expand Down Expand Up @@ -132,7 +132,7 @@ impl ParquetFileMetrics {

let files_ranges_pruned_statistics = MetricBuilder::new(metrics)
.with_type(MetricType::SUMMARY)
.counter("files_ranges_pruned_statistics", partition);
.pruning_metrics("files_ranges_pruned_statistics", partition);

// -----------------------
// 'dev' level metrics
Expand Down
16 changes: 11 additions & 5 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::{
is_dynamic_physical_expr, PhysicalExpr,
};
use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder};
use datafusion_physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, MetricBuilder, PruningMetrics,
};
use datafusion_pruning::{build_pruning_predicate, FilePruner, PruningPredicate};

#[cfg(feature = "parquet_encryption")]
Expand Down Expand Up @@ -195,11 +197,13 @@ impl FileOpener for ParquetOpener {
if let Some(file_pruner) = &mut file_pruner {
if file_pruner.should_prune()? {
// Return an empty stream immediately to skip the work of setting up the actual stream
file_metrics.files_ranges_pruned_statistics.add(1);
file_metrics.files_ranges_pruned_statistics.add_pruned(1);
return Ok(futures::stream::empty().boxed());
}
}

file_metrics.files_ranges_pruned_statistics.add_matched(1);

// Don't load the page index yet. Since it is not stored inline in
// the footer, loading the page index if it is not needed will do
// unnecessary I/O. We decide later if it is needed to evaluate the
Expand Down Expand Up @@ -480,7 +484,7 @@ struct EarlyStoppingStream<S> {
/// None
done: bool,
file_pruner: FilePruner,
files_ranges_pruned_statistics: Count,
files_ranges_pruned_statistics: PruningMetrics,
/// The inner stream
inner: S,
}
Expand All @@ -489,7 +493,7 @@ impl<S> EarlyStoppingStream<S> {
pub fn new(
stream: S,
file_pruner: FilePruner,
files_ranges_pruned_statistics: Count,
files_ranges_pruned_statistics: PruningMetrics,
) -> Self {
Self {
done: false,
Expand All @@ -509,7 +513,9 @@ where
// Since dynamic filters may have been updated, see if we can stop
// reading this stream entirely.
if self.file_pruner.should_prune()? {
self.files_ranges_pruned_statistics.add(1);
self.files_ranges_pruned_statistics.add_pruned(1);
// Previously this file range has been counted as matched
self.files_ranges_pruned_statistics.subtract_matched(1);
self.done = true;
Ok(None)
} else {
Expand Down
18 changes: 17 additions & 1 deletion datafusion/physical-plan/src/metrics/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use std::{borrow::Cow, sync::Arc};

use crate::metrics::MetricType;
use crate::metrics::{value::PruningMetrics, MetricType};

use super::{
Count, ExecutionPlanMetricsSet, Gauge, Label, Metric, MetricValue, Time, Timestamp,
Expand Down Expand Up @@ -250,4 +250,20 @@ impl<'a> MetricBuilder<'a> {
.build(MetricValue::EndTimestamp(timestamp.clone()));
timestamp
}

/// Consumes self and creates a new `PruningMetrics`
pub fn pruning_metrics(
self,
name: impl Into<Cow<'static, str>>,
partition: usize,
) -> PruningMetrics {
let pruning_metrics = PruningMetrics::new();
self.with_partition(partition)
.build(MetricValue::PruningMetrics {
name: name.into(),
// inner values will be `Arc::clone()`
pruning_metrics: pruning_metrics.clone(),
});
pruning_metrics
}
}
5 changes: 4 additions & 1 deletion datafusion/physical-plan/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ use datafusion_common::HashMap;
pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics, SplitMetrics};
pub use builder::MetricBuilder;
pub use custom::CustomMetricValue;
pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp};
pub use value::{
Count, Gauge, MetricValue, PruningMetrics, ScopedTimerGuard, Time, Timestamp,
};

/// Something that tracks a value of interest (metric) of a DataFusion
/// [`ExecutionPlan`] execution.
Expand Down Expand Up @@ -302,6 +304,7 @@ impl MetricsSet {
MetricValue::Gauge { name, .. } => name == metric_name,
MetricValue::StartTimestamp(_) => false,
MetricValue::EndTimestamp(_) => false,
MetricValue::PruningMetrics { .. } => false,
MetricValue::Custom { .. } => false,
})
}
Expand Down
Loading