-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: filter push down with InList
expressions
#2729
Conversation
Codecov Report
@@ Coverage Diff @@
## master #2729 +/- ##
==========================================
+ Coverage 84.70% 84.91% +0.20%
==========================================
Files 270 270
Lines 47262 47874 +612
==========================================
+ Hits 40035 40650 +615
+ Misses 7227 7224 -3
Continue to review full report at Codecov.
|
let mut alias_cols_expr_and_name = HashMap::new(); | ||
//Need rewrite column name before push down | ||
let input_plan = &*input.clone(); | ||
if let LogicalPlan::Projection(projection) = input_plan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think i should get all Expr::Alias
, not only one level🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if you can use the LogicalPlan
's output schema to detect the case you are looking for
I am still a little confused about where the issue is -- is the filter push down logic adding an extra alias? Or is it not adding an alias it should?
I will review this today |
let col_string = <&std::string::String>::clone( | ||
alias_cols_expr_and_name.get(expr).unwrap(), | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let col_string = <&std::string::String>::clone( | |
alias_cols_expr_and_name.get(expr).unwrap(), | |
); | |
let col_string = alias_cols_expr_and_name.get(expr).unwrap(); |
Seemed to work for me locally
let mut alias_cols_expr_and_name = HashMap::new(); | ||
//Need rewrite column name before push down | ||
let input_plan = &*input.clone(); | ||
if let LogicalPlan::Projection(projection) = input_plan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if you can use the LogicalPlan
's output schema to detect the case you are looking for
I am still a little confused about where the issue is -- is the filter push down logic adding an extra alias? Or is it not adding an alias it should?
@@ -1831,4 +1870,111 @@ mod tests { | |||
|
|||
Ok(()) | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I remove the code changes in this PR the tests still pass 🤔
So when I apply this diff:
diff --git a/datafusion/optimizer/src/filter_push_down.rs b/datafusion/optimizer/src/filter_push_down.rs
index 1629e95c5..2668eaaf7 100644
--- a/datafusion/optimizer/src/filter_push_down.rs
+++ b/datafusion/optimizer/src/filter_push_down.rs
@@ -122,27 +122,6 @@ fn remove_filters(
.collect::<Vec<_>>()
}
-// rename all filter columns which have alias name
-fn rename_filters_column_name(
- filters: &mut [Predicate],
- alias_cols_expr_and_name: &HashMap<&Expr, &String>,
-) {
- for (expr, columns) in filters {
- if alias_cols_expr_and_name.contains_key(expr) {
- let col_string = <&std::string::String>::clone(
- alias_cols_expr_and_name.get(expr).unwrap(),
- );
- let column = Column::from_qualified_name(col_string);
- if let Expr::Column(col) = expr {
- columns.remove(col);
- } else {
- unreachable!()
- }
- columns.insert(column);
- }
- }
-}
-
/// builds a new [LogicalPlan] from `plan` by issuing new [LogicalPlan::Filter] if any of the filters
/// in `state` depend on the columns `used_columns`.
fn issue_filters(
@@ -357,18 +336,6 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
LogicalPlan::Analyze { .. } => push_down(&state, plan),
LogicalPlan::Filter(Filter { input, predicate }) => {
let mut predicates = vec![];
- let mut alias_cols_expr_and_name = HashMap::new();
- //Need rewrite column name before push down
- let input_plan = &*input.clone();
- if let LogicalPlan::Projection(projection) = input_plan {
- let exprs = &projection.expr;
- for e in exprs {
- if let Expr::Alias(col_expr, alias_name) = e {
- alias_cols_expr_and_name.insert(col_expr.as_ref(), alias_name);
- }
- }
- }
-
utils::split_conjunction(predicate, &mut predicates);
// Predicates without referencing columns (WHERE FALSE, WHERE 1=1, etc.)
@@ -397,12 +364,6 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
&no_col_predicates,
))
} else {
- if !alias_cols_expr_and_name.is_empty() {
- rename_filters_column_name(
- &mut state.filters,
- &alias_cols_expr_and_name,
- )
- }
optimize(input, state)
}
}
The tests all pass:
-*- mode: compilation; default-directory: "~/Software/arrow-datafusion/" -*-
Compilation started at Tue Jun 14 16:27:52
cd /Users/alamb/Software/arrow-datafusion && RUST_BACKTRACE=1 CARGO_TARGET_DIR=/Users/alamb/Software/df-target cargo test -p datafusion-optimizer -- filter_push_down
Compiling datafusion-optimizer v9.0.0 (/Users/alamb/Software/arrow-datafusion/datafusion/optimizer)
Finished test [unoptimized + debuginfo] target(s) in 4.38s
Running unittests src/lib.rs (/Users/alamb/Software/df-target/debug/deps/datafusion_optimizer-ad133d04102e7238)
running 39 tests
test filter_push_down::tests::filter_no_columns ... ok
test filter_push_down::tests::filter_before_projection ... ok
test filter_push_down::tests::alias ... ok
test filter_push_down::tests::filter_jump_2_plans ... ok
test filter_push_down::tests::filter_after_limit ... ok
test filter_push_down::tests::complex_expression ... ok
test filter_push_down::tests::filter_keep_agg ... ok
test filter_push_down::tests::filter_move_agg ... ok
test filter_push_down::tests::complex_plan ... ok
test filter_push_down::tests::filter_2_breaks_limits ... ok
test filter_push_down::tests::double_limit ... ok
test filter_push_down::tests::filter_using_left_join ... ok
test filter_push_down::tests::filter_on_join_on_common_independent ... ok
test filter_push_down::tests::filter_join_on_one_side ... ok
test filter_push_down::tests::filter_join_on_common_dependent ... ok
test filter_push_down::tests::filter_with_table_provider_exact ... ok
test filter_push_down::tests::filter_using_join_on_common_independent ... ok
test filter_push_down::tests::filter_with_table_provider_inexact ... ok
test filter_push_down::tests::filter_using_left_join_on_common ... ok
test filter_push_down::tests::filter_with_table_provider_unsupported ... ok
test filter_push_down::tests::filters_user_defined_node ... ok
test filter_push_down::tests::filter_with_table_provider_multiple_invocations ... ok
test filter_push_down::tests::filter_using_right_join ... ok
test filter_push_down::tests::filter_using_right_join_on_common ... ok
test filter_push_down::tests::full_join_on_with_filter ... ok
test filter_push_down::tests::multi_combined_filter ... ok
test filter_push_down::tests::join_filter_on_common ... ok
test filter_push_down::tests::join_filter_with_alias ... ok
test filter_push_down::tests::join_filter_removed ... ok
test filter_push_down::tests::left_join_on_with_filter ... ok
test filter_push_down::tests::join_on_with_filter ... ok
test filter_push_down::tests::test_filter_with_alias ... ok
test filter_push_down::tests::test_filter_with_multi_alias ... ok
test filter_push_down::tests::two_filters_on_same_depth ... ok
test filter_push_down::tests::union_all ... ok
test filter_push_down::tests::union_all_with_alias ... ok
test filter_push_down::tests::multi_filter ... ok
test filter_push_down::tests::split_filter ... ok
test filter_push_down::tests::right_join_on_with_filter ... ok
test result: ok. 39 passed; 0 failed; 0 ignored; 0 measured; 118 filtered out; finished in 0.07s
Doc-tests datafusion_optimizer
running 0 tests
test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 2 filtered out; finished in 0.00s
Compilation finished at Tue Jun 14 16:27:56
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb you are right! without the code change the ut still passed
I am still a little confused about where the issue is -- is the filter push down logic adding an extra alias? Or is it not adding an alias it should?
I think it is not
adding an alias it should, but the ut show it doesn't 😂
But I got a log from send logical plan to ballista-scheduler
After apply projection_push_down rule:
Optimized logical plan:
Limit: 50000
Projection: #LINEORDER.LO_SHIPMODE, #ASS
Projection: #LO_SHIPMODE AS LINEORDER.LO_SHIPMODE, #KYLINAPPROXCOUNTDISTINCT(_KY_COUNT_DISTINCT_LINEORDER_LO_SHIPPRIORITY_LINEORDER_LO_SUPPKEY_,UInt8(10)) AS ASS
Aggregate: groupBy=[[#LO_SHIPMODE]], aggr=[[KYLINAPPROXCOUNTDISTINCT(#_KY_COUNT_DISTINCT_LINEORDER_LO_SHIPPRIORITY_LINEORDER_LO_SUPPKEY_, UInt8(10))]]
Projection: #LO_SHIPMODE, #_KY_COUNT_DISTINCT_LINEORDER_LO_SHIPPRIORITY_LINEORDER_LO_SUPPKEY_
hi Filter: #LO_SHIPMODE IN ([Utf8("SHIP"), Utf8("Rail"), Utf8("2321"), Utf8("MAIL")])
Projection: #LO_SHIPMODE, #_KY_COUNT_DISTINCT_LINEORDER_LO_SHIPPRIORITY_LINEORDER_LO_SUPPKEY_
alias Projection: #ssb@kylin_udaf_cube_update@17179869183.21 AS LO_SHIPMODE, #ssb@kylin_udaf_cube_update@17179869183.41 AS _KY_COUNT_DISTINCT_LINEORDER_LO_SHIPPRIORITY_LINEORDER_LO_SUPPKEY_
TableScan: ssb@kylin_udaf_cube_update@17179869183 projection=Some([12, 36])
After apply filter_push_down rule:
Optimized logical plan:
Limit: 50000
Projection: #LINEORDER.LO_SHIPMODE, #ASS
Projection: #LO_SHIPMODE AS LINEORDER.LO_SHIPMODE, #KYLINAPPROXCOUNTDISTINCT(_KY_COUNT_DISTINCT_LINEORDER_LO_SHIPPRIORITY_LINEORDER_LO_SUPPKEY_,UInt8(10)) AS ASS
Aggregate: groupBy=[[#LO_SHIPMODE]], aggr=[[KYLINAPPROXCOUNTDISTINCT(#_KY_COUNT_DISTINCT_LINEORDER_LO_SHIPPRIORITY_LINEORDER_LO_SUPPKEY_, UInt8(10))]]
Projection: #LO_SHIPMODE, #_KY_COUNT_DISTINCT_LINEORDER_LO_SHIPPRIORITY_LINEORDER_LO_SUPPKEY_
Projection: #LO_SHIPMODE, #_KY_COUNT_DISTINCT_LINEORDER_LO_SHIPPRIORITY_LINEORDER_LO_SUPPKEY_
alias Projection: #ssb@kylin_udaf_cube_update@17179869183.21 AS LO_SHIPMODE, #ssb@kylin_udaf_cube_update@17179869183.41 AS _KY_COUNT_DISTINCT_LINEORDER_LO_SHIPPRIORITY_LINEORDER_LO_SUPPKEY_
hi Filter: #LO_SHIPMODE IN ([Utf8("SHIP"), Utf8("Rail"), Utf8("2321"), Utf8("MAIL")])
TableScan: ssb@kylin_udaf_cube_update@17179869183 projection=Some([12, 36]), partial_filters=[#LO_SHIPMODE IN ([Utf8("SHIP"), Utf8("Rail"), Utf8("2321"), Utf8("MAIL")])]
It show not adding an alias it should 🤔, in our internal version datafusion may lost some commit, but add this pr ut still passed😭
Got the bug! :add fail test |
b40a85b
to
3f0cc57
Compare
} | ||
|
||
#[test] | ||
fn test_in_filter_with_alias() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test will fail without code change.
@@ -1831,4 +1831,218 @@ mod tests { | |||
|
|||
Ok(()) | |||
} | |||
|
|||
#[test] | |||
fn test_filter_with_alias() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should keep these tests
I am still planning on reviewing this and will hopefully have time today. |
The changes look good but I think the title should be updated to "Support filter push down with InList expressions"? It looks like we will also need to implement a similar change for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great find @Ted-Jiang -- thank you!
I verified that the tests fail without the code changes:
---- filter_push_down::tests::test_in_filter_with_alias stdout ----
thread 'filter_push_down::tests::test_in_filter_with_alias' panicked at 'assertion failed: `(left == right)`
left: `"Projection: #test.a AS b, #test.c\n Filter: #b IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\n TableScan: test projection=None"`,
right: `"Projection: #test.a AS b, #test.c\n Filter: #test.a IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\n TableScan: test projection=None"`', datafusion/optimizer/src/filter_push_down.rs:672:9
stack backtrace:
0: rust_begin_unwind
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/std/src/panicking.rs:584:5
1: core::panicking::panic_fmt
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/panicking.rs:143:14
2: core::panicking::assert_failed_inner
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/panicking.rs:225:17
3: core::panicking::assert_failed
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/panicking.rs:182:5
4: datafusion_optimizer::filter_push_down::tests::assert_optimized_plan_eq
at ./src/filter_push_down.rs:672:9
5: datafusion_optimizer::filter_push_down::tests::test_in_filter_with_alias
at ./src/filter_push_down.rs:2007:9
6: datafusion_optimizer::filter_push_down::tests::test_in_filter_with_alias::{{closure}}
at ./src/filter_push_down.rs:1979:5
7: core::ops::function::FnOnce::call_once
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/ops/function.rs:227:5
8: core::ops::function::FnOnce::call_once
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/ops/function.rs:227:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
---- filter_push_down::tests::test_in_filter_with_alias_2 stdout ----
thread 'filter_push_down::tests::test_in_filter_with_alias_2' panicked at 'assertion failed: `(left == right)`
left: `"Projection: #b, #test.c\n Projection: #test.a AS b, #test.c\n Filter: #b IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\n TableScan: test projection=None"`,
right: `"Projection: #b, #test.c\n Projection: #test.a AS b, #test.c\n Filter: #test.a IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\n TableScan: test projection=None"`', datafusion/optimizer/src/filter_push_down.rs:672:9
stack backtrace:
0: rust_begin_unwind
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/std/src/panicking.rs:584:5
1: core::panicking::panic_fmt
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/panicking.rs:143:14
2: core::panicking::assert_failed_inner
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/panicking.rs:225:17
3: core::panicking::assert_failed
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/panicking.rs:182:5
4: datafusion_optimizer::filter_push_down::tests::assert_optimized_plan_eq
at ./src/filter_push_down.rs:672:9
5: datafusion_optimizer::filter_push_down::tests::test_in_filter_with_alias_2
at ./src/filter_push_down.rs:2044:9
6: datafusion_optimizer::filter_push_down::tests::test_in_filter_with_alias_2::{{closure}}
at ./src/filter_push_down.rs:2013:5
7: core::ops::function::FnOnce::call_once
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/ops/function.rs:227:5
8: core::ops::function::FnOnce::call_once
at /rustc/fe5b13d681f25ee6474be29d748c65adcd91f69e/library/core/src/ops/function.rs:227:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
I will also file a follow on for the other Exprs this same issue may effect
Expr::Column(_) | ||
| Expr::Literal(_) | ||
| Expr::InList { .. } | ||
| Expr::Exists { .. } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 looks like Exists
and Subquery
may need the same treatment (not in this PR though).... I'll file a follow on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree!
InList
expressions
Filed #2736 for follow on task |
Thanks again @Ted-Jiang for pushing through and finding the root cause |
@alamb Thank you for your carefulness review and kind advice 👍 |
Which issue does this PR close?
Closes #2725 .
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?