-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: filter push down with InList
expressions
#2729
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -654,7 +654,7 @@ mod tests { | |
use async_trait::async_trait; | ||
use datafusion_common::DFSchema; | ||
use datafusion_expr::{ | ||
and, col, lit, | ||
and, col, in_list, lit, | ||
logical_plan::{builder::union_with_alias, JoinType}, | ||
sum, Expr, LogicalPlanBuilder, Operator, TableSource, TableType, | ||
}; | ||
|
@@ -1831,4 +1831,218 @@ mod tests { | |
|
||
Ok(()) | ||
} | ||
|
||
#[test] | ||
fn test_filter_with_alias() -> Result<()> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should keep these tests |
||
// in table scan the true col name is 'test.a', | ||
// but we rename it as 'b', and use col 'b' in filter | ||
// we need rewrite filter col before push down. | ||
let table_scan = test_table_scan()?; | ||
let plan = LogicalPlanBuilder::from(table_scan) | ||
.project(vec![col("a").alias("b"), col("c")])? | ||
.filter(and(col("b").gt(lit(10i64)), col("c").gt(lit(10i64))))? | ||
.build()?; | ||
|
||
// filter on col b | ||
assert_eq!( | ||
format!("{:?}", plan), | ||
"\ | ||
Filter: #b > Int64(10) AND #test.c > Int64(10)\ | ||
\n Projection: #test.a AS b, #test.c\ | ||
\n TableScan: test projection=None\ | ||
" | ||
); | ||
|
||
// rewrite filter col b to test.a | ||
let expected = "\ | ||
Projection: #test.a AS b, #test.c\ | ||
\n Filter: #test.a > Int64(10) AND #test.c > Int64(10)\ | ||
\n TableScan: test projection=None\ | ||
"; | ||
|
||
assert_optimized_plan_eq(&plan, expected); | ||
|
||
Ok(()) | ||
} | ||
|
||
#[test] | ||
fn test_filter_with_alias_2() -> Result<()> { | ||
// in table scan the true col name is 'test.a', | ||
// but we rename it as 'b', and use col 'b' in filter | ||
// we need rewrite filter col before push down. | ||
let table_scan = test_table_scan()?; | ||
let plan = LogicalPlanBuilder::from(table_scan) | ||
.project(vec![col("a").alias("b"), col("c")])? | ||
.project(vec![col("b"), col("c")])? | ||
.filter(and(col("b").gt(lit(10i64)), col("c").gt(lit(10i64))))? | ||
.build()?; | ||
|
||
// filter on col b | ||
assert_eq!( | ||
format!("{:?}", plan), | ||
Ted-Jiang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"\ | ||
Filter: #b > Int64(10) AND #test.c > Int64(10)\ | ||
\n Projection: #b, #test.c\ | ||
\n Projection: #test.a AS b, #test.c\ | ||
\n TableScan: test projection=None\ | ||
" | ||
); | ||
|
||
// rewrite filter col b to test.a | ||
let expected = "\ | ||
Projection: #b, #test.c\ | ||
\n Projection: #test.a AS b, #test.c\ | ||
\n Filter: #test.a > Int64(10) AND #test.c > Int64(10)\ | ||
\n TableScan: test projection=None\ | ||
"; | ||
|
||
assert_optimized_plan_eq(&plan, expected); | ||
|
||
Ok(()) | ||
} | ||
|
||
#[test] | ||
fn test_filter_with_multi_alias() -> Result<()> { | ||
let table_scan = test_table_scan()?; | ||
let plan = LogicalPlanBuilder::from(table_scan) | ||
.project(vec![col("a").alias("b"), col("c").alias("d")])? | ||
.filter(and(col("b").gt(lit(10i64)), col("d").gt(lit(10i64))))? | ||
.build()?; | ||
|
||
// filter on col b and d | ||
assert_eq!( | ||
format!("{:?}", plan), | ||
"\ | ||
Filter: #b > Int64(10) AND #d > Int64(10)\ | ||
\n Projection: #test.a AS b, #test.c AS d\ | ||
\n TableScan: test projection=None\ | ||
" | ||
); | ||
|
||
// rewrite filter col b to test.a, col d to test.c | ||
let expected = "\ | ||
Projection: #test.a AS b, #test.c AS d\ | ||
\n Filter: #test.a > Int64(10) AND #test.c > Int64(10)\ | ||
\n TableScan: test projection=None\ | ||
"; | ||
|
||
assert_optimized_plan_eq(&plan, expected); | ||
|
||
Ok(()) | ||
} | ||
|
||
/// predicate on join key in filter expression should be pushed down to both inputs | ||
#[test] | ||
fn join_filter_with_alias() -> Result<()> { | ||
let table_scan = test_table_scan()?; | ||
let left = LogicalPlanBuilder::from(table_scan) | ||
.project(vec![col("a").alias("c")])? | ||
.build()?; | ||
let right_table_scan = test_table_scan_with_name("test2")?; | ||
let right = LogicalPlanBuilder::from(right_table_scan) | ||
.project(vec![col("b").alias("d")])? | ||
.build()?; | ||
let filter = col("c").gt(lit(1u32)); | ||
let plan = LogicalPlanBuilder::from(left) | ||
.join( | ||
&right, | ||
JoinType::Inner, | ||
(vec![Column::from_name("c")], vec![Column::from_name("d")]), | ||
Some(filter), | ||
)? | ||
.build()?; | ||
|
||
assert_eq!( | ||
format!("{:?}", plan), | ||
"\ | ||
Inner Join: #c = #d Filter: #c > UInt32(1)\ | ||
\n Projection: #test.a AS c\ | ||
\n TableScan: test projection=None\ | ||
\n Projection: #test2.b AS d\ | ||
\n TableScan: test2 projection=None" | ||
); | ||
|
||
// Change filter on col `c`, 'd' to `test.a`, 'test.b' | ||
let expected = "\ | ||
Inner Join: #c = #d\ | ||
\n Projection: #test.a AS c\ | ||
\n Filter: #test.a > UInt32(1)\ | ||
\n TableScan: test projection=None\ | ||
\n Projection: #test2.b AS d\ | ||
\n Filter: #test2.b > UInt32(1)\ | ||
\n TableScan: test2 projection=None"; | ||
assert_optimized_plan_eq(&plan, expected); | ||
Ok(()) | ||
} | ||
|
||
#[test] | ||
fn test_in_filter_with_alias() -> Result<()> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test will fail without code change. |
||
// in table scan the true col name is 'test.a', | ||
// but we rename it as 'b', and use col 'b' in filter | ||
// we need rewrite filter col before push down. | ||
let table_scan = test_table_scan()?; | ||
let filter_value = vec![lit(1u32), lit(2u32), lit(3u32), lit(4u32)]; | ||
let plan = LogicalPlanBuilder::from(table_scan) | ||
.project(vec![col("a").alias("b"), col("c")])? | ||
.filter(in_list(col("b"), filter_value, false))? | ||
.build()?; | ||
|
||
// filter on col b | ||
assert_eq!( | ||
format!("{:?}", plan), | ||
"\ | ||
Filter: #b IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\ | ||
\n Projection: #test.a AS b, #test.c\ | ||
\n TableScan: test projection=None\ | ||
" | ||
); | ||
|
||
// rewrite filter col b to test.a | ||
let expected = "\ | ||
Projection: #test.a AS b, #test.c\ | ||
\n Filter: #test.a IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\ | ||
\n TableScan: test projection=None\ | ||
"; | ||
|
||
assert_optimized_plan_eq(&plan, expected); | ||
|
||
Ok(()) | ||
} | ||
|
||
#[test] | ||
fn test_in_filter_with_alias_2() -> Result<()> { | ||
// in table scan the true col name is 'test.a', | ||
// but we rename it as 'b', and use col 'b' in filter | ||
// we need rewrite filter col before push down. | ||
let table_scan = test_table_scan()?; | ||
let filter_value = vec![lit(1u32), lit(2u32), lit(3u32), lit(4u32)]; | ||
let plan = LogicalPlanBuilder::from(table_scan) | ||
.project(vec![col("a").alias("b"), col("c")])? | ||
.project(vec![col("b"), col("c")])? | ||
.filter(in_list(col("b"), filter_value, false))? | ||
.build()?; | ||
|
||
// filter on col b | ||
assert_eq!( | ||
format!("{:?}", plan), | ||
"\ | ||
Filter: #b IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\ | ||
\n Projection: #b, #test.c\ | ||
\n Projection: #test.a AS b, #test.c\ | ||
\n TableScan: test projection=None\ | ||
" | ||
); | ||
|
||
// rewrite filter col b to test.a | ||
let expected = "\ | ||
Projection: #b, #test.c\ | ||
\n Projection: #test.a AS b, #test.c\ | ||
\n Filter: #test.a IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\ | ||
\n TableScan: test projection=None\ | ||
"; | ||
|
||
assert_optimized_plan_eq(&plan, expected); | ||
|
||
Ok(()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -276,9 +276,13 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> { | |
} | ||
Expr::Not(_) => Ok(Expr::Not(Box::new(expressions[0].clone()))), | ||
Expr::Negative(_) => Ok(Expr::Negative(Box::new(expressions[0].clone()))), | ||
Expr::InList { list, negated, .. } => Ok(Expr::InList { | ||
expr: Box::new(expressions[0].clone()), | ||
list: list.clone(), | ||
negated: *negated, | ||
}), | ||
Expr::Column(_) | ||
| Expr::Literal(_) | ||
| Expr::InList { .. } | ||
| Expr::Exists { .. } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤔 looks like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree! |
||
| Expr::InSubquery { .. } | ||
| Expr::ScalarSubquery(_) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I remove the code changes in this PR the tests still pass 🤔
So when I apply this diff:
The tests all pass:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb you are right! without the code change the ut still passed
I think it is
not
adding an alias it should, but the ut show it doesn't 😂But I got a log from send logical plan to ballista-scheduler
It show not adding an alias it should 🤔, in our internal version datafusion may lost some commit, but add this pr ut still passed😭