diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index 59680139e7af..9843eab4f134 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -70,11 +70,13 @@ class TaskInfo( var killed = false - private[spark] def markGettingResult(time: Long = System.currentTimeMillis) { + private[spark] def markGettingResult(time: Long) { gettingResultTime = time } - private[spark] def markFinished(state: TaskState, time: Long = System.currentTimeMillis) { + private[spark] def markFinished(state: TaskState, time: Long) { + // finishTime should be set larger than 0, otherwise "finished" below will return false. + assert(time > 0) finishTime = time if (state == TaskState.FAILED) { failed = true diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 3b25513bea05..2762144095d6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -667,7 +667,7 @@ private[spark] class TaskSetManager( */ def handleTaskGettingResult(tid: Long): Unit = { val info = taskInfos(tid) - info.markGettingResult() + info.markGettingResult(clock.getTimeMillis()) sched.dagScheduler.taskGettingResult(info) } @@ -695,7 +695,7 @@ private[spark] class TaskSetManager( def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { val info = taskInfos(tid) val index = info.index - info.markFinished(TaskState.FINISHED) + info.markFinished(TaskState.FINISHED, clock.getTimeMillis()) removeRunningTask(tid) // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not @@ -739,7 +739,7 @@ private[spark] class TaskSetManager( return } removeRunningTask(tid) - info.markFinished(state) + info.markFinished(state, clock.getTimeMillis()) val index = info.index copiesRunning(index) -= 1 var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index d03a0c990a02..2773db4f2840 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -191,6 +191,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF) assert(taskOption.isDefined) + clock.advance(1) // Tell it the task has finished manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdates)) assert(sched.endedTasks(0) === Success) @@ -376,6 +377,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(1) val clock = new ManualClock + clock.advance(1) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) @@ -393,6 +395,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(1) val clock = new ManualClock + clock.advance(1) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted @@ -426,6 +429,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // affinity to exec1 on host1 - which we will fail. val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1"))) val clock = new ManualClock + clock.advance(1) // We don't directly use the application blacklist, but its presence triggers blacklisting // within the taskset. val mockListenerBus = mock(classOf[LiveListenerBus]) @@ -550,7 +554,9 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg Seq(TaskLocation("host1", "execB")), Seq(TaskLocation("host2", "execC")), Seq()) - val manager = new TaskSetManager(sched, taskSet, 1, clock = new ManualClock) + val clock = new ManualClock() + clock.advance(1) + val manager = new TaskSetManager(sched, taskSet, 1, clock = clock) sched.addExecutor("execA", "host1") manager.executorAdded() sched.addExecutor("execC", "host2") @@ -842,6 +848,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(task.executorId === k) } assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) + clock.advance(1) // Complete the 3 tasks and leave 1 task in running for (id <- Set(0, 1, 2)) { manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) @@ -899,6 +906,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg tasks += task } assert(sched.startedTasks.toSet === (0 until 5).toSet) + clock.advance(1) // Complete 3 tasks and leave 2 tasks in running for (id <- Set(0, 1, 2)) { manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala index 11482d187aec..38030e066080 100644 --- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala @@ -77,7 +77,7 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext { val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY, false) jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo)) jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo)) - taskInfo.markFinished(TaskState.FINISHED) + taskInfo.markFinished(TaskState.FINISHED, System.currentTimeMillis()) val taskMetrics = TaskMetrics.empty taskMetrics.incPeakExecutionMemory(peakExecutionMemory) jobListener.onTaskEnd(