Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ private[spark] class DAGScheduler(
// TODO: Garbage collect information about failure epochs when we know there are no more
// stray messages to detect.
private val failedEpoch = new HashMap[String, Long]
// There will be a regression when an executor lost and then causes 'fetch failed'.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be a regression is misleading because this mean this PR causes a regression.

// So we should distinguish the failedEpoch of 'executor lost' from the fetchFailedEpoch
// of 'fetch failed'.
private val fetchFailedEpoch = new HashMap[String, Long]

private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator

Expand Down Expand Up @@ -1682,6 +1686,7 @@ private[spark] class DAGScheduler(
execId = bmAddress.executorId,
fileLost = true,
hostToUnregisterOutputs = hostToUnregisterOutputs,
fetchFailed = true,
maybeEpoch = Some(task.epoch))
}
}
Expand Down Expand Up @@ -1838,12 +1843,21 @@ private[spark] class DAGScheduler(
execId: String,
fileLost: Boolean,
hostToUnregisterOutputs: Option[String],
fetchFailed: Boolean = false,
maybeEpoch: Option[Long] = None): Unit = {
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
val executorNewlyFailed = !failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch
val executorNewlyFetchFailed =
!fetchFailedEpoch.contains(execId) || fetchFailedEpoch(execId) < currentEpoch
if (executorNewlyFailed || executorNewlyFetchFailed) {
if (executorNewlyFailed) {
blockManagerMaster.removeExecutor(execId)
}
if (fetchFailed) {
fetchFailedEpoch(execId) = currentEpoch
}
failedEpoch(execId) = currentEpoch
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
blockManagerMaster.removeExecutor(execId)
if (fileLost) {
hostToUnregisterOutputs match {
case Some(host) =>
Expand Down Expand Up @@ -1887,6 +1901,11 @@ private[spark] class DAGScheduler(
logInfo("Host added was in lost list earlier: " + host)
failedEpoch -= execId
}
// remove from fetchFailedEpoch(execId)
if (fetchFailedEpoch.contains(execId)) {
logInfo("Host added was in lost list earlier: " + host)
fetchFailedEpoch -= execId
}
}

private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]): Unit = {
Expand Down
161 changes: 161 additions & 0 deletions core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,167 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assert(mapStatus2(2).location.host === "hostB")
}

test("[SPARK-29551] All shuffle files on the executor should be cleaned up when " +
"executor lost and then causes 'fetch failed'") {
// whether to kill Executor or not before FetchFailed
Seq(true, false).foreach { killExecutor =>
afterEach()
val conf = new SparkConf()
conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
conf.set(config.UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE.key, "false")
init(conf)
runEvent(ExecutorAdded("exec-hostA1", "hostA"))
runEvent(ExecutorAdded("exec-hostA2", "hostA"))
runEvent(ExecutorAdded("exec-hostB", "hostB"))
val firstRDD = new MyRDD(sc, 3, Nil)
val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(3))
val firstShuffleId = firstShuffleDep.shuffleId
val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3))
val secondShuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))

submit(reduceRdd, Array(0))
// map stage1 completes successfully, with one task on each executor
complete(taskSets(0), Seq(
(Success,
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345),
Array.fill[Long](1)(2), mapTaskId = 5)),
(Success,
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345),
Array.fill[Long](1)(2), mapTaskId = 6)),
(Success, makeMapStatus("hostB", 1, mapTaskId = 7))
))
// map stage2 completes successfully, with one task on each executor
complete(taskSets(1), Seq(
(Success,
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345),
Array.fill[Long](1)(2), mapTaskId = 8)),
(Success,
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345),
Array.fill[Long](1)(2), mapTaskId = 9)),
(Success, makeMapStatus("hostB", 1))
))
// make sure our test setup is correct
val initialMapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
// val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we remove this commented code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

assert(initialMapStatus1.count(_ != null) === 3)
assert(initialMapStatus1.map {
_.location.executorId
}.toSet ===
Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
val initialMapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
assert(initialMapStatus2.count(_ != null) === 3)
assert(initialMapStatus2.map {
_.location.executorId
}.toSet ===
Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
// kill exec-hostA2
if (killExecutor) {
runEvent(ExecutorLost("exec-hostA2", ExecutorKilled))
}
// reduce stage fails with a fetch failure from one host
complete(taskSets(2), Seq(
(FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345),
secondShuffleId, 0L, 0, 0, "ignored"), null)
))
// Here is the main assertion -- make sure that we de-register
// the map outputs for exec-hostA2
val mapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
assert(mapStatus1.count(_ != null) === 2)
assert(mapStatus1(0).location.executorId === "exec-hostA1")
assert(mapStatus1(0).location.host === "hostA")
assert(mapStatus1(2).location.executorId === "exec-hostB")
assert(mapStatus1(2).location.host === "hostB")

val mapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
assert(mapStatus2.count(_ != null) === 2)
assert(mapStatus2(0).location.executorId === "exec-hostA1")
assert(mapStatus2(0).location.host === "hostA")
assert(mapStatus2(2).location.executorId === "exec-hostB")
assert(mapStatus2(2).location.host === "hostB")
}
}

test("[SPARK-29551] All shuffle files on the host should be cleaned up when host lost") {
// whether to kill Executor or not before FetchFailed
Seq(true, false).foreach { killExecutor =>
afterEach()
val conf = new SparkConf()
conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
conf.set(config.UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE.key, "true")
init(conf)
runEvent(ExecutorAdded("exec-hostA1", "hostA"))
runEvent(ExecutorAdded("exec-hostA2", "hostA"))
runEvent(ExecutorAdded("exec-hostB", "hostB"))
val firstRDD = new MyRDD(sc, 3, Nil)
val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(3))
val firstShuffleId = firstShuffleDep.shuffleId
val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3))
val secondShuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))

submit(reduceRdd, Array(0))
// map stage1 completes successfully, with one task on each executor
complete(taskSets(0), Seq(
(Success,
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345),
Array.fill[Long](1)(2), mapTaskId = 5)),
(Success,
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345),
Array.fill[Long](1)(2), mapTaskId = 6)),
(Success, makeMapStatus("hostB", 1, mapTaskId = 7))
))
// map stage2 completes successfully, with one task on each executor
complete(taskSets(1), Seq(
(Success,
MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345),
Array.fill[Long](1)(2), mapTaskId = 8)),
(Success,
MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345),
Array.fill[Long](1)(2), mapTaskId = 9)),
(Success, makeMapStatus("hostB", 1))
))
// make sure our test setup is correct
val initialMapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
assert(initialMapStatus1.count(_ != null) === 3)
assert(initialMapStatus1.map {
_.location.executorId
}.toSet ===
Set("exec-hostA1", "exec-hostA2", "exec-hostB"))

val initialMapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
assert(initialMapStatus2.count(_ != null) === 3)
assert(initialMapStatus2.map {
_.location.executorId
}.toSet ===
Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
// kill exec-hostA2
if (killExecutor) {
runEvent(ExecutorLost("exec-hostA2", ExecutorKilled))
}
// reduce stage fails with a fetch failure from one host
complete(taskSets(2), Seq(
(FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345),
secondShuffleId, 0L, 0, 0, "ignored"),
null)
))

// Here is the main assertion -- make sure that we de-register
// the map outputs for both map stage from both executors on hostA
val mapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
assert(mapStatus1.count(_ != null) === 1)
assert(mapStatus1(2).location.executorId === "exec-hostB")
assert(mapStatus1(2).location.host === "hostB")

val mapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
assert(mapStatus2.count(_ != null) === 1)
assert(mapStatus2(2).location.executorId === "exec-hostB")
assert(mapStatus2(2).location.host === "hostB")
}
}

test("zero split job") {
var numResults = 0
var failureReason: Option[Exception] = None
Expand Down