-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-33337][SQL] Support subexpression elimination in branches of conditional expressions #30245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
...src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala
Show resolved
Hide resolved
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pinging me, @viirya . I took a look briefly and this looks useful. I'll revisit tomorrow.
|
Thank you @dongjoon-hyun |
|
Test build #130627 has finished for PR 30245 at commit
|
| } | ||
| }) | ||
| exprSetForAll = exprSetForAll.intersect(exprSet) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to handle head and tail seperately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For expression head, we add underlying expressions into exprSetForAll set. But for expressions in tail, we keep intersect between exprSetForAll and exprSet.
We can merge two blocks, but in the block we need to check if current expression is head expression and do different logic based on the check.
I prefer current one since it looks simpler.
...atalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
Outdated
Show resolved
Hide resolved
| childrenToRecurse.foreach(addExprTree) | ||
| // For some special expressions we cannot just recurse into all of its children, but we can | ||
| // recursively add the common expressions shared between all of its children. | ||
| def commonChildrenToRecurse: Seq[Seq[Expression]] = expr match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. Although this is used only here, can we declare this outside of this function as a private method? Currently, addExprTree seems to grow unnecessarily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, could you add a negative test case having the expression cannot be eliminated from conditional expressions?
I mixed positive and negative test cases. I think I can add some comment to explain it. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Do you still remember why "subexpression elimination" must be eagerly executed? Because implementing "lazy" is expensive? |
|
Test build #130705 has finished for PR 30245 at commit
|
I don't remember if we have tried to implement "lazy" behavior in codegen. Looks like at least it will bring complex as we need extra variable to check if a subexpression is evaulated in first time. Every time we use a subexpression, we might need to first check the extra variable and decide to evaluate the subexpression or just use evaluated value. |
| * For example, given two expressions `(a + (b + (c + 1)))` and `(d + (e + (c + 1)))`, | ||
| * the common expression `(c + 1)` will be added into `equivalenceMap`. | ||
| */ | ||
| def addCommonExprs(exprs: Seq[Expression], addFunc: Expression => Boolean = addExpr): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be private as well.
| val otherExprSet = mutable.Set[Expr]() | ||
|
|
||
| addExprTree(expr, (innerExpr: Expression) => { | ||
| if (innerExpr.deterministic) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar code appears twice. Can we create a method for it?
| val equivalence1 = new EquivalentExpressions | ||
| equivalence1.addExprTree(caseWhenExpr1) | ||
|
|
||
| // `add2` is repeatedly in all conditions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add1 is also repeated. Why it's not included?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We treat the first condition specially because it is definitely run. So it counts one for add2. Other conditions all contain add2 so it counts for one. That is where the count 2 comes from for add2.
For add1, although all values contain it, it is definitely run, so we count it one. If no other expression contains add1, we don't extract subexpression for add1 as it will run just once (we only run one value of CaseWhen).
|
Test build #130813 has finished for PR 30245 at commit
|
|
Kubernetes integration test starting |
|
Test build #130812 has finished for PR 30245 at commit
|
|
Kubernetes integration test status success |
|
Test build #130835 has finished for PR 30245 at commit
|
|
retest this please |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #130853 has finished for PR 30245 at commit
|
|
Thanks! Passed Jenkins and GitHub Actions. Will merge this today. |
|
Thanks! Merging to master. |
| assert(equivalence1.getAllEquivalentExprs.filter(_.size == 2).head == Seq(add, add)) | ||
| // one-time expressions: only ifExpr and its predicate expression | ||
| assert(equivalence1.getAllEquivalentExprs.count(_.size == 1) == 2) | ||
| assert(equivalence1.getAllEquivalentExprs.filter(_.size == 1).head == Seq(ifExpr1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use contains method? HashMap can not guarantee the order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will create a follow-up for making sure it will not possibly flaky. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created #30371.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, @leoluan2009 and @viirya . The follow-up is merged to reduce the flakiness.
…es if elseValue is set ### What changes were proposed in this pull request? This PR fixes a bug with subexpression elimination for CaseWhen statements. #30245 added support for creating subexpressions that are present in all branches of conditional statements. However, for a statement to be in "all branches" of a CaseWhen statement, it must also be in the elseValue. ### Why are the changes needed? Fix a bug where a subexpression can be created and run for branches of a conditional that don't pass. This can cause issues especially with a UDF in a branch that gets executed assuming the condition is true. ### Does this PR introduce _any_ user-facing change? Yes, fixes a potential bug where a UDF could be eagerly executed even though it might expect to have already passed some form of validation. For example: ``` val col = when($"id" < 0, myUdf($"id")) spark.range(1).select(when(col > 0, col)).show() ``` `myUdf($"id")` is considered a subexpression and eagerly evaluated, because it is pulled out as a common expression from both executions of the when clause, but if `id >= 0` it should never actually be run. ### How was this patch tested? Updated existing test with new case. Closes #32595 from Kimahriman/bug-case-subexpr-elimination. Authored-by: Adam Binford <adamq43@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
… values if elseValue is set ### What changes were proposed in this pull request? This PR fixes a bug with subexpression elimination for CaseWhen statements. #30245 added support for creating subexpressions that are present in all branches of conditional statements. However, for a statement to be in "all branches" of a CaseWhen statement, it must also be in the elseValue. ### Why are the changes needed? Fix a bug where a subexpression can be created and run for branches of a conditional that don't pass. This can cause issues especially with a UDF in a branch that gets executed assuming the condition is true. ### Does this PR introduce _any_ user-facing change? Yes, fixes a potential bug where a UDF could be eagerly executed even though it might expect to have already passed some form of validation. For example: ``` val col = when($"id" < 0, myUdf($"id")) spark.range(1).select(when(col > 0, col)).show() ``` `myUdf($"id")` is considered a subexpression and eagerly evaluated, because it is pulled out as a common expression from both executions of the when clause, but if `id >= 0` it should never actually be run. ### How was this patch tested? Updated existing test with new case. Closes #32651 from Kimahriman/bug-case-subexpr-elimination-3.1. Authored-by: Adam Binford <adamq43@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
| // if it is shared among conditions, but it doesn't need to be shared in values. Similarly, | ||
| // a subexpression among values doesn't need to be in conditions because no matter which | ||
| // condition is true, it will be evaluated. | ||
| val conditions = c.branches.tail.map(_._1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a flaw here: we exclude the first condition, so a common subexpressions in the rest of the conditions doesn't mean it's always evaluated.
e.g. CaseWhen(cond1, ... cond2, ..., cond2, ...), cond2 is shared between the rest conditions but it's not always evaluated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this is related to #32977. This looks more a aggressive optimization. Consider if we respect short-circuit evaluation practice for CaseWhen, this might be an issue if users reply short-circuit evaluation to guard later conditions.
Safest approach is to only consider all conditions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT? Should we only consider all conditions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should. I hit an issue caused by it in my refactor and I'll open a PR for the refactor with multiple bugs fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, does #32980 conflict with your refactor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only some trivial conflicts, #32980 should be merged first as it has been reviewed and approved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, I also addressed this issue in #32987 which assumed CaseWhen's (and Coalesce) should short circuit and guard later conditions. The main benefit/difference is if you have
CaseWhen(cond1, ..., cond1, ..., cond2, ...), cond1 gets pulled out as a subexpression when it wouldn't otherwise even with #33142 I think
… values if elseValue is set ### What changes were proposed in this pull request? This PR fixes a bug with subexpression elimination for CaseWhen statements. apache#30245 added support for creating subexpressions that are present in all branches of conditional statements. However, for a statement to be in "all branches" of a CaseWhen statement, it must also be in the elseValue. ### Why are the changes needed? Fix a bug where a subexpression can be created and run for branches of a conditional that don't pass. This can cause issues especially with a UDF in a branch that gets executed assuming the condition is true. ### Does this PR introduce _any_ user-facing change? Yes, fixes a potential bug where a UDF could be eagerly executed even though it might expect to have already passed some form of validation. For example: ``` val col = when($"id" < 0, myUdf($"id")) spark.range(1).select(when(col > 0, col)).show() ``` `myUdf($"id")` is considered a subexpression and eagerly evaluated, because it is pulled out as a common expression from both executions of the when clause, but if `id >= 0` it should never actually be run. ### How was this patch tested? Updated existing test with new case. Closes apache#32651 from Kimahriman/bug-case-subexpr-elimination-3.1. Authored-by: Adam Binford <adamq43@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
… values if elseValue is set ### What changes were proposed in this pull request? This PR fixes a bug with subexpression elimination for CaseWhen statements. apache#30245 added support for creating subexpressions that are present in all branches of conditional statements. However, for a statement to be in "all branches" of a CaseWhen statement, it must also be in the elseValue. ### Why are the changes needed? Fix a bug where a subexpression can be created and run for branches of a conditional that don't pass. This can cause issues especially with a UDF in a branch that gets executed assuming the condition is true. ### Does this PR introduce _any_ user-facing change? Yes, fixes a potential bug where a UDF could be eagerly executed even though it might expect to have already passed some form of validation. For example: ``` val col = when($"id" < 0, myUdf($"id")) spark.range(1).select(when(col > 0, col)).show() ``` `myUdf($"id")` is considered a subexpression and eagerly evaluated, because it is pulled out as a common expression from both executions of the when clause, but if `id >= 0` it should never actually be run. ### How was this patch tested? Updated existing test with new case. Closes apache#32651 from Kimahriman/bug-case-subexpr-elimination-3.1. Authored-by: Adam Binford <adamq43@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
…es if elseValue is set This PR fixes a bug with subexpression elimination for CaseWhen statements. apache#30245 added support for creating subexpressions that are present in all branches of conditional statements. However, for a statement to be in "all branches" of a CaseWhen statement, it must also be in the elseValue. Fix a bug where a subexpression can be created and run for branches of a conditional that don't pass. This can cause issues especially with a UDF in a branch that gets executed assuming the condition is true. Yes, fixes a potential bug where a UDF could be eagerly executed even though it might expect to have already passed some form of validation. For example: ``` val col = when($"id" < 0, myUdf($"id")) spark.range(1).select(when(col > 0, col)).show() ``` `myUdf($"id")` is considered a subexpression and eagerly evaluated, because it is pulled out as a common expression from both executions of the when clause, but if `id >= 0` it should never actually be run. Updated existing test with new case. Closes apache#32595 from Kimahriman/bug-case-subexpr-elimination. Authored-by: Adam Binford <adamq43@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
What changes were proposed in this pull request?
Currently we skip subexpression elimination in branches of conditional expressions including
If,CaseWhen, andCoalesce. Actually we can do subexpression elimination for such branches if the subexpression is common across all branches. This patch proposes to support subexpression elimination in branches of conditional expressions.Why are the changes needed?
We may miss subexpression elimination chances in branches of conditional expressions. This kind of subexpression is frequently seen. It may be written manually by users or come from query optimizer. For example, project collapsing could embed expressions between two
Projects and produces conditional expression like:If
jsonToStruct(json)is time-expensive expression, we don't eliminate the duplication and waste time on running it repeatedly now.Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit test.