[CELEBORN-2264] Support cancel shuffle when write bytes exceeds threshold#3601
[CELEBORN-2264] Support cancel shuffle when write bytes exceeds threshold#3601yew1eb wants to merge 4 commits intoapache:mainfrom
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #3601 +/- ##
==========================================
- Coverage 67.13% 66.91% -0.22%
==========================================
Files 357 357
Lines 21860 21932 +72
Branches 1943 1949 +6
==========================================
Hits 14674 14674
- Misses 6166 6244 +78
+ Partials 1020 1014 -6 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
d15feca to
2182ddb
Compare
2182ddb to
9522ca4
Compare
9522ca4 to
65ee9d9
Compare
7ebf5d1 to
d7c7080
Compare
d7c7080 to
4a513c7
Compare
eolivelli
left a comment
There was a problem hiding this comment.
I have left one question about protocol compatibility
Can you please add some unit tests ?
| bytesWrittenPerPartition: Array[Long], | ||
| serdeVersion: SerdeVersion) | ||
| serdeVersion: SerdeVersion, | ||
| bytesWritten: Long) |
There was a problem hiding this comment.
IIUC this patch is changing the protocol
how is this handled ?
will the new client be compatible with the new server version and viceversa ?
There was a problem hiding this comment.
It's not a big deal since MapperEnd is only utilized on the engine side.
RexXiong
left a comment
There was a problem hiding this comment.
Thanks, BTW when the configuration is changed, you should also execute UPDATE=1 build/mvn clean test -pl common -am -Dtest=none -DwildcardSuites=org.apache.celeborn.ConfigurationSuite
|
|
||
| private val shuffleWriteLimitEnabled = conf.shuffleWriteLimitEnabled | ||
| private val shuffleWriteLimitThreshold = conf.shuffleWriteLimitThreshold | ||
| private val shuffleTotalWrittenBytes = JavaUtils.newConcurrentHashMap[Int, AtomicLong]() |
There was a problem hiding this comment.
Should clean shuffleId related data when shuffle expires.
| bytesWrittenPerPartition: Array[Long], | ||
| serdeVersion: SerdeVersion) | ||
| serdeVersion: SerdeVersion, | ||
| bytesWritten: Long) |
There was a problem hiding this comment.
It's not a big deal since MapperEnd is only utilized on the engine side.
| crc32PerPartition = crc32PerPartition, | ||
| bytesWrittenPerPartition = bytesWrittenPerPartition) | ||
|
|
||
| if (mapperAttemptFinishedSuccess && shuffleWriteLimitEnabled) { |
There was a problem hiding this comment.
Should we consider adding a negative test case for this feature?
| .createWithDefault(false) | ||
|
|
||
| val SHUFFLE_WRITE_LIMIT_ENABLED: ConfigEntry[Boolean] = | ||
| buildConf("celeborn.client.shuffle.write.limit.enabled") |
There was a problem hiding this comment.
Seems it only take effect when using Spark, so it might be better to change the key to celeborn.client.spark.shuffle.write.limit.enabled
eff3dd1 to
b925eb6
Compare
b925eb6 to
15b57bb
Compare
What changes were proposed in this pull request?
This patch adds configurable threshold check for shuffle write bytes.
Why are the changes needed?
Shuffle will be canceled automatically if write bytes exceed the threshold to avoid cluster resource exhaustion.
Does this PR resolve a correctness bug?
No
Does this PR introduce any user-facing change?
No
How was this patch tested?
CI and Manual testing