Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,13 @@ class TaskInfo(

var killed = false

private[spark] def markGettingResult(time: Long = System.currentTimeMillis) {
private[spark] def markGettingResult(time: Long) {
gettingResultTime = time
}

private[spark] def markFinished(state: TaskState, time: Long = System.currentTimeMillis) {
private[spark] def markFinished(state: TaskState, time: Long) {
// finishTime should be set larger than 0, otherwise "finished" below will return false.
assert(time > 0)
finishTime = time
if (state == TaskState.FAILED) {
failed = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ private[spark] class TaskSetManager(
*/
def handleTaskGettingResult(tid: Long): Unit = {
val info = taskInfos(tid)
info.markGettingResult()
info.markGettingResult(clock.getTimeMillis())
sched.dagScheduler.taskGettingResult(info)
}

Expand Down Expand Up @@ -695,7 +695,7 @@ private[spark] class TaskSetManager(
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
val info = taskInfos(tid)
val index = info.index
info.markFinished(TaskState.FINISHED)
info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
removeRunningTask(tid)
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
Expand Down Expand Up @@ -739,7 +739,7 @@ private[spark] class TaskSetManager(
return
}
removeRunningTask(tid)
info.markFinished(state)
info.markFinished(state, clock.getTimeMillis())
val index = info.index
copiesRunning(index) -= 1
var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption.isDefined)

clock.advance(1)
// Tell it the task has finished
manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdates))
assert(sched.endedTasks(0) === Success)
Expand Down Expand Up @@ -376,6 +377,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
clock.advance(1)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)

assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
Expand All @@ -393,6 +395,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
clock.advance(1)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)

// Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
Expand Down Expand Up @@ -426,6 +429,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// affinity to exec1 on host1 - which we will fail.
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1")))
val clock = new ManualClock
clock.advance(1)
// We don't directly use the application blacklist, but its presence triggers blacklisting
// within the taskset.
val mockListenerBus = mock(classOf[LiveListenerBus])
Expand Down Expand Up @@ -550,7 +554,9 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(TaskLocation("host1", "execB")),
Seq(TaskLocation("host2", "execC")),
Seq())
val manager = new TaskSetManager(sched, taskSet, 1, clock = new ManualClock)
val clock = new ManualClock()
clock.advance(1)
val manager = new TaskSetManager(sched, taskSet, 1, clock = clock)
sched.addExecutor("execA", "host1")
manager.executorAdded()
sched.addExecutor("execC", "host2")
Expand Down Expand Up @@ -842,6 +848,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(task.executorId === k)
}
assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
clock.advance(1)
// Complete the 3 tasks and leave 1 task in running
for (id <- Set(0, 1, 2)) {
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
Expand Down Expand Up @@ -899,6 +906,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
tasks += task
}
assert(sched.startedTasks.toSet === (0 until 5).toSet)
clock.advance(1)
// Complete 3 tasks and leave 2 tasks in running
for (id <- Set(0, 1, 2)) {
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY, false)
jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
taskInfo.markFinished(TaskState.FINISHED)
taskInfo.markFinished(TaskState.FINISHED, System.currentTimeMillis())
val taskMetrics = TaskMetrics.empty
taskMetrics.incPeakExecutionMemory(peakExecutionMemory)
jobListener.onTaskEnd(
Expand Down