@@ -852,97 +852,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
852852 assert(resubmittedTasks === 0 )
853853 }
854854
855- test(" Fetch failed task should not have success completion event" ) {
856- sc = new SparkContext (" local" , " test" )
857- // Set the speculation multiplier to be 0 so speculative tasks are launched immediately
858- sc.conf.set(" spark.speculation.multiplier" , " 0.0" )
859- sc.conf.set(" spark.speculation" , " true" )
860-
861- sched = new FakeTaskScheduler (sc, (" exec1" , " host1" ), (" exec2" , " host2" ))
862- var killTaskCalled = false
863- sched.initialize(new FakeSchedulerBackend () {
864- override def killTask (taskId : Long ,
865- executorId : String ,
866- interruptThread : Boolean ,
867- reason : String ): Unit = {
868- // Check the only one killTask event in this case, which triggered by
869- // task 3.1 fetch failed.
870- assert(taskId === 3 )
871- assert(executorId === " exec2" )
872- assert(interruptThread)
873- assert(reason === " another attempt fetch failed" )
874- killTaskCalled = true
875- }
876- })
877-
878- // Keep track of the original tasks will not has SUCCESS event.
879- var originTaskSuccess = false
880- val dagScheduler = new FakeDAGScheduler (sc, sched) {
881- override def taskEnded (task : Task [_],
882- reason : TaskEndReason ,
883- result : Any ,
884- accumUpdates : Seq [AccumulatorV2 [_, _]],
885- taskInfo : TaskInfo ): Unit = {
886- super .taskEnded(task, reason, result, accumUpdates, taskInfo)
887- reason match {
888- case Success if taskInfo.taskId == 3 &&
889- taskInfo.attemptNumber == 0 && taskInfo.index == 3 =>
890- originTaskSuccess = true
891- case _ =>
892- }
893- }
894- }
895- sched.dagScheduler.stop()
896- sched.setDAGScheduler(dagScheduler)
897-
898- val taskSet = FakeTask .createTaskSet(4 )
899- val clock = new ManualClock ()
900- val manager = new TaskSetManager (sched, taskSet, MAX_TASK_FAILURES , clock = clock)
901- val accumUpdatesByTask : Array [Seq [AccumulatorV2 [_, _]]] = taskSet.tasks.map { task =>
902- task.metrics.internalAccums
903- }
904- // Offer resources for 4 tasks to start
905- for ((k, v) <- List (
906- " exec1" -> " host1" ,
907- " exec1" -> " host1" ,
908- " exec2" -> " host2" ,
909- " exec2" -> " host2" )) {
910- val taskOption = manager.resourceOffer(k, v, NO_PREF )
911- assert(taskOption.isDefined)
912- val task = taskOption.get
913- assert(task.executorId === k)
914- }
915- assert(sched.startedTasks.toSet === Set (0 , 1 , 2 , 3 ))
916- clock.advance(1 )
917- // Complete the 3 tasks and leave 1 task in running
918- for (id <- Set (0 , 1 , 2 )) {
919- manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
920- assert(sched.endedTasks(id) === Success )
921- }
922-
923- // checkSpeculatableTasks checks that the task runtime is greater than the threshold for
924- // speculating. Since we use a threshold of 0 for speculation, tasks need to be running for
925- // > 0ms, so advance the clock by 1ms here.
926- clock.advance(1 )
927- assert(manager.checkSpeculatableTasks(0 ))
928- assert(sched.speculativeTasks.toSet === Set (3 ))
929-
930- // Offer resource to start the speculative attempt for the running task
931- val taskOption5 = manager.resourceOffer(" exec1" , " host1" , NO_PREF )
932- assert(taskOption5.isDefined)
933- val task5 = taskOption5.get
934- assert(task5.index === 3 )
935- assert(task5.taskId === 4 )
936- assert(task5.executorId === " exec1" )
937- assert(task5.attemptNumber === 1 )
938- // sched.backend = mock(classOf[SchedulerBackend])
939- // The speculative task raise a FetchFailed
940- manager.handleFailedTask(4 , TaskState .FAILED ,
941- FetchFailed (BlockManagerId (" exec1" , " host1" , 12345 ), 0 , 0 , 0 , " ignored" ))
942- // Check the original task killed by FetchFailed.
943- assert(! originTaskSuccess && killTaskCalled)
944- }
945-
946855 test(" speculative and noPref task should be scheduled after node-local" ) {
947856 sc = new SparkContext (" local" , " test" )
948857 sched = new FakeTaskScheduler (
0 commit comments