Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EXISTS and IN subquery rewriting for correlated filters at filter depth 1 #2451

Closed

Conversation

jon-chuang
Copy link

@jon-chuang jon-chuang commented May 5, 2022

Which issue does this PR close?

Closes: #2351
Related: #2421

Rationale for this change

We perform a simple pattern match in the LogicalPlan optimizer on InSubquery(Projection(Filter(..))) and Exists(Filter(..)), extracting correlated/dependent columns to be used in the semi/anti join

TODO:

  1. Sanity abort if detect_correlated_columns in the remainder expression (needs to be correlation free to derive the benefits of semi/antijoin)? Future: rewrite infallibly as SEMIJOIN + UNION ALL.
  2. Test no optimization on non-conjunctive clauses containing correlated predicates?

What changes are included in this PR?

Are there any user-facing changes?

Future Work

  1. Rewrite nested disjunctive expressions in the filter as UNION ALL.
    For instance
SELECT * FROM t 
WHERE 
  EXISTS (..sq1..) OR 
  (a IN (..sq2..) AND a < 100) OR 
  a > 100

=>

SELECT * FROM t LEFT SEMIJOIN sq1 on ..
UNION ALL
SELECT * FROM t LEFT SEMIJOIN sq2 on a == r.a WHERE a < 100
UNION ALL
SELECT * FROM t WHERE a > 100

@github-actions github-actions bot added the datafusion Changes in the datafusion crate label May 5, 2022
@jon-chuang jon-chuang changed the title Add exists subquery rewriting and correlated filters at filter depth 1 Add EXISTS and IN subquery rewriting for correlated filters at filter depth 1 May 5, 2022
\n TableScan: sq projection=None [a:UInt32, b:UInt32, c:UInt32]\
\n Projection: #sq_nested.c [c:UInt32]\
\n TableScan: sq_nested projection=None [a:UInt32, b:UInt32, c:UInt32]";
\n Semi Join: #sq.a = #sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this is a quirk of the implementation. It removes projections below the semi join in some cases (which are inconsequential).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One way to remove the quirk is to allow for multi-column InSubquerys.

The predicate pullup rule (next PR) will be able to "commute" a projection and a filter as follows:

