Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
9a6aaed
enhance blacklist mechanism
wei-mao-intel Dec 29, 2015
5bfe941
Update for new design
squito May 10, 2016
d7adc67
(node,task) blacklisting
squito Jul 7, 2016
a34e9ae
go back to having the blacklist tracker as an option, rather than the…
squito Jul 7, 2016
cf58374
dont count shuffle-fetch failures
squito Jul 7, 2016
7fcb266
make sure we clear the (node, task) blacklist on stage completion, ad…
squito Jul 8, 2016
487eb66
review feedback
squito Jul 11, 2016
c22aaad
fix
squito Jul 12, 2016
dc2b3ed
all taskset specific blacklisting is now in TaskSetManager
squito Jul 13, 2016
338db65
fix
squito Jul 13, 2016
fa3e34a
Merge branch 'master' into blacklist-SPARK-8425
squito Jul 13, 2016
16afb43
review feedback
squito Jul 14, 2016
7aff08a
review feedback
squito Jul 14, 2016
e181546
rename conf
squito Jul 14, 2016
351a9a7
use typed confs consistently
squito Jul 14, 2016
572c777
docs
squito Jul 20, 2016
8cebb01
api simplification
squito Jul 20, 2016
dbf904e
review feedback
squito Jul 20, 2016
f0de0db
fix for config name change
squito Jul 20, 2016
8a12adf
exclude killed tasks and preempted tasks from blacklist
squito Jul 22, 2016
c9e3662
combine imports
squito Jul 26, 2016
497e626
review feedback
squito Aug 11, 2016
515b18a
add task timeouts
squito Aug 18, 2016
f0428b4
separate datastructure to track blacklisted execs in a tsm, to simpli…
squito Aug 18, 2016
a5fbce7
Merge branch 'master' into blacklist-SPARK-8425
squito Aug 18, 2016
b582d8e
fix missing import
squito Aug 18, 2016
cec36c9
fix line wrapping
squito Aug 18, 2016
290b315
fix test by turning off blacklist
squito Aug 18, 2016
8c58ad9
unused import
squito Aug 18, 2016
f012780
review feedback
squito Aug 22, 2016
fc45f5b
fix some typos
squito Aug 22, 2016
f8b1bff
add validation for blacklist confs
squito Aug 22, 2016
e56bb90
update test to turn off blacklist
squito Aug 22, 2016
cc3b968
fix timeout of individual tasks
squito Aug 26, 2016
5fdfe49
simplify task expiry by doing it lazily
squito Aug 26, 2016
e10fa10
review feedback
squito Aug 31, 2016
1297788
Merge branch 'master' into blacklist-SPARK-8425
squito Aug 31, 2016
c78964f
fix bad merge
squito Aug 31, 2016
b679953
Merge branch 'master' into blacklist-SPARK-8425
squito Sep 21, 2016
463b837
more cleanup of TaskEndReason -> TaskFailedReason
squito Sep 21, 2016
9a2cf84
review feedback
squito Sep 21, 2016
d0f43c7
review feedback
squito Sep 21, 2016
cfb653e
Merge branch 'master' into blacklist-SPARK-8425
squito Sep 22, 2016
18ef5c6
review feedback
squito Sep 22, 2016
0c3ceba
pull out TaskSetBlacklist helper
squito Sep 26, 2016
2381b25
oops, put class in the right place
squito Sep 26, 2016
3ca2f79
more refactor for TaskSetBlacklist
squito Sep 26, 2016
27b4bde
fix logging
squito Sep 26, 2016
278fff3
undo some un-intentional changes
squito Sep 26, 2016
1a467f0
Merge branch 'master' into blacklist-SPARK-8425
squito Oct 17, 2016
0ff7d16
fix merge
squito Oct 17, 2016
ff49a62
fix merge
squito Oct 19, 2016
21907a5
more cleanup from merge
squito Oct 19, 2016
0c57d9d
bit more cleanup
squito Oct 19, 2016
162cb0d
fix serializability of RequestExecutors, add test
squito Oct 20, 2016
cb658dd
cleanup
squito Oct 20, 2016
6b3babc
cleanup after test
squito Oct 21, 2016
37f1573
Merge branch 'master' into blacklist-SPARK-8425
squito Oct 26, 2016
45f42eb
fix merge
squito Oct 27, 2016
cdd9f33
rename
squito Oct 26, 2016
255e9b6
remove some unnecessary changes
squito Oct 27, 2016
d431f26
review feedback
squito Nov 16, 2016
cc3faaf
review feedback
squito Nov 16, 2016
5d8500a
protect against race condition in test asserts
squito Nov 17, 2016
72036f4
Merge branch 'master' into blacklist-SPARK-8425
squito Nov 28, 2016
fd57d86
Merge branch 'master' into blacklist-SPARK-8425
squito Nov 30, 2016
35978e2
minor cleanup
squito Nov 30, 2016
555039d
review feedback
squito Dec 13, 2016
c422dd4
Merge branch 'master' into blacklist-SPARK-8425
squito Dec 13, 2016
c95462f
check executor id
squito Dec 14, 2016
f249b00
Merge branch 'master' into blacklist-SPARK-8425
squito Dec 15, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,21 @@ package object config {
.intConf
.createWithDefault(2)

private[spark] val MAX_FAILURES_PER_EXEC =
ConfigBuilder("spark.blacklist.application.maxFailedTasksPerExecutor")
.intConf
.createWithDefault(2)

private[spark] val MAX_FAILURES_PER_EXEC_STAGE =
ConfigBuilder("spark.blacklist.stage.maxFailedTasksPerExecutor")
.intConf
.createWithDefault(2)

private[spark] val MAX_FAILED_EXEC_PER_NODE =
ConfigBuilder("spark.blacklist.application.maxFailedExecutorsPerNode")
.intConf
.createWithDefault(2)

private[spark] val MAX_FAILED_EXEC_PER_NODE_STAGE =
ConfigBuilder("spark.blacklist.stage.maxFailedExecutorsPerNode")
.intConf
Expand Down
272 changes: 270 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,274 @@

package org.apache.spark.scheduler

import java.util.concurrent.atomic.AtomicReference

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.util.Utils
import org.apache.spark.util.{Clock, SystemClock, Utils}

/**
* BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting
* executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add
* additional blacklisting of executors and nodes for individual tasks and stages which works in
* concert with the blacklisting here.
*
* The tracker needs to deal with a variety of workloads, eg.:
*
* * bad user code -- this may lead to many task failures, but that should not count against
* individual executors
* * many small stages -- this may prevent a bad executor for having many failures within one
* stage, but still many failures over the entire application
* * "flaky" executors -- they don't fail every task, but are still faulty enough to merit
* blacklisting
*
* See the design doc on SPARK-8425 for a more in-depth discussion.
*
* THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe. Though it is
* called by multiple threads, callers must already have a lock on the TaskSchedulerImpl. The
* one exception is [[nodeBlacklist()]], which can be called without holding a lock.
*/
private[scheduler] class BlacklistTracker (
conf: SparkConf,
clock: Clock = new SystemClock()) extends Logging {

BlacklistTracker.validateBlacklistConfs(conf)
private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC)
private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE)
val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf)

/**
* A map from executorId to information on task failures. Tracks the time of each task failure,
* so that we can avoid blacklisting executors due to failures that are very far apart. We do not
* actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take
* to do so. But it will not grow too large, because as soon as an executor gets too many
* failures, we blacklist the executor and remove its entry here.
*/
private val executorIdToFailureList = new HashMap[String, ExecutorFailureList]()
val executorIdToBlacklistStatus = new HashMap[String, BlacklistedExecutor]()
val nodeIdToBlacklistExpiryTime = new HashMap[String, Long]()
/**
* An immutable copy of the set of nodes that are currently blacklisted. Kept in an
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change this to have the first sentence say "An immutable copy of the set of nodes that are currently blacklisted (i.e., of the keys in nodeIdToBlacklistExpiryTime). Kept..."?
(I keep forgetting why this is necessary)

* AtomicReference to make [[nodeBlacklist()]] thread-safe.
*/
private val _nodeBlacklist = new AtomicReference[Set[String]](Set())
/**
* Time when the next blacklist will expire. Used as a
* shortcut to avoid iterating over all entries in the blacklist when none will have expired.
*/
var nextExpiryTime: Long = Long.MaxValue
/**
* Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not*
* remove from this when executors are removed from spark, so we can track when we get multiple
* successive blacklisted executors on one node. Nonetheless, it will not grow too large because
* there cannot be many blacklisted executors on one node, before we stop requesting more
* executors on that node, and we clean up the list of blacklisted executors once an executor has
* been blacklisted for BLACKLIST_TIMEOUT_MILLIS.
*/
val nodeToBlacklistedExecs = new HashMap[String, HashSet[String]]()

/**
* Un-blacklists executors and nodes that have been blacklisted for at least
* BLACKLIST_TIMEOUT_MILLIS
*/
def applyBlacklistTimeout(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a docstring here? "Un-blacklists executors and nodes that have been blacklisted for at least BLACKLIST_TIMEOUT"?

val now = clock.getTimeMillis()
// quickly check if we've got anything to expire from blacklist -- if not, avoid doing any work
if (now > nextExpiryTime) {
// Apply the timeout to blacklisted nodes and executors
val execsToUnblacklist = executorIdToBlacklistStatus.filter(_._2.expiryTime < now).keys
if (execsToUnblacklist.nonEmpty) {
// Un-blacklist any executors that have been blacklisted longer than the blacklist timeout.
logInfo(s"Removing executors $execsToUnblacklist from blacklist because the blacklist " +
s"for those executors has timed out")
execsToUnblacklist.foreach { exec =>
val status = executorIdToBlacklistStatus.remove(exec).get
val failedExecsOnNode = nodeToBlacklistedExecs(status.node)
failedExecsOnNode.remove(exec)
if (failedExecsOnNode.isEmpty) {
nodeToBlacklistedExecs.remove(status.node)
}
}
}
val nodesToUnblacklist = nodeIdToBlacklistExpiryTime.filter(_._2 < now).keys
if (nodesToUnblacklist.nonEmpty) {
// Un-blacklist any nodes that have been blacklisted longer than the blacklist timeout.
logInfo(s"Removing nodes $nodesToUnblacklist from blacklist because the blacklist " +
s"has timed out")
nodeIdToBlacklistExpiryTime --= nodesToUnblacklist
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
}
updateNextExpiryTime()
}
}

private def updateNextExpiryTime(): Unit = {
val execMinExpiry = if (executorIdToBlacklistStatus.nonEmpty) {
executorIdToBlacklistStatus.map{_._2.expiryTime}.min
} else {
Long.MaxValue
}
val nodeMinExpiry = if (nodeIdToBlacklistExpiryTime.nonEmpty) {
nodeIdToBlacklistExpiryTime.values.min
} else {
Long.MaxValue
}
nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry)
}


def updateBlacklistForSuccessfulTaskSet(
stageId: Int,
stageAttemptId: Int,
failuresByExec: HashMap[String, ExecutorFailuresInTaskSet]): Unit = {
// if any tasks failed, we count them towards the overall failure count for the executor at
// this point.
val now = clock.getTimeMillis()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens for super long task sets (e.g., where the duration of the task set is longer than the blacklist timeout)? In that case, we could be adding things to the blacklist that have already expired / is that handled correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, I think it depends what you mean by "handled correctly". We use the time the taskset completes, so its OK if the failures happened long ago when the taskset started, we still count those failures in the app blacklist, so later failures can trickle in and push us over the limit.

OTOH, this also means that if we were already close to the limit on failures for the application when this taskset started, then a really long running taskset will fail to push us over the limit -- by the time the latest task set finishes, we've expired the old failures, so we only get failures from the new taskset. So if your taskset time is longer than the blacklist timeout, you're unlikely to ever get application level blacklisting.

Clearly this is not great, but its not that bad. After all, even if it were app-level blacklisted, we'd hit still the timeout and remove the bad resources from the blacklist, so that we'd need to rediscover it in future blacklists. One of the main reasons for the app-level blacklist is to avoid lots of failures when the tasksets are short. If you really want an application level blacklist which is useful across really long tasksets, then you've got to crank up your timeout.

We could change this slightly by first updating the application level blacklist, and then expiring failures past the timeout. But to me that behavior seems much less intuitive, for a pretty questionable gain.

Does that make sense? What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh gosh, I lied completely about the way the time handling works. sorry that is somewhat embarassing. But the overall comment about what happens with long tasksets still apply -- its not so clear what the thing to do is in that case. If you really want to handle long tasksets and have app-level blacklsiting, you need to increase that timeout.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's what I was concerned about:

Suppose BLACKLIST_TIMEOUT_MILLIS is 5 and MAX_FAILURES_PER_EXEC is 2.

In a long running task set, task set 0, a task fails on executor A at time 8, but the blacklist tracker doesn't find out about this until much later when the task set finishes (for the sake of example, time 100).

In the meantime task set 1 runs, has a task that fails on executor A at time 98, and then completes shortly thereafter at time 99.

At this point, there have been two failures on executor A: one at time 8 and one at time 98. These are so far apart that they shouldn't cause A to be blacklisted. But it looks like when task set 0 finishes, we'll still add the entry at time 8 to ExecutorFailureList, and then hit MAX_FAILURES_PER_EXEC and blacklist executor A. This seems overly aggressive (i.e., it seems like long-running task sets can "unfairly" get executors to be blacklisted that actually had very spread out failures, potentially far in the past).

It looks like this could be fixed by swapping lines 150 and 151 (with a comment that this is to handle long task sets)? I think this is what you were saying seems confusing, but I think is necessary to avoid blacklisting behavior that seems inconsistent with the timeout. Let me know if I'm misinterpreting the behavior here!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I think you are right, thanks for walking me through this example. I can't think of any behavior which is clearly "right" for long-running task sets, but I think what you are proposing is more clear.

The other situation I was worried about is when the failures occur at the beginning of the taskset, but by the time the taskset finishes we're already past the timeout. You'd never get app-level blacklisting. But thinking about this again, I think that is the best thing to do.

One other alternative would be to completely ignore the time that individual tasks fail, and instead use the time the tasksets complete for the timeout. But I think that would be overall more confusing.

failuresByExec.foreach { case (exec, failuresInTaskSet) =>
val appFailuresOnExecutor =
executorIdToFailureList.getOrElseUpdate(exec, new ExecutorFailureList)
appFailuresOnExecutor.addFailures(stageId, stageAttemptId, failuresInTaskSet)
appFailuresOnExecutor.dropFailuresWithTimeoutBefore(now)
val newTotal = appFailuresOnExecutor.numUniqueTaskFailures

val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS
// If this pushes the total number of failures over the threshold, blacklist the executor.
// If its already blacklisted, we avoid "re-blacklisting" (which can happen if there were
// other tasks already running in another taskset when it got blacklisted), because it makes
// some of the logic around expiry times a little more confusing. But it also wouldn't be a
// problem to re-blacklist, with a later expiry time.
if (newTotal >= MAX_FAILURES_PER_EXEC && !executorIdToBlacklistStatus.contains(exec)) {
logInfo(s"Blacklisting executor id: $exec because it has $newTotal" +
s" task failures in successful task sets")
val node = failuresInTaskSet.node
executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTimeForNewBlacklists))
updateNextExpiryTime()

// In addition to blacklisting the executor, we also update the data for failures on the
// node, and potentially put the entire node into a blacklist as well.
val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(node, HashSet[String]())
blacklistedExecsOnNode += exec
// If the node is already in the blacklist, we avoid adding it again with a later expiry
// time.
if (blacklistedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE &&
!nodeIdToBlacklistExpiryTime.contains(node)) {
logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " +
s"executors blacklisted: ${blacklistedExecsOnNode}")
nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
}
}
}
}

