Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: filter push down with InList expressions #2729

Merged
merged 1 commit into from
Jun 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 215 additions & 1 deletion datafusion/optimizer/src/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ mod tests {
use async_trait::async_trait;
use datafusion_common::DFSchema;
use datafusion_expr::{
and, col, lit,
and, col, in_list, lit,
logical_plan::{builder::union_with_alias, JoinType},
sum, Expr, LogicalPlanBuilder, Operator, TableSource, TableType,
};
Expand Down Expand Up @@ -1831,4 +1831,218 @@ mod tests {

Ok(())
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I remove the code changes in this PR the tests still pass 🤔

So when I apply this diff:

diff --git a/datafusion/optimizer/src/filter_push_down.rs b/datafusion/optimizer/src/filter_push_down.rs
index 1629e95c5..2668eaaf7 100644
--- a/datafusion/optimizer/src/filter_push_down.rs
+++ b/datafusion/optimizer/src/filter_push_down.rs
@@ -122,27 +122,6 @@ fn remove_filters(
         .collect::<Vec<_>>()
 }
 
-// rename all filter columns which have alias name
-fn rename_filters_column_name(
-    filters: &mut [Predicate],
-    alias_cols_expr_and_name: &HashMap<&Expr, &String>,
-) {
-    for (expr, columns) in filters {
-        if alias_cols_expr_and_name.contains_key(expr) {
-            let col_string = <&std::string::String>::clone(
-                alias_cols_expr_and_name.get(expr).unwrap(),
-            );
-            let column = Column::from_qualified_name(col_string);
-            if let Expr::Column(col) = expr {
-                columns.remove(col);
-            } else {
-                unreachable!()
-            }
-            columns.insert(column);
-        }
-    }
-}
-
 /// builds a new [LogicalPlan] from `plan` by issuing new [LogicalPlan::Filter] if any of the filters
 /// in `state` depend on the columns `used_columns`.
 fn issue_filters(
@@ -357,18 +336,6 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
         LogicalPlan::Analyze { .. } => push_down(&state, plan),
         LogicalPlan::Filter(Filter { input, predicate }) => {
             let mut predicates = vec![];
-            let mut alias_cols_expr_and_name = HashMap::new();
-            //Need rewrite column name before push down
-            let input_plan = &*input.clone();
-            if let LogicalPlan::Projection(projection) = input_plan {
-                let exprs = &projection.expr;
-                for e in exprs {
-                    if let Expr::Alias(col_expr, alias_name) = e {
-                        alias_cols_expr_and_name.insert(col_expr.as_ref(), alias_name);
-                    }
-                }
-            }
-
             utils::split_conjunction(predicate, &mut predicates);
 
             // Predicates without referencing columns (WHERE FALSE, WHERE 1=1, etc.)
@@ -397,12 +364,6 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
                     &no_col_predicates,
                 ))
             } else {
-                if !alias_cols_expr_and_name.is_empty() {
-                    rename_filters_column_name(
-                        &mut state.filters,
-                        &alias_cols_expr_and_name,
-                    )
-                }
                 optimize(input, state)
             }
         }

The tests all pass:


-*- mode: compilation; default-directory: "~/Software/arrow-datafusion/" -*-
Compilation started at Tue Jun 14 16:27:52

cd /Users/alamb/Software/arrow-datafusion && RUST_BACKTRACE=1 CARGO_TARGET_DIR=/Users/alamb/Software/df-target cargo test -p datafusion-optimizer -- filter_push_down
   Compiling datafusion-optimizer v9.0.0 (/Users/alamb/Software/arrow-datafusion/datafusion/optimizer)
    Finished test [unoptimized + debuginfo] target(s) in 4.38s
     Running unittests src/lib.rs (/Users/alamb/Software/df-target/debug/deps/datafusion_optimizer-ad133d04102e7238)

