-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24917][CORE] make chunk size configurable #21933
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Add an option in Spark configuration to change the chunk size (which is by default 4 Mb). This would allow to bypass the issue mentionned in SPARK-24917 when fetching large partitions (a bit less than 2 Gb)
|
@vincent-grosbois, thanks! I am a bot who has found some folks who might be able to help with the review:@JoshRosen, @rxin and @vanzin |
|
ok to test |
|
Test build #93867 has finished for PR 21933 at commit
|
|
Can you add |
|
Hello, I updated the description and title |
|
retest this please |
|
Test build #94080 has finished for PR 21933 at commit
|
|
retest this please |
|
Test build #94100 has finished for PR 21933 at commit
|
|
Test build #94098 has finished for PR 21933 at commit
|
|
retest this please |
|
Test build #94114 has finished for PR 21933 at commit
|
| // Whether to compress shuffle output temporarily spilled to disk | ||
| private[this] val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true) | ||
| // Size of the chunks to be used in the ChunkedByteBuffer | ||
| private[this] val chunkSizeMb = conf.getSizeAsMb("spark.memory.chunkSize", "4m").toInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name spark.memory.chunkSize looks too generic.
How about spark.memory.serializer.chunkSize or others?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed it to spark.memory.serializer.chunkSize
rename rename spark.memory.chunkSize to spark.memory.serializer.chunkSize
|
Test build #94138 has finished for PR 21933 at commit
|
|
retest this please |
|
Test build #94346 has finished for PR 21933 at commit
|
|
retest this please |
|
Test build #94358 has finished for PR 21933 at commit
|
|
LGTM |
| // Whether to compress shuffle output temporarily spilled to disk | ||
| private[this] val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true) | ||
| // Size of the chunks to be used in the ChunkedByteBuffer | ||
| private[this] val chunkSizeMb = conf.getSizeAsMb("spark.memory.serializer.chunkSize", "4m").toInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we do byteStringAsBytes and remove 1024 * 1024 below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vincent-grosbois WDTY about this?
|
cc @squito too. |
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue and fix looks coherent and good to me.
|
Thanks for detailed analysis @vincent-grosbois . I agree with everything, but as you noted you won't hit this particular issue anymore with |
|
Not really! This would be a useful features to backport in all spark branches that will never benefit from an upgrade to netty 4.1.28. However I think for spark 2.4 and the master branch, the netty dependency will eventually be bumped to 4.1.28 or more, which would make this commit useless... |
|
Yeah this can be closed; we updated to 4.1.30 |
Closes apache#22567 Closes apache#18457 Closes apache#21517 Closes apache#21858 Closes apache#22383 Closes apache#19219 Closes apache#22401 Closes apache#22811 Closes apache#20405 Closes apache#21933 Closes apache#22819 from srowen/ClosePRs. Authored-by: Sean Owen <sean.owen@databricks.com> Signed-off-by: Sean Owen <sean.owen@databricks.com>
Add an option in Spark configuration to change the chunk size (which is by default 4 Mb).
This would allow to bypass the issue mentionned in SPARK-24917 by allowing users to define larger chunks.
Explanation:
currently, using netty < 4.1.28 (before this patch netty/netty@9b95b8e), sending a ChunkedByteBuffer with more than 16 chunks over the network will trigger a "merge" of all the blocks into one big transient array that is then sent over the network. This is problematic as the total memory for all chunks can be high (2GB) and this would then trigger an allocation of 2GB to merge everything, which will create OOM errors.
A possibility to bypass this netty behavior is to make sure that they data is never split into more than 16 chunks. One way to do this is to create bigger chunks, which is currently fixed to 4MB. In this commit I'm allowing users to define bigger chunk sizes for their job, which allowed us to bypass this OOM error.
What changes were proposed in this pull request?
I'm introducing a configuration parameter to define the chunk size
How was this patch tested?
Tested on several spark jobs, the changes actually work and generates "chunks" of the indicated size