[#1472][part-5] Use UnpooledByteBufAllocator to obtain accurate ByteBuf sizes to fix inaccurate usedMemory issue causing OOM#1534
Conversation
|
@jerqi @zuston PTAL. |
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #1534 +/- ##
============================================
+ Coverage 54.17% 55.12% +0.94%
- Complexity 2822 2824 +2
============================================
Files 435 415 -20
Lines 24501 22155 -2346
Branches 2074 2079 +5
============================================
- Hits 13274 12212 -1062
+ Misses 10397 9179 -1218
+ Partials 830 764 -66 ☔ View full report in Codecov by Sentry. |
|
Why is Rust CI always failed? |
|
What's the initial motivation of using |
|
In the previous implementation, the pool cache was not used either. |
| protected Map<String, Map<Integer, AtomicLong>> shuffleSizeMap = JavaUtils.newConcurrentMap(); | ||
|
|
||
| public ShuffleBufferManager(ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager) { | ||
| this.nettyServerEnabled = conf.get(ShuffleServerConf.NETTY_SERVER_PORT) >= 0; |
There was a problem hiding this comment.
Why not use RPC_SERVER_TYPE=GRPC_NETTY to judge?
|
|
||
| public ShuffleBufferManager(ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager) { | ||
| this.nettyServerEnabled = conf.get(ShuffleServerConf.NETTY_SERVER_PORT) >= 0; | ||
| long heapSize = Runtime.getRuntime().maxMemory(); |
There was a problem hiding this comment.
PlatformDependent.maxDirectMemory() This method contains the logic of Runtime.getRuntime().maxMemory(). Can it be merged?
There was a problem hiding this comment.
We allow users to configure the maximum value of the direct memory through the variable MAX_DIRECT_MEMORY_SIZE in the rss-env.sh script, so the value of PlatformDependent.maxDirectMemory() may be different from the value of Runtime.getRuntime().maxMemory(), so we cannot merge this part of the code.
5da2661 to
bcbd1be
Compare
bcbd1be to
d159181
Compare
Yeah, that seems right. Because the parameters |
c53111c to
408ffcf
Compare
408ffcf to
d84cf09
Compare
zuston
left a comment
There was a problem hiding this comment.
Overall LGTM. Left some minor comments
| final long start = System.currentTimeMillis(); | ||
| List<ShufflePartitionedData> shufflePartitionedData = toPartitionedData(req); | ||
| long alreadyReleasedSize = 0; | ||
| boolean hasFailureOccurred = false; |
There was a problem hiding this comment.
This is not related with this PR. You can submit another PR to fix this.
There was a problem hiding this comment.
This code logic of GRPC and Netty is basically the same. In order to solve the problem of usedMemory being inaccurate in the Netty scenario, I fixed the Netty scenario and also made the same changes to the GRPC side.
So, do we keep the changes for hasFailureOccurred in Netty? It is a bit weird to only modify the same logic code for the Netty part?
| protected Map<String, Map<Integer, AtomicLong>> shuffleSizeMap = JavaUtils.newConcurrentMap(); | ||
|
|
||
| public ShuffleBufferManager(ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager) { | ||
| this.nettyServerEnabled = conf.get(ShuffleServerConf.RPC_SERVER_TYPE) == ServerType.GRPC_NETTY; |
There was a problem hiding this comment.
This could be defined by the ShuffleServer
There was a problem hiding this comment.
You wanna pass nettyServerEnabled from ShuffleServer into ShuffleBufferManager?
There was a problem hiding this comment.
The single source principle should be ensured. And the checking logic is not same, you can see the current code in shuffleServer.java
nettyServerEnabled = shuffleServerConf.get(ShuffleServerConf.NETTY_SERVER_PORT) >= 0;
if (nettyServerEnabled) {
streamServer = new StreamServer(this);
}
There was a problem hiding this comment.
The single source principle should be ensured. And the checking logic is not same, you can see the current code in shuffleServer.java
nettyServerEnabled = shuffleServerConf.get(ShuffleServerConf.NETTY_SERVER_PORT) >= 0; if (nettyServerEnabled) { streamServer = new StreamServer(this); }
Done in #1540.
|
|
||
| public void releasePreAllocatedSize(long delta) { | ||
| preAllocatedSize.addAndGet(-delta); | ||
| if (preAllocatedSize.get() >= delta) { |
There was a problem hiding this comment.
it could be optimized by like this:
int allocated = preAllocatedSize.addAndGet(-delta);
if (allocated < 0) {
LOG.warn(
"Current pre-allocated memory["
+ preAllocatedSize.get()
+ "] is less than released["
+ delta
+ "], set pre-allocated memory to 0");
preAllocatedSize.set(0L);
}
There was a problem hiding this comment.
I'm just following the same pattern in releaseMemory, releaseFlushMemory and releaseReadMemory.
Maybe we can refactor the pattern in all the above methods like you said later in another PR?
|
There is a flaky test. You can see https://github.com/apache/incubator-uniffle/actions/runs/7983021033/job/21797479973 |
I don't think it's related to this PR. It does not happen everytime, I'll take a deep look in the next couple of days. |
Done by #1536. @jerqi |
… when failing to cache shuffle data (#1597) ### What changes were proposed in this pull request? Release memory more accurately when failing to cache shuffle data. ### Why are the changes needed? A follow-up PR for: #1534. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs.

What changes were proposed in this pull request?
When we use
UnpooledByteBufAllocatorto allocate off-heapByteBuf, Netty directly requests off-heap memory from the operating system instead of allocating it according topageSizeandchunkSize. This way, we can obtain the exactByteBufsize during the pre-allocation of memory, avoiding distortion of metrics such asusedMemory.Moreover, we have restored the code submission of the PR #1521. We ensure that there is sufficient direct memory for the Netty server during decoding
sendShuffleDataRequestby taking into account theencodedLengthofByteBufin advance during the pre-allocation of memory, thus avoiding OOM during decodingsendShuffleDataRequest.Since we are not using
PooledByteBufAllocator, the PR #1524 is no longer needed.Why are the changes needed?
A sub PR for: #1519
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing UTs.