-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-34575][SQL] Push down limit through window when partitionSpec is empty #31691
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #135579 has finished for PR 31691 at commit
|
|
Test build #135590 has finished for PR 31691 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
Outdated
Show resolved
Hide resolved
| WindowSpecDefinition(Nil, orderSpec, | ||
| SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _)), _, _, child)) | ||
| if child.maxRows.forall( _ > limitVal) => | ||
| LocalLimit( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need the LocalLimit here? We already restrict the window expression to be RankLike and RowNumber, so we know the number of rows will not change before & after window, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be optimized by EliminateLimits later. Otherwise, the plan cannot be further optimized. Like this:
!GlobalLimit 2
!+- Window [row_number() windowspecdefinition(c#0 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#0], [c#0 DESC NULLS LAST]
! +- GlobalLimit 2
! +- LocalLimit 2
! +- Sort [c#0 DESC NULLS LAST], true
! +- LocalRelation [a#0, b#0, c#0]
| if child.maxRows.forall( _ > limitVal) => | ||
| LocalLimit( | ||
| limitExpr = limitExpr, | ||
| child = window.copy(child = Limit(limitExpr, Sort(orderSpec, true, child)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering why do we need an extra Sort here? Shouldn't physical plan rule EnsureRequirements add the sort between window and limit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Sort is needed because we need global sort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If no partitionSpec specified, I think the planner inserts an exchange for making a single partition. So, we don't need a global sort here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current logic is sort first and then limit:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Window [row_number() windowspecdefinition(a#10L ASC NULLS FIRST, b#11L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rowId#8], [a#10L ASC NULLS FIRST, b#11L ASC NULLS FIRST]
+- TakeOrderedAndProject(limit=5, orderBy=[a#10L ASC NULLS FIRST,b#11L ASC NULLS FIRST], output=[a#10L,b#11L])
+- FileScan parquet default.t1[a#10L,b#11L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
If remove sort. The logic is limit first and then sort:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Window [row_number() windowspecdefinition(a#10L ASC NULLS FIRST, b#11L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rowId#8], [a#10L ASC NULLS FIRST, b#11L ASC NULLS FIRST]
+- Sort [a#10L ASC NULLS FIRST, b#11L ASC NULLS FIRST], false, 0
+- GlobalLimit 5
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#30]
+- LocalLimit 5
+- FileScan parquet default.t1[a#10L,b#11L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. I got it. Thanks. I think its better to leave some comments about why we need it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explanation @wangyum , +1 for adding some comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #135619 has finished for PR 31691 at commit
|
|
cc @cloud-fan |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
Outdated
Show resolved
Hide resolved
|
|
||
| // Adding an extra Limit below WINDOW when there is only one RankLike/RowNumber | ||
| // window function and partitionSpec is empty. | ||
| case LocalLimit(limitExpr @ IntegerLiteral(limitVal), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we respect TOP_K_SORT_FALLBACK_THRESHOLD here?
|
I'm trying to understand the before/after data flow. Before: input -> shuffle to one partition -> local sort and run rank function -> limit The optimization makes sense, but seems like we can remove the final limit? More questions: why only allow a single rank function? what's the requirement of the window frame? |
After: |
|
If we know the final limit will always be removed, why we add it in the first place? |
Fixed by b328375. This needs a small change to |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #135705 has finished for PR 31691 at commit
|
| // Adding an extra Limit below WINDOW when there is only one RankLike/RowNumber | ||
| // window function and partitionSpec is empty. | ||
| case LocalLimit(limitExpr @ IntegerLiteral(limit), | ||
| window @ Window(Seq(Alias(WindowExpression(_: RankLike | _: RowNumber, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why only allow one rank?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Add support for multiple window functions if the partitionSpec of all window functions is empty and the same order is used. For example:
val numRows = 10
spark.range(numRows).selectExpr("IF (id % 2 = 0, null, id) AS a", s"${numRows} - id AS b", "id AS c").write.saveAsTable("t1")
spark.sql("SELECT *, ROW_NUMBER() OVER(ORDER BY a) AS rn, DENSE_RANK() OVER(ORDER BY a) AS rk FROM t1 LIMIT 5").explain("cost")Before:
GlobalLimit 5, Statistics(sizeInBytes=200.0 B, rowCount=5)
+- LocalLimit 5, Statistics(sizeInBytes=2.4 KiB)
+- Window [row_number() windowspecdefinition(a#16L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#11, dense_rank(a#16L) windowspecdefinition(a#16L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rk#12], [a#16L ASC NULLS FIRST], Statistics(sizeInBytes=2.4 KiB)
+- Relation default.t1[a#16L,b#17L,c#18L] parquet, Statistics(sizeInBytes=1994.0 B)
After:
Window [row_number() windowspecdefinition(a#16L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#11, dense_rank(a#16L) windowspecdefinition(a#16L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rk#12], [a#16L ASC NULLS FIRST], Statistics(sizeInBytes=200.0 B)
+- GlobalLimit 5, Statistics(sizeInBytes=160.0 B, rowCount=5)
+- LocalLimit 5, Statistics(sizeInBytes=1994.0 B)
+- Sort [a#16L ASC NULLS FIRST], true, Statistics(sizeInBytes=1994.0 B)
+- Relation default.t1[a#16L,b#17L,c#18L] parquet, Statistics(sizeInBytes=1994.0 B)
| case LocalLimit(limitExpr @ IntegerLiteral(limit), | ||
| window @ Window(Seq(Alias(WindowExpression(_: RankLike | _: RowNumber, | ||
| WindowSpecDefinition(Nil, orderSpec, | ||
| SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _)), _, _, child)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the frame must be UnboundedPreceding, CurrentRow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this check.
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #135746 has finished for PR 31691 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #135852 has finished for PR 31691 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
Outdated
Show resolved
Hide resolved
| private def isSupportPushdownThroughWindow( | ||
| windowExpressions: Seq[NamedExpression]): Boolean = windowExpressions.forall { | ||
| case Alias(WindowExpression(_: RankLike | _: RowNumberLike, | ||
| WindowSpecDefinition(Nil, _, _)), _) => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it really work with any kind of window frames? Can we add some comments to explain it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The window frame of RankLike and RowNumberLike is UNBOUNDED PRECEDING to CURRENT ROW.
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
Line 514 in 32a523b
| override val frame: WindowFrame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow) |
...src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownThroughWindowSuite.scala
Show resolved
Hide resolved
|
Test build #135938 has finished for PR 31691 at commit
|
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @wangyum , @cloud-fan , @maropu , @c21
For safety, if you don't mind, can we have an independent rule, LimitPushDownThroughWindow?
cc @gatorsmile
|
+1 for add an independent rule. It seems we can pushdown more cases. For example:
SELECT *, ROW_NUMBER() OVER(PARTITION BY a ORDER BY b) AS rn FROM t LIMIT 10 =>
SELECT *, ROW_NUMBER() OVER(PARTITION BY a ORDER BY b) AS rn FROM (SELECT * FROM t ORDER BY a, b LIMIT 10) tmp
SELECT * FROM (SELECT *, ROW_NUMBER() OVER(PARTITION BY a ORDER BY b) AS rn FROM t ) tmp where rn < 100 LIMIT 10 =>
SELECT *, ROW_NUMBER() OVER(PARTITION BY a ORDER BY b) AS rn FROM (SELECT * FROM t ORDER BY a, b LIMIT 10) tmp
|
|
Thank you, @wangyum ! |
|
@dongjoon-hyun if the point is to safely exclude rule through config if we find bug after next release, I am +1 for adding as a separate rule. btw I think it'd good if we can rename the existing rule as well. Have two rules of |
|
Yes, definitely, that was my point.
|
|
BTW, for the renaming proposal, I'm not sure. |
| * Pushes down [[LocalLimit]] beneath WINDOW. | ||
| */ | ||
| object LimitPushDownThroughWindow extends Rule[LogicalPlan] { | ||
| // The window frame of RankLike and RowNumberLike is UNBOUNDED PRECEDING to CURRENT ROW. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is -> can only be
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's probably better to add an assert below to prove this comment.
| val originalQuery = testRelation | ||
| .select(a, b, c, | ||
| windowExpr(RowNumber(), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn")) | ||
| .limit(20) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To test partitionSpec is not empty independently, we need .limit(2), don't we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch.
| } | ||
|
|
||
| /** | ||
| * Pushes down [[LocalLimit]] beneath WINDOW. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add an itemized functionality and limitation here because LimitPushDownThroughWindow has the limited functionality and it's difficult to track by reading the code? It would be helpful when we add a new feature and maintain this optimizer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I plan explain it by SQL,
/**
* Pushes down [[LocalLimit]] beneath WINDOW. This rule optimizes the following case:
* {{{
* SELECT *, ROW_NUMBER() OVER(ORDER BY a) AS rn FROM Tab1 LIMIT 5 ==>
* SELECT *, ROW_NUMBER() OVER(ORDER BY a) AS rn FROM (SELECT * FROM Tab1 ORDER BY a LIMIT 5) t
* }}}
*/| private def supportsPushdownThroughWindow( | ||
| windowExpressions: Seq[NamedExpression]): Boolean = windowExpressions.forall { | ||
| case Alias(WindowExpression(_: RankLike | _: RowNumberLike, | ||
| WindowSpecDefinition(Nil, _, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't mind, can we merge line 636 and 637?
case Alias(WindowExpression(_: RankLike | _: RowNumberLike, WindowSpecDefinition(Nil, _,
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition, please put this new optimizer in a new file, @wangyum .
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
The scala 2.13 failure is unrelated, thanks, merging to master! |

What changes were proposed in this pull request?
Push down limit through
Windowwhen the partitionSpec of all window functions is empty and the same order is used. This is a real case from production:This pr support 2 cases:
Why are the changes needed?
Improve query performance.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test.