From fb40185443ad36991384181f1775b3d3f11dc2ad Mon Sep 17 00:00:00 2001 From: jackwener Date: Sat, 19 Mar 2022 23:17:16 +0800 Subject: [PATCH 1/3] optimizer: fix the filter push down bug --- datafusion/src/optimizer/filter_push_down.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs index d8e43ed2175b..155fb9925422 100644 --- a/datafusion/src/optimizer/filter_push_down.rs +++ b/datafusion/src/optimizer/filter_push_down.rs @@ -523,7 +523,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { // Don't add expression again if it's already present in // pushed down filters. if new_filters.contains(filter_expr) { - break; + continue; } new_filters.push(filter_expr.clone()); } From 8973da1da440d7a5bb537611185deb7ac7f91043 Mon Sep 17 00:00:00 2001 From: jackwener Date: Sun, 20 Mar 2022 14:34:30 +0800 Subject: [PATCH 2/3] *: add unit test --- datafusion/src/optimizer/filter_push_down.rs | 20 ++++++++++++++++++++ datafusion/src/test/mod.rs | 10 +++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs index 155fb9925422..d493f7f37969 100644 --- a/datafusion/src/optimizer/filter_push_down.rs +++ b/datafusion/src/optimizer/filter_push_down.rs @@ -1471,4 +1471,24 @@ mod tests { assert_optimized_plan_eq(&plan, expected); Ok(()) } + + #[test] + fn multi_combined_filter() -> Result<()> { + let table_scan = + test_table_scan_with_name_projection("test", Some(vec![0, 1, 2]))?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(and(col("c").eq(lit(10i64)), col("b").eq(lit(10i64))))? + .filter(col("b").gt(lit(11i64)))? + .project(vec![col("a").alias("b"), col("c")])? + .filter(col("a").gt(lit(10i64)))? + .build()?; + + let expected = "Projection: #test.a AS b, #test.c\ + \n Filter: #test.a > Int64(10) AND #test.b > Int64(11) AND #test.c = Int64(10) AND #test.b = Int64(10)\ + \n TableScan: test projection=Some([0, 1, 2])"; + + assert_optimized_plan_eq(&plan, expected); + + Ok(()) + } } diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs index cebd9ee02d1c..f191e8396a35 100644 --- a/datafusion/src/test/mod.rs +++ b/datafusion/src/test/mod.rs @@ -105,12 +105,20 @@ pub fn create_partitioned_csv( /// some tests share a common table with different names pub fn test_table_scan_with_name(name: &str) -> Result { + test_table_scan_with_name_projection(name, None) +} + +/// some tests share a common table with different names and specifiy projections +pub fn test_table_scan_with_name_projection( + name: &str, + projection: Option>, +) -> Result { let schema = Schema::new(vec![ Field::new("a", DataType::UInt32, false), Field::new("b", DataType::UInt32, false), Field::new("c", DataType::UInt32, false), ]); - LogicalPlanBuilder::scan_empty(Some(name), &schema, None)?.build() + LogicalPlanBuilder::scan_empty(Some(name), &schema, projection)?.build() } /// some tests share a common table From dc86542a7e16f721f2c96e1aca4d76945424e527 Mon Sep 17 00:00:00 2001 From: jackwener Date: Sun, 20 Mar 2022 15:26:40 +0800 Subject: [PATCH 3/3] *: add the reproduce unit test --- datafusion/src/optimizer/filter_push_down.rs | 34 ++++++++++++++------ datafusion/src/test/mod.rs | 10 +----- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs index d493f7f37969..795e7a2938c9 100644 --- a/datafusion/src/optimizer/filter_push_down.rs +++ b/datafusion/src/optimizer/filter_push_down.rs @@ -1376,6 +1376,11 @@ mod tests { arrow::datatypes::DataType::Int32, true, ), + arrow::datatypes::Field::new( + "b", + arrow::datatypes::DataType::Int32, + true, + ), ])) } @@ -1474,18 +1479,29 @@ mod tests { #[test] fn multi_combined_filter() -> Result<()> { - let table_scan = - test_table_scan_with_name_projection("test", Some(vec![0, 1, 2]))?; + let test_provider = PushDownProvider { + filter_support: TableProviderFilterPushDown::Inexact, + }; + + let table_scan = LogicalPlan::TableScan(TableScan { + table_name: "test".to_string(), + filters: vec![col("a").eq(lit(10i64)), col("b").gt(lit(11i64))], + projected_schema: Arc::new(DFSchema::try_from( + (*test_provider.schema()).clone(), + )?), + projection: Some(vec![0]), + source: Arc::new(test_provider), + limit: None, + }); + let plan = LogicalPlanBuilder::from(table_scan) - .filter(and(col("c").eq(lit(10i64)), col("b").eq(lit(10i64))))? - .filter(col("b").gt(lit(11i64)))? - .project(vec![col("a").alias("b"), col("c")])? - .filter(col("a").gt(lit(10i64)))? + .filter(and(col("a").eq(lit(10i64)), col("b").gt(lit(11i64))))? + .project(vec![col("a"), col("b")])? .build()?; - let expected = "Projection: #test.a AS b, #test.c\ - \n Filter: #test.a > Int64(10) AND #test.b > Int64(11) AND #test.c = Int64(10) AND #test.b = Int64(10)\ - \n TableScan: test projection=Some([0, 1, 2])"; + let expected ="Projection: #a, #b\ + \n Filter: #a = Int64(10) AND #b > Int64(11)\ + \n TableScan: test projection=Some([0]), filters=[#a = Int64(10), #b > Int64(11)]"; assert_optimized_plan_eq(&plan, expected); diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs index f191e8396a35..cebd9ee02d1c 100644 --- a/datafusion/src/test/mod.rs +++ b/datafusion/src/test/mod.rs @@ -105,20 +105,12 @@ pub fn create_partitioned_csv( /// some tests share a common table with different names pub fn test_table_scan_with_name(name: &str) -> Result { - test_table_scan_with_name_projection(name, None) -} - -/// some tests share a common table with different names and specifiy projections -pub fn test_table_scan_with_name_projection( - name: &str, - projection: Option>, -) -> Result { let schema = Schema::new(vec![ Field::new("a", DataType::UInt32, false), Field::new("b", DataType::UInt32, false), Field::new("c", DataType::UInt32, false), ]); - LogicalPlanBuilder::scan_empty(Some(name), &schema, projection)?.build() + LogicalPlanBuilder::scan_empty(Some(name), &schema, None)?.build() } /// some tests share a common table