Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

package org.apache.spark

import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
import java.io.{ObjectInputStream, ObjectOutputStream}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils

// ==============================================================================================
// NOTE: new task end reasons MUST be accompanied with serialization logic in util.JsonProtocol!
// ==============================================================================================

/**
* :: DeveloperApi ::
* Various possible reasons why a task ended. The low-level TaskScheduler is supposed to retry
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,10 @@ private[spark] object JsonProtocol {
("Stack Trace" -> stackTrace) ~
("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~
("Metrics" -> metrics)
case taskCommitDenied: TaskCommitDenied =>
("Job ID" -> taskCommitDenied.jobID) ~
("Partition ID" -> taskCommitDenied.partitionID) ~
("Attempt Number" -> taskCommitDenied.attemptNumber)
case ExecutorLostFailure(executorId, isNormalExit) =>
("Executor ID" -> executorId) ~
("Normal Exit" -> isNormalExit)
Expand Down Expand Up @@ -770,6 +774,7 @@ private[spark] object JsonProtocol {
val exceptionFailure = Utils.getFormattedClassName(ExceptionFailure)
val taskResultLost = Utils.getFormattedClassName(TaskResultLost)
val taskKilled = Utils.getFormattedClassName(TaskKilled)
val taskCommitDenied = Utils.getFormattedClassName(TaskCommitDenied)
val executorLostFailure = Utils.getFormattedClassName(ExecutorLostFailure)
val unknownReason = Utils.getFormattedClassName(UnknownReason)

Expand All @@ -794,6 +799,14 @@ private[spark] object JsonProtocol {
ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics, None)
case `taskResultLost` => TaskResultLost
case `taskKilled` => TaskKilled
case `taskCommitDenied` =>
// Unfortunately, the `TaskCommitDenied` message was introduced in 1.3.0 but the JSON
// de/serialization logic was not added until 1.5.1. To provide backward compatibility
// for reading those logs, we need to provide default values for all the fields.
val jobId = Utils.jsonOption(json \ "Job ID").map(_.extract[Int]).getOrElse(-1)
val partitionId = Utils.jsonOption(json \ "Partition ID").map(_.extract[Int]).getOrElse(-1)
val attemptNo = Utils.jsonOption(json \ "Attempt Number").map(_.extract[Int]).getOrElse(-1)
TaskCommitDenied(jobId, partitionId, attemptNo)
case `executorLostFailure` =>
val isNormalExit = Utils.jsonOption(json \ "Normal Exit").
map(_.extract[Boolean])
Expand Down
17 changes: 17 additions & 0 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ class JsonProtocolSuite extends SparkFunSuite {
testTaskEndReason(exceptionFailure)
testTaskEndReason(TaskResultLost)
testTaskEndReason(TaskKilled)
testTaskEndReason(TaskCommitDenied(2, 3, 4))
testTaskEndReason(ExecutorLostFailure("100", true))
testTaskEndReason(UnknownReason)

Expand Down Expand Up @@ -352,6 +353,17 @@ class JsonProtocolSuite extends SparkFunSuite {
assertEquals(expectedStageInfo, JsonProtocol.stageInfoFromJson(oldStageInfo))
}

// `TaskCommitDenied` was added in 1.3.0 but JSON de/serialization logic was added in 1.5.1
test("TaskCommitDenied backward compatibility") {
val denied = TaskCommitDenied(1, 2, 3)
val oldDenied = JsonProtocol.taskEndReasonToJson(denied)
.removeField({ _._1 == "Job ID" })
.removeField({ _._1 == "Partition ID" })
.removeField({ _._1 == "Attempt Number" })
val expectedDenied = TaskCommitDenied(-1, -1, -1)
assertEquals(expectedDenied, JsonProtocol.taskEndReasonFromJson(oldDenied))
}

/** -------------------------- *
| Helper test running methods |
* --------------------------- */
Expand Down Expand Up @@ -577,6 +589,11 @@ class JsonProtocolSuite extends SparkFunSuite {
assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals)
case (TaskResultLost, TaskResultLost) =>
case (TaskKilled, TaskKilled) =>
case (TaskCommitDenied(jobId1, partitionId1, attemptNumber1),
TaskCommitDenied(jobId2, partitionId2, attemptNumber2)) =>
assert(jobId1 === jobId2)
assert(partitionId1 === partitionId2)
assert(attemptNumber1 === attemptNumber2)
case (ExecutorLostFailure(execId1, isNormalExit1),
ExecutorLostFailure(execId2, isNormalExit2)) =>
assert(execId1 === execId2)
Expand Down