Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ package object config {
.internal()
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

private[spark] val BLACKLIST_FETCH_FAILURE_ENABLED =
ConfigBuilder("spark.blacklist.application.fetchFailure.enabled")
.booleanConf
.createWithDefault(false)
// End blacklist confs

private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ private[scheduler] class BlacklistTracker (
private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC)
private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE)
val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf)
private val BLACKLIST_FETCH_FAILURE_ENABLED = conf.get(config.BLACKLIST_FETCH_FAILURE_ENABLED)

/**
* A map from executorId to information on task failures. Tracks the time of each task failure,
Expand Down Expand Up @@ -145,6 +146,74 @@ private[scheduler] class BlacklistTracker (
nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry)
}

private def killBlacklistedExecutor(exec: String): Unit = {
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not make this a private val?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be a just self-preferred choice, this code was not written by me, I just made some refactoring.

allocationClient match {
case Some(a) =>
logInfo(s"Killing blacklisted executor id $exec " +
s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
a.killExecutors(Seq(exec), true, true)
case None =>
logWarning(s"Not attempting to kill blacklisted executor id $exec " +
s"since allocation client is not defined.")
}
}
}

private def killExecutorsOnBlacklistedNode(node: String): Unit = {
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
allocationClient match {
case Some(a) =>
logInfo(s"Killing all executors on blacklisted host $node " +
s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
if (a.killExecutorsOnHost(node) == false) {
logError(s"Killing executors on node $node failed.")
}
case None =>
logWarning(s"Not attempting to kill executors on blacklisted host $node " +
s"since allocation client is not defined.")
}
}
}

def updateBlacklistForFetchFailure(host: String, exec: String): Unit = {
if (BLACKLIST_FETCH_FAILURE_ENABLED) {
// If we blacklist on fetch failures, we are implicitly saying that we believe the failure is
// non-transient, and can't be recovered from (even if this is the first fetch failure,
// stage is retried after just one failure, so we don't always get a chance to collect
// multiple fetch failures).
// If the external shuffle-service is on, then every other executor on this node would
// be suffering from the same issue, so we should blacklist (and potentially kill) all
// of them immediately.

val now = clock.getTimeMillis()
val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS

if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
if (!nodeIdToBlacklistExpiryTime.contains(host)) {
logInfo(s"blacklisting node $host due to fetch failure of external shuffle service")

nodeIdToBlacklistExpiryTime.put(host, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, host, 1))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
killExecutorsOnBlacklistedNode(host)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to call updateNextExpiryTime() here now since there isn't an explicit executorid being blacklisted which would have normally called it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would also be nice to make sure we have a test for the expiry.

updateNextExpiryTime()
}
} else if (!executorIdToBlacklistStatus.contains(exec)) {
logInfo(s"Blacklisting executor $exec due to fetch failure")

executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(host, expiryTimeForNewBlacklists))
// We hardcoded number of failure tasks to 1 for fetch failure, because there's no
// reattempt for such failure.
listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, 1))
updateNextExpiryTime()
killBlacklistedExecutor(exec)

val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(exec, HashSet[String]())
blacklistedExecsOnNode += exec
}
}
}

def updateBlacklistForSuccessfulTaskSet(
stageId: Int,
Expand Down Expand Up @@ -174,17 +243,7 @@ private[scheduler] class BlacklistTracker (
listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal))
executorIdToFailureList.remove(exec)
updateNextExpiryTime()
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
allocationClient match {
case Some(allocationClient) =>
logInfo(s"Killing blacklisted executor id $exec " +
s"since spark.blacklist.killBlacklistedExecutors is set.")
allocationClient.killExecutors(Seq(exec), true, true)
case None =>
logWarning(s"Not attempting to kill blacklisted executor id $exec " +
s"since allocation client is not defined.")
}
}
killBlacklistedExecutor(exec)

// In addition to blacklisting the executor, we also update the data for failures on the
// node, and potentially put the entire node into a blacklist as well.
Expand All @@ -199,19 +258,7 @@ private[scheduler] class BlacklistTracker (
nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
allocationClient match {
case Some(allocationClient) =>
logInfo(s"Killing all executors on blacklisted host $node " +
s"since spark.blacklist.killBlacklistedExecutors is set.")
if (allocationClient.killExecutorsOnHost(node) == false) {
logError(s"Killing executors on node $node failed.")
}
case None =>
logWarning(s"Not attempting to kill executors on blacklisted host $node " +
s"since allocation client is not defined.")
}
}
killExecutorsOnBlacklistedNode(node)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,21 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}
* acquire a lock on us, so we need to make sure that we don't try to lock the backend while
* we are holding a lock on ourselves.
*/
private[spark] class TaskSchedulerImpl private[scheduler](
private[spark] class TaskSchedulerImpl(
val sc: SparkContext,
val maxTaskFailures: Int,
private[scheduler] val blacklistTrackerOpt: Option[BlacklistTracker],
isLocal: Boolean = false)
extends TaskScheduler with Logging {

import TaskSchedulerImpl._

def this(sc: SparkContext) = {
this(
sc,
sc.conf.get(config.MAX_TASK_FAILURES),
TaskSchedulerImpl.maybeCreateBlacklistTracker(sc))
this(sc, sc.conf.get(config.MAX_TASK_FAILURES))
}

def this(sc: SparkContext, maxTaskFailures: Int, isLocal: Boolean) = {
this(
sc,
maxTaskFailures,
TaskSchedulerImpl.maybeCreateBlacklistTracker(sc),
isLocal = isLocal)
}
// Lazily initializing blackListTrackOpt to avoid getting empty ExecutorAllocationClient,
// because ExecutorAllocationClient is created after this TaskSchedulerImpl.
private[scheduler] lazy val blacklistTrackerOpt = maybeCreateBlacklistTracker(sc)

val conf = sc.conf

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,12 @@ private[spark] class TaskSetManager(
tasksSuccessful += 1
}
isZombie = true

if (fetchFailed.bmAddress != null) {
blacklistTracker.foreach(_.updateBlacklistForFetchFailure(
fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId))
}

None

case ef: ExceptionFailure =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,4 +529,59 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
verify(allocationClientMock).killExecutors(Seq("2"), true, true)
verify(allocationClientMock).killExecutorsOnHost("hostA")
}

test("fetch failure blacklisting kills executors, configured by BLACKLIST_KILL_ENABLED") {
val allocationClientMock = mock[ExecutorAllocationClient]
when(allocationClientMock.killExecutors(any(), any(), any())).thenReturn(Seq("called"))
when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer(new Answer[Boolean] {
// To avoid a race between blacklisting and killing, it is important that the nodeBlacklist
// is updated before we ask the executor allocation client to kill all the executors
// on a particular host.
override def answer(invocation: InvocationOnMock): Boolean = {
if (blacklist.nodeBlacklist.contains("hostA") == false) {
throw new IllegalStateException("hostA should be on the blacklist")
}
true
}
})

conf.set(config.BLACKLIST_FETCH_FAILURE_ENABLED, true)
blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock)

// Disable auto-kill. Blacklist an executor and make sure killExecutors is not called.
conf.set(config.BLACKLIST_KILL_ENABLED, false)
blacklist.updateBlacklistForFetchFailure("hostA", exec = "1")

verify(allocationClientMock, never).killExecutors(any(), any(), any())
verify(allocationClientMock, never).killExecutorsOnHost(any())

// Enable auto-kill. Blacklist an executor and make sure killExecutors is called.
conf.set(config.BLACKLIST_KILL_ENABLED, true)
blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock)
clock.advance(1000)
blacklist.updateBlacklistForFetchFailure("hostA", exec = "1")

