diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ede3c7d9f01ae..df166c87f8cc2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -753,6 +753,8 @@ class DAGScheduler( null } + stageToInfos(stage) = StageInfo.fromStage(stage) + // must be run listener before possible NotSerializableException // should be "StageSubmitted" first and then "JobEnded" listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 480891550eb60..69345aa9114bf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -27,6 +27,7 @@ import org.apache.spark.storage.RDDInfo @DeveloperApi class StageInfo( val stageId: Int, + val attemptId: Int, val name: String, val numTasks: Int, val rddInfos: Seq[RDDInfo], @@ -55,6 +56,6 @@ private[spark] object StageInfo { def fromStage(stage: Stage): StageInfo = { val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd) val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos - new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos, stage.details) + new StageInfo(stage.id, stage.attemptId, stage.name, stage.numTasks, rddInfos, stage.details) } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index 5f45c0ced5ec5..69ed6a690b92f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -36,7 +36,7 @@ private[ui] class StageTableBase( protected def isFairScheduler = parent.isFairScheduler protected def columns: Seq[Node] = { - Stage Id ++ + ID ++ {if (isFairScheduler) {Pool Name} else Seq.empty} ++ Description Submitted @@ -141,7 +141,10 @@ private[ui] class StageTableBase( val shuffleWrite = stageData.shuffleWriteBytes val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else "" - {s.stageId} ++ + {s.stageId}{if (s.attemptId > 0) { + " (Attempt %d)".format(s.attemptId + 1) + }} + ++ {if (isFairScheduler) { stageInfo.stageId) ~ + ("Attempt ID" -> stageInfo.attemptId) ~ ("Stage Name" -> stageInfo.name) ~ ("Number of Tasks" -> stageInfo.numTasks) ~ ("RDD Info" -> rddInfo) ~ @@ -478,6 +479,7 @@ private[spark] object JsonProtocol { def stageInfoFromJson(json: JValue): StageInfo = { val stageId = (json \ "Stage ID").extract[Int] + val attemptId = (json \ "Attempt ID").extractOpt[Int].getOrElse(0) val stageName = (json \ "Stage Name").extract[String] val numTasks = (json \ "Number of Tasks").extract[Int] val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson) @@ -486,7 +488,7 @@ private[spark] object JsonProtocol { val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long]) val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String]) - val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos, details) + val stageInfo = new StageInfo(stageId, attemptId, stageName, numTasks, rddInfos, details) stageInfo.submissionTime = submissionTime stageInfo.completionTime = completionTime stageInfo.failureReason = failureReason diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index a8556624804bb..0ee422e5f81d4 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -31,27 +31,27 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc conf.set("spark.ui.retainedStages", 5.toString) val listener = new JobProgressListener(conf) - def createStageStartEvent(stageId: Int) = { - val stageInfo = new StageInfo(stageId, stageId.toString, 0, null, "") + def createStageStartEvent(stageId: Int, stageAttemptId: Int) = { + val stageInfo = new StageInfo(stageId, stageAttemptId, stageId.toString, 0, null, "") SparkListenerStageSubmitted(stageInfo) } - def createStageEndEvent(stageId: Int) = { - val stageInfo = new StageInfo(stageId, stageId.toString, 0, null, "") + def createStageEndEvent(stageId: Int, stageAttemptId: Int) = { + val stageInfo = new StageInfo(stageId, stageAttemptId, stageId.toString, 0, null, "") SparkListenerStageCompleted(stageInfo) } for (i <- 1 to 50) { - listener.onStageSubmitted(createStageStartEvent(i)) - listener.onStageCompleted(createStageEndEvent(i)) + listener.onStageSubmitted(createStageStartEvent(i, 50-i)) + listener.onStageCompleted(createStageEndEvent(i, 50-i)) } listener.completedStages.size should be (5) - listener.completedStages.count(_.stageId == 50) should be (1) - listener.completedStages.count(_.stageId == 49) should be (1) - listener.completedStages.count(_.stageId == 48) should be (1) - listener.completedStages.count(_.stageId == 47) should be (1) - listener.completedStages.count(_.stageId == 46) should be (1) + listener.completedStages.filter(_.stageId == 50).filter(_.attemptId == 0).size should be (1) + listener.completedStages.filter(_.stageId == 49).filter(_.attemptId == 1).size should be (1) + listener.completedStages.filter(_.stageId == 48).filter(_.attemptId == 2).size should be (1) + listener.completedStages.filter(_.stageId == 47).filter(_.attemptId == 3).size should be (1) + listener.completedStages.filter(_.stageId == 46).filter(_.attemptId == 4).size should be (1) } test("test executor id to summary") { diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 058d31453081a..44d7173f29d7f 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -33,8 +33,8 @@ class JsonProtocolSuite extends FunSuite { test("SparkListenerEvent") { val stageSubmitted = - SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties) - val stageCompleted = SparkListenerStageCompleted(makeStageInfo(101, 201, 301, 401L, 501L)) + SparkListenerStageSubmitted(makeStageInfo(100, 150, 200, 300, 400L, 500L), properties) + val stageCompleted = SparkListenerStageCompleted(makeStageInfo(101, 151, 201, 301, 401L, 501L)) val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 1, 444L, false)) val taskGettingResult = SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true)) @@ -78,7 +78,7 @@ class JsonProtocolSuite extends FunSuite { test("Dependent Classes") { testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L)) - testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L)) + testStageInfo(makeStageInfo(10, 15, 20, 30, 40L, 50L)) testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false)) testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false)) testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000)) @@ -125,7 +125,7 @@ class JsonProtocolSuite extends FunSuite { test("StageInfo.details backward compatibility") { // StageInfo.details was added after 1.0.0. - val info = makeStageInfo(1, 2, 3, 4L, 5L) + val info = makeStageInfo(1, 11, 2, 3, 4L, 5L) assert(info.details.nonEmpty) val newJson = JsonProtocol.stageInfoToJson(info) val oldJson = newJson.removeField { case (field, _) => field == "Details" } @@ -134,6 +134,17 @@ class JsonProtocolSuite extends FunSuite { assert("" === newInfo.details) } + test("StageInfo.attemptId backward compatibility") { + // StageInfo.attemptId was added after 1.1.0. + val info = makeStageInfo(1, 11, 2, 3, 4L, 5L) + assert(info.details.nonEmpty) + val newJson = JsonProtocol.stageInfoToJson(info) + val oldJson = newJson.removeField { case (field, _) => field == "Attempt ID" } + val newInfo = JsonProtocol.stageInfoFromJson(oldJson) + assert(info.name === newInfo.name) + assert(0 === newInfo.attemptId) + } + test("InputMetrics backward compatibility") { // InputMetrics were added after 1.0.1. val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true) @@ -253,6 +264,7 @@ class JsonProtocolSuite extends FunSuite { private def assertEquals(info1: StageInfo, info2: StageInfo) { assert(info1.stageId === info2.stageId) + assert(info1.attemptId === info2.attemptId) assert(info1.name === info2.name) assert(info1.numTasks === info2.numTasks) assert(info1.submissionTime === info2.submissionTime) @@ -475,9 +487,9 @@ class JsonProtocolSuite extends FunSuite { r } - private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = { + private def makeStageInfo(a: Int, attemptId: Int, b: Int, c: Int, d: Long, e: Long) = { val rddInfos = (0 until a % 5).map { i => makeRddInfo(a + i, b + i, c + i, d + i, e + i) } - new StageInfo(a, "greetings", b, rddInfos, "details") + new StageInfo(a, attemptId, "greetings", b, rddInfos, "details") } private def makeTaskInfo(a: Long, b: Int, c: Int, d: Long, speculative: Boolean) = { @@ -537,14 +549,14 @@ class JsonProtocolSuite extends FunSuite { private val stageSubmittedJsonString = """ - {"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":100,"Stage Name": + {"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":100,"AttemptID":150,"Stage Name": "greetings","Number of Tasks":200,"RDD Info":[],"Details":"details"},"Properties": {"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}} """ private val stageCompletedJsonString = """ - {"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":101,"Stage Name": + {"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":101,"AttemptID":151,"Stage Name": "greetings","Number of Tasks":201,"RDD Info":[{"RDD ID":101,"Name":"mayor","Storage Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true, "Replication":1},"Number of Partitions":201,"Number of Cached Partitions":301,