Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,19 +182,19 @@ private[spark] class ExecutorMonitor(
if (updateExecutors) {
val activeShuffleIds = shuffleStages.map(_._2).toSeq
var needTimeoutUpdate = false
val activatedExecs = new mutable.ArrayBuffer[String]()
val activatedExecs = new ExecutorIdCollector()
executors.asScala.foreach { case (id, exec) =>
if (!exec.hasActiveShuffle) {
exec.updateActiveShuffles(activeShuffleIds)
if (exec.hasActiveShuffle) {
needTimeoutUpdate = true
activatedExecs += id
activatedExecs.add(id)
}
}
}

logDebug(s"Activated executors ${activatedExecs.mkString(",")} due to shuffle data " +
s"needed by new job ${event.jobId}.")
logDebug(s"Activated executors $activatedExecs due to shuffle data needed by new job" +
s"${event.jobId}.")

if (needTimeoutUpdate) {
nextTimeout.set(Long.MinValue)
Expand Down Expand Up @@ -233,18 +233,18 @@ private[spark] class ExecutorMonitor(
}
}

val deactivatedExecs = new mutable.ArrayBuffer[String]()
val deactivatedExecs = new ExecutorIdCollector()
executors.asScala.foreach { case (id, exec) =>
if (exec.hasActiveShuffle) {
exec.updateActiveShuffles(activeShuffles)
if (!exec.hasActiveShuffle) {
deactivatedExecs += id
deactivatedExecs.add(id)
}
}
}

logDebug(s"Executors ${deactivatedExecs.mkString(",")} do not have active shuffle data " +
s"after job ${event.jobId} finished.")
logDebug(s"Executors $deactivatedExecs do not have active shuffle data after job " +
s"${event.jobId} finished.")
}

jobToStageIDs.remove(event.jobId).foreach { stages =>
Expand Down Expand Up @@ -448,7 +448,8 @@ private[spark] class ExecutorMonitor(
} else {
idleTimeoutMs
}
idleStart + timeout
val deadline = idleStart + timeout
if (deadline >= 0) deadline else Long.MaxValue
} else {
Long.MaxValue
}
Expand Down Expand Up @@ -491,4 +492,22 @@ private[spark] class ExecutorMonitor(
private case class ShuffleCleanedEvent(id: Int) extends SparkListenerEvent {
override protected[spark] def logEvent: Boolean = false
}

/** Used to collect executor IDs for debug messages (and avoid too long messages). */
private class ExecutorIdCollector {
private val ids = if (log.isDebugEnabled) new mutable.ArrayBuffer[String]() else null
private var excess = 0

def add(id: String): Unit = if (log.isDebugEnabled) {
if (ids.size < 10) {
ids += id
} else {
excess += 1
}
}

override def toString(): String = {
ids.mkString(",") + (if (excess > 0) s" (and $excess more)" else "")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,26 @@ class ExecutorMonitorSuite extends SparkFunSuite {
assert(monitor.timedOutExecutors(idleDeadline).toSet === Set("1", "2"))
}

test("SPARK-28455: avoid overflow in timeout calculation") {
conf
.set(DYN_ALLOCATION_SHUFFLE_TIMEOUT, Long.MaxValue)
.set(DYN_ALLOCATION_SHUFFLE_TRACKING, true)
.set(SHUFFLE_SERVICE_ENABLED, false)
monitor = new ExecutorMonitor(conf, client, null, clock)

// Generate events that will make executor 1 be idle, while still holding shuffle data.
// The executor should not be eligible for removal since the timeout is basically "infinite".
val stage = stageInfo(1, shuffleId = 0)
monitor.onJobStart(SparkListenerJobStart(1, clock.getTimeMillis(), Seq(stage)))
clock.advance(1000L)
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
monitor.onTaskStart(SparkListenerTaskStart(1, 0, taskInfo("1", 1)))
monitor.onTaskEnd(SparkListenerTaskEnd(1, 0, "foo", Success, taskInfo("1", 1), null))
monitor.onJobEnd(SparkListenerJobEnd(1, clock.getTimeMillis(), JobSucceeded))

assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
}

private def idleDeadline: Long = clock.getTimeMillis() + idleTimeoutMs + 1
private def storageDeadline: Long = clock.getTimeMillis() + storageTimeoutMs + 1
private def shuffleDeadline: Long = clock.getTimeMillis() + shuffleTimeoutMs + 1
Expand Down