verify(allocationClientMock).killExecutors(Seq("1"), true, true)
verify(allocationClientMock, never).killExecutorsOnHost(any())

assert(blacklist.executorIdToBlacklistStatus.contains("1"))
assert(blacklist.executorIdToBlacklistStatus("1").node === "hostA")
assert(blacklist.executorIdToBlacklistStatus("1").expiryTime ===
1000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
assert(blacklist.nextExpiryTime === 1000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
assert(blacklist.nodeIdToBlacklistExpiryTime.isEmpty)

// Enable external shuffle service to see if all the executors on this node will be killed.
conf.set(config.SHUFFLE_SERVICE_ENABLED, true)
clock.advance(1000)
blacklist.updateBlacklistForFetchFailure("hostA", exec = "2")

verify(allocationClientMock, never).killExecutors(Seq("2"), true, true)
verify(allocationClientMock).killExecutorsOnHost("hostA")

assert(blacklist.nodeIdToBlacklistExpiryTime.contains("hostA"))
assert(blacklist.nodeIdToBlacklistExpiryTime("hostA") ===
2000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
assert(blacklist.nextExpiryTime === 1000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also test with SHUFFLE_SERVICE_ENABLED=true and BLACKLIST_KILL_ENABLED=false ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If BLACKLIST_KILL_ENABLED=false, the scenario should be the same as here. It looks like a duplication not so necessary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense.

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
conf.set(config.BLACKLIST_ENABLED, true)
sc = new SparkContext(conf)
taskScheduler =
new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4), Some(blacklist)) {
new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4)) {
override def createTaskSetManager(taskSet: TaskSet, maxFailures: Int): TaskSetManager = {
val tsm = super.createTaskSetManager(taskSet, maxFailures)
// we need to create a spied tsm just so we can set the TaskSetBlacklist
Expand All @@ -98,6 +98,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
stageToMockTaskSetBlacklist(taskSet.stageId) = taskSetBlacklist
tsmSpy
}

override private[scheduler] lazy val blacklistTrackerOpt = Some(blacklist)
}
setupHelper()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,38 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
.updateBlacklistForFailedTask(anyString(), anyString(), anyInt())
}

