Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache common referred expression at the window input #9009

Merged
merged 26 commits into from
Jan 29, 2024
Merged

Cache common referred expression at the window input #9009

merged 26 commits into from
Jan 29, 2024

Conversation

mustafasrepo
Copy link
Contributor

Which issue does this PR close?

Closes #.

Rationale for this change

The PR8960 retracted 2 of the window tests. This PR fixes these retracted tests also adds a new feature for caching common expressions at the window input.

As an example consider the following query

SELECT c3,
    SUM(c9) OVER(ORDER BY c3+c4 DESC) as sum1,
    SUM(c9) OVER(ORDER BY c3+c4 ASC) as sum2
    FROM aggregate_test_100

which will generate following logical plan.

+   Projection: aggregate_test_100.c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
+   --Limit: skip=0, fetch=5
+   ----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3 AS aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+   ------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3 AS aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+   --------Projection: aggregate_test_100.c3 + aggregate_test_100.c4 AS aggregate_test_100.c3 + aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3, aggregate_test_100.c3, aggregate_test_100.c9
+   ----------TableScan: aggregate_test_100 projection=[c3, c4, c9]

where expression c3+c4 is computed in the first Projection and its result (which is a column) is used in subsequent WindowAggr. This is done with the CommonSubexprEliminate rule.

However, for the following query

SELECT c3,
    SUM(c9) OVER(ORDER BY c3+c4 DESC, c9 DESC, c2 ASC) as sum1,
    SUM(c9) OVER(ORDER BY c3+c4 ASC, c9 ASC ) as sum2
    FROM aggregate_test_100

datafusion generates following plan:

+   Projection: aggregate_test_100.c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
+   --WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+   ----Projection: aggregate_test_100.c3, aggregate_test_100.c4, aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+   ------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+   --------TableScan: aggregate_test_100 projection=[c2, c3, c4, c9]

where computation c3+c4 couldn't cache with a Projection before first WindowAggr. The reason is that, each WindowAggr refers to c3+c4 once. Hence CommonSubExpr rule doesn't think "removing it is helpful".

What changes are included in this PR?

This PR fixes above problem so that CommonSubExpr considers consecutive window operators during common sub expression substitute analysis. With this analysis we can generate following logical plan for the second query:

+   Projection: aggregate_test_100.c3, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS sum2
+   --WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3 AS aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+   ----Projection: aggregate_test_100.c3 + aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3, aggregate_test_100.c3, aggregate_test_100.c9, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+   ------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3 AS aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c3 + aggregate_test_100.c4 DESC NULLS FIRST, aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
+   --------Projection: aggregate_test_100.c3 + aggregate_test_100.c4 AS aggregate_test_100.c3 + aggregate_test_100.c4aggregate_test_100.c4aggregate_test_100.c3, aggregate_test_100.c2, aggregate_test_100.c3, aggregate_test_100.c9
+   ----------TableScan: aggregate_test_100 projection=[c2, c3, c4, c9]

where common computation is cached with Projection before first window.

Are these changes tested?

Yes

Are there any user-facing changes?

@github-actions github-actions bot added optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) labels Jan 26, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this contribution @mustafasrepo -- this PR looks good to me. Also, I found the description on this PR very clear and well written. Thank you very much 🙏

One thought I had was will there be a problem if there is a subquery that would end up with a nested WindowAggExec that could be incorrectly optimized away 🤔

Something like

SELECT c3,
    SUM(c9) OVER(ORDER BY c3+c4 ASC) as sum2,
    sum1,
    FROM (
      SELECT c3, c4, c9, 
      SUM(c9) OVER(ORDER BY c3+c4 DESC) as sum1,
      FROM aggregate_test_100
    )

cc @waynexia and @haohuaijin

let input_schema = Arc::clone(input.schema());
let arrays =
to_arrays(window_expr, input_schema, &mut expr_set, ExprMask::Normal)?;
// Get all window expressions inside the consecutive window operators.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can add a comment here about why this is recursively looking down into all window operations (e.g. because they all get the same input schema and append on some window functions, but the window functions can't refer to previous window functions).

I think perhaps you could reuse the (very nicely written) description from this PR which explains it very well

Copy link
Contributor

@haohuaijin haohuaijin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mustafasrepo and @alamb, look great to me!

I also have the same question as @alamb.

datafusion/optimizer/src/common_subexpr_eliminate.rs Outdated Show resolved Hide resolved
mustafasrepo and others added 3 commits January 29, 2024 10:33
Co-authored-by: Huaijin <haohuaijin@gmail.com>
# Please enter a commit message to explain why this merge is necessary,
# especially if it merges an updated upstream into a topic branch.
#
# Lines starting with '#' will be ignored, and an empty message aborts
# the commit.
@mustafasrepo
Copy link
Contributor Author

Thank you for this contribution @mustafasrepo -- this PR looks good to me. Also, I found the description on this PR very clear and well written. Thank you very much 🙏

One thought I had was will there be a problem if there is a subquery that would end up with a nested WindowAggExec that could be incorrectly optimized away 🤔

Something like

SELECT c3,
    SUM(c9) OVER(ORDER BY c3+c4 ASC) as sum2,
    sum1,
    FROM (
      SELECT c3, c4, c9, 
      SUM(c9) OVER(ORDER BY c3+c4 DESC) as sum1,
      FROM aggregate_test_100
    )

I think, in these cases, we will generate a sub-optimal plan, where a complex expression is calculated more than once by subsequent operators. However, didn't cached (Previous behaviour). However, I don't think we will generate an invalid plan. I added your example as a test case also in this PR.

I think as a future PR, we can analyze plan from top down to count expression referral count, for better calculating referral counts across plan.

# Please enter a commit message to explain why this merge is necessary,
# especially if it merges an updated upstream into a topic branch.
#
# Lines starting with '#' will be ignored, and an empty message aborts
# the commit.
@alamb
Copy link
Contributor

alamb commented Jan 29, 2024

However, didn't cached (Previous behaviour). However, I don't think we will generate an invalid plan. I added your example as a test case also in this PR.

Thank you

Thank you for this contribution @mustafasrepo -- this PR looks good to me. Also, I found the description on this PR very clear and well written. Thank you very much 🙏

One thought I had was will there be a problem if there is a subquery that would end up with a nested WindowAggExec that could be incorrectly optimized away 🤔

Something like

SELECT c3,
SUM(c9) OVER(ORDER BY c3+c4 ASC) as sum2,
sum1,
FROM (
SELECT c3, c4, c9,
SUM(c9) OVER(ORDER BY c3+c4 DESC) as sum1,
FROM aggregate_test_100
)
I think, in these cases, we will generate a sub-optimal plan, where a complex expression is calculated more than once by subsequent operators. However, didn't cached (Previous behaviour). However, I don't think we will generate an invalid plan. I added your example as a test case also in this PR.

I think as a future PR, we can analyze plan from top down to count expression referral count, for better calculating referral counts across plan.

Yes, I agree there is no need to optimize this case as part of this PR, and since it gives correct results, lets 🚀

@alamb alamb merged commit a57e270 into apache:main Jan 29, 2024
22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants