Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int,
private val blacklistedExecs = new HashSet[String]()
private val blacklistedNodes = new HashSet[String]()

private var latestFailureReason: String = null

/**
* Get the most recent failure reason of this TaskSet.
* @return
*/
def getLatestFailureReason: String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerryshao the whole class is private[scheduler], so I think is OK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito yes from scope level it is fine. My thought is that this exposes the class member to other class unnecessarily. Yeah it is not a big deal, just my personal preference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jerryshao @squito Could you help trigger an other jenkins test?Since last one has pySpark failure.

latestFailureReason
}

/**
* Return true if this executor is blacklisted for the given task. This does *not*
* need to return true if the executor is blacklisted for the entire stage, or blacklisted
Expand Down Expand Up @@ -94,7 +104,9 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int,
private[scheduler] def updateBlacklistForFailedTask(
host: String,
exec: String,
index: Int): Unit = {
index: Int,
failureReason: String): Unit = {
latestFailureReason = failureReason
val execFailures = execToFailures.getOrElseUpdate(exec, new ExecutorFailuresInTaskSet(host))
execFailures.updateWithFailure(index, clock.getTimeMillis())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,9 +670,14 @@ private[spark] class TaskSetManager(
}
if (blacklistedEverywhere) {
val partition = tasks(indexInTaskSet).partitionId
abort(s"Aborting $taskSet because task $indexInTaskSet (partition $partition) " +
s"cannot run anywhere due to node and executor blacklist. Blacklisting behavior " +
s"can be configured via spark.blacklist.*.")
abort(s"""
|Aborting $taskSet because task $indexInTaskSet (partition $partition)
|cannot run anywhere due to node and executor blacklist.
|Most recent failure:
|${taskSetBlacklist.getLatestFailureReason}
|
|Blacklisting behavior can be configured via spark.blacklist.*.
|""".stripMargin)
}
}
}
Expand Down Expand Up @@ -837,9 +842,9 @@ private[spark] class TaskSetManager(
sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)

if (!isZombie && reason.countTowardsTaskFailures) {
taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
info.host, info.executorId, index))
assert (null != failureReason)
taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
info.host, info.executorId, index, failureReason))
numFailures(index) += 1
if (numFailures(index) >= maxTaskFailures) {
logError("Task %d in stage %s failed %d times; aborting job".format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
withBackend(runBackend _) {
val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
awaitJobTermination(jobFuture, duration)
val pattern = ("Aborting TaskSet 0.0 because task .* " +
"cannot run anywhere due to node and executor blacklist").r
val pattern = (
s"""|Aborting TaskSet 0.0 because task .*
|cannot run anywhere due to node and executor blacklist""".stripMargin).r
assert(pattern.findFirstIn(failure.getMessage).isDefined,
s"Couldn't find $pattern in ${failure.getMessage()}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
val taskSetBlacklist = createTaskSetBlacklist(stageId)
if (stageId % 2 == 0) {
// fail one task in every other taskset
taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
taskSetBlacklist.updateBlacklistForFailedTask(
"hostA", exec = "1", index = 0, failureReason = "testing")
failuresSoFar += 1
}
blacklist.updateBlacklistForSuccessfulTaskSet(stageId, 0, taskSetBlacklist.execToFailures)
Expand All @@ -132,7 +133,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// for many different stages, executor 1 fails a task, and then the taskSet fails.
(0 until failuresUntilBlacklisted * 10).foreach { stage =>
val taskSetBlacklist = createTaskSetBlacklist(stage)
taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
taskSetBlacklist.updateBlacklistForFailedTask(
"hostA", exec = "1", index = 0, failureReason = "testing")
}
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
}
Expand All @@ -147,7 +149,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
val numFailures = math.max(conf.get(config.MAX_FAILURES_PER_EXEC),
conf.get(config.MAX_FAILURES_PER_EXEC_STAGE))
(0 until numFailures).foreach { index =>
taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = index)
taskSetBlacklist.updateBlacklistForFailedTask(
"hostA", exec = "1", index = index, failureReason = "testing")
}
assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
Expand All @@ -170,7 +173,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for the whole
// application.
(0 until 4).foreach { partition =>
taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
taskSetBlacklist0.updateBlacklistForFailedTask(
"hostA", exec = "1", index = partition, failureReason = "testing")
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures)
assert(blacklist.nodeBlacklist() === Set())
Expand All @@ -183,7 +187,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// application. Since that's the second executor that is blacklisted on the same node, we also
// blacklist that node.
(0 until 4).foreach { partition =>
taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
taskSetBlacklist1.updateBlacklistForFailedTask(
"hostA", exec = "2", index = partition, failureReason = "testing")
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist1.execToFailures)
assert(blacklist.nodeBlacklist() === Set("hostA"))
Expand All @@ -207,7 +212,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// Fail one more task, but executor isn't put back into blacklist since the count of failures
// on that executor should have been reset to 0.
val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 2)
taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
taskSetBlacklist2.updateBlacklistForFailedTask(
"hostA", exec = "1", index = 0, failureReason = "testing")
blacklist.updateBlacklistForSuccessfulTaskSet(2, 0, taskSetBlacklist2.execToFailures)
assert(blacklist.nodeBlacklist() === Set())
assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
Expand All @@ -221,7 +227,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// Lets say that executor 1 dies completely. We get some task failures, but
// the taskset then finishes successfully (elsewhere).
(0 until 4).foreach { partition =>
taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
taskSetBlacklist0.updateBlacklistForFailedTask(
"hostA", exec = "1", index = partition, failureReason = "testing")
}
blacklist.handleRemovedExecutor("1")
blacklist.updateBlacklistForSuccessfulTaskSet(
Expand All @@ -236,7 +243,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// Now another executor gets spun up on that host, but it also dies.
val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
(0 until 4).foreach { partition =>
taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
taskSetBlacklist1.updateBlacklistForFailedTask(
"hostA", exec = "2", index = partition, failureReason = "testing")
}
blacklist.handleRemovedExecutor("2")
blacklist.updateBlacklistForSuccessfulTaskSet(
Expand Down Expand Up @@ -279,7 +287,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M

def failOneTaskInTaskSet(exec: String): Unit = {
val taskSetBlacklist = createTaskSetBlacklist(stageId = stageId)
taskSetBlacklist.updateBlacklistForFailedTask("host-" + exec, exec, 0)
taskSetBlacklist.updateBlacklistForFailedTask("host-" + exec, exec, 0, "testing")
blacklist.updateBlacklistForSuccessfulTaskSet(stageId, 0, taskSetBlacklist.execToFailures)
stageId += 1
}
Expand Down Expand Up @@ -354,12 +362,12 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 2)
// Taskset1 has one failure immediately
taskSetBlacklist1.updateBlacklistForFailedTask("host-1", "1", 0)
taskSetBlacklist1.updateBlacklistForFailedTask("host-1", "1", 0, "testing")
// Then we have a *long* delay, much longer than the timeout, before any other failures or
// taskset completion
clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS * 5)
// After the long delay, we have one failure on taskset 2, on the same executor
taskSetBlacklist2.updateBlacklistForFailedTask("host-1", "1", 0)
taskSetBlacklist2.updateBlacklistForFailedTask("host-1", "1", 0, "testing")
// Finally, we complete both tasksets. Its important here to complete taskset2 *first*. We
// want to make sure that when taskset 1 finishes, even though we've now got two task failures,
// we realize that the task failure we just added was well before the timeout.
Expand All @@ -377,16 +385,20 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// we blacklist executors on two different hosts -- make sure that doesn't lead to any
// node blacklisting
val taskSetBlacklist0 = createTaskSetBlacklist(stageId = 0)
taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = 1)
taskSetBlacklist0.updateBlacklistForFailedTask(
"hostA", exec = "1", index = 0, failureReason = "testing")
taskSetBlacklist0.updateBlacklistForFailedTask(
"hostA", exec = "1", index = 1, failureReason = "testing")
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures)
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1"))
verify(listenerBusMock).post(SparkListenerExecutorBlacklisted(0, "1", 2))
assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())

val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
taskSetBlacklist1.updateBlacklistForFailedTask("hostB", exec = "2", index = 0)
taskSetBlacklist1.updateBlacklistForFailedTask("hostB", exec = "2", index = 1)
taskSetBlacklist1.updateBlacklistForFailedTask(
"hostB", exec = "2", index = 0, failureReason = "testing")
taskSetBlacklist1.updateBlacklistForFailedTask(
"hostB", exec = "2", index = 1, failureReason = "testing")
blacklist.updateBlacklistForSuccessfulTaskSet(1, 0, taskSetBlacklist1.execToFailures)
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2"))
verify(listenerBusMock).post(SparkListenerExecutorBlacklisted(0, "2", 2))
Expand All @@ -395,8 +407,10 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// Finally, blacklist another executor on the same node as the original blacklisted executor,
// and make sure this time we *do* blacklist the node.
val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 0)
taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "3", index = 0)
taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "3", index = 1)
taskSetBlacklist2.updateBlacklistForFailedTask(
"hostA", exec = "3", index = 0, failureReason = "testing")
taskSetBlacklist2.updateBlacklistForFailedTask(
"hostA", exec = "3", index = 1, failureReason = "testing")
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist2.execToFailures)
assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2", "3"))
verify(listenerBusMock).post(SparkListenerExecutorBlacklisted(0, "3", 2))
Expand Down Expand Up @@ -486,7 +500,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for the whole
// application.
(0 until 4).foreach { partition =>
taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
taskSetBlacklist0.updateBlacklistForFailedTask(
"hostA", exec = "1", index = partition, failureReason = "testing")
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures)

Expand All @@ -497,7 +512,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// application. Since that's the second executor that is blacklisted on the same node, we also
// blacklist that node.
(0 until 4).foreach { partition =>
taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
taskSetBlacklist1.updateBlacklistForFailedTask(
"hostA", exec = "2", index = partition, failureReason = "testing")
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist1.execToFailures)

Expand All @@ -512,7 +528,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for the whole
// application.
(0 until 4).foreach { partition =>
taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
taskSetBlacklist2.updateBlacklistForFailedTask(
"hostA", exec = "1", index = partition, failureReason = "testing")
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist2.execToFailures)

Expand All @@ -523,7 +540,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
// application. Since that's the second executor that is blacklisted on the same node, we also
// blacklist that node.
(0 until 4).foreach { partition =>
taskSetBlacklist3.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
taskSetBlacklist3.updateBlacklistForFailedTask(
"hostA", exec = "2", index = partition, failureReason = "testing")
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist3.execToFailures)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,9 +660,14 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(tsm.isZombie)
assert(failedTaskSet)
val idx = failedTask.index
assert(failedTaskSetReason === s"Aborting TaskSet 0.0 because task $idx (partition $idx) " +
s"cannot run anywhere due to node and executor blacklist. Blacklisting behavior can be " +
s"configured via spark.blacklist.*.")
assert(failedTaskSetReason === s"""
|Aborting $taskSet because task $idx (partition $idx)
|cannot run anywhere due to node and executor blacklist.
|Most recent failure:
|${tsm.taskSetBlacklistHelperOpt.get.getLatestFailureReason}
|
|Blacklisting behavior can be configured via spark.blacklist.*.
|""".stripMargin)
}

test("don't abort if there is an executor available, though it hasn't had scheduled tasks yet") {
Expand Down
Loading