test("update application blacklist for shuffle-fetch") {
// Setup a taskset, and fail some one task for fetch failure.
val conf = new SparkConf()
.set(config.BLACKLIST_ENABLED, true)
.set(config.SHUFFLE_SERVICE_ENABLED, true)
.set(config.BLACKLIST_FETCH_FAILURE_ENABLED, true)
sc = new SparkContext("local", "test", conf)
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val taskSet = FakeTask.createTaskSet(4)
val blacklistTracker = new BlacklistTracker(sc, None)
val tsm = new TaskSetManager(sched, taskSet, 4, Some(blacklistTracker))

// make some offers to our taskset, to get tasks we will fail
val taskDescs = Seq(
"exec1" -> "host1",
"exec2" -> "host2"
).flatMap { case (exec, host) =>
// offer each executor twice (simulating 2 cores per executor)
(0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)}
}
assert(taskDescs.size === 4)

assert(!blacklistTracker.isExecutorBlacklisted(taskDescs(0).executorId))
assert(!blacklistTracker.isNodeBlacklisted("host1"))

// Fail the task with fetch failure
tsm.handleFailedTask(taskDescs(0).taskId, TaskState.FAILED,
FetchFailed(BlockManagerId(taskDescs(0).executorId, "host1", 12345), 0, 0, 0, "ignored"))

assert(blacklistTracker.isNodeBlacklisted("host1"))
}

private def createTaskResult(
id: Int,
accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1448,6 +1448,15 @@ Apart from these, the following properties are also available, and may be useful
all of the executors on that node will be killed.
</td>
</tr>
<tr>
<td><code>spark.blacklist.application.fetchFailure.enabled</code></td>
<td>false</td>
<td>
(Experimental) If set to "true", Spark will blacklist the executor immediately when a fetch
failure happenes. If external shuffle service is enabled, then the whole node will be
blacklisted.
</td>
</tr>
<tr>
<td><code>spark.speculation</code></td>
<td>false</td>
Expand Down