Skip to content

Commit

Permalink
Enrich filter statistics with known column boundaries (#4519)
Browse files Browse the repository at this point in the history
* Propagation of column boundary changes in subexpressions

* Mut-free re-implementation

* New test & minor code suggestions
  • Loading branch information
isidentical authored Dec 10, 2022
1 parent 2a9009e commit 8966eac
Show file tree
Hide file tree
Showing 5 changed files with 350 additions and 77 deletions.
181 changes: 161 additions & 20 deletions datafusion/core/src/physical_plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};

use super::expressions::PhysicalSortExpr;
use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
use super::{ColumnStatistics, RecordBatchStream, SendableRecordBatchStream, Statistics};
use crate::error::{DataFusionError, Result};
use crate::physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
Expand Down Expand Up @@ -172,24 +172,46 @@ impl ExecutionPlan for FilterExec {
/// predicate's selectivity value can be determined for the incoming data.
fn statistics(&self) -> Statistics {
let input_stats = self.input.statistics();
let analysis_ctx =
let starter_ctx =
AnalysisContext::from_statistics(self.input.schema().as_ref(), &input_stats);

let predicate_selectivity = self
.predicate
.boundaries(&analysis_ctx)
.and_then(|bounds| bounds.selectivity);

match predicate_selectivity {
Some(selectivity) => Statistics {
num_rows: input_stats
.num_rows
.map(|num_rows| (num_rows as f64 * selectivity).ceil() as usize),
total_byte_size: input_stats.total_byte_size.map(|total_byte_size| {
(total_byte_size as f64 * selectivity).ceil() as usize
}),
..Default::default()
},
let analysis_ctx = self.predicate.analyze(starter_ctx);

match analysis_ctx.boundaries {
Some(boundaries) => {
// Build back the column level statistics from the boundaries inside the
// analysis context. It is possible that these are going to be different
// than the input statistics, especially when a comparison is made inside
// the predicate expression (e.g. `col1 > 100`).
let column_statistics = analysis_ctx
.column_boundaries
.iter()
.map(|boundary| match boundary {
Some(boundary) => ColumnStatistics {
min_value: Some(boundary.min_value.clone()),
max_value: Some(boundary.max_value.clone()),
..Default::default()
},
None => ColumnStatistics::default(),
})
.collect();

Statistics {
num_rows: input_stats.num_rows.zip(boundaries.selectivity).map(
|(num_rows, selectivity)| {
(num_rows as f64 * selectivity).ceil() as usize
},
),
total_byte_size: input_stats
.total_byte_size
.zip(boundaries.selectivity)
.map(|(num_rows, selectivity)| {
(num_rows as f64 * selectivity).ceil() as usize
}),
column_statistics: Some(column_statistics),
..Default::default()
}
}
None => Statistics::default(),
}
}
Expand Down Expand Up @@ -452,9 +474,6 @@ mod tests {
}

#[tokio::test]
#[ignore]
// This test requires propagation of column boundaries from the comparison analysis
// to the analysis context. This is not yet implemented.
async fn test_filter_statistics_column_level_basic_expr() -> Result<()> {
// Table:
// a: min=1, max=100
Expand All @@ -481,6 +500,8 @@ mod tests {
Arc::new(FilterExec::try_new(predicate, input)?);

let statistics = filter.statistics();

// a must be in [1, 25] range now!
assert_eq!(statistics.num_rows, Some(25));
assert_eq!(
statistics.column_statistics,
Expand All @@ -494,6 +515,126 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_filter_statistics_column_level_nested() -> Result<()> {
// Table:
// a: min=1, max=100
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Some(100),
column_statistics: Some(vec![ColumnStatistics {
min_value: Some(ScalarValue::Int32(Some(1))),
max_value: Some(ScalarValue::Int32(Some(100))),
..Default::default()
}]),
..Default::default()
},
schema.clone(),
));

// WHERE a <= 25
let sub_filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
input,
)?);

// Nested filters (two separate physical plans, instead of AND chain in the expr)
// WHERE a >= 10
// WHERE a <= 25
let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
sub_filter,
)?);

let statistics = filter.statistics();
assert_eq!(statistics.num_rows, Some(16));
assert_eq!(
statistics.column_statistics,
Some(vec![ColumnStatistics {
min_value: Some(ScalarValue::Int32(Some(10))),
max_value: Some(ScalarValue::Int32(Some(25))),
..Default::default()
}])
);

Ok(())
}

#[tokio::test]
async fn test_filter_statistics_column_level_nested_multiple() -> Result<()> {
// Table:
// a: min=1, max=100
// b: min=1, max=50
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Some(100),
column_statistics: Some(vec![
ColumnStatistics {
min_value: Some(ScalarValue::Int32(Some(1))),
max_value: Some(ScalarValue::Int32(Some(100))),
..Default::default()
},
ColumnStatistics {
min_value: Some(ScalarValue::Int32(Some(1))),
max_value: Some(ScalarValue::Int32(Some(50))),
..Default::default()
},
]),
..Default::default()
},
schema.clone(),
));

// WHERE a <= 25
let a_lte_25: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?,
input,
)?);

// WHERE b > 45
let b_gt_5: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
binary(col("b", &schema)?, Operator::Gt, lit(45i32), &schema)?,
a_lte_25,
)?);

// WHERE a >= 10
let filter: Arc<dyn ExecutionPlan> = Arc::new(FilterExec::try_new(
binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?,
b_gt_5,
)?);

let statistics = filter.statistics();
// On a uniform distribution, only fifteen rows will satisfy the
// filter that 'a' proposed (a >= 10 AND a <= 25) (15/100) and only
// 5 rows will satisfy the filter that 'b' proposed (b > 45) (5/50).
//
// Which would result with a selectivity of '15/100 * 5/50' or 0.015
// and that means about %1.5 of the all rows (rounded up to 2 rows).
assert_eq!(statistics.num_rows, Some(2));
assert_eq!(
statistics.column_statistics,
Some(vec![
ColumnStatistics {
min_value: Some(ScalarValue::Int32(Some(10))),
max_value: Some(ScalarValue::Int32(Some(25))),
..Default::default()
},
ColumnStatistics {
min_value: Some(ScalarValue::Int32(Some(45))),
max_value: Some(ScalarValue::Int32(Some(50))),
..Default::default()
}
])
);

Ok(())
}

#[tokio::test]
async fn test_filter_statistics_when_input_stats_missing() -> Result<()> {
// Table:
Expand Down
Loading

0 comments on commit 8966eac

Please sign in to comment.