Skip to content

Commit 7aed4d6

Browse files
authored
Fix propagation of optimized predicates on nested projections (apache#3228)
* Fix propagation of optimized predicates on nested projections * Add SQL integration tests * Alternative implementation on `issue_filters` (#1)
1 parent 873b071 commit 7aed4d6

File tree

2 files changed

+79
-26
lines changed

2 files changed

+79
-26
lines changed

datafusion/core/tests/sql/projection.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,3 +348,42 @@ async fn project_column_with_same_name_as_relation() -> Result<()> {
348348

349349
Ok(())
350350
}
351+
352+
#[tokio::test]
353+
async fn project_column_with_filters_that_cant_pushed_down_always_false() -> Result<()> {
354+
let ctx = SessionContext::new();
355+
356+
let sql = "select * from (select 1 as a) f where f.a=2;";
357+
let actual = execute_to_batches(&ctx, sql).await;
358+
359+
let expected = vec!["++", "++"];
360+
assert_batches_sorted_eq!(expected, &actual);
361+
362+
Ok(())
363+
}
364+
365+
#[tokio::test]
366+
async fn project_column_with_filters_that_cant_pushed_down_always_true() -> Result<()> {
367+
let ctx = SessionContext::new();
368+
369+
let sql = "select * from (select 1 as a) f where f.a=1;";
370+
let actual = execute_to_batches(&ctx, sql).await;
371+
372+
let expected = vec!["+---+", "| a |", "+---+", "| 1 |", "+---+"];
373+
assert_batches_sorted_eq!(expected, &actual);
374+
375+
Ok(())
376+
}
377+
378+
#[tokio::test]
379+
async fn project_columns_in_memory_without_propagation() -> Result<()> {
380+
let ctx = SessionContext::new();
381+
382+
let sql = "select column1 as a from (values (1), (2)) f where f.column1 = 2;";
383+
let actual = execute_to_batches(&ctx, sql).await;
384+
385+
let expected = vec!["+---+", "| a |", "+---+", "| 2 |", "+---+"];
386+
assert_batches_sorted_eq!(expected, &actual);
387+
388+
Ok(())
389+
}

datafusion/optimizer/src/filter_push_down.rs

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ impl State {
8181
}
8282

8383
/// returns all predicates in `state` that depend on any of `used_columns`
84+
/// or the ones that does not reference any columns (e.g. WHERE 1=1)
8485
fn get_predicates<'a>(
8586
state: &'a State,
8687
used_columns: &HashSet<Column>,
@@ -89,10 +90,11 @@ fn get_predicates<'a>(
8990
.filters
9091
.iter()
9192
.filter(|(_, columns)| {
92-
!columns
93-
.intersection(used_columns)
94-
.collect::<HashSet<_>>()
95-
.is_empty()
93+
columns.is_empty()
94+
|| !columns
95+
.intersection(used_columns)
96+
.collect::<HashSet<_>>()
97+
.is_empty()
9698
})
9799
.map(|&(ref a, ref b)| (a, b))
98100
.unzip()
@@ -338,34 +340,16 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
338340
let mut predicates = vec![];
339341
utils::split_conjunction(predicate, &mut predicates);
340342

341-
// Predicates without referencing columns (WHERE FALSE, WHERE 1=1, etc.)
342-
let mut no_col_predicates = vec![];
343-
344343
predicates
345344
.into_iter()
346345
.try_for_each::<_, Result<()>>(|predicate| {
347346
let mut columns: HashSet<Column> = HashSet::new();
348347
expr_to_columns(predicate, &mut columns)?;
349-
if columns.is_empty() {
350-
no_col_predicates.push(predicate)
351-
} else {
352-
// collect the predicate
353-
state.filters.push((predicate.clone(), columns));
354-
}
348+
state.filters.push((predicate.clone(), columns));
355349
Ok(())
356350
})?;
357351

358-
// Predicates without columns will not be pushed down.
359-
// As those contain only literals, they could be optimized using constant folding
360-
// and removal of WHERE TRUE / WHERE FALSE
361-
if !no_col_predicates.is_empty() {
362-
Ok(utils::add_filter(
363-
optimize(input, state)?,
364-
&no_col_predicates,
365-
))
366-
} else {
367-
optimize(input, state)
368-
}
352+
optimize(input, state)
369353
}
370354
LogicalPlan::Projection(Projection {
371355
input,
@@ -401,8 +385,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
401385

402386
// optimize inner
403387
let new_input = optimize(input, state)?;
404-
405-
from_plan(plan, expr, &[new_input])
388+
Ok(from_plan(plan, expr, &[new_input])?)
406389
}
407390
LogicalPlan::Aggregate(Aggregate {
408391
aggr_expr, input, ..
@@ -2092,4 +2075,35 @@ mod tests {
20922075

20932076
Ok(())
20942077
}
2078+
2079+
#[test]
2080+
fn test_propagation_of_optimized_inner_filters_with_projections() -> Result<()> {
2081+
// SELECT a FROM (SELECT 1 AS a) b WHERE b.a = 1
2082+
let plan = LogicalPlanBuilder::empty(true)
2083+
.project_with_alias(vec![lit(0i64).alias("a")], Some("b".to_owned()))?
2084+
.project_with_alias(vec![col("b.a")], Some("b".to_owned()))?
2085+
.filter(col("b.a").eq(lit(1i64)))?
2086+
.project(vec![col("b.a")])?
2087+
.build()?;
2088+
2089+
let expected_before = "\
2090+
Projection: #b.a\
2091+
\n Filter: #b.a = Int64(1)\
2092+
\n Projection: #b.a, alias=b\
2093+
\n Projection: Int64(0) AS a, alias=b\
2094+
\n EmptyRelation";
2095+
assert_eq!(format!("{:?}", plan), expected_before);
2096+
2097+
// Ensure that the predicate without any columns (0 = 1) is
2098+
// still there.
2099+
let expected_after = "\
2100+
Projection: #b.a\
2101+
\n Projection: #b.a, alias=b\
2102+
\n Projection: Int64(0) AS a, alias=b\
2103+
\n Filter: Int64(0) = Int64(1)\
2104+
\n EmptyRelation";
2105+
assert_optimized_plan_eq(&plan, expected_after);
2106+
2107+
Ok(())
2108+
}
20952109
}

0 commit comments

Comments
 (0)