-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-45352][SQL] Eliminate foldable window partitions #43144
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OptimizeWindowPartitions -> EliminateWindowPartitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| plan.transformAllExpressionsWithPruning(_.containsAnyPattern(WINDOW_EXPRESSION), ruleId) { | |
| plan.transformAllExpressionsWithPruning(_.containsPattern(WINDOW_EXPRESSION), ruleId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| case we @ WindowExpression(_, ws @ WindowSpecDefinition(ps, _, _)) => | |
| case we @ WindowExpression(_, ws @ WindowSpecDefinition(ps, _, _)) if ps.exists(_.foldable) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add more test cases:
- partitions only unfoldable.
- Mix unfoldable and foldable partition specs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@beliefer Thanks, updated all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we can simplify the code here with transformWithPruning only, then we can remove the removeWindowExpressionPartitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If do as you say, this will cause the partitionSpec in WindowExpression to be inconsistent with the partitionSpec in Window, which may cause hidden dangers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I mean is merge the logic of removeWindowExpressionPartitions into transformWithPruning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case w @ Window(windowExpressions, partitionSpec, _, _) if partitionSpec.exists(_.foldable) =>
val newWindowExpressions =
windowExpressions.map(_.transformWithPruning(_.containsPattern(WINDOW_EXPRESSION)) {
case wsd @ WindowSpecDefinition(ps, _, _) if ps.exists(_.foldable) =>
wsd.copy(partitionSpec = ps.filter(!_.foldable))
}.asInstanceOf[NamedExpression])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be done? WINDOW_EXPRESSION mismatch WindowSpecDefinition.
case w @ Window(we, ps, _, _) if ps.exists(_.foldable) =>
val newWe = we.map(_.transformWithPruning(_.containsPattern(WINDOW_EXPRESSION)) {
case _we @ WindowExpression(_, wsd @ WindowSpecDefinition(_ps, _, _))
if _ps.exists(_.foldable) =>
val newWsd = wsd.copy(partitionSpec = _ps.filter(!_.foldable))
_we.copy(windowSpec = newWsd)
}.asInstanceOf[NamedExpression])
w.copy(windowExpressions = newWe, partitionSpec = ps.filter(!_.foldable))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't actually built it, just for reference. You should ensure that it is correct.
Could we make the variable name more readable? e.g. we => windowExprs or windowExpressions and others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I adjusted the above code
case w @ Window(windowExprs, partitionSpec, _, _) if partitionSpec.exists(_.foldable) =>
val newWe = windowExprs.map(_.transformWithPruning(_.containsPattern(WINDOW_EXPRESSION)) {
case windowExpr @ WindowExpression(_, wsd @ WindowSpecDefinition(ps, _, _))
if ps.exists(_.foldable) =>
val newWsd = wsd.copy(partitionSpec = ps.filter(!_.foldable))
windowExpr.copy(windowSpec = newWsd)
}.asInstanceOf[NamedExpression])
w.copy(windowExpressions = newWe, partitionSpec = partitionSpec.filter(!_.foldable))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uh, I'm missing. Please rename newWe to newWindowExprs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
cc @wangyum |
beliefer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except one comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use withNewChildren instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which object uses withNewChildren? What is the purpose of using withNewChildren?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reduce object copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, thanks.
|
cc @cloud-fan |
| /** | ||
| * Remove window partition if partition expressions are foldable | ||
| */ | ||
| object EliminateWindowPartitions extends Rule[LogicalPlan] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's put it in a new file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| Row(1) :: Row(1) :: Nil) | ||
| } | ||
|
|
||
| test("SPARK-45352: Remove window partition if partition expression are foldable") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for a new optimizer rule, we should add unit tests, like LimitPushdownSuite
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| val window1 = new WindowSpec(Seq(), Seq(sortOrder), UnspecifiedFrame) | ||
| val window2 = new WindowSpec(Seq(lit(1).expr), Seq(sortOrder), UnspecifiedFrame) | ||
| val df1 = ds.select(row_number().over(window1).alias("num")) | ||
| val df2 = ds.select(row_number().over(window2).alias("num")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for end-to-end tests, let's use end-to-end APIs, e.g. Window.partitionBy($"key").orderBy($"value")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, thanks.
|
@cloud-fan Already fixed unit test, could you have time to take a look?Thanks. |
| case windowExpr @ WindowExpression(_, wsd @ WindowSpecDefinition(ps, _, _)) | ||
| if ps.exists(_.foldable) => | ||
| val newWsd = wsd.copy(partitionSpec = ps.filter(!_.foldable)) | ||
| windowExpr.withNewChildren(Seq(windowExpr.windowFunction, newWsd)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: windowExpr.copy(windowSpec = newWsd)
| testRelation | ||
| .select(a, b, | ||
| windowExpr(RowNumber(), | ||
| windowSpec(Nil, b.desc :: Nil, windowFrame)).as("rn")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@beliefer can you help confirm that no partition column means a single partition in the window operator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. All the data rows in a single partition.
| windowExpr(RowNumber(), | ||
| windowSpec(a :: Nil, b.desc :: Nil, windowFrame)).as("rn")) | ||
|
|
||
| val correctAnswer = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: val correctAnswer = originalQuery
| Row(1) :: Row(1) :: Nil) | ||
| } | ||
|
|
||
| test("SPARK-45352: Eliminate foldable window partitions") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we turn on/off the new optimizer rule and run the test twice to make sure the result is the same?
|
Updated all. @cloud-fan |
|
thanks, merging to master! |
|
Thank you for review. @cloud-fan @beliefer |
### What changes were proposed in this pull request? This PR add a new optimizer rule `EliminateWindowPartitions`, it remove window partition if partition expressions are foldable. sql1: `select row_number() over(order by a) b from t ` sql2: `select row_number() over(partition by 1 order by a) b from t ` After this PR, the `optimizedPlan` for sql1 and sql2 is the same. ### Why are the changes needed? Foldable partition is redundant, remove it not only can simplify plan, but some rules can also take effect when the partitions are all foldable, such as `LimitPushDownThroughWindow`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#43144 from zml1206/SPARK-45352. Authored-by: zml1206 <zhuml1206@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
This PR add a new optimizer rule
EliminateWindowPartitions, it remove window partition if partition expressions are foldable.sql1:
select row_number() over(order by a) b from tsql2:
select row_number() over(partition by 1 order by a) b from tAfter this PR, the
optimizedPlanfor sql1 and sql2 is the same.Why are the changes needed?
Foldable partition is redundant, remove it not only can simplify plan, but some rules can also take effect when the partitions are all foldable, such as
LimitPushDownThroughWindow.Does this PR introduce any user-facing change?
No
How was this patch tested?
UT
Was this patch authored or co-authored using generative AI tooling?
No