def isExecutorBlacklisted(executorId: String): Boolean = {
executorIdToBlacklistStatus.contains(executorId)
}

/**
* Get the full set of nodes that are blacklisted. Unlike other methods in this class, this *IS*
* thread-safe -- no lock required on a taskScheduler.
*/
def nodeBlacklist(): Set[String] = {
_nodeBlacklist.get()
}

def isNodeBlacklisted(node: String): Boolean = {
nodeIdToBlacklistExpiryTime.contains(node)
}

def handleRemovedExecutor(executorId: String): Unit = {
// We intentionally do not clean up executors that are already blacklisted in
// nodeToBlacklistedExecs, so that if another executor on the same node gets blacklisted, we can
// blacklist the entire node. We also can't clean up executorIdToBlacklistStatus, so we can
// eventually remove the executor after the timeout. Despite not clearing those structures
// here, we don't expect they will grow too big since you won't get too many executors on one
// node, and the timeout will clear it up periodically in any case.
executorIdToFailureList -= executorId
}


/**
* Tracks all failures for one executor (that have not passed the timeout).
*
* In general we actually expect this to be extremely small, since it won't contain more than the
* maximum number of task failures before an executor is failed (default 2).
*/
private[scheduler] final class ExecutorFailureList extends Logging {

private case class TaskId(stage: Int, stageAttempt: Int, taskIndex: Int)

/**
* All failures on this executor in successful task sets.
*/
private var failuresAndExpiryTimes = ArrayBuffer[(TaskId, Long)]()
/**
* As an optimization, we track the min expiry time over all entries in failuresAndExpiryTimes
* so its quick to tell if there are any failures with expiry before the current time.
*/
private var minExpiryTime = Long.MaxValue

def addFailures(
stage: Int,
stageAttempt: Int,
failuresInTaskSet: ExecutorFailuresInTaskSet): Unit = {
failuresInTaskSet.taskToFailureCountAndFailureTime.foreach {
case (taskIdx, (_, failureTime)) =>
val expiryTime = failureTime + BLACKLIST_TIMEOUT_MILLIS
failuresAndExpiryTimes += ((TaskId(stage, stageAttempt, taskIdx), expiryTime))
if (expiryTime < minExpiryTime) {
minExpiryTime = expiryTime
}
}
}

/**
* The number of unique tasks that failed on this executor. Only counts failures within the
* timeout, and in successful tasksets.
*/
def numUniqueTaskFailures: Int = failuresAndExpiryTimes.size

def isEmpty: Boolean = failuresAndExpiryTimes.isEmpty

/**
* Apply the timeout to individual tasks. This is to prevent one-off failures that are very
* spread out in time (and likely have nothing to do with problems on the executor) from
* triggering blacklisting. However, note that we do *not* remove executors and nodes from
* the blacklist as we expire individual task failures -- each have their own timeout. Eg.,
* suppose:
* * timeout = 10, maxFailuresPerExec = 2
* * Task 1 fails on exec 1 at time 0
* * Task 2 fails on exec 1 at time 5
* --> exec 1 is blacklisted from time 5 - 15.
* This is to simplify the implementation, as well as keep the behavior easier to understand
* for the end user.
*/
def dropFailuresWithTimeoutBefore(dropBefore: Long): Unit = {
if (minExpiryTime < dropBefore) {
var newMinExpiry = Long.MaxValue
val newFailures = new ArrayBuffer[(TaskId, Long)]
failuresAndExpiryTimes.foreach { case (task, expiryTime) =>
if (expiryTime >= dropBefore) {
newFailures += ((task, expiryTime))
if (expiryTime < newMinExpiry) {
newMinExpiry = expiryTime
}
}
}
failuresAndExpiryTimes = newFailures
minExpiryTime = newMinExpiry
}
}

override def toString(): String = {
s"failures = $failuresAndExpiryTimes"
}
}

}

