-
Notifications
You must be signed in to change notification settings - Fork 240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Shuffle coalesce read supports split-and-retry #11598
base: branch-25.02
Are you sure you want to change the base?
Shuffle coalesce read supports split-and-retry #11598
Conversation
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a lot of code, and I have not even really started to dig into it. I am concerned about how much is being changed here. From the description it looks like someone has misconfigured the target batch size, so this is going to try and adjust that target size, but just for shuffle coalesce? I am really confused about the goals of this and would like to see examples of when this helps and what the performance impact is to the happy path.
I also am concerned that we are just masking other underlying problems. The spill framework should be set up so that if we hit a point where there is nothing more to spill, then we start to pause other threads. So instead of adjusting the target batch size we adjust the parallelism dynamically. So did the user set the target batch size so high that a single thread would not be able to process a single task? If so, then we need to work with them to set reasonable target batch sizes. If the size looks reasonable, then we need to understand what is using all of the GPU's memory that makes it so that we cannot get enough to copy a single batch to the GPU? We might have some memory leaks somewhere else that this is trying to mask.
metricsMap: Map[String, GpuMetric], | ||
prefetchFirstBatch: Boolean = false): Iterator[ColumnarBatch] = { | ||
if (readOption.useSplitRetryRead) { | ||
val reader = new GpuShuffleCoalesceReader(iter, targetSize, dataTypes, metricsMap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am confused why we want to put the coalesce on the GPU if we want to try and split and retry the read? We have it on the CPU to speed up the processing, but the new default puts it on the GPU. Do you have some performance numbers to show the impact of this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx for review. And the coalesce is still on the CPU. What this new GpuShuffleCoalesceReader
does is just a combination of HostShuffleCoalesceIterator
and GpuShuffleCoalesceIterator
, along with the additional split-retry support when trying to move the coalcesced buffer to the GPU. To know if GPU OOM happens for split-retry, we have to put all coalesce steps together in this new reader. So the reader can replace the iterators where the HostShuffleCoalesceIterator
and GpuShuffleCoalesceIterator
are used together. So far it is for only sized hash joins and the coalesce shuffle exec.
Split-retry here does not mean we will split the coalesced buffer. Instead we retry the entire coalesce process with smaller target size, that is re-collecting less shuffle buffers (total size <= target_size/2) from the cache, concatenating them and trying to move to the GPU again. The new coalesced buffer is about half size of the original one, so requires less GPU memory when copying it to the GPU.
The change is big but the idea is quite simple.
Pull in shuffle buffers according to the target size on CPU
and cache them
|
|
\ /
Take the first N buffers with total size up to the target_size <---
| |
| |
\ / |
Do the coalcesce on CPU (reduce the target_szie by half )
| |
| |
\ / |
Move the coalcesced big buffer to GPU ---- (GPU OOM) ---->
|
|
\ /
Done, remove the first N buffers from the cache
The change is big but the whole feature can be disabled by simply setting Yes, this adjusts the target size only for shuffle coalesce. And the whole task will be affected only when no other operators in this task will concatenate or split the batches for the output. This is mainly to improve the stability. It can possibly get better performance only when the time cost of this split retry is lesss than a Spark task retry and the task retry can really happen without this change. I rememer the target size was well tuned. Changing it will lead to longer operation time for some operators in the query. @winningsix may know more details.
We will try to collect more details when this OOM happen again. |
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
…e-split-retry Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
close #11584
This PR adds in the split-and-retry support when reading buffers from Shuffle for GpuShuffleCoalesceExec and sized hash joins.
When OOM happens, the retry will reduce the target size by half, re-collect buffers from the cache with total size up to the new target batch size, then concatenate the collected buffers and move the concatenated result to GPU. Since the concatenated buffer can not be split, we have to re-collect the buffers and concatenate them again with this new smaller target size.
The new target size will be used by all the following read in this task. The more split-and-retry it gets, the smaller the target size will be. This is just a choice. We prefer to believe that the GPU is busy with high memory pressure when getting an OOM. And more OOMs will possibly occur if using the original target batch size.
The change is big but the idea is quite simple.