[#436] feat(client,server): Introduce multi-part LocalStorageManager#2253
[#436] feat(client,server): Introduce multi-part LocalStorageManager#2253maobaolong merged 8 commits intoapache:masterfrom
Conversation
d1a7c63 to
cd840b9
Compare
d372b3d to
58f4644
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #2253 +/- ##
============================================
- Coverage 52.65% 51.82% -0.83%
+ Complexity 3376 2977 -399
============================================
Files 514 475 -39
Lines 27565 22645 -4920
Branches 2582 2084 -498
============================================
- Hits 14513 11735 -2778
+ Misses 12092 10150 -1942
+ Partials 960 760 -200 ☔ View full report in Codecov by Sentry. 🚨 Try these New Features:
|
| long crc = block.getCrc(); | ||
| long startOffset = dataWriter.nextOffset(); | ||
| // put storage id to high 8 bits | ||
| long startOffset = ShuffleDataSegment.wrapOffset(dataWriter.nextOffset(), storageId); |
There was a problem hiding this comment.
This is too rough. If the offset's high 8 bits has used, this will make the data inconsistent.
And this is also not compatible with the previous shuffle-server/client version.
|
Thanks for proposing this. But I'm afraid of the implicit storage index behind the offset, which is dangerous. Why not introducing the extra storage index into the protobuf. |
|
@zuston Thanks for your review, in fact I have done another approach with the proto change, which introduce storageId into the shuffleIndex file and changed the netty encode/decode methods in the client/server side, it can be more graceful than this PR and it can works as expected too. But I cannot give a gray online plan, I cannot upgrade client and server at this same time. For this PR, although both client and server are need to upgrade, but I can upgrade all the clients first, then I upgrade the server. |
I don't think the storage id should be recorded into the index file, this could be retrieved from the local storage index. If so, the client could do some compatible change for this. |
ae1c289 to
b09f92d
Compare
b09f92d to
c25ffd8
Compare
|
@zuston Thanks for the discussion and suggestion, I change to add storageIds as the response of |
bf89fd3 to
e7f5e5b
Compare
e7f5e5b to
817a5d4
Compare
zuston
left a comment
There was a problem hiding this comment.
Overall LGTM now. Thanks for your effort @maobaolong
| bufferSegments.add( | ||
| new BufferSegment(blockId, bufferOffset, length, uncompressLength, crc, taskAttemptId)); | ||
| bufferOffset += length; | ||
| boolean storageChanged = preStorageId != -1 && currentStorageId != preStorageId; |
There was a problem hiding this comment.
This class should be fully tested by unit test cases to cover the storage changed situation
common/src/main/java/org/apache/uniffle/common/segment/LocalOrderSegmentSplitter.java
Outdated
Show resolved
Hide resolved
| * ShuffleDataSegment size should < readBufferSize 3. Single shuffleDataSegment's blocks should | ||
| * be continuous | ||
| */ | ||
| int[] storageIds = shuffleIndexResult.getStorageIds(); |
There was a problem hiding this comment.
For the fixedSizeSegments and localOrderSegmentSpilter, the storage changing logic may be unified into the underlying abstract segment spiltter.
server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/uniffle/server/storage/MultiPartLocalStorageManager.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/uniffle/server/storage/MultiPartLocalStorageManager.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/uniffle/storage/common/CompositeStorage.java
Outdated
Show resolved
Hide resolved
|
BTW, |
|
@zuston Thanks for your review, rename related minor comments has been addressed, others will updated it these weekend.
This cannot work for gray upgrade.
|
2588c02 to
e82ef2f
Compare
|
|
||
| import io.netty.channel.FileRegion; | ||
|
|
||
| public class CompositeFileRegion extends AbstractFileRegion { |
|
|
||
| @Override | ||
| public boolean release() { | ||
| super.release(); |
There was a problem hiding this comment.
The return value of super.release() is never checked.
Maybe use the return value to initialize released:
boolean released = super.release();
| } | ||
|
|
||
| @Override | ||
| protected void deallocate() {} |
There was a problem hiding this comment.
This is empty. Do we need to release extra resources here?
| position -= region.count(); | ||
| } else { | ||
| long transferredNow = region.transferTo(target, position); | ||
| totalTransferred += transferredNow; |
There was a problem hiding this comment.
totalTranferred -> totalBytesTransferred
transferredNow -> currentBytesTransferred
transferred -> bytesTransferred
|
|
||
| @Override | ||
| public boolean release(int decrement) { | ||
| super.release(decrement); |
There was a problem hiding this comment.
ditto.
The return value of super.release(decrement) is never checked.
|
@rickyma Thanks for your review and suggestion. It helps to make this PR better. I updated the code and addressed your comments, PTAL. |
fee8c4e to
071e0fc
Compare
|
The title and description of this PR should be improved.
|
071e0fc to
0b89751
Compare
0b89751 to
e2254f7
Compare
|
@rickyma The title has been updated. |
|
The description of this PR was not changed. |
…2253) ### What changes were proposed in this pull request? - Introduce a factory to create specific LocalStorageManager by config. - Introduce multiply disk LocalStorageManager. ### Why are the changes needed? Fix: #436 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Existing UTs and new added UT - Tested on our pressure test cluster. - new client -> old server ✓ - new client -> new server ✓ - old client -> new server ❌, so we have to upgraded client first, than upgrade the servers
What changes were proposed in this pull request?
Why are the changes needed?
Fix: #436
Does this PR introduce any user-facing change?
No.
How was this patch tested?