-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-38005][core] Support cleaning up merged shuffle files and state from external shuffle service #38560
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
+CC @otterc |
|
|
|
|
I decided to change the driver to not send reduceIds ( only send shuffleId and appId), because only the shuffle service finally understands which shuffle data is stored, no matter how the driver processes or processes the message |
|
Can one of the admins verify this patch? |
|
@mridulm as your comment #37922 (comment) , I want to Improve this part of the deletion logic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
~~@mridulm @wankunde @otterc I'm not sure if I missed any logic, please help review my code , thanks~ I will improve my code style later.
Now I don't change my code in BlockManagerMasterEndpoint as #37922 do . Can it be split into two PRs, I implement the code of the shuffle service part first and @wankunde finish the rest part since he has done?~~
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unified push based shuffle identification variables here, which will be used in yarn external shuffle service and spark core module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked the appAttemptShuffleMergeId in the code before.
I think if we want to delete partitions merged data, then we should delete the corresponding ShuffleMergeId in DB (Otherwise, inconsistency will occur when restoring shuffle info from db)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I think when deleting partitions, we shouldn't store shuffleMergeID in DB anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous code is difficult to understand because of the scala syntax. For example, mapOutputTracker.shuffleStatuses.get(shuffleId).**foreach** The foreach here is not actually an iterator.
externalBlockStoreClient.map the externalBlockStoreClient is not actually an iterator.
I didn't change the code logic, just changed the style
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the shuffle statuses are not exists ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the shuffle statuses are not exists ?
I think it will not, please see case match codes
|
I will try to get to this later this week, do let me know if you are still working on it though. |
In this case, we should fire a remove immediately - we are not going to use it for this app anyway ... |
|
@mridulm Yes , there are nontrivial overlap between #37922 , I can cherry pick some codes and fix the comments in this PR. |
Some thoughts on these changes I wrote here |
|
To add, @wankunde's PR is very close to being done. Thoughts ? |
@mridulm sorry, in my previous implementation, I needed to pass the reduceid to the external shuffle service, but I found a problem, the driver cannot record the complete merged reduceId (see my comment for the reason)... |
…e from external shuffle service
…e from external shuffle service
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the shuffle statuses are not exists ?
| mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId, | ||
| bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds), | ||
| Longs.toArray(sizes)); | ||
| appShuffleInfo.shuffles.get(msg.shuffleId).setFinalizedPartitions(Ints.toArray(reduceIds)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The FinalizedPartitions will be empty after the Shuffle service restart which will cause the merged shuffle files leak.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your review @wankunde !
Yes, if we need to solve this situation completely, we need to store finalized partitions in db.
But on the other hand, when Application removed, all the merged data will be cleaned up finally.
I'm not sure if we can just ignore this case to simplify the logic since it will finally be cleaned up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the shuffle statuses are not exists ?
I think it will not, please see case match codes
| try { | ||
| File metaFile = | ||
| shuffleInfo.getMergedShuffleMetaFile(shuffleId, mergeId, partition); | ||
| File indexFile = new File( | ||
| shuffleInfo.getMergedShuffleIndexFilePath(shuffleId, mergeId, partition)); | ||
| File dataFile = | ||
| shuffleInfo.getMergedShuffleDataFile(shuffleId, mergeId, partition); | ||
| metaFile.delete(); | ||
| indexFile.delete(); | ||
| dataFile.delete(); | ||
| } catch (Exception e) { | ||
| logger.error("Error delete shuffle files for {}", shuffleMergeId, e); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just like closeAllFilesAndDeleteIfNeeded method, can we continue delete the other files if one delete() failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm => | ||
| bm.storageEndpoint.ask[Boolean](removeMsg).recover { | ||
| // use false as default value means no shuffle data were removed | ||
| handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just move removeShuffleFromExecutorsFutures to the last.
It needs to be invoked at last to avoid cleaning up shuffleStatuses in mapOutputTracker too early. Otherwise mapOutputTracker.shuffleStatuses.get(shuffleId) may be none sometimes
Please refer to unregisterShuffle codes
| mergeManager2, mergeManager2DB) == 1) | ||
| assert(ShuffleTestAccessor.getOutdatedFinalizedShuffleCountDuringDBReload( | ||
| mergeManager2, mergeManager2DB) == 2) | ||
| mergeManager2, mergeManager2DB) == 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is as expected, because we delete the current merge partitions and the current outdated merge status in db (not cleaned before this PR)
Please refer to codes
|
@mridulm If you are back and have time, please review my PR, I think the function is almost done. let me know if there is something inappropriate, I will modify it soon, thanks! |
|
Apologies for the delay in getting to this @yabola - will try to get to this next week. My recommandation would be to keep the change as close to @wankunde's PR as possible, and fix the pending issues there to expedite the reviews (since that change was already reviewed quite a lot). If there are additional functional gaps in that PR, we can do it in follow up PR's. |
@wankunde according to comments, could you fix the remaining comments in your PR? |
|
Hi, @yabola @mridulm , I will update SPARK-40480 this weekend. |
|
close on #37922 |
No description provided.