Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.executor

import org.apache.spark.{TaskCommitDenied, TaskEndReason}
import org.apache.spark.{TaskCommitDenied, TaskFailedReason}

/**
* Exception thrown when a task attempts to commit output to HDFS but is denied by the driver.
Expand All @@ -29,5 +29,5 @@ private[spark] class CommitDeniedException(
attemptNumber: Int)
extends Exception(msg) {

def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptNumber)
def toTaskFailedReason: TaskFailedReason = TaskCommitDenied(jobID, splitID, attemptNumber)
}
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ private[spark] class Executor(

} catch {
case ffe: FetchFailedException =>
val reason = ffe.toTaskEndReason
val reason = ffe.toTaskFailedReason
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

Expand All @@ -370,7 +370,7 @@ private[spark] class Executor(
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))

case CausedBy(cDE: CommitDeniedException) =>
val reason = cDE.toTaskEndReason
val reason = cDE.toTaskFailedReason
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,14 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul

def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState,
serializedData: ByteBuffer) {
var reason : TaskEndReason = UnknownReason
var reason : TaskFailedReason = UnknownReason
try {
getTaskResultExecutor.execute(new Runnable {
override def run(): Unit = Utils.logUncaughtExceptions {
val loader = Utils.getContextOrSparkClassLoader
try {
if (serializedData != null && serializedData.limit() > 0) {
reason = serializer.get().deserialize[TaskEndReason](
reason = serializer.get().deserialize[TaskFailedReason](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this totally safe? Can you point me to the code where we serialize this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. its sent as an executor status update in various failure handling scenarios here, though its not so nicely grouped that its super-obvious they always go together, unfortunately:

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L357

I walked through the various cases and convinced myself they went together. I made one more minor improvement to make that more clear, a couple of .toTaskEndReason methods could be renamed to .toTaskFailedReason (d991a3c).

Really, though there were two other things that convinced me this was OK:

  1. the only other possible thing a TaskEndReason could be is Success. But you'll notice that Success is never serialized as the msg -- its just implicit with TaskState.FINISHED, and then the Success part just gets dropped in on the driver-side here:

    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)

  2. The TaskEndReason was already getting blind-casted before anyway:

    reason.asInstanceOf[TaskFailedReason].toErrorString

    This was already in the same thread, without any try/ catch etc. checks anyway. Well, except for this:

which I decided wasn't a concern mostly b/c of reason (1) again.

There's more cleanup we could do here:

  • TaskState.isFailed is unused, instead there is a different hard-coded check here:
    } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
  • We could more directly fix the weak coupling between TaskState and TaskEndReason by replacing ExecutorBackend.statusUpdate with more particular methods, so that the coupling is more explicit.

and probably other related things too. If you have a strong preference, I could address those here, but figured it was best to just get in this small cleanup I felt confident in for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great. It's always better to be more explicit

serializedData, loader)
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ private[spark] class TaskSchedulerImpl(
taskSetManager: TaskSetManager,
tid: Long,
taskState: TaskState,
reason: TaskEndReason): Unit = synchronized {
reason: TaskFailedReason): Unit = synchronized {
taskSetManager.handleFailedTask(tid, taskState, reason)
if (!taskSetManager.isZombie && taskState != TaskState.KILLED) {
// Need to revive offers again now that the task set manager state has been updated to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ private[spark] class TaskSetManager(
* Marks the task as failed, re-adds it to the list of pending tasks, and notifies the
* DAG Scheduler.
*/
def handleFailedTask(tid: Long, state: TaskState, reason: TaskEndReason) {
def handleFailedTask(tid: Long, state: TaskState, reason: TaskFailedReason) {
val info = taskInfos(tid)
if (info.failed || info.killed) {
return
Expand All @@ -707,7 +707,7 @@ private[spark] class TaskSetManager(
copiesRunning(index) -= 1
var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty
val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}): " +
reason.asInstanceOf[TaskFailedReason].toErrorString
reason.toErrorString
val failureException: Option[Throwable] = reason match {
case fetchFailed: FetchFailed =>
logWarning(failureReason)
Expand Down Expand Up @@ -765,10 +765,6 @@ private[spark] class TaskSetManager(
case e: TaskFailedReason => // TaskResultLost, TaskKilled, and others
logWarning(failureReason)
None

case e: TaskEndReason =>
logError("Unknown TaskEndReason: " + e)
None
}
// always add to failed executors
failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
Expand All @@ -784,9 +780,7 @@ private[spark] class TaskSetManager(
addPendingTask(index)
}

if (!isZombie && state != TaskState.KILLED
&& reason.isInstanceOf[TaskFailedReason]
&& reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) {
if (!isZombie && state != TaskState.KILLED && reason.countTowardsTaskFailures) {
assert (null != failureReason)
numFailures(index) += 1
if (numFailures(index) >= maxTaskFailures) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.shuffle

import org.apache.spark.{FetchFailed, TaskEndReason}
import org.apache.spark.{FetchFailed, TaskFailedReason}
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -45,7 +45,7 @@ private[spark] class FetchFailedException(
this(bmAddress, shuffleId, mapId, reduceId, cause.getMessage, cause)
}

def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId,
def toTaskFailedReason: TaskFailedReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId,
Utils.exceptionString(this))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class JsonProtocolSuite extends SparkFunSuite {
val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19,
"Some exception")
val fetchMetadataFailed = new MetadataFetchFailedException(17,
19, "metadata Fetch failed exception").toTaskEndReason
19, "metadata Fetch failed exception").toTaskFailedReason
val exceptionFailure = new ExceptionFailure(exception, Seq.empty[AccumulableInfo])
testTaskEndReason(Success)
testTaskEndReason(Resubmitted)
Expand Down