Project([col(a)])
  Filter(col(b)=(col(t.b))
=>
Filter(col(b)=(col(t.b))
  Project([col(a), col(b)])

This way, one can move the Filter to the top of the subquery tree, just like with Exists.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The predicate pullup rule (next PR) will be able to "commute" a projection and a filter as follows:

This sounds like a subset of the filter "pushdown" rule -- so maybe we can just invoke that again rather than adding a new special case rule

// NOTE: We only pattern match against Projection(Filter(..)). We will have another optimization rule
// which tries to pull up all correlated predicates in an InSubquery into a Projection(Filter(..))
// at the root node of the InSubquery's subquery. The Projection at the root must have as its expression
// a single Column.
Copy link
Author

@jon-chuang jon-chuang May 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be somewhat restrictive. It can be expanded but it needs some work. In particular joins will need to accept expressions rather than just joining blindly on rows.

Edit: Actually, its probably fine to have the following:

Projection(col(a))
  Filter(col(b)=col(outer.b))
    Projection(alias(col(a).plus(col(c)), a), col(b))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree that re-using a Projection to calculate expressions is reasonable.

I think there are certain types of OUTER joins where the actual evaluation needs to happen within the join (as the presence of a NULL means something different than the row being filtered)

@alamb
Copy link
Contributor

alamb commented May 6, 2022

Thanks @jon-chuang -- I will try and review this carefully tomorrow

@jon-chuang jon-chuang force-pushed the exists-simple-pattern-match branch from ff90c1a to e7f0365 Compare May 7, 2022 04:02
@alamb
Copy link
Contributor

alamb commented May 8, 2022

Sorry I haven't had a chance to review this PR yet @jon-chuang -- I will put it on my queue and hopefully get to it shortly

@alamb
Copy link
Contributor

alamb commented May 9, 2022

I have made it 1/2 way through this PR -- I will finish it up later today. Thanks again for your patience @jon-chuang

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really nicely done @jon-chuang -- thank you 🙏

cc @andygrove

For other reviewers, here is the PR for equality IN/NOT IN support: #2421

Semi Join: #table_a.a = #table_b.a [a:UInt32, b:UInt32]\
\n Projection: #table_a.a, #table_a.b [a:UInt32, b:UInt32]\
\n TableScan: table_a projection=None [a:UInt32, b:UInt32, c:UInt32]\
\n Filter: #table_b.b > Utf8(\"5\") [a:UInt32, b:UInt32, c:UInt32]\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vaguely remember some corner cases related to these non equality predicates and SEMI joins, but when I tried to come up with some examples of issues with this plan I could not. 👍

let subquery = LogicalPlanBuilder::from(table_b)
.filter(
(col("table_a.c").eq(col("table_b.c"))).and(
(col("table_a.a").eq(col("table_b.a")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest at least one predicate that has the subquery / table_b on the left -- like table_b.a = table_a. as all these predicates have table_a on the left

Ok(())
}

// We only test not exists for the simplest case since all the other code paths
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

I think there might be some subtleties involving NULLs (like if the subquery only has nulls and there is a non equality filter like table_b.a > 5

// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter] with
/// its predicate with all `predicates` ANDed.
pub fn filter_by_all(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan {
if let Some(predicate) = combine_conjunctive(predicates) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// NOTE: We only pattern match against Projection(Filter(..)). We will have another optimization rule
// which tries to pull up all correlated predicates in an InSubquery into a Projection(Filter(..))
// at the root node of the InSubquery's subquery. The Projection at the root must have as its expression
// a single Column.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree that re-using a Projection to calculate expressions is reasonable.

I think there are certain types of OUTER joins where the actual evaluation needs to happen within the join (as the presence of a NULL means something different than the row being filtered)

&mut correlated_join_columns,
);

// Strip the projection away and use its input for the semi/anti-join
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why this stripping is needed? I don't understand it

\n TableScan: sq projection=None [a:UInt32, b:UInt32, c:UInt32]\
\n Projection: #sq_nested.c [c:UInt32]\
\n TableScan: sq_nested projection=None [a:UInt32, b:UInt32, c:UInt32]";
\n Semi Join: #sq.a = #sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The predicate pullup rule (next PR) will be able to "commute" a projection and a filter as follows:

This sounds like a subset of the filter "pushdown" rule -- so maybe we can just invoke that again rather than adding a new special case rule

let new_input = subquery_filters.iter().try_fold(
optimized_input,
|outer_plan, &subquery_expr| {
self.rewrite_correlated_subquery_as_join(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines +715 to +720
let mut contains_correlated_columns = false;
expr.accept(CorrelatedColumnsVisitor {
outer_schema,
contains_correlated_columns: &mut contains_correlated_columns,
})?;
Ok(contains_correlated_columns)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it really matters, but this type of pattern could also be expressed like this, to avoid some mut (assuming contains_correlated_columns has been changed to bool):

Suggested change
let mut contains_correlated_columns = false;
expr.accept(CorrelatedColumnsVisitor {
outer_schema,
contains_correlated_columns: &mut contains_correlated_columns,
})?;
Ok(contains_correlated_columns)
let visitor = expr.accept(CorrelatedColumnsVisitor {
outer_schema,
contains_correlated_columns: false,
})?;
Ok(visitor.contains_correlated_columns)

@alamb alamb mentioned this pull request May 11, 2022
@jackwener
Copy link
Member

It's a great job!😀❤My work is a little busy.
I will review it on weekends.

@alamb
Copy link
Contributor

alamb commented May 26, 2022

@jon-chuang do you have time to work on this PR (looks like it has accumulated some conflicts) and there are some suggestions about tests. If not, I can find some time to do a final polish and merge it in

.filter(
(col("table_a.c").eq(col("table_b.c"))).and(
(col("table_a.a").eq(col("table_b.a")))
.and(col("table_a.b").eq(col("table_b.b"))),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need test like A or (B and c)?

Because it can't be split .

@@ -64,105 +233,41 @@ impl OptimizerRule for SubqueryFilterToJoin {
utils::split_conjunction(predicate, &mut filters);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A or (B anc C) can't be splited.

I'm not sure that it will have a affact.

@andygrove andygrove removed the datafusion Changes in the datafusion crate label Jun 3, 2022
@alamb alamb marked this pull request as draft June 7, 2022 17:22
@alamb
Copy link
Contributor

alamb commented Jun 7, 2022

Marking as draft as it needs some work before merging

@alamb
Copy link
Contributor

alamb commented Jul 21, 2022

@avantgardnerio noted that #2885 has some non trivial overlap with this PR, if you ever get back to it @jon-chuang

@alamb
Copy link
Contributor

alamb commented Aug 6, 2022

I believe this PR has been superseded now, so closing. Please feel free to reopen if you plan to keep working on it.

@alamb alamb closed this Aug 6, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Optimize EXISTS subquery expressions by rewriting as semi-join
4 participants