From 91b8aeff8adca4454b9631a0bfa01876de71bb53 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Sat, 4 Mar 2017 15:47:36 -0800 Subject: [PATCH 01/16] Allow reason to be specified for task kill fix compile really fix compile --- .../org/apache/spark/TaskEndReason.scala | 7 ++- .../CoarseGrainedExecutorBackend.scala | 4 +- .../org/apache/spark/executor/Executor.scala | 45 +++++++++++++------ .../apache/spark/scheduler/DAGScheduler.scala | 2 +- .../spark/scheduler/SchedulerBackend.scala | 14 +++++- .../spark/scheduler/TaskSchedulerImpl.scala | 5 ++- .../spark/scheduler/TaskSetManager.scala | 4 +- .../cluster/CoarseGrainedClusterMessage.scala | 4 +- .../CoarseGrainedSchedulerBackend.scala | 11 +++-- .../local/LocalSchedulerBackend.scala | 13 +++--- .../scala/org/apache/spark/ui/UIUtils.scala | 4 +- .../apache/spark/ui/jobs/ExecutorTable.scala | 4 +- .../spark/ui/jobs/JobProgressListener.scala | 17 ++++--- .../org/apache/spark/ui/jobs/StageTable.scala | 3 +- .../org/apache/spark/ui/jobs/UIData.scala | 6 +-- .../org/apache/spark/util/JsonProtocol.scala | 10 ++++- .../apache/spark/executor/ExecutorSuite.scala | 2 +- .../OutputCommitCoordinatorSuite.scala | 4 +- .../scheduler/SchedulerIntegrationSuite.scala | 4 +- .../spark/scheduler/TaskSetManagerSuite.scala | 16 ++++--- .../org/apache/spark/ui/UIUtilsSuite.scala | 2 +- .../ui/jobs/JobProgressListenerSuite.scala | 5 ++- .../apache/spark/util/JsonProtocolSuite.scala | 7 ++- .../spark/executor/MesosExecutorBackend.scala | 4 +- .../MesosFineGrainedSchedulerBackend.scala | 4 +- .../spark/streaming/ui/AllBatchesTable.scala | 2 +- 26 files changed, 137 insertions(+), 66 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 8c1b5f7bf0d9..2556bfb40634 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -61,6 +61,9 @@ sealed trait TaskFailedReason extends TaskEndReason { * on was killed. */ def countTowardsTaskFailures: Boolean = true + + /** Whether this task should be retried by the scheduler. */ + def shouldRetry: Boolean = false } /** @@ -212,8 +215,8 @@ case object TaskResultLost extends TaskFailedReason { * Task was killed intentionally and needs to be rescheduled. */ @DeveloperApi -case object TaskKilled extends TaskFailedReason { - override def toErrorString: String = "TaskKilled (killed intentionally)" +case class TaskKilled(reason: String, override val shouldRetry: Boolean) extends TaskFailedReason { + override def toErrorString: String = s"TaskKilled ($reason)" override def countTowardsTaskFailures: Boolean = false } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index b376ecd301ea..955a6d09f998 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -97,11 +97,11 @@ private[spark] class CoarseGrainedExecutorBackend( executor.launchTask(this, taskDesc) } - case KillTask(taskId, _, interruptThread) => + case KillTask(taskId, _, interruptThread, reason, retryTask) => if (executor == null) { exitExecutor(1, "Received KillTask command but executor was null") } else { - executor.killTask(taskId, interruptThread) + executor.killTask(taskId, interruptThread, reason, retryTask) } case StopExecutor => diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 790c1ae94247..f92c37a07f11 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -158,7 +158,8 @@ private[spark] class Executor( threadPool.execute(tr) } - def killTask(taskId: Long, interruptThread: Boolean): Unit = { + def killTask( + taskId: Long, interruptThread: Boolean, reason: String, shouldRetry: Boolean): Unit = { val taskRunner = runningTasks.get(taskId) if (taskRunner != null) { if (taskReaperEnabled) { @@ -168,7 +169,9 @@ private[spark] class Executor( case Some(existingReaper) => interruptThread && !existingReaper.interruptThread } if (shouldCreateReaper) { - val taskReaper = new TaskReaper(taskRunner, interruptThread = interruptThread) + val taskReaper = new TaskReaper( + taskRunner, interruptThread = interruptThread, reason = reason, + shouldRetry = shouldRetry) taskReaperForTask(taskId) = taskReaper Some(taskReaper) } else { @@ -178,7 +181,8 @@ private[spark] class Executor( // Execute the TaskReaper from outside of the synchronized block. maybeNewTaskReaper.foreach(taskReaperPool.execute) } else { - taskRunner.kill(interruptThread = interruptThread) + taskRunner.kill( + interruptThread = interruptThread, reason = reason, shouldRetry = shouldRetry) } } } @@ -189,8 +193,9 @@ private[spark] class Executor( * tasks instead of taking the JVM down. * @param interruptThread whether to interrupt the task thread */ - def killAllTasks(interruptThread: Boolean) : Unit = { - runningTasks.keys().asScala.foreach(t => killTask(t, interruptThread = interruptThread)) + def killAllTasks(interruptThread: Boolean, reason: String) : Unit = { + runningTasks.keys().asScala.foreach(t => + killTask(t, interruptThread = interruptThread, reason = reason, shouldRetry = false)) } def stop(): Unit = { @@ -220,6 +225,12 @@ private[spark] class Executor( /** Whether this task has been killed. */ @volatile private var killed = false + /** The reason this task was killed. */ + @volatile private var killReason: String = null + + /** Whether to retry this killed task. */ + @volatile private var retryIfKilled: Boolean = false + @volatile private var threadId: Long = -1 def getThreadId: Long = threadId @@ -239,8 +250,10 @@ private[spark] class Executor( */ @volatile var task: Task[Any] = _ - def kill(interruptThread: Boolean): Unit = { - logInfo(s"Executor is trying to kill $taskName (TID $taskId)") + def kill(interruptThread: Boolean, reason: String, shouldRetry: Boolean): Unit = { + logInfo(s"Executor is trying to kill $taskName (TID $taskId), reason: $reason") + retryIfKilled = shouldRetry + killReason = reason killed = true if (task != null) { synchronized { @@ -427,14 +440,17 @@ private[spark] class Executor( execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) case _: TaskKilledException => - logInfo(s"Executor killed $taskName (TID $taskId)") + logInfo(s"Executor killed $taskName (TID $taskId), reason: $killReason") setTaskFinishedAndClearInterruptStatus() - execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) + execBackend.statusUpdate( + taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason, retryIfKilled))) case _: InterruptedException if task.killed => - logInfo(s"Executor interrupted and killed $taskName (TID $taskId)") + logInfo( + s"Executor interrupted and preempted $taskName (TID $taskId), reason: $killReason") setTaskFinishedAndClearInterruptStatus() - execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) + execBackend.statusUpdate( + taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason, retryIfKilled))) case CausedBy(cDE: CommitDeniedException) => val reason = cDE.toTaskFailedReason @@ -512,7 +528,9 @@ private[spark] class Executor( */ private class TaskReaper( taskRunner: TaskRunner, - val interruptThread: Boolean) + val interruptThread: Boolean, + val reason: String, + val shouldRetry: Boolean) extends Runnable { private[this] val taskId: Long = taskRunner.taskId @@ -533,7 +551,8 @@ private[spark] class Executor( // Only attempt to kill the task once. If interruptThread = false then a second kill // attempt would be a no-op and if interruptThread = true then it may not be safe or // effective to interrupt multiple times: - taskRunner.kill(interruptThread = interruptThread) + taskRunner.kill( + interruptThread = interruptThread, reason = reason, shouldRetry = shouldRetry) // Monitor the killed task until it exits. The synchronization logic here is complicated // because we don't want to synchronize on the taskRunner while possibly taking a thread // dump, but we also need to be careful to avoid races between checking whether the task diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 692ed8083475..58b73240c742 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1345,7 +1345,7 @@ class DAGScheduler( case TaskResultLost => // Do nothing here; the TaskScheduler handles these failures and resubmits the task. - case _: ExecutorLostFailure | TaskKilled | UnknownReason => + case _: ExecutorLostFailure | _: TaskKilled | UnknownReason => // Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler // will abort the job. } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 8801a761afae..c2026f374ab0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -30,8 +30,20 @@ private[spark] trait SchedulerBackend { def reviveOffers(): Unit def defaultParallelism(): Int - def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = + /** + * Requests that an executor kills a running task. + * + * @param taskId Id of the task. + * @param executorId Id of the executor the task is running on. + * @param interruptThread Whether the executor should interrupt the task thread. + * @param reason The reason for the task kill. + * @param shouldRetry Whether the scheduler should retry the task. + */ + def killTask( + taskId: Long, executorId: String, interruptThread: Boolean, reason: String, + shouldRetry: Boolean): Unit = throw new UnsupportedOperationException + def isReady(): Boolean = true /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index bfbcfa1aa386..5e215d09a6a6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -239,7 +239,8 @@ private[spark] class TaskSchedulerImpl private[scheduler]( // simply abort the stage. tsm.runningTasksSet.foreach { tid => val execId = taskIdToExecutorId(tid) - backend.killTask(tid, execId, interruptThread) + backend.killTask( + tid, execId, interruptThread, reason = "stage cancelled", shouldRetry = false) } tsm.abort("Stage %s cancelled".format(stageId)) logInfo("Stage %d was cancelled".format(stageId)) @@ -467,7 +468,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( taskState: TaskState, reason: TaskFailedReason): Unit = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) - if (!taskSetManager.isZombie && taskState != TaskState.KILLED) { + if (!taskSetManager.isZombie && reason.shouldRetry) { // Need to revive offers again now that the task set manager state has been updated to // reflect failed tasks that need to be re-run. backend.reviveOffers() diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 19ebaf817e24..4201018bc1a9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -710,7 +710,9 @@ private[spark] class TaskSetManager( logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + s"as the attempt ${info.attemptNumber} succeeded on ${info.host}") - sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, true) + sched.backend.killTask( + attemptInfo.taskId, attemptInfo.executorId, interruptThread = true, + reason = "another attempt succeeded", shouldRetry = false) } if (!successful(index)) { tasksSuccessful += 1 diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 2898cd7d17ca..6bd98c772ca6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -40,7 +40,9 @@ private[spark] object CoarseGrainedClusterMessages { // Driver to executors case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage - case class KillTask(taskId: Long, executor: String, interruptThread: Boolean) + case class KillTask( + taskId: Long, executor: String, interruptThread: Boolean, reason: String, + shouldRetry: Boolean) extends CoarseGrainedClusterMessage case class KillExecutorsOnHost(host: String) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 94abe30bb12f..91f40e173d3e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -132,10 +132,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case ReviveOffers => makeOffers() - case KillTask(taskId, executorId, interruptThread) => + case KillTask(taskId, executorId, interruptThread, reason, shouldRetry) => executorDataMap.get(executorId) match { case Some(executorInfo) => - executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread)) + executorInfo.executorEndpoint.send( + KillTask(taskId, executorId, interruptThread, reason, shouldRetry)) case None => // Ignoring the task kill since the executor is not registered. logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.") @@ -414,8 +415,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp driverEndpoint.send(ReviveOffers) } - override def killTask(taskId: Long, executorId: String, interruptThread: Boolean) { - driverEndpoint.send(KillTask(taskId, executorId, interruptThread)) + override def killTask( + taskId: Long, executorId: String, interruptThread: Boolean, reason: String, + shouldRetry: Boolean) { + driverEndpoint.send(KillTask(taskId, executorId, interruptThread, reason, shouldRetry)) } override def defaultParallelism(): Int = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index 625f998cd460..fb47cd70d5b9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -34,7 +34,8 @@ private case class ReviveOffers() private case class StatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) -private case class KillTask(taskId: Long, interruptThread: Boolean) +private case class KillTask( + taskId: Long, interruptThread: Boolean, reason: String, shouldRetry: Boolean) private case class StopExecutor() @@ -70,8 +71,8 @@ private[spark] class LocalEndpoint( reviveOffers() } - case KillTask(taskId, interruptThread) => - executor.killTask(taskId, interruptThread) + case KillTask(taskId, interruptThread, reason, shouldRetry) => + executor.killTask(taskId, interruptThread, reason, shouldRetry) } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -143,8 +144,10 @@ private[spark] class LocalSchedulerBackend( override def defaultParallelism(): Int = scheduler.conf.getInt("spark.default.parallelism", totalCores) - override def killTask(taskId: Long, executorId: String, interruptThread: Boolean) { - localEndpoint.send(KillTask(taskId, interruptThread)) + override def killTask( + taskId: Long, executorId: String, interruptThread: Boolean, reason: String, + shouldRetry: Boolean) { + localEndpoint.send(KillTask(taskId, interruptThread, reason, shouldRetry)) } override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) { diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index d161843dd223..8c84b31f12cd 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -342,7 +342,7 @@ private[spark] object UIUtils extends Logging { completed: Int, failed: Int, skipped: Int, - killed: Int, + killed: Map[String, Int], total: Int): Seq[Node] = { val completeWidth = "width: %s%%".format((completed.toDouble/total)*100) // started + completed can be > total when there are speculative tasks @@ -354,7 +354,7 @@ private[spark] object UIUtils extends Logging { {completed}/{total} { if (failed > 0) s"($failed failed)" } { if (skipped > 0) s"($skipped skipped)" } - { if (killed > 0) s"($killed killed)" } + { killed.map { case (reason, count) => s"($count killed: $reason)" } }
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index cd1b02addc78..fa098ea4689a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -133,9 +133,9 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage {executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")} {UIUtils.formatDuration(v.taskTime)} - {v.failedTasks + v.succeededTasks + v.killedTasks} + {v.failedTasks + v.succeededTasks + v.killedTasks.map(_._2).sum} {v.failedTasks} - {v.killedTasks} + {v.killedTasks.map(_._2).sum} {v.succeededTasks} {if (stageData.hasInput) { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index e87caff42643..66773d526e26 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -371,8 +371,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { taskEnd.reason match { case Success => execSummary.succeededTasks += 1 - case TaskKilled => - execSummary.killedTasks += 1 + case kill: TaskKilled => + execSummary.killedTasks = execSummary.killedTasks.updated( + kill.reason, execSummary.killedTasks.getOrElse(kill.reason, 0) + 1) case _ => execSummary.failedTasks += 1 } @@ -385,9 +386,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageData.completedIndices.add(info.index) stageData.numCompleteTasks += 1 None - case TaskKilled => - stageData.numKilledTasks += 1 - Some(TaskKilled.toErrorString) + case kill: TaskKilled => + stageData.numKilledTasks = stageData.numKilledTasks.updated( + kill.reason, stageData.numKilledTasks.getOrElse(kill.reason, 0) + 1) + Some(kill.toErrorString) case e: ExceptionFailure => // Handle ExceptionFailure because we might have accumUpdates stageData.numFailedTasks += 1 Some(e.toErrorString) @@ -422,8 +424,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { taskEnd.reason match { case Success => jobData.numCompletedTasks += 1 - case TaskKilled => - jobData.numKilledTasks += 1 + case kill: TaskKilled => + jobData.numKilledTasks = jobData.numKilledTasks.updated( + kill.reason, jobData.numKilledTasks.getOrElse(kill.reason, 0) + 1) case _ => jobData.numFailedTasks += 1 } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index e1fa9043b6a1..b3f850ebc0b9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -300,7 +300,8 @@ private[ui] class StagePagedTable( {UIUtils.makeProgressBar(started = stageData.numActiveTasks, completed = stageData.completedIndices.size, failed = stageData.numFailedTasks, - skipped = 0, killed = stageData.numKilledTasks, total = info.numTasks)} + skipped = 0, killed = stageData.numKilledTasks, + total = info.numTasks)} {data.inputReadWithUnit} {data.outputWriteWithUnit} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 073f7edfc2fe..de03a9f51548 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -32,7 +32,7 @@ private[spark] object UIData { var taskTime : Long = 0 var failedTasks : Int = 0 var succeededTasks : Int = 0 - var killedTasks : Int = 0 + var killedTasks : Map[String, Int] = Map.empty var inputBytes : Long = 0 var inputRecords : Long = 0 var outputBytes : Long = 0 @@ -64,7 +64,7 @@ private[spark] object UIData { var numCompletedTasks: Int = 0, var numSkippedTasks: Int = 0, var numFailedTasks: Int = 0, - var numKilledTasks: Int = 0, + var numKilledTasks: Map[String, Int] = Map.empty, /* Stages */ var numActiveStages: Int = 0, // This needs to be a set instead of a simple count to prevent double-counting of rerun stages: @@ -78,7 +78,7 @@ private[spark] object UIData { var numCompleteTasks: Int = _ var completedIndices = new OpenHashSet[Int]() var numFailedTasks: Int = _ - var numKilledTasks: Int = _ + var numKilledTasks: Map[String, Int] = Map.empty var executorRunTime: Long = _ var executorCpuTime: Long = _ diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 4b4d2d10cbf8..8dc871dc1b2d 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -390,6 +390,9 @@ private[spark] object JsonProtocol { ("Executor ID" -> executorId) ~ ("Exit Caused By App" -> exitCausedByApp) ~ ("Loss Reason" -> reason.map(_.toString)) + case taskKilled: TaskKilled => + ("Kill Reason" -> taskKilled.reason) ~ + ("Should Retry" -> taskKilled.shouldRetry) case _ => Utils.emptyJson } ("Reason" -> reason) ~ json @@ -877,7 +880,12 @@ private[spark] object JsonProtocol { })) ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates) case `taskResultLost` => TaskResultLost - case `taskKilled` => TaskKilled + case `taskKilled` => + val killReason = Utils.jsonOption(json \ "Kill Reason") + .map(_.extract[String]).getOrElse("unknown reason") + val shouldRetry = Utils.jsonOption(json \ "Should Retry") + .map(_.extract[Boolean]).getOrElse(false) + TaskKilled(killReason, shouldRetry) case `taskCommitDenied` => // Unfortunately, the `TaskCommitDenied` message was introduced in 1.3.0 but the JSON // de/serialization logic was not added until 1.5.1. To provide backward compatibility diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 8150fff2d018..d2c1a52e61cb 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -110,7 +110,7 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug } // we know the task will be started, but not yet deserialized, because of the latches we // use in mockExecutorBackend. - executor.killAllTasks(true) + executor.killAllTasks(true, "test") executorSuiteHelper.latch2.countDown() if (!executorSuiteHelper.latch3.await(5, TimeUnit.SECONDS)) { fail("executor did not send second status update in time") diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 83ed12752074..b24836b02b43 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -175,13 +175,13 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { assert(!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter)) // The non-authorized committer fails outputCommitCoordinator.taskCompleted( - stage, partition, attemptNumber = nonAuthorizedCommitter, reason = TaskKilled) + stage, partition, attemptNumber = nonAuthorizedCommitter, reason = TaskKilled("test", false)) // New tasks should still not be able to commit because the authorized committer has not failed assert( !outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 1)) // The authorized committer now fails, clearing the lock outputCommitCoordinator.taskCompleted( - stage, partition, attemptNumber = authorizedCommitter, reason = TaskKilled) + stage, partition, attemptNumber = authorizedCommitter, reason = TaskKilled("test", false)) // A new task should now be allowed to become the authorized committer assert( outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 2)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 398ac3d6202d..54edf121ce0d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -410,7 +410,9 @@ private[spark] abstract class MockBackend( } } - override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = { + override def killTask( + taskId: Long, executorId: String, interruptThread: Boolean, reason: String, + shouldRetry: Boolean): Unit = { // We have to implement this b/c of SPARK-15385. // Its OK for this to be a no-op, because even if a backend does implement killTask, // it really can only be "best-effort" in any case, and the scheduler should be robust to that. diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 2c2cda9f318e..0aacd4a623dd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -671,7 +671,9 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2")) sched.initialize(new FakeSchedulerBackend() { - override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {} + override def killTask( + taskId: Long, executorId: String, interruptThread: Boolean, reason: String, + shouldRetry: Boolean): Unit = {} }) // Keep track of the number of tasks that are resubmitted, @@ -927,7 +929,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // Complete the speculative attempt for the running task manager.handleSuccessfulTask(4, createTaskResult(3, accumUpdatesByTask(3))) // Verify that it kills other running attempt - verify(sched.backend).killTask(3, "exec2", true) + verify(sched.backend).killTask(3, "exec2", true, "another attempt succeeded", false) // Because the SchedulerBackend was a mock, the 2nd copy of the task won't actually be // killed, so the FakeTaskScheduler is only told about the successful completion // of the speculated task. @@ -1013,14 +1015,15 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg manager.handleSuccessfulTask(speculativeTask.taskId, createTaskResult(3, accumUpdatesByTask(3))) // Verify that it kills other running attempt val origTask = originalTasks(speculativeTask.index) - verify(sched.backend).killTask(origTask.taskId, "exec2", true) + verify(sched.backend).killTask( + origTask.taskId, "exec2", true, "another attempt succeeded", false) // Because the SchedulerBackend was a mock, the 2nd copy of the task won't actually be // killed, so the FakeTaskScheduler is only told about the successful completion // of the speculated task. assert(sched.endedTasks(3) === Success) // also because the scheduler is a mock, our manager isn't notified about the task killed event, // so we do that manually - manager.handleFailedTask(origTask.taskId, TaskState.KILLED, TaskKilled) + manager.handleFailedTask(origTask.taskId, TaskState.KILLED, TaskKilled("test", false)) // this task has "failed" 4 times, but one of them doesn't count, so keep running the stage assert(manager.tasksSuccessful === 4) assert(!manager.isZombie) @@ -1037,7 +1040,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg createTaskResult(3, accumUpdatesByTask(3))) // Verify that it kills other running attempt val origTask2 = originalTasks(speculativeTask2.index) - verify(sched.backend).killTask(origTask2.taskId, "exec2", true) + verify(sched.backend).killTask( + origTask2.taskId, "exec2", true, "another attempt succeeded", false) assert(manager.tasksSuccessful === 5) assert(manager.isZombie) } @@ -1093,7 +1097,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg tsmSpy.handleFailedTask(taskDescs(2).taskId, TaskState.FAILED, TaskCommitDenied(0, 2, 0)) tsmSpy.handleFailedTask(taskDescs(3).taskId, TaskState.KILLED, - TaskKilled) + TaskKilled("test", false)) // Make sure that the blacklist ignored all of the task failures above, since they aren't // the fault of the executor where the task was running. diff --git a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala index 6335d905c0fb..c770fd5da76f 100644 --- a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala @@ -110,7 +110,7 @@ class UIUtilsSuite extends SparkFunSuite { } test("SPARK-11906: Progress bar should not overflow because of speculative tasks") { - val generated = makeProgressBar(2, 3, 0, 0, 0, 4).head.child.filter(_.label == "div") + val generated = makeProgressBar(2, 3, 0, 0, Map.empty, 4).head.child.filter(_.label == "div") val expected = Seq(
,
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index e3127da9a6b2..783b26975166 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -274,8 +274,9 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with // Make sure killed tasks are accounted for correctly. listener.onTaskEnd( - SparkListenerTaskEnd(task.stageId, 0, taskType, TaskKilled, taskInfo, metrics)) - assert(listener.stageIdToData((task.stageId, 0)).numKilledTasks === 1) + SparkListenerTaskEnd( + task.stageId, 0, taskType, TaskKilled("test", false), taskInfo, metrics)) + assert(listener.stageIdToData((task.stageId, 0)).numKilledTasks === Map("test" -> 1)) // Make sure we count success as success. listener.onTaskEnd( diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 9f76c74bce89..3c928139e7ca 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -164,7 +164,8 @@ class JsonProtocolSuite extends SparkFunSuite { testTaskEndReason(fetchMetadataFailed) testTaskEndReason(exceptionFailure) testTaskEndReason(TaskResultLost) - testTaskEndReason(TaskKilled) + testTaskEndReason(TaskKilled("test", false)) + testTaskEndReason(TaskKilled("test", true)) testTaskEndReason(TaskCommitDenied(2, 3, 4)) testTaskEndReason(ExecutorLostFailure("100", true, Some("Induced failure"))) testTaskEndReason(UnknownReason) @@ -676,7 +677,9 @@ private[spark] object JsonProtocolSuite extends Assertions { assert(r1.fullStackTrace === r2.fullStackTrace) assertSeqEquals[AccumulableInfo](r1.accumUpdates, r2.accumUpdates, (a, b) => a.equals(b)) case (TaskResultLost, TaskResultLost) => - case (TaskKilled, TaskKilled) => + case (r1: TaskKilled, r2: TaskKilled) => + assert(r1.reason == r2.reason) + assert(r1.shouldRetry == r2.shouldRetry) case (TaskCommitDenied(jobId1, partitionId1, attemptNumber1), TaskCommitDenied(jobId2, partitionId2, attemptNumber2)) => assert(jobId1 === jobId2) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index b25253978258..701ce211ffec 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -104,7 +104,9 @@ private[spark] class MesosExecutorBackend logError("Received KillTask but executor was null") } else { // TODO: Determine the 'interruptOnCancel' property set for the given job. - executor.killTask(t.getValue.toLong, interruptThread = false) + executor.killTask( + t.getValue.toLong, interruptThread = false, reason = "killed intentionally", + shouldRetry = false) } } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index 7e561916a71e..3e274b103eb0 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -425,7 +425,9 @@ private[spark] class MesosFineGrainedSchedulerBackend( recordSlaveLost(d, slaveId, ExecutorExited(status, exitCausedByApp = true)) } - override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = { + override def killTask( + taskId: Long, executorId: String, interruptThread: Boolean, reason: String, + shouldRetry: Boolean): Unit = { mesosDriver.killTask( TaskID.newBuilder() .setValue(taskId.toString).build() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala index 1352ca1c4c95..06f458f2bdc6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -97,7 +97,7 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) completed = batch.numCompletedOutputOp, failed = batch.numFailedOutputOp, skipped = 0, - killed = 0, + killed = Map.empty, total = batch.outputOperations.size) } From a58d391ecf444a6f575da13df6b2a2fbe32de861 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 6 Mar 2017 16:13:12 -0800 Subject: [PATCH 02/16] add public api and update mima --- .../scala/org/apache/spark/SparkContext.scala | 19 +++++++++++++ .../apache/spark/scheduler/DAGScheduler.scala | 7 +++++ .../spark/scheduler/TaskScheduler.scala | 3 +++ .../spark/scheduler/TaskSchedulerImpl.scala | 8 ++++++ .../org/apache/spark/SparkContextSuite.scala | 27 +++++++++++++++++++ .../spark/scheduler/DAGSchedulerSuite.scala | 6 +++++ .../ExternalClusterManagerSuite.scala | 2 ++ project/MimaExcludes.scala | 14 +++++++++- 8 files changed, 85 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0e36a30c933d..34c6b07f60e8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2249,6 +2249,25 @@ class SparkContext(config: SparkConf) extends Logging { dagScheduler.cancelStage(stageId, None) } + /** + * Kill a given task. It will be retried. + * + * @param taskId the task ID to kill + */ + def killTask(taskId: Long): Unit = { + killTask(taskId, "cancelled") + } + + /** + * Kill a given task. It will be retried. + * + * @param taskId the task ID to kill + * @param reason the reason for killing the task, which should be a short string + */ + def killTask(taskId: Long, reason: String): Unit = { + dagScheduler.killTask(taskId, reason) + } + /** * Clean a closure to make it ready to serialized and send to tasks * (removes unreferenced variables in $outer's, updates REPL variables) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 58b73240c742..2d87f87bdca8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -731,6 +731,13 @@ class DAGScheduler( eventProcessLoop.post(StageCancelled(stageId, reason)) } + /** + * Kill a given task. It will be retried. + */ + def killTask(taskId: Long, reason: String): Unit = { + taskScheduler.killTask(taskId, interruptThread = true, reason, shouldRetry = true) + } + /** * Resubmit any failed stages. Ordinarily called after a small amount of time has passed since * the last fetch failure. diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index cd13eebe74a9..478ccaccd22b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -54,6 +54,9 @@ private[spark] trait TaskScheduler { // Cancel a stage. def cancelTasks(stageId: Int, interruptThread: Boolean): Unit + // Kill a task. + def killTask(taskId: Long, interruptThread: Boolean, reason: String, shouldRetry: Boolean): Unit + // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called. def setDAGScheduler(dagScheduler: DAGScheduler): Unit diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 5e215d09a6a6..b52fa08b9f4a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -248,6 +248,14 @@ private[spark] class TaskSchedulerImpl private[scheduler]( } } + override def killTask( + taskId: Long, interruptThread: Boolean, reason: String, shouldRetry: Boolean): Unit = { + logInfo(s"Killing task ($reason): $taskId") + val execId = taskIdToExecutorId.getOrElse( + taskId, throw new IllegalArgumentException("Task not found: " + taskId)) + backend.killTask(taskId, execId, interruptThread, reason, shouldRetry) + } + /** * Called to indicate that all task attempts (including speculated tasks) associated with the * given TaskSetManager have completed, so state associated with the TaskSetManager should be diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index f97a112ec127..77e13fe7bc78 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -538,10 +538,37 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } } + test("Killing tasks") { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + + SparkContextSuite.isTaskStarted = false + SparkContextSuite.taskKilled = false + + val listener = new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + eventually(timeout(10.seconds)) { + assert(SparkContextSuite.isTaskStarted) + } + if (!SparkContextSuite.taskKilled) { + SparkContextSuite.taskKilled = true + sc.killTask(taskStart.taskInfo.taskId, "first attempt will hang") + } + } + } + sc.addSparkListener(listener) + sc.parallelize(1 to 1).foreach { x => + if (!SparkContextSuite.isTaskStarted) { + SparkContextSuite.isTaskStarted = true + Thread.sleep(9999999) + } + // second try succeeds immediately + } + } } object SparkContextSuite { @volatile var cancelJob = false @volatile var cancelStage = false @volatile var isTaskStarted = false + @volatile var taskKilled = false } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 8eaf9dfcf49b..2cca00edb102 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -126,6 +126,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def cancelTasks(stageId: Int, interruptThread: Boolean) { cancelledStages += stageId } + override def killTask( + taskId: Long, interruptThread: Boolean, reason: String, shouldRetry: Boolean): Unit = {} override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} @@ -552,6 +554,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def cancelTasks(stageId: Int, interruptThread: Boolean) { throw new UnsupportedOperationException } + override def killTask( + taskId: Long, interruptThread: Boolean, reason: String, shouldRetry: Boolean): Unit = { + throw new UnsupportedOperationException + } override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} override def defaultParallelism(): Int = 2 override def executorHeartbeatReceived( diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index e87cebf0cf35..d909ef365db7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -79,6 +79,8 @@ private class DummyTaskScheduler extends TaskScheduler { override def stop(): Unit = {} override def submitTasks(taskSet: TaskSet): Unit = {} override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = {} + override def killTask( + taskId: Long, interruptThread: Boolean, reason: String, shouldRetry: Boolean): Unit = {} override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} override def defaultParallelism(): Int = 2 override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 56b8c0b95e8a..1b8e93d29799 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -64,7 +64,19 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.TaskData.$default$11"), // [SPARK-17161] Removing Python-friendly constructors not needed - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this"), + + // [SPARK-19820] Allow reason to be specified to task kill + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskFailedReason.shouldRetry"), + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.TaskKilled$"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.TaskKilled.productElement"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.TaskKilled.productArity"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.TaskKilled.canEqual"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.TaskKilled.productIterator"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.TaskKilled.countTowardsTaskFailures"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.TaskKilled.productPrefix"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.TaskKilled.toErrorString"), + ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.TaskKilled.toString") ) // Exclude rules for 2.1.x From 02d81b53cb7ea8a5383c800fb98c9a90c968cb8f Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 6 Mar 2017 17:08:10 -0800 Subject: [PATCH 03/16] chop down indent --- .../spark/executor/CoarseGrainedExecutorBackend.scala | 4 ++-- .../org/apache/spark/scheduler/SchedulerBackend.scala | 5 ++++- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 7 +++++-- .../scheduler/cluster/CoarseGrainedClusterMessage.scala | 5 ++++- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 5 ++++- .../main/scala/org/apache/spark/ui/jobs/StageTable.scala | 3 +-- .../apache/spark/scheduler/SchedulerIntegrationSuite.scala | 5 ++++- .../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 5 ++++- .../cluster/mesos/MesosFineGrainedSchedulerBackend.scala | 5 ++++- 9 files changed, 32 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 955a6d09f998..2b0777fb4644 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -97,11 +97,11 @@ private[spark] class CoarseGrainedExecutorBackend( executor.launchTask(this, taskDesc) } - case KillTask(taskId, _, interruptThread, reason, retryTask) => + case KillTask(taskId, _, interruptThread, reason, shouldRetry) => if (executor == null) { exitExecutor(1, "Received KillTask command but executor was null") } else { - executor.killTask(taskId, interruptThread, reason, retryTask) + executor.killTask(taskId, interruptThread, reason, shouldRetry) } case StopExecutor => diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index c2026f374ab0..6e6c83940b7c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -40,7 +40,10 @@ private[spark] trait SchedulerBackend { * @param shouldRetry Whether the scheduler should retry the task. */ def killTask( - taskId: Long, executorId: String, interruptThread: Boolean, reason: String, + taskId: Long, + executorId: String, + interruptThread: Boolean, + reason: String, shouldRetry: Boolean): Unit = throw new UnsupportedOperationException diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 4201018bc1a9..8ad02e0bb002 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -711,8 +711,11 @@ private[spark] class TaskSetManager( s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + s"as the attempt ${info.attemptNumber} succeeded on ${info.host}") sched.backend.killTask( - attemptInfo.taskId, attemptInfo.executorId, interruptThread = true, - reason = "another attempt succeeded", shouldRetry = false) + attemptInfo.taskId, + attemptInfo.executorId, + interruptThread = true, + reason = "another attempt succeeded", + shouldRetry = false) } if (!successful(index)) { tasksSuccessful += 1 diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 6bd98c772ca6..16231c631e29 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -41,7 +41,10 @@ private[spark] object CoarseGrainedClusterMessages { case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage case class KillTask( - taskId: Long, executor: String, interruptThread: Boolean, reason: String, + taskId: Long, + executor: String, + interruptThread: Boolean, + reason: String, shouldRetry: Boolean) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 91f40e173d3e..8c1431d7a0fb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -416,7 +416,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } override def killTask( - taskId: Long, executorId: String, interruptThread: Boolean, reason: String, + taskId: Long, + executorId: String, + interruptThread: Boolean, + reason: String, shouldRetry: Boolean) { driverEndpoint.send(KillTask(taskId, executorId, interruptThread, reason, shouldRetry)) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index b3f850ebc0b9..e1fa9043b6a1 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -300,8 +300,7 @@ private[ui] class StagePagedTable( {UIUtils.makeProgressBar(started = stageData.numActiveTasks, completed = stageData.completedIndices.size, failed = stageData.numFailedTasks, - skipped = 0, killed = stageData.numKilledTasks, - total = info.numTasks)} + skipped = 0, killed = stageData.numKilledTasks, total = info.numTasks)} {data.inputReadWithUnit} {data.outputWriteWithUnit} diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 54edf121ce0d..d7a998c02b94 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -411,7 +411,10 @@ private[spark] abstract class MockBackend( } override def killTask( - taskId: Long, executorId: String, interruptThread: Boolean, reason: String, + taskId: Long, + executorId: String, + interruptThread: Boolean, + reason: String, shouldRetry: Boolean): Unit = { // We have to implement this b/c of SPARK-15385. // Its OK for this to be a no-op, because even if a backend does implement killTask, diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 0aacd4a623dd..0b639ae5cb17 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -672,7 +672,10 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2")) sched.initialize(new FakeSchedulerBackend() { override def killTask( - taskId: Long, executorId: String, interruptThread: Boolean, reason: String, + taskId: Long, + executorId: String, + interruptThread: Boolean, + reason: String, shouldRetry: Boolean): Unit = {} }) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index 3e274b103eb0..b303e17661a1 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -426,7 +426,10 @@ private[spark] class MesosFineGrainedSchedulerBackend( } override def killTask( - taskId: Long, executorId: String, interruptThread: Boolean, reason: String, + taskId: Long, + executorId: String, + interruptThread: Boolean, + reason: String, shouldRetry: Boolean): Unit = { mesosDriver.killTask( TaskID.newBuilder() From 170fa34cdc607285a315cbb5c6fd98461ac10fe8 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 6 Mar 2017 17:47:07 -0800 Subject: [PATCH 04/16] remove shouldRetry param --- .../org/apache/spark/TaskEndReason.scala | 5 +--- .../CoarseGrainedExecutorBackend.scala | 4 +-- .../org/apache/spark/executor/Executor.scala | 26 +++++++------------ .../apache/spark/scheduler/DAGScheduler.scala | 2 +- .../spark/scheduler/SchedulerBackend.scala | 4 +-- .../spark/scheduler/TaskScheduler.scala | 2 +- .../spark/scheduler/TaskSchedulerImpl.scala | 10 +++---- .../spark/scheduler/TaskSetManager.scala | 3 +-- .../cluster/CoarseGrainedClusterMessage.scala | 6 +---- .../CoarseGrainedSchedulerBackend.scala | 12 +++------ .../local/LocalSchedulerBackend.scala | 12 ++++----- .../org/apache/spark/util/JsonProtocol.scala | 7 ++--- .../apache/spark/executor/ExecutorSuite.scala | 2 +- .../spark/scheduler/DAGSchedulerSuite.scala | 6 ++--- .../ExternalClusterManagerSuite.scala | 3 +-- .../OutputCommitCoordinatorSuite.scala | 4 +-- .../scheduler/SchedulerIntegrationSuite.scala | 6 +---- .../spark/scheduler/TaskSetManagerSuite.scala | 16 +++++------- .../ui/jobs/JobProgressListenerSuite.scala | 2 +- .../apache/spark/util/JsonProtocolSuite.scala | 4 +-- project/MimaExcludes.scala | 1 - .../spark/executor/MesosExecutorBackend.scala | 3 +-- .../MesosFineGrainedSchedulerBackend.scala | 6 +---- 23 files changed, 49 insertions(+), 97 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 2556bfb40634..a76283e33fa6 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -61,9 +61,6 @@ sealed trait TaskFailedReason extends TaskEndReason { * on was killed. */ def countTowardsTaskFailures: Boolean = true - - /** Whether this task should be retried by the scheduler. */ - def shouldRetry: Boolean = false } /** @@ -215,7 +212,7 @@ case object TaskResultLost extends TaskFailedReason { * Task was killed intentionally and needs to be rescheduled. */ @DeveloperApi -case class TaskKilled(reason: String, override val shouldRetry: Boolean) extends TaskFailedReason { +case class TaskKilled(reason: String) extends TaskFailedReason { override def toErrorString: String = s"TaskKilled ($reason)" override def countTowardsTaskFailures: Boolean = false } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 2b0777fb4644..ba0096d87456 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -97,11 +97,11 @@ private[spark] class CoarseGrainedExecutorBackend( executor.launchTask(this, taskDesc) } - case KillTask(taskId, _, interruptThread, reason, shouldRetry) => + case KillTask(taskId, _, interruptThread, reason) => if (executor == null) { exitExecutor(1, "Received KillTask command but executor was null") } else { - executor.killTask(taskId, interruptThread, reason, shouldRetry) + executor.killTask(taskId, interruptThread, reason) } case StopExecutor => diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index f92c37a07f11..8421c34556a8 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -159,7 +159,7 @@ private[spark] class Executor( } def killTask( - taskId: Long, interruptThread: Boolean, reason: String, shouldRetry: Boolean): Unit = { + taskId: Long, interruptThread: Boolean, reason: String): Unit = { val taskRunner = runningTasks.get(taskId) if (taskRunner != null) { if (taskReaperEnabled) { @@ -170,8 +170,7 @@ private[spark] class Executor( } if (shouldCreateReaper) { val taskReaper = new TaskReaper( - taskRunner, interruptThread = interruptThread, reason = reason, - shouldRetry = shouldRetry) + taskRunner, interruptThread = interruptThread, reason = reason) taskReaperForTask(taskId) = taskReaper Some(taskReaper) } else { @@ -181,8 +180,7 @@ private[spark] class Executor( // Execute the TaskReaper from outside of the synchronized block. maybeNewTaskReaper.foreach(taskReaperPool.execute) } else { - taskRunner.kill( - interruptThread = interruptThread, reason = reason, shouldRetry = shouldRetry) + taskRunner.kill(interruptThread = interruptThread, reason = reason) } } } @@ -195,7 +193,7 @@ private[spark] class Executor( */ def killAllTasks(interruptThread: Boolean, reason: String) : Unit = { runningTasks.keys().asScala.foreach(t => - killTask(t, interruptThread = interruptThread, reason = reason, shouldRetry = false)) + killTask(t, interruptThread = interruptThread, reason = reason)) } def stop(): Unit = { @@ -228,9 +226,6 @@ private[spark] class Executor( /** The reason this task was killed. */ @volatile private var killReason: String = null - /** Whether to retry this killed task. */ - @volatile private var retryIfKilled: Boolean = false - @volatile private var threadId: Long = -1 def getThreadId: Long = threadId @@ -250,9 +245,8 @@ private[spark] class Executor( */ @volatile var task: Task[Any] = _ - def kill(interruptThread: Boolean, reason: String, shouldRetry: Boolean): Unit = { + def kill(interruptThread: Boolean, reason: String): Unit = { logInfo(s"Executor is trying to kill $taskName (TID $taskId), reason: $reason") - retryIfKilled = shouldRetry killReason = reason killed = true if (task != null) { @@ -443,14 +437,14 @@ private[spark] class Executor( logInfo(s"Executor killed $taskName (TID $taskId), reason: $killReason") setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate( - taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason, retryIfKilled))) + taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason))) case _: InterruptedException if task.killed => logInfo( s"Executor interrupted and preempted $taskName (TID $taskId), reason: $killReason") setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate( - taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason, retryIfKilled))) + taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason))) case CausedBy(cDE: CommitDeniedException) => val reason = cDE.toTaskFailedReason @@ -529,8 +523,7 @@ private[spark] class Executor( private class TaskReaper( taskRunner: TaskRunner, val interruptThread: Boolean, - val reason: String, - val shouldRetry: Boolean) + val reason: String) extends Runnable { private[this] val taskId: Long = taskRunner.taskId @@ -551,8 +544,7 @@ private[spark] class Executor( // Only attempt to kill the task once. If interruptThread = false then a second kill // attempt would be a no-op and if interruptThread = true then it may not be safe or // effective to interrupt multiple times: - taskRunner.kill( - interruptThread = interruptThread, reason = reason, shouldRetry = shouldRetry) + taskRunner.kill(interruptThread = interruptThread, reason = reason) // Monitor the killed task until it exits. The synchronization logic here is complicated // because we don't want to synchronize on the taskRunner while possibly taking a thread // dump, but we also need to be careful to avoid races between checking whether the task diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 2d87f87bdca8..fc5ecb3798cc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -735,7 +735,7 @@ class DAGScheduler( * Kill a given task. It will be retried. */ def killTask(taskId: Long, reason: String): Unit = { - taskScheduler.killTask(taskId, interruptThread = true, reason, shouldRetry = true) + taskScheduler.killTask(taskId, true, reason) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 6e6c83940b7c..22db3350abfa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -37,14 +37,12 @@ private[spark] trait SchedulerBackend { * @param executorId Id of the executor the task is running on. * @param interruptThread Whether the executor should interrupt the task thread. * @param reason The reason for the task kill. - * @param shouldRetry Whether the scheduler should retry the task. */ def killTask( taskId: Long, executorId: String, interruptThread: Boolean, - reason: String, - shouldRetry: Boolean): Unit = + reason: String): Unit = throw new UnsupportedOperationException def isReady(): Boolean = true diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 478ccaccd22b..e5594636a581 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -55,7 +55,7 @@ private[spark] trait TaskScheduler { def cancelTasks(stageId: Int, interruptThread: Boolean): Unit // Kill a task. - def killTask(taskId: Long, interruptThread: Boolean, reason: String, shouldRetry: Boolean): Unit + def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called. def setDAGScheduler(dagScheduler: DAGScheduler): Unit diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index b52fa08b9f4a..31f94b91a014 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -239,8 +239,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( // simply abort the stage. tsm.runningTasksSet.foreach { tid => val execId = taskIdToExecutorId(tid) - backend.killTask( - tid, execId, interruptThread, reason = "stage cancelled", shouldRetry = false) + backend.killTask(tid, execId, interruptThread, reason = "stage cancelled") } tsm.abort("Stage %s cancelled".format(stageId)) logInfo("Stage %d was cancelled".format(stageId)) @@ -248,12 +247,11 @@ private[spark] class TaskSchedulerImpl private[scheduler]( } } - override def killTask( - taskId: Long, interruptThread: Boolean, reason: String, shouldRetry: Boolean): Unit = { + override def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit = { logInfo(s"Killing task ($reason): $taskId") val execId = taskIdToExecutorId.getOrElse( taskId, throw new IllegalArgumentException("Task not found: " + taskId)) - backend.killTask(taskId, execId, interruptThread, reason, shouldRetry) + backend.killTask(taskId, execId, interruptThread, reason) } /** @@ -476,7 +474,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( taskState: TaskState, reason: TaskFailedReason): Unit = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) - if (!taskSetManager.isZombie && reason.shouldRetry) { + if (!taskSetManager.isZombie) { // Need to revive offers again now that the task set manager state has been updated to // reflect failed tasks that need to be re-run. backend.reviveOffers() diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 8ad02e0bb002..e4c33e963b1d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -714,8 +714,7 @@ private[spark] class TaskSetManager( attemptInfo.taskId, attemptInfo.executorId, interruptThread = true, - reason = "another attempt succeeded", - shouldRetry = false) + reason = "another attempt succeeded") } if (!successful(index)) { tasksSuccessful += 1 diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 16231c631e29..86b602c2ab77 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -41,11 +41,7 @@ private[spark] object CoarseGrainedClusterMessages { case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage case class KillTask( - taskId: Long, - executor: String, - interruptThread: Boolean, - reason: String, - shouldRetry: Boolean) + taskId: Long, executor: String, interruptThread: Boolean, reason: String) extends CoarseGrainedClusterMessage case class KillExecutorsOnHost(host: String) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 8c1431d7a0fb..b5cb69f9c0e4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -132,11 +132,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case ReviveOffers => makeOffers() - case KillTask(taskId, executorId, interruptThread, reason, shouldRetry) => + case KillTask(taskId, executorId, interruptThread, reason) => executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.executorEndpoint.send( - KillTask(taskId, executorId, interruptThread, reason, shouldRetry)) + KillTask(taskId, executorId, interruptThread, reason)) case None => // Ignoring the task kill since the executor is not registered. logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.") @@ -416,12 +416,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } override def killTask( - taskId: Long, - executorId: String, - interruptThread: Boolean, - reason: String, - shouldRetry: Boolean) { - driverEndpoint.send(KillTask(taskId, executorId, interruptThread, reason, shouldRetry)) + taskId: Long, executorId: String, interruptThread: Boolean, reason: String) { + driverEndpoint.send(KillTask(taskId, executorId, interruptThread, reason)) } override def defaultParallelism(): Int = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index fb47cd70d5b9..35509bc2f85b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -34,8 +34,7 @@ private case class ReviveOffers() private case class StatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) -private case class KillTask( - taskId: Long, interruptThread: Boolean, reason: String, shouldRetry: Boolean) +private case class KillTask(taskId: Long, interruptThread: Boolean, reason: String) private case class StopExecutor() @@ -71,8 +70,8 @@ private[spark] class LocalEndpoint( reviveOffers() } - case KillTask(taskId, interruptThread, reason, shouldRetry) => - executor.killTask(taskId, interruptThread, reason, shouldRetry) + case KillTask(taskId, interruptThread, reason) => + executor.killTask(taskId, interruptThread, reason) } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -145,9 +144,8 @@ private[spark] class LocalSchedulerBackend( scheduler.conf.getInt("spark.default.parallelism", totalCores) override def killTask( - taskId: Long, executorId: String, interruptThread: Boolean, reason: String, - shouldRetry: Boolean) { - localEndpoint.send(KillTask(taskId, interruptThread, reason, shouldRetry)) + taskId: Long, executorId: String, interruptThread: Boolean, reason: String) { + localEndpoint.send(KillTask(taskId, interruptThread, reason)) } override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) { diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 8dc871dc1b2d..2cb88919c8c8 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -391,8 +391,7 @@ private[spark] object JsonProtocol { ("Exit Caused By App" -> exitCausedByApp) ~ ("Loss Reason" -> reason.map(_.toString)) case taskKilled: TaskKilled => - ("Kill Reason" -> taskKilled.reason) ~ - ("Should Retry" -> taskKilled.shouldRetry) + ("Kill Reason" -> taskKilled.reason) case _ => Utils.emptyJson } ("Reason" -> reason) ~ json @@ -883,9 +882,7 @@ private[spark] object JsonProtocol { case `taskKilled` => val killReason = Utils.jsonOption(json \ "Kill Reason") .map(_.extract[String]).getOrElse("unknown reason") - val shouldRetry = Utils.jsonOption(json \ "Should Retry") - .map(_.extract[Boolean]).getOrElse(false) - TaskKilled(killReason, shouldRetry) + TaskKilled(killReason) case `taskCommitDenied` => // Unfortunately, the `TaskCommitDenied` message was introduced in 1.3.0 but the JSON // de/serialization logic was not added until 1.5.1. To provide backward compatibility diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index d2c1a52e61cb..f47e574b4fc4 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -117,7 +117,7 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug } // `testFailedReason` should be `TaskKilled`; `taskState` should be `KILLED` - assert(executorSuiteHelper.testFailedReason === TaskKilled) + assert(executorSuiteHelper.testFailedReason === TaskKilled("test")) assert(executorSuiteHelper.taskState === TaskState.KILLED) } finally { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 2cca00edb102..38e826451e7e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -126,8 +126,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def cancelTasks(stageId: Int, interruptThread: Boolean) { cancelledStages += stageId } - override def killTask( - taskId: Long, interruptThread: Boolean, reason: String, shouldRetry: Boolean): Unit = {} + override def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit = {} override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} @@ -554,8 +553,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def cancelTasks(stageId: Int, interruptThread: Boolean) { throw new UnsupportedOperationException } - override def killTask( - taskId: Long, interruptThread: Boolean, reason: String, shouldRetry: Boolean): Unit = { + override def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit = { throw new UnsupportedOperationException } override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index d909ef365db7..65c8788f2201 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -79,8 +79,7 @@ private class DummyTaskScheduler extends TaskScheduler { override def stop(): Unit = {} override def submitTasks(taskSet: TaskSet): Unit = {} override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = {} - override def killTask( - taskId: Long, interruptThread: Boolean, reason: String, shouldRetry: Boolean): Unit = {} + override def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit = {} override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} override def defaultParallelism(): Int = 2 override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index b24836b02b43..aed93bad87e2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -175,13 +175,13 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { assert(!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter)) // The non-authorized committer fails outputCommitCoordinator.taskCompleted( - stage, partition, attemptNumber = nonAuthorizedCommitter, reason = TaskKilled("test", false)) + stage, partition, attemptNumber = nonAuthorizedCommitter, reason = TaskKilled("test")) // New tasks should still not be able to commit because the authorized committer has not failed assert( !outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 1)) // The authorized committer now fails, clearing the lock outputCommitCoordinator.taskCompleted( - stage, partition, attemptNumber = authorizedCommitter, reason = TaskKilled("test", false)) + stage, partition, attemptNumber = authorizedCommitter, reason = TaskKilled("test")) // A new task should now be allowed to become the authorized committer assert( outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 2)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index d7a998c02b94..8103983c4392 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -411,11 +411,7 @@ private[spark] abstract class MockBackend( } override def killTask( - taskId: Long, - executorId: String, - interruptThread: Boolean, - reason: String, - shouldRetry: Boolean): Unit = { + taskId: Long, executorId: String, interruptThread: Boolean, reason: String): Unit = { // We have to implement this b/c of SPARK-15385. // Its OK for this to be a no-op, because even if a backend does implement killTask, // it really can only be "best-effort" in any case, and the scheduler should be robust to that. diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 0b639ae5cb17..d4819dfae987 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -675,8 +675,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg taskId: Long, executorId: String, interruptThread: Boolean, - reason: String, - shouldRetry: Boolean): Unit = {} + reason: String): Unit = {} }) // Keep track of the number of tasks that are resubmitted, @@ -932,7 +931,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // Complete the speculative attempt for the running task manager.handleSuccessfulTask(4, createTaskResult(3, accumUpdatesByTask(3))) // Verify that it kills other running attempt - verify(sched.backend).killTask(3, "exec2", true, "another attempt succeeded", false) + verify(sched.backend).killTask(3, "exec2", true, "another attempt succeeded") // Because the SchedulerBackend was a mock, the 2nd copy of the task won't actually be // killed, so the FakeTaskScheduler is only told about the successful completion // of the speculated task. @@ -1018,15 +1017,14 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg manager.handleSuccessfulTask(speculativeTask.taskId, createTaskResult(3, accumUpdatesByTask(3))) // Verify that it kills other running attempt val origTask = originalTasks(speculativeTask.index) - verify(sched.backend).killTask( - origTask.taskId, "exec2", true, "another attempt succeeded", false) + verify(sched.backend).killTask(origTask.taskId, "exec2", true, "another attempt succeeded") // Because the SchedulerBackend was a mock, the 2nd copy of the task won't actually be // killed, so the FakeTaskScheduler is only told about the successful completion // of the speculated task. assert(sched.endedTasks(3) === Success) // also because the scheduler is a mock, our manager isn't notified about the task killed event, // so we do that manually - manager.handleFailedTask(origTask.taskId, TaskState.KILLED, TaskKilled("test", false)) + manager.handleFailedTask(origTask.taskId, TaskState.KILLED, TaskKilled("test")) // this task has "failed" 4 times, but one of them doesn't count, so keep running the stage assert(manager.tasksSuccessful === 4) assert(!manager.isZombie) @@ -1043,8 +1041,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg createTaskResult(3, accumUpdatesByTask(3))) // Verify that it kills other running attempt val origTask2 = originalTasks(speculativeTask2.index) - verify(sched.backend).killTask( - origTask2.taskId, "exec2", true, "another attempt succeeded", false) + verify(sched.backend).killTask(origTask2.taskId, "exec2", true, "another attempt succeeded") assert(manager.tasksSuccessful === 5) assert(manager.isZombie) } @@ -1099,8 +1096,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg ExecutorLostFailure(taskDescs(1).executorId, exitCausedByApp = false, reason = None)) tsmSpy.handleFailedTask(taskDescs(2).taskId, TaskState.FAILED, TaskCommitDenied(0, 2, 0)) - tsmSpy.handleFailedTask(taskDescs(3).taskId, TaskState.KILLED, - TaskKilled("test", false)) + tsmSpy.handleFailedTask(taskDescs(3).taskId, TaskState.KILLED, TaskKilled("test")) // Make sure that the blacklist ignored all of the task failures above, since they aren't // the fault of the executor where the task was running. diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 783b26975166..0f654bf930ff 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -275,7 +275,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with // Make sure killed tasks are accounted for correctly. listener.onTaskEnd( SparkListenerTaskEnd( - task.stageId, 0, taskType, TaskKilled("test", false), taskInfo, metrics)) + task.stageId, 0, taskType, TaskKilled("test"), taskInfo, metrics)) assert(listener.stageIdToData((task.stageId, 0)).numKilledTasks === Map("test" -> 1)) // Make sure we count success as success. diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 3c928139e7ca..a64dbeae4729 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -164,8 +164,7 @@ class JsonProtocolSuite extends SparkFunSuite { testTaskEndReason(fetchMetadataFailed) testTaskEndReason(exceptionFailure) testTaskEndReason(TaskResultLost) - testTaskEndReason(TaskKilled("test", false)) - testTaskEndReason(TaskKilled("test", true)) + testTaskEndReason(TaskKilled("test")) testTaskEndReason(TaskCommitDenied(2, 3, 4)) testTaskEndReason(ExecutorLostFailure("100", true, Some("Induced failure"))) testTaskEndReason(UnknownReason) @@ -679,7 +678,6 @@ private[spark] object JsonProtocolSuite extends Assertions { case (TaskResultLost, TaskResultLost) => case (r1: TaskKilled, r2: TaskKilled) => assert(r1.reason == r2.reason) - assert(r1.shouldRetry == r2.shouldRetry) case (TaskCommitDenied(jobId1, partitionId1, attemptNumber1), TaskCommitDenied(jobId2, partitionId2, attemptNumber2)) => assert(jobId1 === jobId2) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 1b8e93d29799..0fee39fe0363 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -67,7 +67,6 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this"), // [SPARK-19820] Allow reason to be specified to task kill - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskFailedReason.shouldRetry"), ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.TaskKilled$"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.TaskKilled.productElement"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.TaskKilled.productArity"), diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index 701ce211ffec..c4e6e295358c 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -105,8 +105,7 @@ private[spark] class MesosExecutorBackend } else { // TODO: Determine the 'interruptOnCancel' property set for the given job. executor.killTask( - t.getValue.toLong, interruptThread = false, reason = "killed intentionally", - shouldRetry = false) + t.getValue.toLong, interruptThread = false, reason = "killed intentionally") } } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index b303e17661a1..63fd6c13f674 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -426,11 +426,7 @@ private[spark] class MesosFineGrainedSchedulerBackend( } override def killTask( - taskId: Long, - executorId: String, - interruptThread: Boolean, - reason: String, - shouldRetry: Boolean): Unit = { + taskId: Long, executorId: String, interruptThread: Boolean, reason: String): Unit = { mesosDriver.killTask( TaskID.newBuilder() .setValue(taskId.toString).build() From 72b28cb0dc7aacf7cde1b5e49f05da49cb5de276 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 14 Mar 2017 17:24:08 -0700 Subject: [PATCH 05/16] comments Update Executor.scala Update CoarseGrainedClusterMessage.scala minor indent fixes --- .../scala/org/apache/spark/SparkContext.scala | 29 +++++++++---------- .../org/apache/spark/executor/Executor.scala | 12 +++----- .../apache/spark/scheduler/DAGScheduler.scala | 4 +-- .../spark/scheduler/TaskScheduler.scala | 3 +- .../spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../cluster/CoarseGrainedClusterMessage.scala | 3 +- .../scala/org/apache/spark/ui/UIUtils.scala | 4 +-- .../apache/spark/ui/jobs/AllJobsPage.scala | 4 +-- .../apache/spark/ui/jobs/ExecutorTable.scala | 4 +-- .../spark/ui/jobs/JobProgressListener.scala | 12 ++++---- .../org/apache/spark/ui/jobs/StageTable.scala | 2 +- .../org/apache/spark/ui/jobs/UIData.scala | 6 ++-- .../org/apache/spark/SparkContextSuite.scala | 18 ++++++++---- .../spark/scheduler/DAGSchedulerSuite.scala | 4 +-- .../ExternalClusterManagerSuite.scala | 2 +- .../ui/jobs/JobProgressListenerSuite.scala | 2 +- .../spark/executor/MesosExecutorBackend.scala | 2 +- .../spark/streaming/ui/AllBatchesTable.scala | 2 +- .../apache/spark/streaming/ui/BatchPage.scala | 2 +- 19 files changed, 57 insertions(+), 60 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 34c6b07f60e8..60fd567d5147 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2250,22 +2250,19 @@ class SparkContext(config: SparkConf) extends Logging { } /** - * Kill a given task. It will be retried. - * - * @param taskId the task ID to kill - */ - def killTask(taskId: Long): Unit = { - killTask(taskId, "cancelled") - } - - /** - * Kill a given task. It will be retried. - * - * @param taskId the task ID to kill - * @param reason the reason for killing the task, which should be a short string - */ - def killTask(taskId: Long, reason: String): Unit = { - dagScheduler.killTask(taskId, reason) + * Kill and reschedule the given task attempt. Task ids can be obtained from the Spark UI + * or through SparkListener.onTaskStart. + * + * @param taskId the task ID to kill. This id uniquely identifies the task attempt. + * @param interruptThread whether to interrupt the thread running the task. + * @param reason the reason for killing the task, which should be a short string. If a task + * is killed multiple times with different reasons, only one reason will be reported. + */ + def killTaskAttempt( + taskId: Long, + interruptThread: Boolean = true, + reason: String = "cancelled"): Unit = { + dagScheduler.killTaskAttempt(taskId, interruptThread, reason) } /** diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 8421c34556a8..86a895f6df26 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -158,8 +158,7 @@ private[spark] class Executor( threadPool.execute(tr) } - def killTask( - taskId: Long, interruptThread: Boolean, reason: String): Unit = { + def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit = { val taskRunner = runningTasks.get(taskId) if (taskRunner != null) { if (taskReaperEnabled) { @@ -436,15 +435,12 @@ private[spark] class Executor( case _: TaskKilledException => logInfo(s"Executor killed $taskName (TID $taskId), reason: $killReason") setTaskFinishedAndClearInterruptStatus() - execBackend.statusUpdate( - taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason))) + execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason))) case _: InterruptedException if task.killed => - logInfo( - s"Executor interrupted and preempted $taskName (TID $taskId), reason: $killReason") + logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason") setTaskFinishedAndClearInterruptStatus() - execBackend.statusUpdate( - taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason))) + execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason))) case CausedBy(cDE: CommitDeniedException) => val reason = cDE.toTaskFailedReason diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index fc5ecb3798cc..4a94c9c0d20c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -734,8 +734,8 @@ class DAGScheduler( /** * Kill a given task. It will be retried. */ - def killTask(taskId: Long, reason: String): Unit = { - taskScheduler.killTask(taskId, true, reason) + def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Unit = { + taskScheduler.killTaskAttempt(taskId, interruptThread, reason) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index e5594636a581..96e816346263 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -54,8 +54,7 @@ private[spark] trait TaskScheduler { // Cancel a stage. def cancelTasks(stageId: Int, interruptThread: Boolean): Unit - // Kill a task. - def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit + def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Unit // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called. def setDAGScheduler(dagScheduler: DAGScheduler): Unit diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 31f94b91a014..8982e63f2284 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -247,7 +247,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( } } - override def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit = { + override def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Unit = { logInfo(s"Killing task ($reason): $taskId") val execId = taskIdToExecutorId.getOrElse( taskId, throw new IllegalArgumentException("Task not found: " + taskId)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 86b602c2ab77..6b49bd699a13 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -40,8 +40,7 @@ private[spark] object CoarseGrainedClusterMessages { // Driver to executors case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage - case class KillTask( - taskId: Long, executor: String, interruptThread: Boolean, reason: String) + case class KillTask(taskId: Long, executor: String, interruptThread: Boolean, reason: String) extends CoarseGrainedClusterMessage case class KillExecutorsOnHost(host: String) diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 8c84b31f12cd..7b98a5015666 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -342,7 +342,7 @@ private[spark] object UIUtils extends Logging { completed: Int, failed: Int, skipped: Int, - killed: Map[String, Int], + reasonToNumKilled: Map[String, Int], total: Int): Seq[Node] = { val completeWidth = "width: %s%%".format((completed.toDouble/total)*100) // started + completed can be > total when there are speculative tasks @@ -354,7 +354,7 @@ private[spark] object UIUtils extends Logging { {completed}/{total} { if (failed > 0) s"($failed failed)" } { if (skipped > 0) s"($skipped skipped)" } - { killed.map { case (reason, count) => s"($count killed: $reason)" } } + { reasonToNumKilled.map { case (reason, count) => s"($count killed: $reason)" } }
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index d217f558045f..18be0870746e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -630,8 +630,8 @@ private[ui] class JobPagedTable( {UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks, - failed = job.numFailedTasks, skipped = job.numSkippedTasks, killed = job.numKilledTasks, - total = job.numTasks - job.numSkippedTasks)} + failed = job.numFailedTasks, skipped = job.numSkippedTasks, + reasonToNumKilled = job.reasonToNumKilled, total = job.numTasks - job.numSkippedTasks)} } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index fa098ea4689a..52f41298a172 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -133,9 +133,9 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage {executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")} {UIUtils.formatDuration(v.taskTime)} - {v.failedTasks + v.succeededTasks + v.killedTasks.map(_._2).sum} + {v.failedTasks + v.succeededTasks + v.reasonToNumKilled.map(_._2).sum} {v.failedTasks} - {v.killedTasks.map(_._2).sum} + {v.reasonToNumKilled.map(_._2).sum} {v.succeededTasks} {if (stageData.hasInput) { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 66773d526e26..1cf03e1541d1 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -372,8 +372,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { case Success => execSummary.succeededTasks += 1 case kill: TaskKilled => - execSummary.killedTasks = execSummary.killedTasks.updated( - kill.reason, execSummary.killedTasks.getOrElse(kill.reason, 0) + 1) + execSummary.reasonToNumKilled = execSummary.reasonToNumKilled.updated( + kill.reason, execSummary.reasonToNumKilled.getOrElse(kill.reason, 0) + 1) case _ => execSummary.failedTasks += 1 } @@ -387,8 +387,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageData.numCompleteTasks += 1 None case kill: TaskKilled => - stageData.numKilledTasks = stageData.numKilledTasks.updated( - kill.reason, stageData.numKilledTasks.getOrElse(kill.reason, 0) + 1) + stageData.reasonToNumKilled = stageData.reasonToNumKilled.updated( + kill.reason, stageData.reasonToNumKilled.getOrElse(kill.reason, 0) + 1) Some(kill.toErrorString) case e: ExceptionFailure => // Handle ExceptionFailure because we might have accumUpdates stageData.numFailedTasks += 1 @@ -425,8 +425,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { case Success => jobData.numCompletedTasks += 1 case kill: TaskKilled => - jobData.numKilledTasks = jobData.numKilledTasks.updated( - kill.reason, jobData.numKilledTasks.getOrElse(kill.reason, 0) + 1) + jobData.reasonToNumKilled = jobData.reasonToNumKilled.updated( + kill.reason, jobData.reasonToNumKilled.getOrElse(kill.reason, 0) + 1) case _ => jobData.numFailedTasks += 1 } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index e1fa9043b6a1..f4caad0f5871 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -300,7 +300,7 @@ private[ui] class StagePagedTable( {UIUtils.makeProgressBar(started = stageData.numActiveTasks, completed = stageData.completedIndices.size, failed = stageData.numFailedTasks, - skipped = 0, killed = stageData.numKilledTasks, total = info.numTasks)} + skipped = 0, reasonToNumKilled = stageData.reasonToNumKilled, total = info.numTasks)} {data.inputReadWithUnit} {data.outputWriteWithUnit} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index de03a9f51548..ac1a74ad8029 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -32,7 +32,7 @@ private[spark] object UIData { var taskTime : Long = 0 var failedTasks : Int = 0 var succeededTasks : Int = 0 - var killedTasks : Map[String, Int] = Map.empty + var reasonToNumKilled : Map[String, Int] = Map.empty var inputBytes : Long = 0 var inputRecords : Long = 0 var outputBytes : Long = 0 @@ -64,7 +64,7 @@ private[spark] object UIData { var numCompletedTasks: Int = 0, var numSkippedTasks: Int = 0, var numFailedTasks: Int = 0, - var numKilledTasks: Map[String, Int] = Map.empty, + var reasonToNumKilled: Map[String, Int] = Map.empty, /* Stages */ var numActiveStages: Int = 0, // This needs to be a set instead of a simple count to prevent double-counting of rerun stages: @@ -78,7 +78,7 @@ private[spark] object UIData { var numCompleteTasks: Int = _ var completedIndices = new OpenHashSet[Int]() var numFailedTasks: Int = _ - var numKilledTasks: Map[String, Int] = Map.empty + var reasonToNumKilled: Map[String, Int] = Map.empty var executorRunTime: Long = _ var executorCpuTime: Long = _ diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 77e13fe7bc78..ff68f9d52b5b 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -538,6 +538,9 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } } + // Launches one task that will run forever. Once the SparkListener detects the task has + // started, kill and re-schedule it. The second run of the task will complete immediately. + // If this test times out, then the first version of the task wasn't killed successfully. test("Killing tasks") { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) @@ -551,17 +554,20 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } if (!SparkContextSuite.taskKilled) { SparkContextSuite.taskKilled = true - sc.killTask(taskStart.taskInfo.taskId, "first attempt will hang") + sc.killTaskAttempt(taskStart.taskInfo.taskId, true, "first attempt will hang") } } } sc.addSparkListener(listener) - sc.parallelize(1 to 1).foreach { x => - if (!SparkContextSuite.isTaskStarted) { - SparkContextSuite.isTaskStarted = true - Thread.sleep(9999999) + eventually(timeout(20.seconds)) { + sc.parallelize(1 to 1).foreach { x => + // first attempt will hang + if (!SparkContextSuite.isTaskStarted) { + SparkContextSuite.isTaskStarted = true + Thread.sleep(9999999) + } + // second attempt succeeds immediately } - // second try succeeds immediately } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 38e826451e7e..49fb7923f9cf 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -126,7 +126,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def cancelTasks(stageId: Int, interruptThread: Boolean) { cancelledStages += stageId } - override def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit = {} + override def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Unit = {} override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} @@ -553,7 +553,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def cancelTasks(stageId: Int, interruptThread: Boolean) { throw new UnsupportedOperationException } - override def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit = { + override def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Unit = { throw new UnsupportedOperationException } override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 65c8788f2201..6c963c142154 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -79,7 +79,7 @@ private class DummyTaskScheduler extends TaskScheduler { override def stop(): Unit = {} override def submitTasks(taskSet: TaskSet): Unit = {} override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = {} - override def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit = {} + override def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Unit = {} override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} override def defaultParallelism(): Int = 2 override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 0f654bf930ff..93964a2d5674 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -276,7 +276,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with listener.onTaskEnd( SparkListenerTaskEnd( task.stageId, 0, taskType, TaskKilled("test"), taskInfo, metrics)) - assert(listener.stageIdToData((task.stageId, 0)).numKilledTasks === Map("test" -> 1)) + assert(listener.stageIdToData((task.stageId, 0)).reasonToNumKilled === Map("test" -> 1)) // Make sure we count success as success. listener.onTaskEnd( diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index c4e6e295358c..a086ec7ea2da 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -105,7 +105,7 @@ private[spark] class MesosExecutorBackend } else { // TODO: Determine the 'interruptOnCancel' property set for the given job. executor.killTask( - t.getValue.toLong, interruptThread = false, reason = "killed intentionally") + t.getValue.toLong, interruptThread = false, reason = "killed by mesos") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala index 06f458f2bdc6..70b4bb466c46 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -97,7 +97,7 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) completed = batch.numCompletedOutputOp, failed = batch.numFailedOutputOp, skipped = 0, - killed = Map.empty, + reasonToNumKilled = Map.empty, total = batch.outputOperations.size) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala index 1a87fc790f91..f55af6a5cc35 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala @@ -146,7 +146,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { completed = sparkJob.numCompletedTasks, failed = sparkJob.numFailedTasks, skipped = sparkJob.numSkippedTasks, - killed = sparkJob.numKilledTasks, + reasonToNumKilled = sparkJob.reasonToNumKilled, total = sparkJob.numTasks - sparkJob.numSkippedTasks) } From 8f7ffb395cae9ae7aa24a14dcdb908aaee30b710 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 16 Mar 2017 16:31:37 -0700 Subject: [PATCH 06/16] update --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 60fd567d5147..f1af8e75504d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2261,7 +2261,7 @@ class SparkContext(config: SparkConf) extends Logging { def killTaskAttempt( taskId: Long, interruptThread: Boolean = true, - reason: String = "cancelled"): Unit = { + reason: String = "killed via SparkContext.killTaskAttempt"): Unit = { dagScheduler.killTaskAttempt(taskId, interruptThread, reason) } From 348e97abe2ab129e33db3161c9ee5aab0e357d81 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 20 Mar 2017 21:22:12 -0700 Subject: [PATCH 07/16] comments 2 --- .../unsafe/sort/UnsafeInMemorySorter.java | 5 ++- .../unsafe/sort/UnsafeSorterSpillReader.java | 5 ++- .../apache/spark/InterruptibleIterator.scala | 7 ++-- .../scala/org/apache/spark/TaskContext.scala | 5 +++ .../org/apache/spark/TaskContextImpl.scala | 16 ++++++--- .../apache/spark/TaskKilledException.scala | 2 +- .../apache/spark/api/python/PythonRDD.scala | 3 +- .../org/apache/spark/executor/Executor.scala | 33 +++++++++---------- .../org/apache/spark/scheduler/Task.scala | 20 +++++++---- .../scala/org/apache/spark/ui/UIUtils.scala | 5 ++- .../org/apache/spark/SparkContextSuite.scala | 11 ++++++- .../execution/datasources/FileScanRDD.scala | 4 +-- 12 files changed, 68 insertions(+), 48 deletions(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index f219c5605b64..c14c12664f5a 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -23,7 +23,6 @@ import org.apache.avro.reflect.Nullable; import org.apache.spark.TaskContext; -import org.apache.spark.TaskKilledException; import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.unsafe.Platform; @@ -291,8 +290,8 @@ public void loadNext() { // to avoid performance overhead. This check is added here in `loadNext()` instead of in // `hasNext()` because it's technically possible for the caller to be relying on // `getNumRecords()` instead of `hasNext()` to know when to stop. - if (taskContext != null && taskContext.isInterrupted()) { - throw new TaskKilledException(); + if (taskContext != null) { + taskContext.killTaskIfInterrupted(); } // This pointer points to a 4-byte record length, followed by the record's bytes final long recordPointer = array.get(offset + position); diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index b6323c624b7b..9521ab86a12d 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -24,7 +24,6 @@ import org.apache.spark.SparkEnv; import org.apache.spark.TaskContext; -import org.apache.spark.TaskKilledException; import org.apache.spark.io.NioBufferedFileInputStream; import org.apache.spark.serializer.SerializerManager; import org.apache.spark.storage.BlockId; @@ -102,8 +101,8 @@ public void loadNext() throws IOException { // to avoid performance overhead. This check is added here in `loadNext()` instead of in // `hasNext()` because it's technically possible for the caller to be relying on // `getNumRecords()` instead of `hasNext()` to know when to stop. - if (taskContext != null && taskContext.isInterrupted()) { - throw new TaskKilledException(); + if (taskContext != null) { + taskContext.killTaskIfInterrupted(); } recordLength = din.readInt(); keyPrefix = din.readLong(); diff --git a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala index 5c262bcbddf7..7f2c0068174b 100644 --- a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala +++ b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala @@ -33,11 +33,8 @@ class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator // is allowed. The assumption is that Thread.interrupted does not have a memory fence in read // (just a volatile field in C), while context.interrupted is a volatile in the JVM, which // introduces an expensive read fence. - if (context.isInterrupted) { - throw new TaskKilledException - } else { - delegate.hasNext - } + context.killTaskIfInterrupted() + delegate.hasNext } def next(): T = delegate.next() diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 5acfce17593b..f29a55b1d8ac 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -184,6 +184,11 @@ abstract class TaskContext extends Serializable { @DeveloperApi def getMetricsSources(sourceName: String): Seq[Source] + /** + * If the task is interrupted, throws TaskKilledException with the reason for the interrupt. + */ + private[spark] def killTaskIfInterrupted(): Unit + /** * Returns the manager for this task's managed memory. */ diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index ea8dcdfd5d7d..cb2429a5ce97 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -59,8 +59,8 @@ private[spark] class TaskContextImpl( /** List of callback functions to execute when the task fails. */ @transient private val onFailureCallbacks = new ArrayBuffer[TaskFailureListener] - // Whether the corresponding task has been killed. - @volatile private var interrupted: Boolean = false + // If defined, the corresponding task has been killed for the contained reason. + @volatile private var maybeKillReason: Option[String] = None // Whether the task has completed. private var completed: Boolean = false @@ -140,8 +140,14 @@ private[spark] class TaskContextImpl( } /** Marks the task for interruption, i.e. cancellation. */ - private[spark] def markInterrupted(): Unit = { - interrupted = true + private[spark] def markInterrupted(reason: String): Unit = { + maybeKillReason = Some(reason) + } + + private[spark] override def killTaskIfInterrupted(): Unit = { + if (maybeKillReason.isDefined) { + throw new TaskKilledException(maybeKillReason.get) + } } @GuardedBy("this") @@ -149,7 +155,7 @@ private[spark] class TaskContextImpl( override def isRunningLocally(): Boolean = false - override def isInterrupted(): Boolean = interrupted + override def isInterrupted(): Boolean = maybeKillReason.isDefined override def getLocalProperty(key: String): String = localProperties.getProperty(key) diff --git a/core/src/main/scala/org/apache/spark/TaskKilledException.scala b/core/src/main/scala/org/apache/spark/TaskKilledException.scala index ad487c4efb87..74b9577d03f8 100644 --- a/core/src/main/scala/org/apache/spark/TaskKilledException.scala +++ b/core/src/main/scala/org/apache/spark/TaskKilledException.scala @@ -24,4 +24,4 @@ import org.apache.spark.annotation.DeveloperApi * Exception thrown when a task is explicitly killed (i.e., task failure is expected). */ @DeveloperApi -class TaskKilledException extends RuntimeException +class TaskKilledException(val reason: String) extends RuntimeException diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 04ae97ed3ccb..ee1280171689 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -215,7 +215,8 @@ private[spark] class PythonRunner( case e: Exception if context.isInterrupted => logDebug("Exception thrown after task interruption", e) - throw new TaskKilledException + context.killTaskIfInterrupted() + null // not reached case e: Exception if env.isStopped => logDebug("Exception thrown after context is stopped", e) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 86a895f6df26..ed3fb00509a6 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -219,11 +219,8 @@ private[spark] class Executor( val threadName = s"Executor task launch worker for task $taskId" private val taskName = taskDescription.name - /** Whether this task has been killed. */ - @volatile private var killed = false - - /** The reason this task was killed. */ - @volatile private var killReason: String = null + /** If specified, this task has been killed and this option contains the reason. */ + @volatile private var maybeKillReason: Option[String] = None @volatile private var threadId: Long = -1 @@ -246,12 +243,11 @@ private[spark] class Executor( def kill(interruptThread: Boolean, reason: String): Unit = { logInfo(s"Executor is trying to kill $taskName (TID $taskId), reason: $reason") - killReason = reason - killed = true + maybeKillReason = Some(reason) if (task != null) { synchronized { if (!finished) { - task.kill(interruptThread) + task.kill(interruptThread, reason) } } } @@ -302,12 +298,12 @@ private[spark] class Executor( // If this task has been killed before we deserialized it, let's quit now. Otherwise, // continue executing the task. - if (killed) { + if (maybeKillReason.isDefined) { // Throw an exception rather than returning, because returning within a try{} block // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl // exception will be caught by the catch block, leading to an incorrect ExceptionFailure // for the task. - throw new TaskKilledException + throw new TaskKilledException(maybeKillReason.get) } logDebug("Task " + taskId + "'s epoch is " + task.epoch) @@ -364,9 +360,7 @@ private[spark] class Executor( } else 0L // If the task has been killed, let's fail it. - if (task.killed) { - throw new TaskKilledException - } + task.context.killTaskIfInterrupted() val resultSer = env.serializer.newInstance() val beforeSerialization = System.currentTimeMillis() @@ -432,15 +426,18 @@ private[spark] class Executor( setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) - case _: TaskKilledException => - logInfo(s"Executor killed $taskName (TID $taskId), reason: $killReason") + case t: TaskKilledException => + logInfo(s"Executor killed $taskName (TID $taskId), reason: ${t.reason}") setTaskFinishedAndClearInterruptStatus() - execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason))) + execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason))) case _: InterruptedException if task.killed => - logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason") + logInfo( + s"Executor interrupted and killed $taskName (TID $taskId)," + + s" reason: ${task.maybeKillReason.get}") setTaskFinishedAndClearInterruptStatus() - execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason))) + execBackend.statusUpdate( + taskId, TaskState.KILLED, ser.serialize(TaskKilled(task.maybeKillReason.get))) case CausedBy(cDE: CommitDeniedException) => val reason = cDE.toTaskFailedReason diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 70213722aae4..47dc755c72e4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -89,8 +89,8 @@ private[spark] abstract class Task[T]( TaskContext.setTaskContext(context) taskThread = Thread.currentThread() - if (_killed) { - kill(interruptThread = false) + if (_maybeKillReason != null) { + kill(interruptThread = false, _maybeKillReason) } new CallerContext( @@ -160,7 +160,7 @@ private[spark] abstract class Task[T]( // A flag to indicate whether the task is killed. This is used in case context is not yet // initialized when kill() is invoked. - @volatile @transient private var _killed = false + @volatile @transient private var _maybeKillReason: String = null protected var _executorDeserializeTime: Long = 0 protected var _executorDeserializeCpuTime: Long = 0 @@ -168,7 +168,12 @@ private[spark] abstract class Task[T]( /** * Whether the task has been killed. */ - def killed: Boolean = _killed + def killed: Boolean = _maybeKillReason != null + + /** + * If this task has been killed, contains the reason for the kill. + */ + def maybeKillReason: Option[String] = Option(_maybeKillReason) /** * Returns the amount of time spent deserializing the RDD and function to be run. @@ -201,10 +206,11 @@ private[spark] abstract class Task[T]( * be called multiple times. * If interruptThread is true, we will also call Thread.interrupt() on the Task's executor thread. */ - def kill(interruptThread: Boolean) { - _killed = true + def kill(interruptThread: Boolean, reason: String) { + require(reason != null) + _maybeKillReason = reason if (context != null) { - context.markInterrupted() + context.markInterrupted(reason) } if (interruptThread && taskThread != null) { taskThread.interrupt() diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 7b98a5015666..e53d6907bc40 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -354,7 +354,10 @@ private[spark] object UIUtils extends Logging { {completed}/{total} { if (failed > 0) s"($failed failed)" } { if (skipped > 0) s"($skipped skipped)" } - { reasonToNumKilled.map { case (reason, count) => s"($count killed: $reason)" } } + { reasonToNumKilled.toSeq.sortBy(-_._2).map { + case (reason, count) => s"($count killed: $reason)" + } + }
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 382288a41912..a97864ca91ee 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFor import org.scalatest.concurrent.Eventually import org.scalatest.Matchers._ -import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskStart} +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskEnd, SparkListenerTaskStart} import org.apache.spark.util.Utils @@ -548,6 +548,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu SparkContextSuite.isTaskStarted = false SparkContextSuite.taskKilled = false + SparkContextSuite.taskSucceeded = false val listener = new SparkListener { override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { @@ -559,6 +560,11 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu sc.killTaskAttempt(taskStart.taskInfo.taskId, true, "first attempt will hang") } } + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + if (taskEnd.taskInfo.attemptNumber == 1 && taskEnd.reason == Success) { + SparkContextSuite.taskSucceeded = true + } + } } sc.addSparkListener(listener) eventually(timeout(20.seconds)) { @@ -569,8 +575,10 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu Thread.sleep(9999999) } // second attempt succeeds immediately + SparkContextSuite.taskSucceeded = true } } + assert(SparkContextSuite.taskSucceeded) } test("SPARK-19446: DebugFilesystem.assertNoOpenStreams should report " + @@ -595,4 +603,5 @@ object SparkContextSuite { @volatile var cancelStage = false @volatile var isTaskStarted = false @volatile var taskKilled = false + @volatile var taskSucceeded = false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index a89d172a911a..9df20731c71d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -101,9 +101,7 @@ class FileScanRDD( // Kill the task in case it has been marked as killed. This logic is from // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order // to avoid performance overhead. - if (context.isInterrupted()) { - throw new TaskKilledException - } + context.killTaskIfInterrupted() (currentIterator != null && currentIterator.hasNext) || nextIterator() } def next(): Object = { From f5069f7932193a1190e454bb211587e6fc84ecae Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 20 Mar 2017 21:23:18 -0700 Subject: [PATCH 08/16] add default reason back --- core/src/main/scala/org/apache/spark/TaskKilledException.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/TaskKilledException.scala b/core/src/main/scala/org/apache/spark/TaskKilledException.scala index 74b9577d03f8..0d95b80adcdb 100644 --- a/core/src/main/scala/org/apache/spark/TaskKilledException.scala +++ b/core/src/main/scala/org/apache/spark/TaskKilledException.scala @@ -24,4 +24,4 @@ import org.apache.spark.annotation.DeveloperApi * Exception thrown when a task is explicitly killed (i.e., task failure is expected). */ @DeveloperApi -class TaskKilledException(val reason: String) extends RuntimeException +class TaskKilledException(val reason: String = "unknown reason") extends RuntimeException From 884a3ad7308e69c0ca010c344133bcce6582920d Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 20 Mar 2017 21:46:31 -0700 Subject: [PATCH 09/16] fix mima --- .../src/main/scala/org/apache/spark/TaskKilledException.scala | 4 +++- project/MimaExcludes.scala | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskKilledException.scala b/core/src/main/scala/org/apache/spark/TaskKilledException.scala index 0d95b80adcdb..9dbf0d493be1 100644 --- a/core/src/main/scala/org/apache/spark/TaskKilledException.scala +++ b/core/src/main/scala/org/apache/spark/TaskKilledException.scala @@ -24,4 +24,6 @@ import org.apache.spark.annotation.DeveloperApi * Exception thrown when a task is explicitly killed (i.e., task failure is expected). */ @DeveloperApi -class TaskKilledException(val reason: String = "unknown reason") extends RuntimeException +class TaskKilledException(val reason: String) extends RuntimeException { + def this() = this("unknown reason") +} diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 47fc72b0a5bb..1b45426a650d 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -75,7 +75,8 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.TaskKilled.countTowardsTaskFailures"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.TaskKilled.productPrefix"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.TaskKilled.toErrorString"), - ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.TaskKilled.toString") + ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.TaskKilled.toString"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.killTaskIfInterrupted") ) // Exclude rules for 2.1.x From 203a90020031b71d976f60491d757c4d78b85517 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 21 Mar 2017 14:03:32 -0700 Subject: [PATCH 10/16] comments 3 --- .../main/scala/org/apache/spark/TaskContext.scala | 5 +++++ .../scala/org/apache/spark/TaskContextImpl.scala | 9 +++++++-- .../org/apache/spark/api/python/PythonRDD.scala | 3 +-- .../scala/org/apache/spark/executor/Executor.scala | 12 ++++++------ 4 files changed, 19 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index f29a55b1d8ac..0b87cd503d4f 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -189,6 +189,11 @@ abstract class TaskContext extends Serializable { */ private[spark] def killTaskIfInterrupted(): Unit + /** + * If the task is interrupted, the reason this task was killed, otherwise None. + */ + private[spark] def getKillReason(): Option[String] + /** * Returns the manager for this task's managed memory. */ diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index cb2429a5ce97..892e6541beba 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -145,11 +145,16 @@ private[spark] class TaskContextImpl( } private[spark] override def killTaskIfInterrupted(): Unit = { - if (maybeKillReason.isDefined) { - throw new TaskKilledException(maybeKillReason.get) + val reason = maybeKillReason + if (reason.isDefined) { + throw new TaskKilledException(reason.get) } } + private[spark] override def getKillReason(): Option[String] = { + maybeKillReason + } + @GuardedBy("this") override def isCompleted(): Boolean = synchronized(completed) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index ee1280171689..b0dd2fc187ba 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -215,8 +215,7 @@ private[spark] class PythonRunner( case e: Exception if context.isInterrupted => logDebug("Exception thrown after task interruption", e) - context.killTaskIfInterrupted() - null // not reached + throw new TaskKilledException(context.getKillReason().getOrElse("unknown reason")) case e: Exception if env.isStopped => logDebug("Exception thrown after context is stopped", e) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index ed3fb00509a6..1e14535e61c5 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -298,12 +298,13 @@ private[spark] class Executor( // If this task has been killed before we deserialized it, let's quit now. Otherwise, // continue executing the task. - if (maybeKillReason.isDefined) { + val killReason = maybeKillReason + if (killReason.isDefined) { // Throw an exception rather than returning, because returning within a try{} block // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl // exception will be caught by the catch block, leading to an incorrect ExceptionFailure // for the task. - throw new TaskKilledException(maybeKillReason.get) + throw new TaskKilledException(killReason.get) } logDebug("Task " + taskId + "'s epoch is " + task.epoch) @@ -432,12 +433,11 @@ private[spark] class Executor( execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason))) case _: InterruptedException if task.killed => - logInfo( - s"Executor interrupted and killed $taskName (TID $taskId)," + - s" reason: ${task.maybeKillReason.get}") + val killReason = task.maybeKillReason.getOrElse("unknown reason") + logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason") setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate( - taskId, TaskState.KILLED, ser.serialize(TaskKilled(task.maybeKillReason.get))) + taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason))) case CausedBy(cDE: CommitDeniedException) => val reason = cDE.toTaskFailedReason From 6e8593b9bb88a2b0bf90e39887368cc4535480b6 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 21 Mar 2017 14:10:24 -0700 Subject: [PATCH 11/16] clean up test --- core/src/test/scala/org/apache/spark/SparkContextSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index a97864ca91ee..ce1a401601c6 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -575,7 +575,6 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu Thread.sleep(9999999) } // second attempt succeeds immediately - SparkContextSuite.taskSucceeded = true } } assert(SparkContextSuite.taskSucceeded) From 570771555c877fc0b7a8c989e14fdaf4aa79c217 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 21 Mar 2017 15:07:46 -0700 Subject: [PATCH 12/16] fix mima again --- project/MimaExcludes.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 1b45426a650d..dcb90abe775e 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -76,7 +76,8 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.TaskKilled.productPrefix"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.TaskKilled.toErrorString"), ProblemFilters.exclude[FinalMethodProblem]("org.apache.spark.TaskKilled.toString"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.killTaskIfInterrupted") + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.killTaskIfInterrupted"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.getKillReason") ) // Exclude rules for 2.1.x From a37c09b78ab5362e3464e8201f1839cacef8a382 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 22 Mar 2017 15:04:27 -0700 Subject: [PATCH 13/16] comments 5 --- .../scala/org/apache/spark/SparkContext.scala | 4 +++- .../org/apache/spark/TaskContextImpl.scala | 12 +++++------ .../org/apache/spark/executor/Executor.scala | 10 ++++----- .../apache/spark/scheduler/DAGScheduler.scala | 4 +++- .../org/apache/spark/scheduler/Task.scala | 21 +++++++------------ .../spark/scheduler/TaskScheduler.scala | 7 ++++++- .../spark/scheduler/TaskSchedulerImpl.scala | 15 ++++++++----- .../spark/scheduler/DAGSchedulerSuite.scala | 6 ++++-- .../ExternalClusterManagerSuite.scala | 3 ++- 9 files changed, 47 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f1af8e75504d..0225fd605607 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2257,11 +2257,13 @@ class SparkContext(config: SparkConf) extends Logging { * @param interruptThread whether to interrupt the thread running the task. * @param reason the reason for killing the task, which should be a short string. If a task * is killed multiple times with different reasons, only one reason will be reported. + * + * @return Whether the task was successfully killed. */ def killTaskAttempt( taskId: Long, interruptThread: Boolean = true, - reason: String = "killed via SparkContext.killTaskAttempt"): Unit = { + reason: String = "killed via SparkContext.killTaskAttempt"): Boolean = { dagScheduler.killTaskAttempt(taskId, interruptThread, reason) } diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index 892e6541beba..56e87d12748d 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -59,8 +59,8 @@ private[spark] class TaskContextImpl( /** List of callback functions to execute when the task fails. */ @transient private val onFailureCallbacks = new ArrayBuffer[TaskFailureListener] - // If defined, the corresponding task has been killed for the contained reason. - @volatile private var maybeKillReason: Option[String] = None + // If defined, the corresponding task has been killed and this option contains the reason. + @volatile private var reasonIfKilled: Option[String] = None // Whether the task has completed. private var completed: Boolean = false @@ -141,18 +141,18 @@ private[spark] class TaskContextImpl( /** Marks the task for interruption, i.e. cancellation. */ private[spark] def markInterrupted(reason: String): Unit = { - maybeKillReason = Some(reason) + reasonIfKilled = Some(reason) } private[spark] override def killTaskIfInterrupted(): Unit = { - val reason = maybeKillReason + val reason = reasonIfKilled if (reason.isDefined) { throw new TaskKilledException(reason.get) } } private[spark] override def getKillReason(): Option[String] = { - maybeKillReason + reasonIfKilled } @GuardedBy("this") @@ -160,7 +160,7 @@ private[spark] class TaskContextImpl( override def isRunningLocally(): Boolean = false - override def isInterrupted(): Boolean = maybeKillReason.isDefined + override def isInterrupted(): Boolean = reasonIfKilled.isDefined override def getLocalProperty(key: String): String = localProperties.getProperty(key) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 1e14535e61c5..99b1608010dd 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -220,7 +220,7 @@ private[spark] class Executor( private val taskName = taskDescription.name /** If specified, this task has been killed and this option contains the reason. */ - @volatile private var maybeKillReason: Option[String] = None + @volatile private var reasonIfKilled: Option[String] = None @volatile private var threadId: Long = -1 @@ -243,7 +243,7 @@ private[spark] class Executor( def kill(interruptThread: Boolean, reason: String): Unit = { logInfo(s"Executor is trying to kill $taskName (TID $taskId), reason: $reason") - maybeKillReason = Some(reason) + reasonIfKilled = Some(reason) if (task != null) { synchronized { if (!finished) { @@ -298,7 +298,7 @@ private[spark] class Executor( // If this task has been killed before we deserialized it, let's quit now. Otherwise, // continue executing the task. - val killReason = maybeKillReason + val killReason = reasonIfKilled if (killReason.isDefined) { // Throw an exception rather than returning, because returning within a try{} block // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl @@ -432,8 +432,8 @@ private[spark] class Executor( setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason))) - case _: InterruptedException if task.killed => - val killReason = task.maybeKillReason.getOrElse("unknown reason") + case _: InterruptedException if task.reasonIfKilled.isDefined => + val killReason = task.reasonIfKilled.getOrElse("unknown reason") logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason") setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate( diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4a94c9c0d20c..244fea5c0c23 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -733,8 +733,10 @@ class DAGScheduler( /** * Kill a given task. It will be retried. + * + * @return Whether the task was successfully killed. */ - def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Unit = { + def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean = { taskScheduler.killTaskAttempt(taskId, interruptThread, reason) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 47dc755c72e4..46ef23f316a6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -89,8 +89,8 @@ private[spark] abstract class Task[T]( TaskContext.setTaskContext(context) taskThread = Thread.currentThread() - if (_maybeKillReason != null) { - kill(interruptThread = false, _maybeKillReason) + if (_reasonIfKilled != null) { + kill(interruptThread = false, _reasonIfKilled) } new CallerContext( @@ -158,22 +158,17 @@ private[spark] abstract class Task[T]( // The actual Thread on which the task is running, if any. Initialized in run(). @volatile @transient private var taskThread: Thread = _ - // A flag to indicate whether the task is killed. This is used in case context is not yet - // initialized when kill() is invoked. - @volatile @transient private var _maybeKillReason: String = null + // If non-null, this task has been killed and the reason is as specified. This is used in case + // context is not yet initialized when kill() is invoked. + @volatile @transient private var _reasonIfKilled: String = null protected var _executorDeserializeTime: Long = 0 protected var _executorDeserializeCpuTime: Long = 0 /** - * Whether the task has been killed. + * If defined, this task has been killed and this option contains the reason. */ - def killed: Boolean = _maybeKillReason != null - - /** - * If this task has been killed, contains the reason for the kill. - */ - def maybeKillReason: Option[String] = Option(_maybeKillReason) + def reasonIfKilled: Option[String] = Option(_reasonIfKilled) /** * Returns the amount of time spent deserializing the RDD and function to be run. @@ -208,7 +203,7 @@ private[spark] abstract class Task[T]( */ def kill(interruptThread: Boolean, reason: String) { require(reason != null) - _maybeKillReason = reason + _reasonIfKilled = reason if (context != null) { context.markInterrupted(reason) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 96e816346263..3de7d1f7de22 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -54,7 +54,12 @@ private[spark] trait TaskScheduler { // Cancel a stage. def cancelTasks(stageId: Int, interruptThread: Boolean): Unit - def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Unit + /** + * Kills a task attempt. + * + * @return Whether the task was successfully killed. + */ + def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called. def setDAGScheduler(dagScheduler: DAGScheduler): Unit diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 8982e63f2284..15e0db18816a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -247,11 +247,16 @@ private[spark] class TaskSchedulerImpl private[scheduler]( } } - override def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Unit = { - logInfo(s"Killing task ($reason): $taskId") - val execId = taskIdToExecutorId.getOrElse( - taskId, throw new IllegalArgumentException("Task not found: " + taskId)) - backend.killTask(taskId, execId, interruptThread, reason) + override def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean = { + logInfo(s"Killing task $taskId: $reason") + val execId = taskIdToExecutorId.get(taskId) + if (execId.isDefined) { + backend.killTask(taskId, execId.get, interruptThread, reason) + true + } else { + logInfo(s"Could not kill task $taskId because no task with that ID was found.") + false + } } /** diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 49fb7923f9cf..dc7db7ec538f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -126,7 +126,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def cancelTasks(stageId: Int, interruptThread: Boolean) { cancelledStages += stageId } - override def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Unit = {} + override def killTaskAttempt( + taskId: Long, interruptThread: Boolean, reason: String): Boolean = false override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} @@ -553,7 +554,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def cancelTasks(stageId: Int, interruptThread: Boolean) { throw new UnsupportedOperationException } - override def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Unit = { + override def killTaskAttempt( + taskId: Long, interruptThread: Boolean, reason: String): Boolean = { throw new UnsupportedOperationException } override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 6c963c142154..7ef0dd999f25 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -79,7 +79,8 @@ private class DummyTaskScheduler extends TaskScheduler { override def stop(): Unit = {} override def submitTasks(taskSet: TaskSet): Unit = {} override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = {} - override def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Unit = {} + override def killTaskAttempt( + taskId: Long, interruptThread: Boolean, reason: String): Boolean = false override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} override def defaultParallelism(): Int = 2 override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} From 71b41b3ea11d4d3490fdc1ac9061e501ae0f8589 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 22 Mar 2017 16:40:45 -0700 Subject: [PATCH 14/16] Log warn --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 15e0db18816a..86f43022d742 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -254,7 +254,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( backend.killTask(taskId, execId.get, interruptThread, reason) true } else { - logInfo(s"Could not kill task $taskId because no task with that ID was found.") + logWarning(s"Could not kill task $taskId because no task with that ID was found.") false } } From 3ec3633c3362f8c5a5d1703f21b0569d58179201 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 23 Mar 2017 13:51:18 -0700 Subject: [PATCH 15/16] skip reviveoffers if the task is successful --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 86f43022d742..db393b96c2d2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -479,7 +479,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( taskState: TaskState, reason: TaskFailedReason): Unit = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) - if (!taskSetManager.isZombie) { + if (!taskSetManager.isZombie && !taskSetManager.someAttemptSucceeded(tid)) { // Need to revive offers again now that the task set manager state has been updated to // reflect failed tasks that need to be re-run. backend.reviveOffers() diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 85c127d029dc..0f86f5f604f9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -99,6 +99,10 @@ private[spark] class TaskSetManager( override def runningTasks: Int = runningTasksSet.size + def someAttemptSucceeded(tid: Long): Boolean = { + successful(taskInfos(tid).index) + } + // True once no more tasks should be launched for this task set manager. TaskSetManagers enter // the zombie state once at least one attempt of each task has completed successfully, or if the // task set is aborted (for example, because it was killed). TaskSetManagers remain in the zombie From 145c78ac5205ec66c90977898c4c6154da46f6b4 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 23 Mar 2017 16:39:23 -0700 Subject: [PATCH 16/16] fix flaky test --- core/src/test/scala/org/apache/spark/SparkContextSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index ce1a401601c6..2c947556dfd3 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -577,7 +577,9 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu // second attempt succeeds immediately } } - assert(SparkContextSuite.taskSucceeded) + eventually(timeout(10.seconds)) { + assert(SparkContextSuite.taskSucceeded) + } } test("SPARK-19446: DebugFilesystem.assertNoOpenStreams should report " +