diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index f5beb403555e9..d0337b6e34962 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -182,19 +182,19 @@ private[spark] class ExecutorMonitor( if (updateExecutors) { val activeShuffleIds = shuffleStages.map(_._2).toSeq var needTimeoutUpdate = false - val activatedExecs = new mutable.ArrayBuffer[String]() + val activatedExecs = new ExecutorIdCollector() executors.asScala.foreach { case (id, exec) => if (!exec.hasActiveShuffle) { exec.updateActiveShuffles(activeShuffleIds) if (exec.hasActiveShuffle) { needTimeoutUpdate = true - activatedExecs += id + activatedExecs.add(id) } } } - logDebug(s"Activated executors ${activatedExecs.mkString(",")} due to shuffle data " + - s"needed by new job ${event.jobId}.") + logDebug(s"Activated executors $activatedExecs due to shuffle data needed by new job" + + s"${event.jobId}.") if (needTimeoutUpdate) { nextTimeout.set(Long.MinValue) @@ -233,18 +233,18 @@ private[spark] class ExecutorMonitor( } } - val deactivatedExecs = new mutable.ArrayBuffer[String]() + val deactivatedExecs = new ExecutorIdCollector() executors.asScala.foreach { case (id, exec) => if (exec.hasActiveShuffle) { exec.updateActiveShuffles(activeShuffles) if (!exec.hasActiveShuffle) { - deactivatedExecs += id + deactivatedExecs.add(id) } } } - logDebug(s"Executors ${deactivatedExecs.mkString(",")} do not have active shuffle data " + - s"after job ${event.jobId} finished.") + logDebug(s"Executors $deactivatedExecs do not have active shuffle data after job " + + s"${event.jobId} finished.") } jobToStageIDs.remove(event.jobId).foreach { stages => @@ -448,7 +448,8 @@ private[spark] class ExecutorMonitor( } else { idleTimeoutMs } - idleStart + timeout + val deadline = idleStart + timeout + if (deadline >= 0) deadline else Long.MaxValue } else { Long.MaxValue } @@ -491,4 +492,22 @@ private[spark] class ExecutorMonitor( private case class ShuffleCleanedEvent(id: Int) extends SparkListenerEvent { override protected[spark] def logEvent: Boolean = false } + + /** Used to collect executor IDs for debug messages (and avoid too long messages). */ + private class ExecutorIdCollector { + private val ids = if (log.isDebugEnabled) new mutable.ArrayBuffer[String]() else null + private var excess = 0 + + def add(id: String): Unit = if (log.isDebugEnabled) { + if (ids.size < 10) { + ids += id + } else { + excess += 1 + } + } + + override def toString(): String = { + ids.mkString(",") + (if (excess > 0) s" (and $excess more)" else "") + } + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala index e11ee97469b00..6a25754fcbe5a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala @@ -367,6 +367,26 @@ class ExecutorMonitorSuite extends SparkFunSuite { assert(monitor.timedOutExecutors(idleDeadline).toSet === Set("1", "2")) } + test("SPARK-28455: avoid overflow in timeout calculation") { + conf + .set(DYN_ALLOCATION_SHUFFLE_TIMEOUT, Long.MaxValue) + .set(DYN_ALLOCATION_SHUFFLE_TRACKING, true) + .set(SHUFFLE_SERVICE_ENABLED, false) + monitor = new ExecutorMonitor(conf, client, null, clock) + + // Generate events that will make executor 1 be idle, while still holding shuffle data. + // The executor should not be eligible for removal since the timeout is basically "infinite". + val stage = stageInfo(1, shuffleId = 0) + monitor.onJobStart(SparkListenerJobStart(1, clock.getTimeMillis(), Seq(stage))) + clock.advance(1000L) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null)) + monitor.onTaskStart(SparkListenerTaskStart(1, 0, taskInfo("1", 1))) + monitor.onTaskEnd(SparkListenerTaskEnd(1, 0, "foo", Success, taskInfo("1", 1), null)) + monitor.onJobEnd(SparkListenerJobEnd(1, clock.getTimeMillis(), JobSucceeded)) + + assert(monitor.timedOutExecutors(idleDeadline).isEmpty) + } + private def idleDeadline: Long = clock.getTimeMillis() + idleTimeoutMs + 1 private def storageDeadline: Long = clock.getTimeMillis() + storageTimeoutMs + 1 private def shuffleDeadline: Long = clock.getTimeMillis() + shuffleTimeoutMs + 1