From 6bb02ab3be884b096ee60c293d1c20c2c10acd98 Mon Sep 17 00:00:00 2001 From: theirix Date: Mon, 18 Nov 2024 19:25:36 +0000 Subject: [PATCH 1/7] Avoid pushdown of volatile functions to tablescan --- datafusion/optimizer/src/push_down_filter.rs | 40 +++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 23cd46803c78d..307ff3674af0c 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -998,7 +998,20 @@ impl OptimizerRule for PushDownFilter { filter_predicates.len()); } - let zip = filter_predicates.into_iter().zip(results); + let zip = + filter_predicates + .into_iter() + .zip(results) + .map(|(&ref expr, res)| { + let filter_pushdown_type = if expr.is_volatile() { + // Do not push down predicate with volatile functions to scan + TableProviderFilterPushDown::Unsupported + } else { + res + }; + (expr, filter_pushdown_type) + }); + let new_scan_filters = zip .clone() @@ -3385,4 +3398,29 @@ Projection: a, b \n TableScan: test2"; assert_optimized_plan_eq(plan, expected) } + + #[test] + fn test_push_down_volatile_table_scan() -> Result<()> { + // SELECT test.a, test.b FROM test as t WHERE TestScalarUDF() > 0.1; + let table_scan = test_table_scan()?; + let fun = ScalarUDF::new_from_impl(TestScalarUDF { + signature: Signature::exact(vec![], Volatility::Volatile), + }); + let expr = Expr::ScalarFunction(ScalarFunction::new_udf(Arc::new(fun), vec![])); + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a"), col("b")])? + .filter(expr.gt(lit(0.1)))? + .build()?; + + let expected_before = "Filter: TestScalarUDF() > Float64(0.1)\ + \n Projection: test.a, test.b\ + \n TableScan: test"; + assert_eq!(format!("{plan}"), expected_before); + + let expected_after = "Projection: test.a, test.b\ + \n Filter: TestScalarUDF() > Float64(0.1)\ + \n TableScan: test"; + assert_optimized_plan_eq(plan, expected_after) + } + } From 79c75b3957f7a72c3d6602086535f08d993f303e Mon Sep 17 00:00:00 2001 From: theirix Date: Mon, 18 Nov 2024 21:20:42 +0000 Subject: [PATCH 2/7] Apply formatting and linting --- datafusion/optimizer/src/push_down_filter.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 307ff3674af0c..25eced8524301 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -1002,7 +1002,7 @@ impl OptimizerRule for PushDownFilter { filter_predicates .into_iter() .zip(results) - .map(|(&ref expr, res)| { + .map(|(expr, res)| { let filter_pushdown_type = if expr.is_volatile() { // Do not push down predicate with volatile functions to scan TableProviderFilterPushDown::Unsupported @@ -1012,7 +1012,6 @@ impl OptimizerRule for PushDownFilter { (expr, filter_pushdown_type) }); - let new_scan_filters = zip .clone() .filter(|(_, res)| res != &TableProviderFilterPushDown::Unsupported) @@ -3422,5 +3421,4 @@ Projection: a, b \n TableScan: test"; assert_optimized_plan_eq(plan, expected_after) } - } From 0d473cedd5d0940646e4a072c142108cc77f5917 Mon Sep 17 00:00:00 2001 From: theirix Date: Tue, 19 Nov 2024 20:03:18 +0000 Subject: [PATCH 3/7] Do not check for volatility for unsupported predicates --- datafusion/optimizer/src/push_down_filter.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 25eced8524301..187df3836d7ee 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -1002,14 +1002,18 @@ impl OptimizerRule for PushDownFilter { filter_predicates .into_iter() .zip(results) - .map(|(expr, res)| { - let filter_pushdown_type = if expr.is_volatile() { + .map(|(pred, res)| { + let filter_pushdown_type = if !matches!( + res, + TableProviderFilterPushDown::Unsupported + ) && pred.is_volatile() + { // Do not push down predicate with volatile functions to scan TableProviderFilterPushDown::Unsupported } else { res }; - (expr, filter_pushdown_type) + (pred, filter_pushdown_type) }); let new_scan_filters = zip From 5fd716f81f3044b2a9510170f91804d45351ec0c Mon Sep 17 00:00:00 2001 From: theirix Date: Wed, 20 Nov 2024 22:29:40 +0000 Subject: [PATCH 4/7] Check volatility before passing expressions to TableSource --- datafusion/optimizer/src/push_down_filter.rs | 44 ++++++++++---------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 187df3836d7ee..3c7d6ec4c6e40 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -988,38 +988,32 @@ impl OptimizerRule for PushDownFilter { LogicalPlan::Join(join) => push_down_join(join, Some(&filter.predicate)), LogicalPlan::TableScan(scan) => { let filter_predicates = split_conjunction(&filter.predicate); - let results = scan + + let (volatile_filters, non_volatile_filters): (Vec<&Expr>, Vec<&Expr>) = + filter_predicates + .into_iter() + .partition(|pred| pred.is_volatile()); + + // Check which non-volatile filters are supported by source + let supported_filters = scan .source - .supports_filters_pushdown(filter_predicates.as_slice())?; - if filter_predicates.len() != results.len() { + .supports_filters_pushdown(non_volatile_filters.as_slice())?; + if non_volatile_filters.len() != supported_filters.len() { return internal_err!( "Vec returned length: {} from supports_filters_pushdown is not the same size as the filters passed, which length is: {}", - results.len(), - filter_predicates.len()); + supported_filters.len(), + non_volatile_filters.len()); } - let zip = - filter_predicates - .into_iter() - .zip(results) - .map(|(pred, res)| { - let filter_pushdown_type = if !matches!( - res, - TableProviderFilterPushDown::Unsupported - ) && pred.is_volatile() - { - // Do not push down predicate with volatile functions to scan - TableProviderFilterPushDown::Unsupported - } else { - res - }; - (pred, filter_pushdown_type) - }); + // Compose scan filters from non-volatile filters of `Exact` or `Inexact` pushdown type + let zip = non_volatile_filters.into_iter().zip(supported_filters); let new_scan_filters = zip .clone() .filter(|(_, res)| res != &TableProviderFilterPushDown::Unsupported) .map(|(pred, _)| pred); + + // Add new scan filters let new_scan_filters: Vec = scan .filters .iter() @@ -1027,9 +1021,13 @@ impl OptimizerRule for PushDownFilter { .unique() .cloned() .collect(); + + // Compose predicates to be of `Unsupported` or `Inexact` pushdown type, and also include volatile filters let new_predicate: Vec = zip .filter(|(_, res)| res != &TableProviderFilterPushDown::Exact) - .map(|(pred, _)| pred.clone()) + .map(|(pred, _)| pred) + .chain(volatile_filters) + .cloned() .collect(); let new_scan = LogicalPlan::TableScan(TableScan { From 059ceddedeb38386bc20ecd800a30c15d8933f08 Mon Sep 17 00:00:00 2001 From: theirix Date: Wed, 20 Nov 2024 22:32:44 +0000 Subject: [PATCH 5/7] Document non-volatile expresions contract for supports_filters_pushdown --- datafusion/expr/src/table_source.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/expr/src/table_source.rs b/datafusion/expr/src/table_source.rs index bdb602d48dee5..e9a677de50c13 100644 --- a/datafusion/expr/src/table_source.rs +++ b/datafusion/expr/src/table_source.rs @@ -99,7 +99,7 @@ pub trait TableSource: Sync + Send { } /// Tests whether the table provider can make use of any or all filter expressions - /// to optimise data retrieval. + /// to optimise data retrieval. Only non-volatile expressions are passed to this function. fn supports_filters_pushdown( &self, filters: &[&Expr], From ca9a9d2ff960ed99549b046e93b823133ae02a5d Mon Sep 17 00:00:00 2001 From: theirix Date: Wed, 20 Nov 2024 22:44:57 +0000 Subject: [PATCH 6/7] Add test for Unsupported pushdown type --- datafusion/optimizer/src/push_down_filter.rs | 59 ++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 3c7d6ec4c6e40..af63ce7687e13 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -3423,4 +3423,63 @@ Projection: a, b \n TableScan: test"; assert_optimized_plan_eq(plan, expected_after) } + + #[test] + fn test_push_down_volatile_mixed_table_scan() -> Result<()> { + // SELECT test.a, test.b FROM test as t WHERE TestScalarUDF() > 0.1 and test.a > 5 and test.b > 10; + let table_scan = test_table_scan()?; + let fun = ScalarUDF::new_from_impl(TestScalarUDF { + signature: Signature::exact(vec![], Volatility::Volatile), + }); + let expr = Expr::ScalarFunction(ScalarFunction::new_udf(Arc::new(fun), vec![])); + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a"), col("b")])? + .filter( + expr.gt(lit(0.1)) + .and(col("t.a").gt(lit(5))) + .and(col("t.b").gt(lit(10))), + )? + .build()?; + + let expected_before = "Filter: TestScalarUDF() > Float64(0.1) AND t.a > Int32(5) AND t.b > Int32(10)\ + \n Projection: test.a, test.b\ + \n TableScan: test"; + assert_eq!(format!("{plan}"), expected_before); + + let expected_after = "Projection: test.a, test.b\ + \n Filter: TestScalarUDF() > Float64(0.1)\ + \n TableScan: test, full_filters=[t.a > Int32(5), t.b > Int32(10)]"; + assert_optimized_plan_eq(plan, expected_after) + } + + #[test] + fn test_push_down_volatile_mixed_unsupported_table_scan() -> Result<()> { + // SELECT test.a, test.b FROM test as t WHERE TestScalarUDF() > 0.1 and test.a > 5 and test.b > 10; + let fun = ScalarUDF::new_from_impl(TestScalarUDF { + signature: Signature::exact(vec![], Volatility::Volatile), + }); + let expr = Expr::ScalarFunction(ScalarFunction::new_udf(Arc::new(fun), vec![])); + let plan = table_scan_with_pushdown_provider_builder( + TableProviderFilterPushDown::Unsupported, + vec![], + None, + )? + .project(vec![col("a"), col("b")])? + .filter( + expr.gt(lit(0.1)) + .and(col("t.a").gt(lit(5))) + .and(col("t.b").gt(lit(10))), + )? + .build()?; + + let expected_before = "Filter: TestScalarUDF() > Float64(0.1) AND t.a > Int32(5) AND t.b > Int32(10)\ + \n Projection: a, b\ + \n TableScan: test"; + assert_eq!(format!("{plan}"), expected_before); + + let expected_after = "Projection: a, b\ + \n Filter: t.a > Int32(5) AND t.b > Int32(10) AND TestScalarUDF() > Float64(0.1)\ + \n TableScan: test"; + assert_optimized_plan_eq(plan, expected_after) + } } From 0066c0be3242d9d7cd46bbc75b63c511b77ed81b Mon Sep 17 00:00:00 2001 From: theirix Date: Wed, 20 Nov 2024 22:45:21 +0000 Subject: [PATCH 7/7] Refactor tests using supports_filters_pushdown --- datafusion/optimizer/src/push_down_filter.rs | 72 ++++++++------------ 1 file changed, 29 insertions(+), 43 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index af63ce7687e13..195dc06578b2b 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -2529,23 +2529,31 @@ mod tests { } } - fn table_scan_with_pushdown_provider( + fn table_scan_with_pushdown_provider_builder( filter_support: TableProviderFilterPushDown, - ) -> Result { + filters: Vec, + projection: Option>, + ) -> Result { let test_provider = PushDownProvider { filter_support }; let table_scan = LogicalPlan::TableScan(TableScan { table_name: "test".into(), - filters: vec![], + filters, projected_schema: Arc::new(DFSchema::try_from( (*test_provider.schema()).clone(), )?), - projection: None, + projection, source: Arc::new(test_provider), fetch: None, }); - LogicalPlanBuilder::from(table_scan) + Ok(LogicalPlanBuilder::from(table_scan)) + } + + fn table_scan_with_pushdown_provider( + filter_support: TableProviderFilterPushDown, + ) -> Result { + table_scan_with_pushdown_provider_builder(filter_support, vec![], None)? .filter(col("a").eq(lit(1i64)))? .build() } @@ -2602,25 +2610,14 @@ mod tests { #[test] fn multi_combined_filter() -> Result<()> { - let test_provider = PushDownProvider { - filter_support: TableProviderFilterPushDown::Inexact, - }; - - let table_scan = LogicalPlan::TableScan(TableScan { - table_name: "test".into(), - filters: vec![col("a").eq(lit(10i64)), col("b").gt(lit(11i64))], - projected_schema: Arc::new(DFSchema::try_from( - (*test_provider.schema()).clone(), - )?), - projection: Some(vec![0]), - source: Arc::new(test_provider), - fetch: None, - }); - - let plan = LogicalPlanBuilder::from(table_scan) - .filter(and(col("a").eq(lit(10i64)), col("b").gt(lit(11i64))))? - .project(vec![col("a"), col("b")])? - .build()?; + let plan = table_scan_with_pushdown_provider_builder( + TableProviderFilterPushDown::Inexact, + vec![col("a").eq(lit(10i64)), col("b").gt(lit(11i64))], + Some(vec![0]), + )? + .filter(and(col("a").eq(lit(10i64)), col("b").gt(lit(11i64))))? + .project(vec![col("a"), col("b")])? + .build()?; let expected = "Projection: a, b\ \n Filter: a = Int64(10) AND b > Int64(11)\ @@ -2631,25 +2628,14 @@ mod tests { #[test] fn multi_combined_filter_exact() -> Result<()> { - let test_provider = PushDownProvider { - filter_support: TableProviderFilterPushDown::Exact, - }; - - let table_scan = LogicalPlan::TableScan(TableScan { - table_name: "test".into(), - filters: vec![], - projected_schema: Arc::new(DFSchema::try_from( - (*test_provider.schema()).clone(), - )?), - projection: Some(vec![0]), - source: Arc::new(test_provider), - fetch: None, - }); - - let plan = LogicalPlanBuilder::from(table_scan) - .filter(and(col("a").eq(lit(10i64)), col("b").gt(lit(11i64))))? - .project(vec![col("a"), col("b")])? - .build()?; + let plan = table_scan_with_pushdown_provider_builder( + TableProviderFilterPushDown::Exact, + vec![], + Some(vec![0]), + )? + .filter(and(col("a").eq(lit(10i64)), col("b").gt(lit(11i64))))? + .project(vec![col("a"), col("b")])? + .build()?; let expected = r#" Projection: a, b