running 39 tests
test filter_push_down::tests::filter_no_columns ... ok
test filter_push_down::tests::filter_before_projection ... ok
test filter_push_down::tests::alias ... ok
test filter_push_down::tests::filter_jump_2_plans ... ok
test filter_push_down::tests::filter_after_limit ... ok
test filter_push_down::tests::complex_expression ... ok
test filter_push_down::tests::filter_keep_agg ... ok
test filter_push_down::tests::filter_move_agg ... ok
test filter_push_down::tests::complex_plan ... ok
test filter_push_down::tests::filter_2_breaks_limits ... ok
test filter_push_down::tests::double_limit ... ok
test filter_push_down::tests::filter_using_left_join ... ok
test filter_push_down::tests::filter_on_join_on_common_independent ... ok
test filter_push_down::tests::filter_join_on_one_side ... ok
test filter_push_down::tests::filter_join_on_common_dependent ... ok
test filter_push_down::tests::filter_with_table_provider_exact ... ok
test filter_push_down::tests::filter_using_join_on_common_independent ... ok
test filter_push_down::tests::filter_with_table_provider_inexact ... ok
test filter_push_down::tests::filter_using_left_join_on_common ... ok
test filter_push_down::tests::filter_with_table_provider_unsupported ... ok
test filter_push_down::tests::filters_user_defined_node ... ok
test filter_push_down::tests::filter_with_table_provider_multiple_invocations ... ok
test filter_push_down::tests::filter_using_right_join ... ok
test filter_push_down::tests::filter_using_right_join_on_common ... ok
test filter_push_down::tests::full_join_on_with_filter ... ok
test filter_push_down::tests::multi_combined_filter ... ok
test filter_push_down::tests::join_filter_on_common ... ok
test filter_push_down::tests::join_filter_with_alias ... ok
test filter_push_down::tests::join_filter_removed ... ok
test filter_push_down::tests::left_join_on_with_filter ... ok
test filter_push_down::tests::join_on_with_filter ... ok
test filter_push_down::tests::test_filter_with_alias ... ok
test filter_push_down::tests::test_filter_with_multi_alias ... ok
test filter_push_down::tests::two_filters_on_same_depth ... ok
test filter_push_down::tests::union_all ... ok
test filter_push_down::tests::union_all_with_alias ... ok
test filter_push_down::tests::multi_filter ... ok
test filter_push_down::tests::split_filter ... ok
test filter_push_down::tests::right_join_on_with_filter ... ok

test result: ok. 39 passed; 0 failed; 0 ignored; 0 measured; 118 filtered out; finished in 0.07s

   Doc-tests datafusion_optimizer

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 2 filtered out; finished in 0.00s


Compilation finished at Tue Jun 14 16:27:56

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb you are right! without the code change the ut still passed

I am still a little confused about where the issue is -- is the filter push down logic adding an extra alias? Or is it not adding an alias it should?

I think it is not adding an alias it should, but the ut show it doesn't 😂

But I got a log from send logical plan to ballista-scheduler

After apply projection_push_down rule:

Optimized logical plan:
Limit: 50000
  Projection: #LINEORDER.LO_SHIPMODE, #ASS
    Projection: #LO_SHIPMODE AS LINEORDER.LO_SHIPMODE, #KYLINAPPROXCOUNTDISTINCT(_KY_COUNT_DISTINCT_LINEORDER_LO_SHIPPRIORITY_LINEORDER_LO_SUPPKEY_,UInt8(10)) AS ASS
      Aggregate: groupBy=[[#LO_SHIPMODE]], aggr=[[KYLINAPPROXCOUNTDISTINCT(#_KY_COUNT_DISTINCT_LINEORDER_LO_SHIPPRIORITY_LINEORDER_LO_SUPPKEY_, UInt8(10))]]
        Projection: #LO_SHIPMODE, #_KY_COUNT_DISTINCT_LINEORDER_LO_SHIPPRIORITY_LINEORDER_LO_SUPPKEY_
  hi      Filter: #LO_SHIPMODE IN ([Utf8("SHIP"), Utf8("Rail"), Utf8("2321"), Utf8("MAIL")])
            Projection: #LO_SHIPMODE, #_KY_COUNT_DISTINCT_LINEORDER_LO_SHIPPRIORITY_LINEORDER_LO_SUPPKEY_
 alias         Projection: #ssb@kylin_udaf_cube_update@17179869183.21 AS LO_SHIPMODE, #ssb@kylin_udaf_cube_update@17179869183.41 AS _KY_COUNT_DISTINCT_LINEORDER_LO_SHIPPRIORITY_LINEORDER_LO_SUPPKEY_
                TableScan: ssb@kylin_udaf_cube_update@17179869183 projection=Some([12, 36])

After apply filter_push_down rule:

Optimized logical plan:
Limit: 50000
  Projection: #LINEORDER.LO_SHIPMODE, #ASS
    Projection: #LO_SHIPMODE AS LINEORDER.LO_SHIPMODE, #KYLINAPPROXCOUNTDISTINCT(_KY_COUNT_DISTINCT_LINEORDER_LO_SHIPPRIORITY_LINEORDER_LO_SUPPKEY_,UInt8(10)) AS ASS
      Aggregate: groupBy=[[#LO_SHIPMODE]], aggr=[[KYLINAPPROXCOUNTDISTINCT(#_KY_COUNT_DISTINCT_LINEORDER_LO_SHIPPRIORITY_LINEORDER_LO_SUPPKEY_, UInt8(10))]]
        Projection: #LO_SHIPMODE, #_KY_COUNT_DISTINCT_LINEORDER_LO_SHIPPRIORITY_LINEORDER_LO_SUPPKEY_
          Projection: #LO_SHIPMODE, #_KY_COUNT_DISTINCT_LINEORDER_LO_SHIPPRIORITY_LINEORDER_LO_SUPPKEY_
  alias     Projection: #ssb@kylin_udaf_cube_update@17179869183.21 AS LO_SHIPMODE, #ssb@kylin_udaf_cube_update@17179869183.41 AS _KY_COUNT_DISTINCT_LINEORDER_LO_SHIPPRIORITY_LINEORDER_LO_SUPPKEY_
  hi          Filter: #LO_SHIPMODE IN ([Utf8("SHIP"), Utf8("Rail"), Utf8("2321"), Utf8("MAIL")])
                TableScan: ssb@kylin_udaf_cube_update@17179869183 projection=Some([12, 36]), partial_filters=[#LO_SHIPMODE IN ([Utf8("SHIP"), Utf8("Rail"), Utf8("2321"), Utf8("MAIL")])]

It show not adding an alias it should 🤔, in our internal version datafusion may lost some commit, but add this pr ut still passed😭

#[test]
fn test_filter_with_alias() -> Result<()> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep these tests

// in table scan the true col name is 'test.a',
// but we rename it as 'b', and use col 'b' in filter
// we need rewrite filter col before push down.
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a").alias("b"), col("c")])?
.filter(and(col("b").gt(lit(10i64)), col("c").gt(lit(10i64))))?
.build()?;

// filter on col b
assert_eq!(
format!("{:?}", plan),
"\
Filter: #b > Int64(10) AND #test.c > Int64(10)\
\n Projection: #test.a AS b, #test.c\
\n TableScan: test projection=None\
"
);

// rewrite filter col b to test.a
let expected = "\
Projection: #test.a AS b, #test.c\
\n Filter: #test.a > Int64(10) AND #test.c > Int64(10)\
\n TableScan: test projection=None\
";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}

#[test]
fn test_filter_with_alias_2() -> Result<()> {
// in table scan the true col name is 'test.a',
// but we rename it as 'b', and use col 'b' in filter
// we need rewrite filter col before push down.
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a").alias("b"), col("c")])?
.project(vec![col("b"), col("c")])?
.filter(and(col("b").gt(lit(10i64)), col("c").gt(lit(10i64))))?
.build()?;

// filter on col b
assert_eq!(
format!("{:?}", plan),
"\
Filter: #b > Int64(10) AND #test.c > Int64(10)\
\n Projection: #b, #test.c\
\n Projection: #test.a AS b, #test.c\
\n TableScan: test projection=None\
"
);

// rewrite filter col b to test.a
let expected = "\
Projection: #b, #test.c\
\n Projection: #test.a AS b, #test.c\
\n Filter: #test.a > Int64(10) AND #test.c > Int64(10)\
\n TableScan: test projection=None\
";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}

#[test]
fn test_filter_with_multi_alias() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a").alias("b"), col("c").alias("d")])?
.filter(and(col("b").gt(lit(10i64)), col("d").gt(lit(10i64))))?
.build()?;

// filter on col b and d
assert_eq!(
format!("{:?}", plan),
"\
Filter: #b > Int64(10) AND #d > Int64(10)\
\n Projection: #test.a AS b, #test.c AS d\
\n TableScan: test projection=None\
"
);

// rewrite filter col b to test.a, col d to test.c
let expected = "\
Projection: #test.a AS b, #test.c AS d\
\n Filter: #test.a > Int64(10) AND #test.c > Int64(10)\
\n TableScan: test projection=None\
";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}

/// predicate on join key in filter expression should be pushed down to both inputs
#[test]
fn join_filter_with_alias() -> Result<()> {
let table_scan = test_table_scan()?;
let left = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a").alias("c")])?
.build()?;
let right_table_scan = test_table_scan_with_name("test2")?;
let right = LogicalPlanBuilder::from(right_table_scan)
.project(vec![col("b").alias("d")])?
.build()?;
let filter = col("c").gt(lit(1u32));
let plan = LogicalPlanBuilder::from(left)
.join(
&right,
JoinType::Inner,
(vec![Column::from_name("c")], vec![Column::from_name("d")]),
Some(filter),
)?
.build()?;

assert_eq!(
format!("{:?}", plan),
"\
Inner Join: #c = #d Filter: #c > UInt32(1)\
\n Projection: #test.a AS c\
\n TableScan: test projection=None\
\n Projection: #test2.b AS d\
\n TableScan: test2 projection=None"
);

// Change filter on col `c`, 'd' to `test.a`, 'test.b'
let expected = "\
Inner Join: #c = #d\
\n Projection: #test.a AS c\
\n Filter: #test.a > UInt32(1)\
\n TableScan: test projection=None\
\n Projection: #test2.b AS d\
\n Filter: #test2.b > UInt32(1)\
\n TableScan: test2 projection=None";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}

#[test]
fn test_in_filter_with_alias() -> Result<()> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test will fail without code change.

// in table scan the true col name is 'test.a',
// but we rename it as 'b', and use col 'b' in filter
// we need rewrite filter col before push down.
let table_scan = test_table_scan()?;
let filter_value = vec![lit(1u32), lit(2u32), lit(3u32), lit(4u32)];
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a").alias("b"), col("c")])?
.filter(in_list(col("b"), filter_value, false))?
.build()?;

// filter on col b
assert_eq!(
format!("{:?}", plan),
"\
Filter: #b IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\
\n Projection: #test.a AS b, #test.c\
\n TableScan: test projection=None\
"
);

// rewrite filter col b to test.a
let expected = "\
Projection: #test.a AS b, #test.c\
\n Filter: #test.a IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\
\n TableScan: test projection=None\
";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}

#[test]
fn test_in_filter_with_alias_2() -> Result<()> {
// in table scan the true col name is 'test.a',
// but we rename it as 'b', and use col 'b' in filter
// we need rewrite filter col before push down.
let table_scan = test_table_scan()?;
let filter_value = vec![lit(1u32), lit(2u32), lit(3u32), lit(4u32)];
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a").alias("b"), col("c")])?
.project(vec![col("b"), col("c")])?
.filter(in_list(col("b"), filter_value, false))?
.build()?;

// filter on col b
assert_eq!(
format!("{:?}", plan),
"\
Filter: #b IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\
\n Projection: #b, #test.c\
\n Projection: #test.a AS b, #test.c\
\n TableScan: test projection=None\
"
);

// rewrite filter col b to test.a
let expected = "\
Projection: #b, #test.c\
\n Projection: #test.a AS b, #test.c\
\n Filter: #test.a IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\
\n TableScan: test projection=None\
";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}
}
6 changes: 5 additions & 1 deletion datafusion/optimizer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,13 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
}
Expr::Not(_) => Ok(Expr::Not(Box::new(expressions[0].clone()))),
Expr::Negative(_) => Ok(Expr::Negative(Box::new(expressions[0].clone()))),
Expr::InList { list, negated, .. } => Ok(Expr::InList {
expr: Box::new(expressions[0].clone()),
list: list.clone(),
negated: *negated,
}),
Expr::Column(_)
| Expr::Literal(_)
| Expr::InList { .. }
| Expr::Exists { .. }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 looks like Exists and Subquery may need the same treatment (not in this PR though).... I'll file a follow on

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree!

| Expr::InSubquery { .. }
| Expr::ScalarSubquery(_)
Expand Down