Skip to content

Commit

Permalink
fix alias rewrite In_List for filter push down (apache#2729)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ted-Jiang authored and waynexia committed Jun 20, 2022
1 parent 6228b65 commit e53f6c5
Show file tree
Hide file tree
Showing 2 changed files with 220 additions and 2 deletions.
216 changes: 215 additions & 1 deletion datafusion/optimizer/src/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ mod tests {
use async_trait::async_trait;
use datafusion_common::DFSchema;
use datafusion_expr::{
and, col, lit,
and, col, in_list, lit,
logical_plan::{builder::union_with_alias, JoinType},
sum, Expr, LogicalPlanBuilder, Operator, TableSource, TableType,
};
Expand Down Expand Up @@ -1831,4 +1831,218 @@ mod tests {

Ok(())
}

#[test]
fn test_filter_with_alias() -> Result<()> {
// in table scan the true col name is 'test.a',
// but we rename it as 'b', and use col 'b' in filter
// we need rewrite filter col before push down.
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a").alias("b"), col("c")])?
.filter(and(col("b").gt(lit(10i64)), col("c").gt(lit(10i64))))?
.build()?;

// filter on col b
assert_eq!(
format!("{:?}", plan),
"\
Filter: #b > Int64(10) AND #test.c > Int64(10)\
\n Projection: #test.a AS b, #test.c\
\n TableScan: test projection=None\
"
);

// rewrite filter col b to test.a
let expected = "\
Projection: #test.a AS b, #test.c\
\n Filter: #test.a > Int64(10) AND #test.c > Int64(10)\
\n TableScan: test projection=None\
";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}

#[test]
fn test_filter_with_alias_2() -> Result<()> {
// in table scan the true col name is 'test.a',
// but we rename it as 'b', and use col 'b' in filter
// we need rewrite filter col before push down.
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a").alias("b"), col("c")])?
.project(vec![col("b"), col("c")])?
.filter(and(col("b").gt(lit(10i64)), col("c").gt(lit(10i64))))?
.build()?;

// filter on col b
assert_eq!(
format!("{:?}", plan),
"\
Filter: #b > Int64(10) AND #test.c > Int64(10)\
\n Projection: #b, #test.c\
\n Projection: #test.a AS b, #test.c\
\n TableScan: test projection=None\
"
);

// rewrite filter col b to test.a
let expected = "\
Projection: #b, #test.c\
\n Projection: #test.a AS b, #test.c\
\n Filter: #test.a > Int64(10) AND #test.c > Int64(10)\
\n TableScan: test projection=None\
";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}

#[test]
fn test_filter_with_multi_alias() -> Result<()> {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a").alias("b"), col("c").alias("d")])?
.filter(and(col("b").gt(lit(10i64)), col("d").gt(lit(10i64))))?
.build()?;

// filter on col b and d
assert_eq!(
format!("{:?}", plan),
"\
Filter: #b > Int64(10) AND #d > Int64(10)\
\n Projection: #test.a AS b, #test.c AS d\
\n TableScan: test projection=None\
"
);

// rewrite filter col b to test.a, col d to test.c
let expected = "\
Projection: #test.a AS b, #test.c AS d\
\n Filter: #test.a > Int64(10) AND #test.c > Int64(10)\
\n TableScan: test projection=None\
";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}

/// predicate on join key in filter expression should be pushed down to both inputs
#[test]
fn join_filter_with_alias() -> Result<()> {
let table_scan = test_table_scan()?;
let left = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a").alias("c")])?
.build()?;
let right_table_scan = test_table_scan_with_name("test2")?;
let right = LogicalPlanBuilder::from(right_table_scan)
.project(vec![col("b").alias("d")])?
.build()?;
let filter = col("c").gt(lit(1u32));
let plan = LogicalPlanBuilder::from(left)
.join(
&right,
JoinType::Inner,
(vec![Column::from_name("c")], vec![Column::from_name("d")]),
Some(filter),
)?
.build()?;

assert_eq!(
format!("{:?}", plan),
"\
Inner Join: #c = #d Filter: #c > UInt32(1)\
\n Projection: #test.a AS c\
\n TableScan: test projection=None\
\n Projection: #test2.b AS d\
\n TableScan: test2 projection=None"
);

// Change filter on col `c`, 'd' to `test.a`, 'test.b'
let expected = "\
Inner Join: #c = #d\
\n Projection: #test.a AS c\
\n Filter: #test.a > UInt32(1)\
\n TableScan: test projection=None\
\n Projection: #test2.b AS d\
\n Filter: #test2.b > UInt32(1)\
\n TableScan: test2 projection=None";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}

#[test]
fn test_in_filter_with_alias() -> Result<()> {
// in table scan the true col name is 'test.a',
// but we rename it as 'b', and use col 'b' in filter
// we need rewrite filter col before push down.
let table_scan = test_table_scan()?;
let filter_value = vec![lit(1u32), lit(2u32), lit(3u32), lit(4u32)];
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a").alias("b"), col("c")])?
.filter(in_list(col("b"), filter_value, false))?
.build()?;

// filter on col b
assert_eq!(
format!("{:?}", plan),
"\
Filter: #b IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\
\n Projection: #test.a AS b, #test.c\
\n TableScan: test projection=None\
"
);

// rewrite filter col b to test.a
let expected = "\
Projection: #test.a AS b, #test.c\
\n Filter: #test.a IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\
\n TableScan: test projection=None\
";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}

#[test]
fn test_in_filter_with_alias_2() -> Result<()> {
// in table scan the true col name is 'test.a',
// but we rename it as 'b', and use col 'b' in filter
// we need rewrite filter col before push down.
let table_scan = test_table_scan()?;
let filter_value = vec![lit(1u32), lit(2u32), lit(3u32), lit(4u32)];
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a").alias("b"), col("c")])?
.project(vec![col("b"), col("c")])?
.filter(in_list(col("b"), filter_value, false))?
.build()?;

// filter on col b
assert_eq!(
format!("{:?}", plan),
"\
Filter: #b IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\
\n Projection: #b, #test.c\
\n Projection: #test.a AS b, #test.c\
\n TableScan: test projection=None\
"
);

// rewrite filter col b to test.a
let expected = "\
Projection: #b, #test.c\
\n Projection: #test.a AS b, #test.c\
\n Filter: #test.a IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\
\n TableScan: test projection=None\
";

assert_optimized_plan_eq(&plan, expected);

Ok(())
}
}
6 changes: 5 additions & 1 deletion datafusion/optimizer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,13 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
}
Expr::Not(_) => Ok(Expr::Not(Box::new(expressions[0].clone()))),
Expr::Negative(_) => Ok(Expr::Negative(Box::new(expressions[0].clone()))),
Expr::InList { list, negated, .. } => Ok(Expr::InList {
expr: Box::new(expressions[0].clone()),
list: list.clone(),
negated: *negated,
}),
Expr::Column(_)
| Expr::Literal(_)
| Expr::InList { .. }
| Expr::Exists { .. }
| Expr::InSubquery { .. }
| Expr::ScalarSubquery(_)
Expand Down

0 comments on commit e53f6c5

Please sign in to comment.