Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
e)
connectFailedWorkers.put(
workerInfo,
(StatusCode.WORKER_UNKNOWN, System.currentTimeMillis()))
(StatusCode.WORKER_UNRESPONSIVE, System.currentTimeMillis()))
}
iter.remove()
}
Expand All @@ -611,7 +611,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
logError(s"Init rpc client failed for $shuffleId on $workerInfo during reserve slots, reason: Timeout.")
connectFailedWorkers.put(
workerInfo,
(StatusCode.WORKER_UNKNOWN, System.currentTimeMillis()))
(StatusCode.WORKER_UNRESPONSIVE, System.currentTimeMillis()))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,7 @@ class WorkerStatusTracker(
excludedWorkers.asScala.foreach {
case (workerInfo: WorkerInfo, (statusCode, registerTime)) =>
statusCode match {
case StatusCode.WORKER_UNKNOWN |
StatusCode.WORKER_UNRESPONSIVE |
case StatusCode.WORKER_UNRESPONSIVE |
StatusCode.COMMIT_FILE_EXCEPTION |
StatusCode.NO_AVAILABLE_WORKING_DIR |
StatusCode.RESERVE_SLOTS_FAILED |
Expand All @@ -185,7 +184,8 @@ class WorkerStatusTracker(
}
}
for (worker <- res.unknownWorkers.asScala) {
if (!excludedWorkers.containsKey(worker)) {
if (!excludedWorkers.containsKey(worker) || excludedWorkers.get(
worker)._1 != StatusCode.WORKER_UNKNOWN) {
excludedWorkers.put(worker, (StatusCode.WORKER_UNKNOWN, current))
statusChanged = true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,24 @@ class ReducePartitionCommitHandler(
if (mockShuffleLost) {
mockShuffleLostShuffle == shuffleId
} else {
dataLostShuffleSet.contains(shuffleId)
dataLostShuffleSet.contains(shuffleId) || isStageDataLostInUnknownWorker(shuffleId)
}
}

private def isStageDataLostInUnknownWorker(shuffleId: Int): Boolean = {
if (conf.clientShuffleDataLostOnUnknownWorkerEnabled && !conf.clientPushReplicateEnabled) {
val allocatedWorkers = shuffleAllocatedWorkers.get(shuffleId)
if (allocatedWorkers != null) {
return workerStatusTracker.excludedWorkers.asScala.collect {
case (workerId, (status, _))
if status == StatusCode.WORKER_UNKNOWN && allocatedWorkers.contains(workerId) =>
workerId
}.nonEmpty
}
}
false
}

override def isPartitionInProcess(shuffleId: Int, partitionId: Int): Boolean = {
isStageEndOrInProcess(shuffleId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
val statusTracker = new WorkerStatusTracker(celebornConf, null)

val registerTime = System.currentTimeMillis()
statusTracker.excludedWorkers.put(mock("host1"), (StatusCode.WORKER_UNKNOWN, registerTime))
statusTracker.excludedWorkers.put(mock("host1"), (StatusCode.WORKER_UNRESPONSIVE, registerTime))
statusTracker.excludedWorkers.put(mock("host2"), (StatusCode.WORKER_SHUTDOWN, registerTime))

// test reserve (only statusCode list in handleHeartbeatResponse)
Expand All @@ -46,7 +46,7 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
// only reserve host1
Assert.assertEquals(
statusTracker.excludedWorkers.get(mock("host1")),
(StatusCode.WORKER_UNKNOWN, registerTime))
(StatusCode.WORKER_UNRESPONSIVE, registerTime))
Assert.assertFalse(statusTracker.excludedWorkers.containsKey(mock("host2")))

// add shutdown/excluded worker
Expand All @@ -55,13 +55,20 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
statusTracker.handleHeartbeatResponse(response1)

// test keep Unknown register time
Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host1")))
Assert.assertEquals(
statusTracker.excludedWorkers.get(mock("host1")),
(StatusCode.WORKER_UNKNOWN, registerTime))
statusTracker.excludedWorkers.get(mock("host1"))._1,
StatusCode.WORKER_UNKNOWN)
Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host3")))
Assert.assertEquals(
statusTracker.excludedWorkers.get(mock("host3"))._1,
StatusCode.WORKER_UNKNOWN)

// test new added shutdown/excluded workers
Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host0")))
Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host3")))
Assert.assertEquals(
statusTracker.excludedWorkers.get(mock("host0"))._1,
StatusCode.WORKER_EXCLUDED)
Assert.assertTrue(!statusTracker.excludedWorkers.containsKey(mock("host4")))
Assert.assertTrue(statusTracker.shuttingWorkers.contains(mock("host4")))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
get(CLIENT_PUSH_SENDBUFFERPOOL_CHECKEXPIREINTERVAL)
def clientAdaptiveOptimizeSkewedPartitionReadEnabled: Boolean =
get(CLIENT_ADAPTIVE_OPTIMIZE_SKEWED_PARTITION_READ_ENABLED)
def clientShuffleDataLostOnUnknownWorkerEnabled: Boolean =
get(CLIENT_SHUFFLE_DATA_LOST_ON_UNKNOWN_WORKER_ENABLED)

// //////////////////////////////////////////////////////
// Client Shuffle //
Expand Down Expand Up @@ -6742,4 +6744,11 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(2)

val CLIENT_SHUFFLE_DATA_LOST_ON_UNKNOWN_WORKER_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.shuffleDataLostOnUnknownWorker.enabled")
.categories("client")
.version("0.7.0")
.doc("Whether to mark shuffle data lost when unknown worker is detected.")
.booleanConf
.createWithDefault(false)
}
1 change: 1 addition & 0 deletions docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ license: |
| celeborn.client.shuffle.rangeReadFilter.enabled | false | false | If a spark application have skewed partition, this value can set to true to improve performance. | 0.2.0 | celeborn.shuffle.rangeReadFilter.enabled |
| celeborn.client.shuffle.register.filterExcludedWorker.enabled | false | false | Whether to filter excluded worker when register shuffle. | 0.4.0 | |
| celeborn.client.shuffle.reviseLostShuffles.enabled | false | false | Whether to revise lost shuffles. | 0.6.0 | |
| celeborn.client.shuffleDataLostOnUnknownWorker.enabled | false | false | Whether to mark shuffle data lost when unknown worker is detected. | 0.7.0 | |
| celeborn.client.slot.assign.maxWorkers | 10000 | false | Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one from Master side and Client side, see `celeborn.master.slot.assign.maxWorkers`. | 0.3.1 | |
| celeborn.client.spark.fetch.cleanFailedShuffle | false | false | whether to clean those disk space occupied by shuffles which cannot be fetched | 0.6.0 | |
| celeborn.client.spark.fetch.cleanFailedShuffleInterval | 1s | false | the interval to clean the failed-to-fetch shuffle files, only valid when celeborn.client.spark.fetch.cleanFailedShuffle is enabled | 0.6.0 | |
Expand Down
Loading