Skip to content

Commit

Permalink
fix alias rewrite In_List for filter push down (apache#2729)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ted-Jiang authored and MazterQyou committed May 19, 2023
1 parent 4f60376 commit de2adf4
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 1 deletion.
171 changes: 171 additions & 0 deletions datafusion/core/src/optimizer/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@ mod tests {
use crate::{logical_plan::col, prelude::JoinType};
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_expr::expr_fn::in_list;

fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan {
let rule = FilterPushDown::new();
Expand Down Expand Up @@ -1506,4 +1507,174 @@ mod tests {

Ok(())
}

#[test]
fn test_filter_with_alias() -> Result<()> {
// in table scan the true col name is 'test.a',
// but we rename it as 'b', and use col 'b' in filter
// we need rewrite filter col before push down.
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a").alias("b"), col("c")])?
.filter(and(col("b").gt(lit(10i64)), col("c").gt(lit(10i64))))?
.build()?;

// filter on col b
assert_eq!(
format!("{:?}", plan),
"\
Filter: #b > Int64(10) AND #test.c > Int64(10)\
\n Projection: #test.a AS b, #test.c\
\n TableScan: test projection=None\
"
);

// rewrite filter col b to test.a
let expected = "\
Projection: #test.a AS b, #test.c\
\n Filter: #test.a > Int64(10) AND #test.c > Int64(10)\
\n TableScan: test projection=None\
";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}

#[test]
fn test_filter_with_alias_2() -> Result<()> {
// in table scan the true col name is 'test.a',
// but we rename it as 'b', and use col 'b' in filter
// we need rewrite filter col before push down.
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a").alias("b"), col("c")])?
.project(vec![col("b"), col("c")])?
.filter(and(col("b").gt(lit(10i64)), col("c").gt(lit(10i64))))?
.build()?;

// filter on col b
assert_eq!(
format!("{:?}", plan),
"\
Filter: #b > Int64(10) AND #test.c > Int64(10)\
\n Projection: #b, #test.c\
\n Projection: #test.a AS b, #test.c\
\n TableScan: test projection=None\
"
);

// rewrite filter col b to test.a
let expected = "\
Projection: #b, #test.c\
\n Projection: #test.a AS b, #test.c\
\n Filter: #test.a > Int64(10) AND #test.c > Int64(10)\
\n TableScan: test projection=None\
";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}

#[test]
fn test_filter_with_multi_alias() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a").alias("b"), col("c").alias("d")])?
.filter(and(col("b").gt(lit(10i64)), col("d").gt(lit(10i64))))?
.build()?;

// filter on col b and d
assert_eq!(
format!("{:?}", plan),
"\
Filter: #b > Int64(10) AND #d > Int64(10)\
\n Projection: #test.a AS b, #test.c AS d\
\n TableScan: test projection=None\
"
);

// rewrite filter col b to test.a, col d to test.c
let expected = "\
Projection: #test.a AS b, #test.c AS d\
\n Filter: #test.a > Int64(10) AND #test.c > Int64(10)\
\n TableScan: test projection=None\
";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}

#[test]
fn test_in_filter_with_alias() -> Result<()> {
// in table scan the true col name is 'test.a',
// but we rename it as 'b', and use col 'b' in filter
// we need rewrite filter col before push down.
let table_scan = test_table_scan()?;
let filter_value = vec![lit(1u32), lit(2u32), lit(3u32), lit(4u32)];
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a").alias("b"), col("c")])?
.filter(in_list(col("b"), filter_value, false))?
.build()?;

// filter on col b
assert_eq!(
format!("{:?}", plan),
"\
Filter: #b IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\
\n Projection: #test.a AS b, #test.c\
\n TableScan: test projection=None\
"
);

// rewrite filter col b to test.a
let expected = "\
Projection: #test.a AS b, #test.c\
\n Filter: #test.a IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\
\n TableScan: test projection=None\
";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}

#[test]
fn test_in_filter_with_alias_2() -> Result<()> {
// in table scan the true col name is 'test.a',
// but we rename it as 'b', and use col 'b' in filter
// we need rewrite filter col before push down.
let table_scan = test_table_scan()?;
let filter_value = vec![lit(1u32), lit(2u32), lit(3u32), lit(4u32)];
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a").alias("b"), col("c")])?
.project(vec![col("b"), col("c")])?
.filter(in_list(col("b"), filter_value, false))?
.build()?;

// filter on col b
assert_eq!(
format!("{:?}", plan),
"\
Filter: #b IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\
\n Projection: #b, #test.c\
\n Projection: #test.a AS b, #test.c\
\n TableScan: test projection=None\
"
);

// rewrite filter col b to test.a
let expected = "\
Projection: #b, #test.c\
\n Projection: #test.a AS b, #test.c\
\n Filter: #test.a IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\
\n TableScan: test projection=None\
";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}
}
6 changes: 5 additions & 1 deletion datafusion/core/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,10 +560,14 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
}
Expr::Not(_) => Ok(Expr::Not(Box::new(expressions[0].clone()))),
Expr::Negative(_) => Ok(Expr::Negative(Box::new(expressions[0].clone()))),
Expr::InList { list, negated, .. } => Ok(Expr::InList {
expr: Box::new(expressions[0].clone()),
list: list.clone(),
negated: *negated,
}),
Expr::Column(_)
| Expr::OuterColumn(_, _)
| Expr::Literal(_)
| Expr::InList { .. }
| Expr::ScalarVariable(_, _) => Ok(expr.clone()),
Expr::Sort {
asc, nulls_first, ..
Expand Down

0 comments on commit de2adf4

Please sign in to comment.