From 6262c52f7eb4abcf06742e9afd9d1454f06cdf1f Mon Sep 17 00:00:00 2001 From: Charles Lewis Date: Wed, 22 Mar 2017 13:33:55 -0700 Subject: [PATCH 01/11] report metrics for killed tasks --- .../org/apache/spark/TaskEndReason.scala | 14 ++++++- .../org/apache/spark/executor/Executor.scala | 39 +++++++++++++++++++ .../apache/spark/scheduler/DAGScheduler.scala | 11 ++++-- .../spark/scheduler/TaskSetManager.scala | 8 +++- .../org/apache/spark/util/JsonProtocol.scala | 7 +++- 5 files changed, 72 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index a76283e33fa6..1d14fa183261 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -212,9 +212,19 @@ case object TaskResultLost extends TaskFailedReason { * Task was killed intentionally and needs to be rescheduled. */ @DeveloperApi -case class TaskKilled(reason: String) extends TaskFailedReason { - override def toErrorString: String = s"TaskKilled ($reason)" +case class TaskKilled( + reason: String + accumUpdates: Seq[AccumulableInfo] = Seq.empty, + private[spark] var accums: Seq[AccumulatorV2[_, _]] = Nil) + extends TaskFailedReason { + + override def toErrorString: String = "TaskKilled ($reason)" override def countTowardsTaskFailures: Boolean = false + + private[spark] def withAccums(accums: Seq[AccumulatorV2[_, _]]): TaskKilled = { + this.accums = accums + this + } } /** diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index c325222b764b..6894849071b9 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -507,6 +507,45 @@ private[spark] class Executor( setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) + case t: TaskKilledException => + logInfo(s"Executor killed $taskName (TID $taskId), reason: ${t.reason}") + + // Collect latest accumulator values to report back to the driver + val accums: Seq[AccumulatorV2[_, _]] = + if (task != null) { + task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) + task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) + task.collectAccumulatorUpdates(taskFailed = true) + } else { + Seq.empty + } + val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None)) + + setTaskFinishedAndClearInterruptStatus() + + val serializedTK = ser.serialize(TaskKilled(t.reason, accUpdates).withAccums(accums)) + execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK) + + case _: InterruptedException if task.reasonIfKilled.isDefined => + val killReason = task.reasonIfKilled.getOrElse("unknown reason") + logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason") + + // Collect latest accumulator values to report back to the driver + val accums: Seq[AccumulatorV2[_, _]] = + if (task != null) { + task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) + task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) + task.collectAccumulatorUpdates(taskFailed = true) + } else { + Seq.empty + } + val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None)) + + setTaskFinishedAndClearInterruptStatus() + + val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates).withAccums(accums)) + execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK) + case CausedBy(cDE: CommitDeniedException) => val reason = cDE.toTaskCommitDeniedReason setTaskFinishedAndClearInterruptStatus() diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 78b6b34b5d2b..b9ce80f52549 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1414,13 +1414,18 @@ class DAGScheduler( case commitDenied: TaskCommitDenied => // Do nothing here, left up to the TaskScheduler to decide how to handle denied commits - case exceptionFailure: ExceptionFailure => - // Nothing left to do, already handled above for accumulator updates. + case _: ExceptionFailure => + // Tasks killed or failed with exceptions might still have accumulator updates. + updateAccumulators(event) + + case _: TaskKilled => + // Tasks killed or failed with exceptions might still have accumulator updates. + updateAccumulators(event) case TaskResultLost => // Do nothing here; the TaskScheduler handles these failures and resubmits the task. - case _: ExecutorLostFailure | _: TaskKilled | UnknownReason => + case _: ExecutorLostFailure | UnknownReason => // Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler // will abort the job. } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 8a96a7692f61..0570bcbef244 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -833,13 +833,19 @@ private[spark] class TaskSetManager( } ef.exception + case tk: TaskKilled => + // TaskKilled might have accumulator updates + accumUpdates = tk.accums + logWarning(failureReason) + None + case e: ExecutorLostFailure if !e.exitCausedByApp => logInfo(s"Task $tid failed because while it was being computed, its executor " + "exited for a reason unrelated to the task. Not counting this failure towards the " + "maximum number of failures for the task.") None - case e: TaskFailedReason => // TaskResultLost, TaskKilled, and others + case e: TaskFailedReason => // TaskResultLost and others logWarning(failureReason) None } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 40383fe05026..e8ea9b1c2d59 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -407,7 +407,9 @@ private[spark] object JsonProtocol { ("Exit Caused By App" -> exitCausedByApp) ~ ("Loss Reason" -> reason.map(_.toString)) case taskKilled: TaskKilled => + val accumUpdates = JArray(taskKilled.accumUpdates.map(accumulableInfoToJson).toList) ("Kill Reason" -> taskKilled.reason) + ("Accumulator Updates" -> accumUpdates) case _ => emptyJson } ("Reason" -> reason) ~ json @@ -917,7 +919,10 @@ private[spark] object JsonProtocol { case `taskKilled` => val killReason = jsonOption(json \ "Kill Reason") .map(_.extract[String]).getOrElse("unknown reason") - TaskKilled(killReason) + val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates") + .map(_.extract[List[JValue]].map(accumulableInfoFromJson)) + .getOrElse(Seq[AccumulableInfo]()) + TaskKilled(killReason, accumUpdates) case `taskCommitDenied` => // Unfortunately, the `TaskCommitDenied` message was introduced in 1.3.0 but the JSON // de/serialization logic was not added until 1.5.1. To provide backward compatibility From 18308895ad07c9c757e96e08f5c40b5dcaaf3455 Mon Sep 17 00:00:00 2001 From: Charles Lewis Date: Fri, 24 Mar 2017 12:06:46 -0700 Subject: [PATCH 02/11] add task killed to exception accum test --- .../spark/scheduler/DAGSchedulerSuite.scala | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 8b6ec37625ee..4a827682744e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1852,7 +1852,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assertDataStructuresEmpty() } - test("accumulators are updated on exception failures") { + test("accumulators are updated on exception failures and task killed") { val acc1 = AccumulatorSuite.createLongAccum("ingenieur") val acc2 = AccumulatorSuite.createLongAccum("boulanger") val acc3 = AccumulatorSuite.createLongAccum("agriculteur") @@ -1868,15 +1868,25 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val accUpdate3 = new LongAccumulator accUpdate3.metadata = acc3.metadata accUpdate3.setValue(18) - val accumUpdates = Seq(accUpdate1, accUpdate2, accUpdate3) - val accumInfo = accumUpdates.map(AccumulatorSuite.makeInfo) + + val accumUpdates1 = Seq(accUpdate1, accUpdate2) + val accumInfo1 = accumUpdates1.map(AccumulatorSuite.makeInfo) val exceptionFailure = new ExceptionFailure( new SparkException("fondue?"), - accumInfo).copy(accums = accumUpdates) + accumInfo1).copy(accums = accumUpdates1) submit(new MyRDD(sc, 1, Nil), Array(0)) runEvent(makeCompletionEvent(taskSets.head.tasks.head, exceptionFailure, "result")) + assert(AccumulatorContext.get(acc1.id).get.value === 15L) assert(AccumulatorContext.get(acc2.id).get.value === 13L) + + val accumUpdates2 = Seq(accUpdate3) + val accumInfo2 = accumUpdates2.map(AccumulatorSuite.makeInfo) + + val taskKilled = new TaskKilled( + accumInfo2).copy(accums = accumUpdates2) + runEvent(makeCompletionEvent(taskSets.head.tasks.head, taskKilled, "result")) + assert(AccumulatorContext.get(acc3.id).get.value === 18L) } @@ -2497,6 +2507,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val accumUpdates = reason match { case Success => task.metrics.accumulators() case ef: ExceptionFailure => ef.accums + case tk: TaskKilled => tk.accums case _ => Seq.empty } CompletionEvent(task, reason, result, accumUpdates ++ extraAccumUpdates, taskInfo) From cb276bc1f1583ca7dcf44b8e80a9fc2cd09953cf Mon Sep 17 00:00:00 2001 From: Charles Lewis Date: Fri, 24 Mar 2017 16:20:59 -0700 Subject: [PATCH 03/11] extra fixes for task killed reason merge --- core/src/main/scala/org/apache/spark/TaskEndReason.scala | 2 +- core/src/main/scala/org/apache/spark/util/JsonProtocol.scala | 2 +- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 1d14fa183261..8e1327e01ada 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -213,7 +213,7 @@ case object TaskResultLost extends TaskFailedReason { */ @DeveloperApi case class TaskKilled( - reason: String + reason: String, accumUpdates: Seq[AccumulableInfo] = Seq.empty, private[spark] var accums: Seq[AccumulatorV2[_, _]] = Nil) extends TaskFailedReason { diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index e8ea9b1c2d59..072a2841d6fa 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -408,7 +408,7 @@ private[spark] object JsonProtocol { ("Loss Reason" -> reason.map(_.toString)) case taskKilled: TaskKilled => val accumUpdates = JArray(taskKilled.accumUpdates.map(accumulableInfoToJson).toList) - ("Kill Reason" -> taskKilled.reason) + ("Kill Reason" -> taskKilled.reason) ~ ("Accumulator Updates" -> accumUpdates) case _ => emptyJson } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 4a827682744e..99df0d42813e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1884,6 +1884,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val accumInfo2 = accumUpdates2.map(AccumulatorSuite.makeInfo) val taskKilled = new TaskKilled( + "test", accumInfo2).copy(accums = accumUpdates2) runEvent(makeCompletionEvent(taskSets.head.tasks.head, taskKilled, "result")) From 30ae1457afd9899a5fa937c2fefac31f6a0752ed Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Thu, 26 Apr 2018 15:34:54 +0800 Subject: [PATCH 04/11] Fix merge conflict and semantic difference --- .../org/apache/spark/executor/Executor.scala | 44 +++++++------------ .../apache/spark/scheduler/DAGScheduler.scala | 10 ++--- .../org/apache/spark/util/JsonProtocol.scala | 2 +- 3 files changed, 21 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 6894849071b9..bccbf5755796 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -480,33 +480,6 @@ private[spark] class Executor( execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) } catch { - case t: TaskKilledException => - logInfo(s"Executor killed $taskName (TID $taskId), reason: ${t.reason}") - setTaskFinishedAndClearInterruptStatus() - execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled(t.reason))) - - case _: InterruptedException | NonFatal(_) if - task != null && task.reasonIfKilled.isDefined => - val killReason = task.reasonIfKilled.getOrElse("unknown reason") - logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason") - setTaskFinishedAndClearInterruptStatus() - execBackend.statusUpdate( - taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason))) - - case t: Throwable if hasFetchFailure && !Utils.isFatalError(t) => - val reason = task.context.fetchFailed.get.toTaskFailedReason - if (!t.isInstanceOf[FetchFailedException]) { - // there was a fetch failure in the task, but some user code wrapped that exception - // and threw something else. Regardless, we treat it as a fetch failure. - val fetchFailedCls = classOf[FetchFailedException].getName - logWarning(s"TID ${taskId} encountered a ${fetchFailedCls} and " + - s"failed, but the ${fetchFailedCls} was hidden by another " + - s"exception. Spark is handling this like a fetch failure and ignoring the " + - s"other exception: $t") - } - setTaskFinishedAndClearInterruptStatus() - execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) - case t: TaskKilledException => logInfo(s"Executor killed $taskName (TID $taskId), reason: ${t.reason}") @@ -526,7 +499,8 @@ private[spark] class Executor( val serializedTK = ser.serialize(TaskKilled(t.reason, accUpdates).withAccums(accums)) execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK) - case _: InterruptedException if task.reasonIfKilled.isDefined => + case _: InterruptedException | NonFatal(_) if + task != null && task.reasonIfKilled.isDefined => val killReason = task.reasonIfKilled.getOrElse("unknown reason") logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason") @@ -546,6 +520,20 @@ private[spark] class Executor( val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates).withAccums(accums)) execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK) + case t: Throwable if hasFetchFailure && !Utils.isFatalError(t) => + val reason = task.context.fetchFailed.get.toTaskFailedReason + if (!t.isInstanceOf[FetchFailedException]) { + // there was a fetch failure in the task, but some user code wrapped that exception + // and threw something else. Regardless, we treat it as a fetch failure. + val fetchFailedCls = classOf[FetchFailedException].getName + logWarning(s"TID ${taskId} encountered a ${fetchFailedCls} and " + + s"failed, but the ${fetchFailedCls} was hidden by another " + + s"exception. Spark is handling this like a fetch failure and ignoring the " + + s"other exception: $t") + } + setTaskFinishedAndClearInterruptStatus() + execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) + case CausedBy(cDE: CommitDeniedException) => val reason = cDE.toTaskCommitDeniedReason setTaskFinishedAndClearInterruptStatus() diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b9ce80f52549..aa1f8353d382 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1210,7 +1210,7 @@ class DAGScheduler( case _ => updateAccumulators(event) } - case _: ExceptionFailure => updateAccumulators(event) + case _: ExceptionFailure | _: TaskKilled => updateAccumulators(event) case _ => } postTaskEnd(event) @@ -1414,13 +1414,11 @@ class DAGScheduler( case commitDenied: TaskCommitDenied => // Do nothing here, left up to the TaskScheduler to decide how to handle denied commits - case _: ExceptionFailure => - // Tasks killed or failed with exceptions might still have accumulator updates. - updateAccumulators(event) + case exceptionFailure: ExceptionFailure => + // Nothing left to do, already handled above for accumulator updates. case _: TaskKilled => - // Tasks killed or failed with exceptions might still have accumulator updates. - updateAccumulators(event) + // Nothing left to do, already handled above for accumulator updates. case TaskResultLost => // Do nothing here; the TaskScheduler handles these failures and resubmits the task. diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 072a2841d6fa..50c6461373de 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -919,7 +919,7 @@ private[spark] object JsonProtocol { case `taskKilled` => val killReason = jsonOption(json \ "Kill Reason") .map(_.extract[String]).getOrElse("unknown reason") - val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates") + val accumUpdates = jsonOption(json \ "Accumulator Updates") .map(_.extract[List[JValue]].map(accumulableInfoFromJson)) .getOrElse(Seq[AccumulableInfo]()) TaskKilled(killReason, accumUpdates) From 88b1cebd76d7414d4bbdb99e03fe10f74e25029a Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Thu, 26 Apr 2018 17:22:20 +0800 Subject: [PATCH 05/11] Make accums in TaskKilled immutable and extract common logic in TaskRunner to reduce duplicate code --- .../org/apache/spark/TaskEndReason.scala | 6 +- .../org/apache/spark/executor/Executor.scala | 71 +++++++++---------- 2 files changed, 33 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 8e1327e01ada..db95caa5eedc 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -215,16 +215,12 @@ case object TaskResultLost extends TaskFailedReason { case class TaskKilled( reason: String, accumUpdates: Seq[AccumulableInfo] = Seq.empty, - private[spark] var accums: Seq[AccumulatorV2[_, _]] = Nil) + private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil) extends TaskFailedReason { override def toErrorString: String = "TaskKilled ($reason)" override def countTowardsTaskFailures: Boolean = false - private[spark] def withAccums(accums: Seq[AccumulatorV2[_, _]]): TaskKilled = { - this.accums = accums - this - } } /** diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index bccbf5755796..a9d300c9c67a 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -287,6 +287,33 @@ private[spark] class Executor( notifyAll() } + /** + * Set executor runtime and JVM gc time if task instance is still valid + */ + private def reportGCAndExecutorTimeIfPossible(taskStart: Long): Unit = { + if (task != null) { + task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) + task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) + } + } + + /** + * Utility function to: + * 1. Report executor runtime and JVM gc time if possible + * 2. Collect accumulator updates + * 3. Set the finished flag to true and clear current thread's interrupt status + */ + private def collectAccumulatorsAndResetStatusOnFailure(taskStart: Long) = { + reportGCAndExecutorTimeIfPossible(taskStart) + // Collect latest accumulator values to report back to the driver + val accums: Seq[AccumulatorV2[_, _]] = + Option(task).map(_.collectAccumulatorUpdates(taskFailed = true)).getOrElse(Seq.empty) + val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None)) + + setTaskFinishedAndClearInterruptStatus() + (accums, accUpdates) + } + override def run(): Unit = { threadId = Thread.currentThread.getId Thread.currentThread.setName(threadName) @@ -483,20 +510,8 @@ private[spark] class Executor( case t: TaskKilledException => logInfo(s"Executor killed $taskName (TID $taskId), reason: ${t.reason}") - // Collect latest accumulator values to report back to the driver - val accums: Seq[AccumulatorV2[_, _]] = - if (task != null) { - task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) - task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) - task.collectAccumulatorUpdates(taskFailed = true) - } else { - Seq.empty - } - val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None)) - - setTaskFinishedAndClearInterruptStatus() - - val serializedTK = ser.serialize(TaskKilled(t.reason, accUpdates).withAccums(accums)) + val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStart) + val serializedTK = ser.serialize(TaskKilled(t.reason, accUpdates, accums)) execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK) case _: InterruptedException | NonFatal(_) if @@ -504,20 +519,8 @@ private[spark] class Executor( val killReason = task.reasonIfKilled.getOrElse("unknown reason") logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason") - // Collect latest accumulator values to report back to the driver - val accums: Seq[AccumulatorV2[_, _]] = - if (task != null) { - task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) - task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) - task.collectAccumulatorUpdates(taskFailed = true) - } else { - Seq.empty - } - val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None)) - - setTaskFinishedAndClearInterruptStatus() - - val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates).withAccums(accums)) + val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStart) + val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates, accums)) execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK) case t: Throwable if hasFetchFailure && !Utils.isFatalError(t) => @@ -551,17 +554,7 @@ private[spark] class Executor( // the task failure would not be ignored if the shutdown happened because of premption, // instead of an app issue). if (!ShutdownHookManager.inShutdown()) { - // Collect latest accumulator values to report back to the driver - val accums: Seq[AccumulatorV2[_, _]] = - if (task != null) { - task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) - task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) - task.collectAccumulatorUpdates(taskFailed = true) - } else { - Seq.empty - } - - val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None)) + val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStart) val serializedTaskEndReason = { try { From ea16b1c65122057e3a0f8c7b9abd06658e8bc1e7 Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Thu, 26 Apr 2018 23:06:47 +0800 Subject: [PATCH 06/11] Inline reportGcAndExecutorTimeIfPossible --- .../org/apache/spark/executor/Executor.scala | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index a9d300c9c67a..fc3de623ebff 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -287,16 +287,6 @@ private[spark] class Executor( notifyAll() } - /** - * Set executor runtime and JVM gc time if task instance is still valid - */ - private def reportGCAndExecutorTimeIfPossible(taskStart: Long): Unit = { - if (task != null) { - task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) - task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) - } - } - /** * Utility function to: * 1. Report executor runtime and JVM gc time if possible @@ -304,7 +294,12 @@ private[spark] class Executor( * 3. Set the finished flag to true and clear current thread's interrupt status */ private def collectAccumulatorsAndResetStatusOnFailure(taskStart: Long) = { - reportGCAndExecutorTimeIfPossible(taskStart) + // Report executor runtime and JVM gc time + Option(task).foreach(t => { + t.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) + t.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) + }) + // Collect latest accumulator values to report back to the driver val accums: Seq[AccumulatorV2[_, _]] = Option(task).map(_.collectAccumulatorUpdates(taskFailed = true)).getOrElse(Seq.empty) From 5de0bcc66686161e49871bdba7f9753f5d7438f1 Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Thu, 26 Apr 2018 23:56:45 +0800 Subject: [PATCH 07/11] Minor fixes to address comments --- core/src/main/scala/org/apache/spark/TaskEndReason.scala | 2 +- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index db95caa5eedc..33901bc8380e 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -218,7 +218,7 @@ case class TaskKilled( private[spark] val accums: Seq[AccumulatorV2[_, _]] = Nil) extends TaskFailedReason { - override def toErrorString: String = "TaskKilled ($reason)" + override def toErrorString: String = s"TaskKilled ($reason)" override def countTowardsTaskFailures: Boolean = false } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index aa1f8353d382..c0da30a5dca1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1414,10 +1414,7 @@ class DAGScheduler( case commitDenied: TaskCommitDenied => // Do nothing here, left up to the TaskScheduler to decide how to handle denied commits - case exceptionFailure: ExceptionFailure => - // Nothing left to do, already handled above for accumulator updates. - - case _: TaskKilled => + case exceptionFailure: ExceptionFailure | taskKilled: TaskKilled => // Nothing left to do, already handled above for accumulator updates. case TaskResultLost => From 05d1d9cad761bb09e1131162458fecd5e34f02d2 Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Fri, 27 Apr 2018 11:19:07 +0800 Subject: [PATCH 08/11] Add document for semantic change of accumulator. --- docs/rdd-programming-guide.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/rdd-programming-guide.md b/docs/rdd-programming-guide.md index b6424090d2fe..5685b1738735 100644 --- a/docs/rdd-programming-guide.md +++ b/docs/rdd-programming-guide.md @@ -1548,6 +1548,9 @@ data.map(g) +In new version of Spark(> 2.3), the semantic of Accumulator has been changed a bit: it now includes updates from +killed task if available for internal metrics. + # Deploying to a Cluster The [application submission guide](submitting-applications.html) describes how to submit applications to a cluster. From 945c1d5945e2cf836e36f4c670b5b80dc83b1c76 Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Mon, 14 May 2018 22:42:41 +0800 Subject: [PATCH 09/11] Revert document and fix compile error --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- docs/rdd-programming-guide.md | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c0da30a5dca1..06b92fcdcc44 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1414,7 +1414,7 @@ class DAGScheduler( case commitDenied: TaskCommitDenied => // Do nothing here, left up to the TaskScheduler to decide how to handle denied commits - case exceptionFailure: ExceptionFailure | taskKilled: TaskKilled => + case _: ExceptionFailure | _: TaskKilled => // Nothing left to do, already handled above for accumulator updates. case TaskResultLost => diff --git a/docs/rdd-programming-guide.md b/docs/rdd-programming-guide.md index 5685b1738735..b6424090d2fe 100644 --- a/docs/rdd-programming-guide.md +++ b/docs/rdd-programming-guide.md @@ -1548,9 +1548,6 @@ data.map(g) -In new version of Spark(> 2.3), the semantic of Accumulator has been changed a bit: it now includes updates from -killed task if available for internal metrics. - # Deploying to a Cluster The [application submission guide](submitting-applications.html) describes how to submit applications to a cluster. From 59c2807dca5cc1c9332a1b3e7fcc73214df436ec Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Tue, 15 May 2018 21:11:49 +0800 Subject: [PATCH 10/11] Update MimaExcludes to ignore TaskKilled incompatible change. Rename taskStart -> taskStartTime in executor --- .../org/apache/spark/executor/Executor.scala | 18 +++++++++--------- project/MimaExcludes.scala | 5 +++++ 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index fc3de623ebff..b1856ff0f324 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -293,10 +293,10 @@ private[spark] class Executor( * 2. Collect accumulator updates * 3. Set the finished flag to true and clear current thread's interrupt status */ - private def collectAccumulatorsAndResetStatusOnFailure(taskStart: Long) = { + private def collectAccumulatorsAndResetStatusOnFailure(taskStartTime: Long) = { // Report executor runtime and JVM gc time Option(task).foreach(t => { - t.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) + t.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStartTime) t.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) }) @@ -322,7 +322,7 @@ private[spark] class Executor( val ser = env.closureSerializer.newInstance() logInfo(s"Running $taskName (TID $taskId)") execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) - var taskStart: Long = 0 + var taskStartTime: Long = 0 var taskStartCpu: Long = 0 startGCTime = computeTotalGcTime() @@ -358,7 +358,7 @@ private[spark] class Executor( } // Run the actual task and measure its runtime. - taskStart = System.currentTimeMillis() + taskStartTime = System.currentTimeMillis() taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime } else 0L @@ -418,11 +418,11 @@ private[spark] class Executor( // Deserialization happens in two parts: first, we deserialize a Task object, which // includes the Partition. Second, Task.run() deserializes the RDD and function to be run. task.metrics.setExecutorDeserializeTime( - (taskStart - deserializeStartTime) + task.executorDeserializeTime) + (taskStartTime - deserializeStartTime) + task.executorDeserializeTime) task.metrics.setExecutorDeserializeCpuTime( (taskStartCpu - deserializeStartCpuTime) + task.executorDeserializeCpuTime) // We need to subtract Task.run()'s deserialization time to avoid double-counting - task.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime) + task.metrics.setExecutorRunTime((taskFinish - taskStartTime) - task.executorDeserializeTime) task.metrics.setExecutorCpuTime( (taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime) task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) @@ -505,7 +505,7 @@ private[spark] class Executor( case t: TaskKilledException => logInfo(s"Executor killed $taskName (TID $taskId), reason: ${t.reason}") - val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStart) + val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTime) val serializedTK = ser.serialize(TaskKilled(t.reason, accUpdates, accums)) execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK) @@ -514,7 +514,7 @@ private[spark] class Executor( val killReason = task.reasonIfKilled.getOrElse("unknown reason") logInfo(s"Executor interrupted and killed $taskName (TID $taskId), reason: $killReason") - val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStart) + val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTime) val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates, accums)) execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK) @@ -549,7 +549,7 @@ private[spark] class Executor( // the task failure would not be ignored if the shutdown happened because of premption, // instead of an app issue). if (!ShutdownHookManager.inShutdown()) { - val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStart) + val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTime) val serializedTaskEndReason = { try { diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 7d0e88ee20c3..f4451a777668 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,6 +36,11 @@ object MimaExcludes { // Exclude rules for 2.4.x lazy val v24excludes = v23excludes ++ Seq( + // [SPARK-20087][CORE] Attach accumulators / metrics to 'TaskKilled' end reason + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.TaskKilled.apply"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.TaskKilled.copy"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.TaskKilled.this"), + // [SPARK-22941][core] Do not exit JVM when submit fails with in-process launcher. ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkSubmit.printWarning"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkSubmit.parseSparkConfProperty"), From 74911b7a8d7714618ab060b3227e33505b0c5d05 Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Mon, 21 May 2018 16:55:24 +0800 Subject: [PATCH 11/11] Avoid copy call when constructing TaskKilled. --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 99df0d42813e..2987170bf502 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1883,9 +1883,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val accumUpdates2 = Seq(accUpdate3) val accumInfo2 = accumUpdates2.map(AccumulatorSuite.makeInfo) - val taskKilled = new TaskKilled( - "test", - accumInfo2).copy(accums = accumUpdates2) + val taskKilled = new TaskKilled( "test", accumInfo2, accums = accumUpdates2) runEvent(makeCompletionEvent(taskSets.head.tasks.head, taskKilled, "result")) assert(AccumulatorContext.get(acc3.id).get.value === 18L)