diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs index c0ee6da4884a..1dc634b779a5 100644 --- a/datafusion/core/src/physical_plan/filter.rs +++ b/datafusion/core/src/physical_plan/filter.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use super::expressions::PhysicalSortExpr; -use super::{RecordBatchStream, SendableRecordBatchStream, Statistics}; +use super::{ColumnStatistics, RecordBatchStream, SendableRecordBatchStream, Statistics}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, @@ -172,24 +172,46 @@ impl ExecutionPlan for FilterExec { /// predicate's selectivity value can be determined for the incoming data. fn statistics(&self) -> Statistics { let input_stats = self.input.statistics(); - let analysis_ctx = + let starter_ctx = AnalysisContext::from_statistics(self.input.schema().as_ref(), &input_stats); - let predicate_selectivity = self - .predicate - .boundaries(&analysis_ctx) - .and_then(|bounds| bounds.selectivity); - - match predicate_selectivity { - Some(selectivity) => Statistics { - num_rows: input_stats - .num_rows - .map(|num_rows| (num_rows as f64 * selectivity).ceil() as usize), - total_byte_size: input_stats.total_byte_size.map(|total_byte_size| { - (total_byte_size as f64 * selectivity).ceil() as usize - }), - ..Default::default() - }, + let analysis_ctx = self.predicate.analyze(starter_ctx); + + match analysis_ctx.boundaries { + Some(boundaries) => { + // Build back the column level statistics from the boundaries inside the + // analysis context. It is possible that these are going to be different + // than the input statistics, especially when a comparison is made inside + // the predicate expression (e.g. `col1 > 100`). + let column_statistics = analysis_ctx + .column_boundaries + .iter() + .map(|boundary| match boundary { + Some(boundary) => ColumnStatistics { + min_value: Some(boundary.min_value.clone()), + max_value: Some(boundary.max_value.clone()), + ..Default::default() + }, + None => ColumnStatistics::default(), + }) + .collect(); + + Statistics { + num_rows: input_stats.num_rows.zip(boundaries.selectivity).map( + |(num_rows, selectivity)| { + (num_rows as f64 * selectivity).ceil() as usize + }, + ), + total_byte_size: input_stats + .total_byte_size + .zip(boundaries.selectivity) + .map(|(num_rows, selectivity)| { + (num_rows as f64 * selectivity).ceil() as usize + }), + column_statistics: Some(column_statistics), + ..Default::default() + } + } None => Statistics::default(), } } @@ -452,9 +474,6 @@ mod tests { } #[tokio::test] - #[ignore] - // This test requires propagation of column boundaries from the comparison analysis - // to the analysis context. This is not yet implemented. async fn test_filter_statistics_column_level_basic_expr() -> Result<()> { // Table: // a: min=1, max=100 @@ -481,6 +500,8 @@ mod tests { Arc::new(FilterExec::try_new(predicate, input)?); let statistics = filter.statistics(); + + // a must be in [1, 25] range now! assert_eq!(statistics.num_rows, Some(25)); assert_eq!( statistics.column_statistics, @@ -494,6 +515,126 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_filter_statistics_column_level_nested() -> Result<()> { + // Table: + // a: min=1, max=100 + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Some(100), + column_statistics: Some(vec![ColumnStatistics { + min_value: Some(ScalarValue::Int32(Some(1))), + max_value: Some(ScalarValue::Int32(Some(100))), + ..Default::default() + }]), + ..Default::default() + }, + schema.clone(), + )); + + // WHERE a <= 25 + let sub_filter: Arc = Arc::new(FilterExec::try_new( + binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?, + input, + )?); + + // Nested filters (two separate physical plans, instead of AND chain in the expr) + // WHERE a >= 10 + // WHERE a <= 25 + let filter: Arc = Arc::new(FilterExec::try_new( + binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?, + sub_filter, + )?); + + let statistics = filter.statistics(); + assert_eq!(statistics.num_rows, Some(16)); + assert_eq!( + statistics.column_statistics, + Some(vec![ColumnStatistics { + min_value: Some(ScalarValue::Int32(Some(10))), + max_value: Some(ScalarValue::Int32(Some(25))), + ..Default::default() + }]) + ); + + Ok(()) + } + + #[tokio::test] + async fn test_filter_statistics_column_level_nested_multiple() -> Result<()> { + // Table: + // a: min=1, max=100 + // b: min=1, max=50 + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Some(100), + column_statistics: Some(vec![ + ColumnStatistics { + min_value: Some(ScalarValue::Int32(Some(1))), + max_value: Some(ScalarValue::Int32(Some(100))), + ..Default::default() + }, + ColumnStatistics { + min_value: Some(ScalarValue::Int32(Some(1))), + max_value: Some(ScalarValue::Int32(Some(50))), + ..Default::default() + }, + ]), + ..Default::default() + }, + schema.clone(), + )); + + // WHERE a <= 25 + let a_lte_25: Arc = Arc::new(FilterExec::try_new( + binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?, + input, + )?); + + // WHERE b > 45 + let b_gt_5: Arc = Arc::new(FilterExec::try_new( + binary(col("b", &schema)?, Operator::Gt, lit(45i32), &schema)?, + a_lte_25, + )?); + + // WHERE a >= 10 + let filter: Arc = Arc::new(FilterExec::try_new( + binary(col("a", &schema)?, Operator::GtEq, lit(10i32), &schema)?, + b_gt_5, + )?); + + let statistics = filter.statistics(); + // On a uniform distribution, only fifteen rows will satisfy the + // filter that 'a' proposed (a >= 10 AND a <= 25) (15/100) and only + // 5 rows will satisfy the filter that 'b' proposed (b > 45) (5/50). + // + // Which would result with a selectivity of '15/100 * 5/50' or 0.015 + // and that means about %1.5 of the all rows (rounded up to 2 rows). + assert_eq!(statistics.num_rows, Some(2)); + assert_eq!( + statistics.column_statistics, + Some(vec![ + ColumnStatistics { + min_value: Some(ScalarValue::Int32(Some(10))), + max_value: Some(ScalarValue::Int32(Some(25))), + ..Default::default() + }, + ColumnStatistics { + min_value: Some(ScalarValue::Int32(Some(45))), + max_value: Some(ScalarValue::Int32(Some(50))), + ..Default::default() + } + ]) + ); + + Ok(()) + } + #[tokio::test] async fn test_filter_statistics_when_input_stats_missing() -> Result<()> { // Table: diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 83bc8087aed9..1b2c303c2f08 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -71,8 +71,9 @@ use kernels_arrow::{ use arrow::datatypes::{DataType, Schema, TimeUnit}; use arrow::record_batch::RecordBatch; +use super::column::Column; use crate::physical_expr::down_cast_any_ref; -use crate::{AnalysisContext, ExprBoundaries, PhysicalExpr}; +use crate::{analysis_expect, AnalysisContext, ExprBoundaries, PhysicalExpr}; use datafusion_common::cast::{as_boolean_array, as_decimal128_array}; use datafusion_common::ScalarValue; use datafusion_common::{DataFusionError, Result}; @@ -658,7 +659,7 @@ impl PhysicalExpr for BinaryExpr { } /// Return the boundaries of this binary expression's result. - fn boundaries(&self, context: &AnalysisContext) -> Option { + fn analyze(&self, context: AnalysisContext) -> AnalysisContext { match &self.op { Operator::Eq | Operator::Gt @@ -667,26 +668,42 @@ impl PhysicalExpr for BinaryExpr { | Operator::GtEq => { // We currently only support comparison when we know at least one of the sides are // a known value (a scalar). This includes predicates like a > 20 or 5 > a. - let left_boundaries = self.left.boundaries(context)?; - let right_boundaries = self.right.boundaries(context)?; - let (op, left, right) = match right_boundaries.reduce() { - Some(right_value) => { + let context = self.left.analyze(context); + let left_boundaries = + analysis_expect!(context, context.boundaries()).clone(); + + let context = self.right.analyze(context); + let right_boundaries = + analysis_expect!(context, context.boundaries.clone()); + + match (left_boundaries.reduce(), right_boundaries.reduce()) { + (_, Some(right_value)) => { // We know the right side is a scalar, so we can use the operator as is - (self.op, self.left.clone(), right_value) + analyze_expr_scalar_comparison( + context, + &self.op, + &self.left, + right_value, + ) } - None => { + (Some(left_value), _) => { // If not, we have to swap the operator and left/right (since this means // left has to be a scalar). - ( - self.op.swap()?, - self.right.clone(), - left_boundaries.reduce()?, + let swapped_op = analysis_expect!(context, self.op.swap()); + analyze_expr_scalar_comparison( + context, + &swapped_op, + &self.right, + left_value, ) } - }; - analyze_expr_scalar_comparison(&op, context, &left, right) + _ => { + // Both sides are columns, so we give up. + context.with_boundaries(None) + } + } } - _ => None, + _ => context.with_boundaries(None), } } } @@ -702,14 +719,16 @@ impl PartialEq for BinaryExpr { // Analyze the comparison between an expression (on the left) and a scalar value // (on the right). The new boundaries will indicate whether it is always true, always -// false, or unknown (with a probablistic selectivity value attached). +// false, or unknown (with a probablistic selectivity value attached). This operation +// will also include the new upper/lower boundaries for the operand on the left if +// they can be determined. fn analyze_expr_scalar_comparison( + context: AnalysisContext, op: &Operator, - context: &AnalysisContext, left: &Arc, right: ScalarValue, -) -> Option { - let left_bounds = left.boundaries(context)?; +) -> AnalysisContext { + let left_bounds = analysis_expect!(context, left.analyze(context.clone()).boundaries); let left_min = left_bounds.min_value; let left_max = left_bounds.max_value; @@ -737,8 +756,8 @@ fn analyze_expr_scalar_comparison( assert!(!(always_selects && never_selects)); let selectivity = match (always_selects, never_selects) { - (true, _) => Some(1.0), - (_, true) => Some(0.0), + (true, _) => 1.0, + (_, true) => 0.0, (false, false) => { // If there is a partial overlap, then we can estimate the selectivity // by computing the ratio of the existing overlap to the total range. Since we @@ -746,19 +765,65 @@ fn analyze_expr_scalar_comparison( // assumes a uniform distribution by default. // Our [min, max] is inclusive, so we need to add 1 to the difference. - let total_range = left_max.distance(&left_min)? + 1; - let overlap_between_boundaries = match op { - Operator::Lt => right.distance(&left_min)?, - Operator::Gt => left_max.distance(&right)?, - Operator::LtEq => right.distance(&left_min)? + 1, - Operator::GtEq => left_max.distance(&right)? + 1, - Operator::Eq => 1, + let total_range = analysis_expect!(context, left_max.distance(&left_min)) + 1; + let overlap_between_boundaries = analysis_expect!( + context, + match op { + Operator::Lt => right.distance(&left_min), + Operator::Gt => left_max.distance(&right), + Operator::LtEq => right.distance(&left_min).map(|dist| dist + 1), + Operator::GtEq => left_max.distance(&right).map(|dist| dist + 1), + Operator::Eq => Some(1), + _ => None, + } + ); + + overlap_between_boundaries as f64 / total_range as f64 + } + }; + + // The context represents all the knowledge we have gathered during the + // analysis process, which we can now add more since the expression's upper + // and lower boundaries might have changed. + let context = match left.as_any().downcast_ref::() { + Some(column_expr) => { + let (left_min, left_max) = match op { + // TODO: for lt/gt, we technically should shrink the possibility space + // by one since a < 5 means that 5 is not a possible value for `a`. However, + // it is currently tricky to do so (e.g. for floats, we can get away with 4.999 + // so we need a smarter logic to find out what is the closest value that is + // different from the scalar_value). + Operator::Lt | Operator::LtEq => { + // We only want to update the upper bound when we know it will help us (e.g. + // it is actually smaller than what we have right now) and it is a valid + // value (e.g. [0, 100] < -100 would update the boundaries to [0, -100] if + // there weren't the selectivity check). + if right < left_max && selectivity > 0.0 { + (left_min, right) + } else { + (left_min, left_max) + } + } + Operator::Gt | Operator::GtEq => { + // Same as above, but this time we want to limit the lower bound. + if right > left_min && selectivity > 0.0 { + (right, left_max) + } else { + (left_min, left_max) + } + } + // For equality, we don't have the range problem so even if the selectivity + // is 0.0, we can still update the boundaries. + Operator::Eq => (right.clone(), right), _ => unreachable!(), }; - Some(overlap_between_boundaries as f64 / total_range as f64) + let left_bounds = + ExprBoundaries::new(left_min, left_max, left_bounds.distinct_count); + context.with_column_update(column_expr.index(), left_bounds) } - }?; + None => context, + }; // The selectivity can't be be greater than 1.0. assert!(selectivity <= 1.0); @@ -769,12 +834,13 @@ fn analyze_expr_scalar_comparison( _ => (false, true, 2), }; - Some(ExprBoundaries::new_with_selectivity( + let result_boundaries = Some(ExprBoundaries::new_with_selectivity( ScalarValue::Boolean(Some(pred_min)), ScalarValue::Boolean(Some(pred_max)), Some(pred_distinct), Some(selectivity), - )) + )); + context.with_boundaries(result_boundaries) } /// unwrap underlying (non dictionary) value, if any, to pass to a scalar kernel @@ -3109,13 +3175,16 @@ mod tests { ((Operator::GtEq, 200), (0.0, 1, 100)), ]; - for ((operator, rhs), (exp_selectivity, _, _)) in cases { + for ((operator, rhs), (exp_selectivity, exp_min, exp_max)) in cases { let context = AnalysisContext::from_statistics(&schema, &statistics); let left = col("a", &schema).unwrap(); let right = ScalarValue::Int64(Some(rhs)); - let boundaries = - analyze_expr_scalar_comparison(&operator, &context, &left, right) - .expect("this case should not return None"); + let analysis_ctx = + analyze_expr_scalar_comparison(context, &operator, &left, right); + let boundaries = analysis_ctx + .boundaries + .as_ref() + .expect("Analysis must complete for this test!"); assert_eq!( boundaries @@ -3137,10 +3206,18 @@ mod tests { assert_eq!(boundaries.min_value, ScalarValue::Boolean(Some(false))); assert_eq!(boundaries.max_value, ScalarValue::Boolean(Some(true))); } + + // For getting the updated boundaries, we can simply analyze the LHS + // with the existing context. + let left_boundaries = left + .analyze(analysis_ctx) + .boundaries + .expect("this case should not return None"); + assert_eq!(left_boundaries.min_value, ScalarValue::Int64(Some(exp_min))); + assert_eq!(left_boundaries.max_value, ScalarValue::Int64(Some(exp_max))); } Ok(()) } - #[test] fn test_comparison_result_estimate_different_type() -> Result<()> { // A table where the column 'a' has a min of 1.3, a max of 50.7. @@ -3178,13 +3255,16 @@ mod tests { ((Operator::GtEq, 50.7), (1.0 / distance, 50.7, 50.7)), ]; - for ((operator, rhs), (exp_selectivity, _, _)) in cases { + for ((operator, rhs), (exp_selectivity, exp_min, exp_max)) in cases { let context = AnalysisContext::from_statistics(&schema, &statistics); let left = col("a", &schema).unwrap(); let right = ScalarValue::from(rhs); - let boundaries = - analyze_expr_scalar_comparison(&operator, &context, &left, right) - .expect("this case should not return None"); + let analysis_ctx = + analyze_expr_scalar_comparison(context, &operator, &left, right); + let boundaries = analysis_ctx + .clone() + .boundaries + .expect("Analysis must complete for this test!"); assert_eq!( boundaries @@ -3206,6 +3286,19 @@ mod tests { assert_eq!(boundaries.min_value, ScalarValue::from(false)); assert_eq!(boundaries.max_value, ScalarValue::from(true)); } + + let left_boundaries = left + .analyze(analysis_ctx) + .boundaries + .expect("this case should not return None"); + assert_eq!( + left_boundaries.min_value, + ScalarValue::Float64(Some(exp_min)) + ); + assert_eq!( + left_boundaries.max_value, + ScalarValue::Float64(Some(exp_max)) + ); } Ok(()) } @@ -3227,7 +3320,8 @@ mod tests { let context = AnalysisContext::from_statistics(&schema, &statistics); let predicate_boundaries = gt - .boundaries(&context) + .analyze(context) + .boundaries .expect("boundaries should not be None"); assert_eq!(predicate_boundaries.selectivity, Some(0.76)); @@ -3255,7 +3349,8 @@ mod tests { let context = AnalysisContext::from_statistics(&schema, &statistics); let predicate_boundaries = gt - .boundaries(&context) + .analyze(context) + .boundaries .expect("boundaries should not be None"); assert_eq!(predicate_boundaries.selectivity, Some(0.5)); diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs index 94e0dfbb8263..edbe6e27c6fd 100644 --- a/datafusion/physical-expr/src/expressions/column.rs +++ b/datafusion/physical-expr/src/expressions/column.rs @@ -26,7 +26,7 @@ use arrow::{ }; use crate::physical_expr::down_cast_any_ref; -use crate::{AnalysisContext, ExprBoundaries, PhysicalExpr}; +use crate::{AnalysisContext, PhysicalExpr}; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::ColumnarValue; @@ -104,9 +104,10 @@ impl PhysicalExpr for Column { } /// Return the boundaries of this column, if known. - fn boundaries(&self, context: &AnalysisContext) -> Option { + fn analyze(&self, context: AnalysisContext) -> AnalysisContext { assert!(self.index < context.column_boundaries.len()); - context.column_boundaries[self.index].clone() + let col_bounds = context.column_boundaries[self.index].clone(); + context.with_boundaries(col_bounds) } } @@ -317,8 +318,8 @@ mod test { for (name, index, expected) in cases { let col = Column::new(name, index); - let boundaries = col.boundaries(&context); - assert_eq!(boundaries, expected); + let test_ctx = col.analyze(context.clone()); + assert_eq!(test_ctx.boundaries, expected); } Ok(()) diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index 75d7829bb175..ea6f51027575 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -84,15 +84,14 @@ impl PhysicalExpr for Literal { Ok(self) } - #[allow(unused_variables)] /// Return the boundaries of this literal expression (which is the same as /// the value it represents). - fn boundaries(&self, context: &AnalysisContext) -> Option { - Some(ExprBoundaries::new( + fn analyze(&self, context: AnalysisContext) -> AnalysisContext { + context.with_boundaries(Some(ExprBoundaries::new( self.value.clone(), self.value.clone(), Some(1), - )) + ))) } } @@ -150,7 +149,8 @@ mod tests { let context = AnalysisContext::new(&schema, vec![]); let literal_expr = lit(42i32); - let boundaries = literal_expr.boundaries(&context).unwrap(); + let result_ctx = literal_expr.analyze(context); + let boundaries = result_ctx.boundaries.unwrap(); assert_eq!(boundaries.min_value, ScalarValue::Int32(Some(42))); assert_eq!(boundaries.max_value, ScalarValue::Int32(Some(42))); assert_eq!(boundaries.distinct_count, Some(1)); diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index 02b6eeb4efea..459ce8cd7b15 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -76,11 +76,10 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + PartialEq { children: Vec>, ) -> Result>; - #[allow(unused_variables)] /// Return the boundaries of this expression. This method (and all the /// related APIs) are experimental and subject to change. - fn boundaries(&self, context: &AnalysisContext) -> Option { - None + fn analyze(&self, context: AnalysisContext) -> AnalysisContext { + context } } @@ -91,6 +90,8 @@ pub struct AnalysisContext { /// A list of known column boundaries, ordered by the index /// of the column in the current schema. pub column_boundaries: Vec>, + // Result of the current analysis. + pub boundaries: Option, } impl AnalysisContext { @@ -99,7 +100,10 @@ impl AnalysisContext { column_boundaries: Vec>, ) -> Self { assert_eq!(input_schema.fields().len(), column_boundaries.len()); - Self { column_boundaries } + Self { + column_boundaries, + boundaries: None, + } } /// Create a new analysis context from column statistics. @@ -116,6 +120,26 @@ impl AnalysisContext { }; Self::new(input_schema, column_boundaries) } + + pub fn boundaries(&self) -> Option<&ExprBoundaries> { + self.boundaries.as_ref() + } + + /// Set the result of the current analysis. + pub fn with_boundaries(mut self, result: Option) -> Self { + self.boundaries = result; + self + } + + /// Update the boundaries of a column. + pub fn with_column_update( + mut self, + column: usize, + boundaries: ExprBoundaries, + ) -> Self { + self.column_boundaries[column] = Some(boundaries); + self + } } /// Represents the boundaries of the resulting value from a physical expression, @@ -267,6 +291,18 @@ fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result { Ok(make_array(data)) } +#[macro_export] +// If the given expression is None, return the given context +// without setting the boundaries. +macro_rules! analysis_expect { + ($context: ident, $expr: expr) => { + match $expr { + Some(expr) => expr, + None => return $context.with_boundaries(None), + } + }; +} + #[cfg(test)] mod tests { use std::sync::Arc;