-
Notifications
You must be signed in to change notification settings - Fork 29.1k
[SPARK-55092][SQL] Disable partition grouping in KeyGroupedPartitioning when not needed
#53859
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[SPARK-55092][SQL] Disable partition grouping in KeyGroupedPartitioning when not needed
#53859
Conversation
JIRA Issue Information=== Improvement SPARK-55092 === This comment was automatically generated by GitHub Actions |
04eed9a to
a0e1d99
Compare
KeyGroupedPartitioning don't group partitions when not neededKeyGroupedPartitioning don't group partitions when not needed
a0e1d99 to
1dc157a
Compare
KeyGroupedPartitioning don't group partitions when not neededKeyGroupedPartitioning when not needed
KeyGroupedPartitioning when not neededKeyGroupedPartitioning when not needed
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
Outdated
Show resolved
Hide resolved
1dc157a to
f4fbeed
Compare
KeyGroupedPartitioning when not neededKeyGroupedPartitioning when not needed
KeyGroupedPartitioning when not neededKeyGroupedPartitioning when not needed
|
cc @szehon-ho , @sunchao, @viirya, @dongjoon-hyun |
|
Thank you for pinging me, @peter-toth . |
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
Outdated
Show resolved
Hide resolved
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although we need to handle copyFromTag independently, the proposal itself sounds reasonable to me. Do you think you can share some supporting performance numbers based on the existing benchmark or from your production environment?
Why are the changes needed?
Improve performance.
Numbers depend heavily on the usecase. In our case a customer would like to use SPJ, between table The optimization in this PR is similar to what |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @peter-toth .
BTW, this PR is not rebased to the master after merging #53884 . Did I understand correctly?
…s when not needed
…le partition grouping
f63091b to
4fd8026
Compare
You are right. I've just rebased it. The diff should be ok now. |
|
Thank you! |
|
@szehon-ho , @sunchao , @viirya , do you have any concerns or comments? |
| sparkSession: SparkSession, | ||
| adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None, | ||
| subquery: Boolean): Seq[Rule[SparkPlan]] = { | ||
| val requiredDistribution = if (subquery) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure i get this, if its not a subquery we pass in any requiredDistribution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, let me change this tomorrow and pass in subquery directly into EnsureRequirements, that way this will be much cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| val newChild = disableKeyGroupingIfNotNeeded(c) | ||
| ShuffleExchangeExec(newPartitioning, newChild, so, ps) | ||
| case _ => | ||
| val newChild = disableKeyGroupingIfNotNeeded(child) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we make a method createShuffleExchangeExec(..., disableGrouping: Boolean) to reduce duplication?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in b04bb61.
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/StoragePartitionJoinParams.scala
Outdated
Show resolved
Hide resolved
|
@chirag-s-db if you also want to take a look? |
| } | ||
| } | ||
|
|
||
| private def populateNoGroupingPartitionInfo(plan: SparkPlan): SparkPlan = plan match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like can be done with transform api?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I can change this to use transform() APIs.
Wanted to make it similar to the other 2 populate...() methods. Shall I change those as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I modified all 3 populate...()s in b04bb61.
| child, values, joinKeyPositions, reducers, applyPartialClustering, replicatePartitions)) | ||
| } | ||
|
|
||
| private def disableKeyGroupingIfNotNeeded(child: SparkPlan) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More detailed comments on this method would be good, e.g., the conditions under which grouping can be safely disabled, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added comments in b04bb61.
| sparkSession: SparkSession, | ||
| adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None, | ||
| subquery: Boolean): Seq[Rule[SparkPlan]] = { | ||
| val requiredDistribution = if (subquery) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add more detailed comments here? It looks confusing without any context when looking code here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this and now passing in subquery and added comments in c28fc3f.
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks reasonable improvement.
|
Could we also support the case where the KeyGroupedPartitioning is the output of the plan with the following approach?
One advantage of this approach is that it allows us to avoid grouping for the (presumably not uncommon) case of a simple scan from a partitioned table, and it should still be safe for checkpointed scans (as the checkpointed scans would have a FYI @szehon-ho |
My concern with this approach is that we can introduce an extra shuffle above the checkpointed (ungrouped) data. |
|
I really like @chirag-s-db 's idea, it is quite clean. Else we have to guard everywhere we make a Shuffle to disable it. But i also see the point about missing the opportunity to avoid shuffle for a checkpointed KeyGrouped RDD. Although i guess its not a very common case. So in short , no strong opinion either way. @sunchao @viirya wondering any opinion? |
|
Please note that not only checkpointed RDDs, but cached RDDs would also need an extra shuffle. Actually, I wonder if partition grouping by key is at the right place in I’m happy to put together a POC PR; just let me know. |
|
yea that sounds like it would be cleaner, and cover the checkpoint/ cache case. im not sure the detail about the disableGrouping by default will work, but curious to see, thanks @peter-toth |
What changes were proposed in this pull request?
Currently
KeyGroupedPartitioningalways groups partitions by key regardless if grouping is actually needed or not. This beahaviour decreases parallelism and can lead to slower performance.This PR disables parition grouping of a scan with
KeyGroupedPartitioningoutput partitioning if:We can't disable partition grouping of a scan in a main query if it contributes the ouput partitioning of the query result because we don't know whether the query is cached/checkpointed and how the output of the query will be used later. The output must keep
KeyGroupedPartitioningsemantics in this case.But we can disable partition grouping in subqueries when grouping is not needed for anything in the subquery plan. This is actually necessary to make sure broadcast exchange reuse happens correctly during dynamic partition pruning.
Why are the changes needed?
Improve performance.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New UT added.
Was this patch authored or co-authored using generative AI tooling?
No.