diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 625596885faa1..5743648f48ef6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -271,7 +271,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { ) { jobData.numActiveStages -= 1 if (stage.failureReason.isEmpty) { - jobData.completedStageIndices.add(stage.stageId) + if (!stage.submissionTime.isEmpty) { + jobData.completedStageIndices.add(stage.stageId) + } } else { jobData.numFailedStages += 1 } @@ -304,6 +306,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { jobData <- jobIdToData.get(jobId) ) { jobData.numActiveStages += 1 + + // If a stage retries again, it should be removed from completedStageIndices set + jobData.completedStageIndices.remove(stage.stageId) } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 711a3697bda15..a9bd39db7170e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -22,6 +22,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo} import org.apache.spark.util.collection.OpenHashSet +import scala.collection.mutable import scala.collection.mutable.HashMap private[jobs] object UIData { @@ -63,7 +64,7 @@ private[jobs] object UIData { /* Stages */ var numActiveStages: Int = 0, // This needs to be a set instead of a simple count to prevent double-counting of rerun stages: - var completedStageIndices: OpenHashSet[Int] = new OpenHashSet[Int](), + var completedStageIndices: mutable.HashSet[Int] = new mutable.HashSet[Int](), var numSkippedStages: Int = 0, var numFailedStages: Int = 0 )