private[scheduler] object BlacklistTracker extends Logging {

Expand Down Expand Up @@ -80,7 +344,9 @@ private[scheduler] object BlacklistTracker extends Logging {
config.MAX_TASK_ATTEMPTS_PER_EXECUTOR,
config.MAX_TASK_ATTEMPTS_PER_NODE,
config.MAX_FAILURES_PER_EXEC_STAGE,
config.MAX_FAILED_EXEC_PER_NODE_STAGE
config.MAX_FAILED_EXEC_PER_NODE_STAGE,
config.MAX_FAILURES_PER_EXEC,
config.MAX_FAILED_EXEC_PER_NODE
).foreach { config =>
val v = conf.get(config)
if (v <= 0) {
Expand Down Expand Up @@ -112,3 +378,5 @@ private[scheduler] object BlacklistTracker extends Logging {
}
}
}

private final case class BlacklistedExecutor(node: String, expiryTime: Long)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about BlacklistStatus? "Executor" is confusing, since the class doesn't contain anything about an executor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this class inside BlacklistTracker?

Or I wonder if it would be better to rename the map to executorIdToNodeAndExpiryTime and then just put a 2-item tuple in the map rather than this simple datastructure (don't have strong feelings though, if you prefer the class)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, moved this and ExecutorFailureList as well. I'd prefer to keep it as a case class since there are uses like executorIdToBlacklistStatus.filter(_._2.expiryTime < now).keys and I think that is much clearer with the expiryTime in there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, actually there is a minor reason why I can't put it inside BlacklistTracker -- I expose executorIdToBlacklistStatus just for tests, but then the compiler complains that BlacklistedExecutor escapes its defining scope

Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,30 @@ import scala.collection.mutable.HashMap
private[scheduler] class ExecutorFailuresInTaskSet(val node: String) {
/**
* Mapping from index of the tasks in the taskset, to the number of times it has failed on this
* executor.
* executor and the most recent failure time.
*/
val taskToFailureCount = HashMap[Int, Int]()
val taskToFailureCountAndFailureTime = HashMap[Int, (Int, Long)]()

def updateWithFailure(taskIndex: Int): Unit = {
val prevFailureCount = taskToFailureCount.getOrElse(taskIndex, 0)
taskToFailureCount(taskIndex) = prevFailureCount + 1
def updateWithFailure(taskIndex: Int, failureTime: Long): Unit = {
val (prevFailureCount, prevFailureTime) =
taskToFailureCountAndFailureTime.getOrElse(taskIndex, (0, -1L))
// these times always come from the driver, so we don't need to worry about skew, but might
// as well still be defensive in case there is non-monotonicity in the clock
val newFailureTime = math.max(prevFailureTime, failureTime)
taskToFailureCountAndFailureTime(taskIndex) = (prevFailureCount + 1, newFailureTime)
}

def numUniqueTasksWithFailures: Int = taskToFailureCount.size
def numUniqueTasksWithFailures: Int = taskToFailureCountAndFailureTime.size

/**
* Return the number of times this executor has failed on the given task index.
*/
def getNumTaskFailures(index: Int): Int = {
taskToFailureCount.getOrElse(index, 0)
taskToFailureCountAndFailureTime.getOrElse(index, (0, 0))._1
}

override def toString(): String = {
s"numUniqueTasksWithFailures = $numUniqueTasksWithFailures; " +
s"tasksToFailureCount = $taskToFailureCount"
s"tasksToFailureCount = $taskToFailureCountAndFailureTime"
}
}
Loading