-
Notifications
You must be signed in to change notification settings - Fork 170
[ISSUE-380] Refactor the flush process to fix fallback fail #383
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| private final ShuffleBuffer shuffleBuffer; | ||
| private final AtomicInteger retryTimes = new AtomicInteger(); | ||
| private boolean isPended = false; | ||
| private StorageManager underStorageManager; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we avoid using storageManager in a ShuffleDataFlushEvent? It's a little weird.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I also want to avoid this, but if we don't, we have to maintain this relation mapping in multipleStorageManager. However we couldn't remove this structure when event's lifecycle finished, because we don't have similar interface in storage manager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we only have Storage field. Could we use Storage to find its StorageManager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently it can't. But this solution sounds great. We could extend the interface of storage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently it can't. But this solution sounds great. We could extend the interface of
storage
I think we need add a interface for StorageManager#isBelongto(Storage storage). Storage is in the module storage. StorageManager is in the module server. Storage shouldn't depend on StorageManager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emmm…. I still think cache is not suitable. Because it holds the reference of events and will cause memory leak. We should implicitly remove this event reference when event is flushed.
It's not acceptable for me to put StorageManager into ShuffleDataFlushEvent. I would rather give up this strategy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know. I don’t insist on keeping this into event. I just explain the cache is also unreasonable.
And this is also a potential bug due to memory leak.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a callback for ShuffleFlushEvent. In the callback, it will remove the data. When we call the cleanupFlushEventData(event) method. We call event.clearCallback();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds great!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The relation will help event find the concrete under-storage-manager selected by method of
selectStoragewhen invoked bywritemethod. Right? @xianjingfeng
Yes, if we remove pending queue, we don't need maintain this relation.
|
We should refactor the ShuffleFlushEvent. We need a method |
|
To implement what you want |
Codecov Report
@@ Coverage Diff @@
## master #383 +/- ##
============================================
+ Coverage 58.59% 58.78% +0.18%
- Complexity 1589 1602 +13
============================================
Files 193 193
Lines 10910 10938 +28
Branches 957 955 -2
============================================
+ Hits 6393 6430 +37
+ Misses 4139 4131 -8
+ Partials 378 377 -1
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
Make sense. |
For what? |
I think |
This is not related with this PR. Let's focus on this. |
| ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost()); | ||
| continue; | ||
| } else { | ||
| if (event.isPended()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need isPended?
| } else if (!event.isValid()) { | ||
| // avoid printing error log | ||
|
|
||
| while (true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do need while(true)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If not, how to handle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while (retryTimes < maxRetry)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For me, it is the same with this PR’s current implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, the exit condition in while will be more clear.
I will correct it
My mistake. It's necessary. |
| long start = System.currentTimeMillis(); | ||
| List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks(); | ||
| boolean writeSuccess = false; | ||
| try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to modify the flush process?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this logic need to be refactored.
| ShuffleServerMetrics.gaugeEventQueueSize.set(flushQueue.size()); | ||
| ShuffleServerMetrics.gaugeWriteHandler.inc(); | ||
| flushToFile(event); | ||
| flushToFileImpl(event); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we rename to flushToFIleImpl?
Usually we use Impl method , because we have already had a interface method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I’ll correct it
|
If we can't make the process clear, I would better to remove pending queue. There are too many bugs caused by pending queue. |
server/src/main/java/org/apache/uniffle/server/ShuffleDataFlushEvent.java
Show resolved
Hide resolved
Removing is OK for me. |
Let's remove it. It's more simple. |
| } | ||
|
|
||
| if (!storage.canWrite()) { | ||
| if (storageManager instanceof MultiStorageManager) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we avoid judging the MultiStorageManager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to remove pending queue, this is unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add an interface supportPending for storageManager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emm... It's weird, let me think again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us leave a todo comment for later optimization, and make current PR small.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any process about this optimization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After supporting multiple disk selection #435 , I think we could remove this pending queue.
internal-client/src/main/java/org/apache/uniffle/client/factory/ShuffleServerClientFactory.java
Show resolved
Hide resolved
server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
Outdated
Show resolved
Hide resolved
|
|
||
| if (event.getRetryTimes() > retryMax) { | ||
| LOG.error("Failed to write data for {} in {} times, shuffle data will be lost", event, retryMax); | ||
| ShuffleServerMetrics.incStorageFailedCounter(event.getUnderStorage().getStorageHost()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getUnderStorage may be null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. Fixed.
jerqi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Except for MultiStorageManager, LGTM.
|
Merge. thanks @zuston |
…rs cache (#1627) ### What changes were proposed in this pull request? Remove the meaningless eventOfUnderStorageManagers. ### Why are the changes needed? Fix #1626 & #1620. It's also a follow-up PR for #383. This cache only makes sense when the event retries after a failure. However, after the event fails, it is not appropriate to continue taking the original storageManager from the cache(because events usually fail due to high IO pressure or disk damage or disk full). In this case, the cache seems to be meaningless, so there is a contradiction here, we should remove it. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Tested in our env
…rs cache (#1627) ### What changes were proposed in this pull request? Remove the meaningless eventOfUnderStorageManagers. ### Why are the changes needed? Fix #1626 & #1620. It's also a follow-up PR for #383. This cache only makes sense when the event retries after a failure. However, after the event fails, it is not appropriate to continue taking the original storageManager from the cache(because events usually fail due to high IO pressure or disk damage or disk full). In this case, the cache seems to be meaningless, so there is a contradiction here, we should remove it. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Tested in our env
What changes were proposed in this pull request?
MultipleStorageManagerShuffleDataFlushEventto scope all cleanup operation with event lifecycle.selectStoragemethod after writing.whileloop, more simple and clear.total_failed_written_event_numWhy are the changes needed?
To fix the fallback invalid when data-flush event enters into pending queue
Does this PR introduce any user-facing change?
No
How was this patch tested?