Skip to content

Commit

Permalink
Alternative implementation on issue_filters
Browse files Browse the repository at this point in the history
  • Loading branch information
isidentical committed Aug 27, 2022
1 parent 39c6459 commit 169e6b6
Showing 1 changed file with 11 additions and 45 deletions.
56 changes: 11 additions & 45 deletions datafusion/optimizer/src/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ impl State {
}

/// returns all predicates in `state` that depend on any of `used_columns`
/// or the ones that does not reference any columns (e.g. WHERE 1=1)
fn get_predicates<'a>(
state: &'a State,
used_columns: &HashSet<Column>,
Expand All @@ -89,10 +90,11 @@ fn get_predicates<'a>(
.filters
.iter()
.filter(|(_, columns)| {
!columns
.intersection(used_columns)
.collect::<HashSet<_>>()
.is_empty()
columns.is_empty()
| !columns
.intersection(used_columns)
.collect::<HashSet<_>>()
.is_empty()
})
.map(|&(ref a, ref b)| (a, b))
.unzip()
Expand Down Expand Up @@ -338,34 +340,16 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
let mut predicates = vec![];
utils::split_conjunction(predicate, &mut predicates);

// Predicates without referencing columns (WHERE FALSE, WHERE 1=1, etc.)
let mut no_col_predicates = vec![];

predicates
.into_iter()
.try_for_each::<_, Result<()>>(|predicate| {
let mut columns: HashSet<Column> = HashSet::new();
expr_to_columns(predicate, &mut columns)?;
if columns.is_empty() {
no_col_predicates.push(predicate)
} else {
// collect the predicate
state.filters.push((predicate.clone(), columns));
}
state.filters.push((predicate.clone(), columns));
Ok(())
})?;

// Predicates without columns will not be pushed down.
// As those contain only literals, they could be optimized using constant folding
// and removal of WHERE TRUE / WHERE FALSE
if !no_col_predicates.is_empty() {
Ok(utils::add_filter(
optimize(input, state)?,
&no_col_predicates,
))
} else {
optimize(input, state)
}
optimize(input, state)
}
LogicalPlan::Projection(Projection {
input,
Expand All @@ -390,36 +374,18 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
})
.collect::<HashMap<_, _>>();

// Predicates without referencing columns (WHERE FALSE, WHERE 1=1, etc.)
let mut no_col_predicates = vec![];

// re-write all filters based on this projection
// E.g. in `Filter: #b\n Projection: #a > 1 as b`, we can swap them, but the filter must be "#a > 1"
for (predicate, columns) in state.filters.iter_mut() {
*predicate = replace_cols_by_name(predicate.clone(), &projection)?;

columns.clear();
expr_to_columns(predicate, columns)?;
if columns.is_empty() {
no_col_predicates.push(predicate.clone())
}
}
// Don't pushdown columnless predicates.
state
.filters
.retain(|(predicate, _)| !no_col_predicates.contains(predicate));

// optimize inner
let new_input = optimize(input, state)?;
let inlined_plan = from_plan(plan, expr, &[new_input])?;
if !no_col_predicates.is_empty() {
Ok(utils::add_filter(
inlined_plan,
no_col_predicates.iter().collect::<Vec<&Expr>>().as_slice(),
))
} else {
Ok(inlined_plan)
}
Ok(from_plan(plan, expr, &[new_input])?)
}
LogicalPlan::Aggregate(Aggregate {
aggr_expr, input, ..
Expand Down Expand Up @@ -2133,8 +2099,8 @@ mod tests {
let expected_after = "\
Projection: #b.a\
\n Projection: #b.a, alias=b\
\n Filter: Int64(0) = Int64(1)\
\n Projection: Int64(0) AS a, alias=b\
\n Projection: Int64(0) AS a, alias=b\
\n Filter: Int64(0) = Int64(1)\
\n EmptyRelation";
assert_optimized_plan_eq(&plan, expected_after);

Expand Down

0 comments on commit 169e6b6

Please sign in to comment.