From 7808a0122289eaa9549bad0870e14e47ccb074c5 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 6 Sep 2019 09:50:51 +0900 Subject: [PATCH 1/3] [SPARK-26989][CORE][TEST] DAGSchedulerSuite: ensure listeners are fully processed before checking failedStages --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 2b3423f9a4d4..b2bbff6c4b38 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1141,6 +1141,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi taskSets(0).tasks(1), TaskKilled("test"), null)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(failedStages === Seq(0)) assert(mapOutputTracker.findMissingPartitions(shuffleId) === Some(Seq(0, 1))) @@ -2653,6 +2654,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi taskSets(0).tasks(0), TaskKilled("test"), null)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(failedStages === Seq(0)) // The second map task fails with TaskKilled. @@ -2689,6 +2691,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi taskSets(0).tasks(0), TaskKilled("test"), null)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(failedStages === Seq(0)) // Trigger resubmission of the failed map stage. @@ -2813,6 +2816,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi null)) assert(failure == null, "job should not fail") + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) val failedStages = scheduler.failedStages.toSeq assert(failedStages.length == 2) // Shuffle blocks of "hostA" is lost, so first task of the `shuffleMapRdd2` needs to retry. From ea3bc108bf2aa1f3d2526955f3787be3d1c496f2 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 6 Sep 2019 11:00:30 +0900 Subject: [PATCH 2/3] Force waiting listener to process all events before accessing recorded values in listener --- .../spark/scheduler/DAGSchedulerSuite.scala | 147 +++++++++--------- 1 file changed, 77 insertions(+), 70 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index b2bbff6c4b38..fd5974d90dbc 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -174,31 +174,72 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi /** Length of time to wait while draining listener events. */ val WAIT_TIMEOUT_MILLIS = 10000 - val submittedStageInfos = new HashSet[StageInfo] - val successfulStages = new HashSet[Int] - val failedStages = new ArrayBuffer[Int] - val stageByOrderOfExecution = new ArrayBuffer[Int] - val endedTasks = new HashSet[Long] - val sparkListener = new SparkListener() { + /** + * Listeners which records some information to verify in UTs. Getter-kind methods in this class + * ensures the value is returned after ensuring there's no event to process, as well as the + * value is immutable: prevent showing odd result by race condition. + */ + class EventInfoRecordingListener extends SparkListener { + private val _submittedStageInfos = new HashSet[StageInfo] + private val _successfulStages = new HashSet[Int] + private val _failedStages = new ArrayBuffer[Int] + private val _stageByOrderOfExecution = new ArrayBuffer[Int] + private val _endedTasks = new HashSet[Long] + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { - submittedStageInfos += stageSubmitted.stageInfo + _submittedStageInfos += stageSubmitted.stageInfo } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { val stageInfo = stageCompleted.stageInfo - stageByOrderOfExecution += stageInfo.stageId + _stageByOrderOfExecution += stageInfo.stageId if (stageInfo.failureReason.isEmpty) { - successfulStages += stageInfo.stageId + _successfulStages += stageInfo.stageId } else { - failedStages += stageInfo.stageId + _failedStages += stageInfo.stageId } } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - endedTasks += taskEnd.taskInfo.taskId + _endedTasks += taskEnd.taskInfo.taskId + } + + def submittedStageInfos: Set[StageInfo] = withWaitingListenerUntilEmpty { + _submittedStageInfos.toSet + } + + def successfulStages: Set[Int] = withWaitingListenerUntilEmpty { + _successfulStages.toSet + } + + def failedStages: List[Int] = withWaitingListenerUntilEmpty { + _failedStages.toList + } + + def stageByOrderOfExecution: List[Int] = withWaitingListenerUntilEmpty { + _stageByOrderOfExecution.toList + } + + def endedTask: Set[Long] = withWaitingListenerUntilEmpty { + _endedTasks.toSet + } + + def clear(): Unit = { + _submittedStageInfos.clear() + _successfulStages.clear() + _failedStages.clear() + _stageByOrderOfExecution.clear() + _endedTasks.clear() + } + + private def withWaitingListenerUntilEmpty[T](fn: => T): T = { + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + fn } } + val sparkListener = new EventInfoRecordingListener() + var mapOutputTracker: MapOutputTrackerMaster = null var broadcastManager: BroadcastManager = null var securityMgr: SecurityManager = null @@ -247,10 +288,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi private def init(testConf: SparkConf): Unit = { sc = new SparkContext("local[2]", "DAGSchedulerSuite", testConf) - submittedStageInfos.clear() - successfulStages.clear() - failedStages.clear() - endedTasks.clear() + sparkListener.clear() failure = null sc.addSparkListener(sparkListener) taskSets.clear() @@ -373,9 +411,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } test("[SPARK-3353] parent stage should have lower stage id") { - stageByOrderOfExecution.clear() sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count() - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + val stageByOrderOfExecution = sparkListener.stageByOrderOfExecution assert(stageByOrderOfExecution.length === 2) assert(stageByOrderOfExecution(0) < stageByOrderOfExecution(1)) } @@ -618,9 +655,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi submit(unserializableRdd, Array(0)) assert(failure.getMessage.startsWith( "Job aborted due to stage failure: Task not serializable:")) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(failedStages.contains(0)) - assert(failedStages.size === 1) + assert(sparkListener.failedStages === Seq(0)) assertDataStructuresEmpty() } @@ -628,9 +663,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi submit(new MyRDD(sc, 1, Nil), Array(0)) failed(taskSets(0), "some failure") assert(failure.getMessage === "Job aborted due to stage failure: some failure") - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(failedStages.contains(0)) - assert(failedStages.size === 1) + assert(sparkListener.failedStages === Seq(0)) assertDataStructuresEmpty() } @@ -639,9 +672,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val jobId = submit(rdd, Array(0)) cancel(jobId) assert(failure.getMessage === s"Job $jobId cancelled ") - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(failedStages.contains(0)) - assert(failedStages.size === 1) + assert(sparkListener.failedStages === Seq(0)) assertDataStructuresEmpty() } @@ -699,9 +730,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(results === Map(0 -> 42)) assertDataStructuresEmpty() - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(failedStages.isEmpty) - assert(successfulStages.contains(0)) + assert(sparkListener.failedStages.isEmpty) + assert(sparkListener.successfulStages.contains(0)) } test("run trivial shuffle") { @@ -1084,8 +1114,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi taskSets(1).tasks(0), FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null)) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(failedStages.contains(1)) + assert(sparkListener.failedStages.contains(1)) // The second ResultTask fails, with a fetch failure for the output from the second mapper. runEvent(makeCompletionEvent( @@ -1093,8 +1122,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1, "ignored"), null)) // The SparkListener should not receive redundant failure events. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(failedStages.size == 1) + assert(sparkListener.failedStages.size === 1) } test("Retry all the tasks on a resubmitted attempt of a barrier stage caused by FetchFailure") { @@ -1141,8 +1169,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi taskSets(0).tasks(1), TaskKilled("test"), null)) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(failedStages === Seq(0)) + assert(sparkListener.failedStages === Seq(0)) assert(mapOutputTracker.findMissingPartitions(shuffleId) === Some(Seq(0, 1))) scheduler.resubmitFailedStages() @@ -1196,11 +1223,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val mapStageId = 0 def countSubmittedMapStageAttempts(): Int = { - submittedStageInfos.count(_.stageId == mapStageId) + sparkListener.submittedStageInfos.count(_.stageId == mapStageId) } // The map stage should have been submitted. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 1) complete(taskSets(0), Seq( @@ -1217,12 +1243,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi taskSets(1).tasks(0), FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null)) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(failedStages.contains(1)) + assert(sparkListener.failedStages.contains(1)) // Trigger resubmission of the failed map stage. runEvent(ResubmitFailedStages) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) // Another attempt for the map stage should have been submitted, resulting in 2 total attempts. assert(countSubmittedMapStageAttempts() === 2) @@ -1239,7 +1263,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // shouldn't effect anything -- our calling it just makes *SURE* it gets called between the // desired event and our check. runEvent(ResubmitFailedStages) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 2) } @@ -1257,14 +1280,13 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi submit(reduceRdd, Array(0, 1)) def countSubmittedReduceStageAttempts(): Int = { - submittedStageInfos.count(_.stageId == 1) + sparkListener.submittedStageInfos.count(_.stageId == 1) } def countSubmittedMapStageAttempts(): Int = { - submittedStageInfos.count(_.stageId == 0) + sparkListener.submittedStageInfos.count(_.stageId == 0) } // The map stage should have been submitted. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 1) // Complete the map stage. @@ -1273,7 +1295,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi (Success, makeMapStatus("hostB", 2)))) // The reduce stage should have been submitted. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedReduceStageAttempts() === 1) // The first result task fails, with a fetch failure for the output from the first mapper. @@ -1288,7 +1309,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // Because the map stage finished, another attempt for the reduce stage should have been // submitted, resulting in 2 total attempts for each the map and the reduce stage. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 2) assert(countSubmittedReduceStageAttempts() === 2) @@ -1318,10 +1338,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(makeCompletionEvent( taskSets(0).tasks(1), Success, 42, Seq.empty, Array.empty, createFakeTaskInfoWithId(1))) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) // verify stage exists assert(scheduler.stageIdToStage.contains(0)) - assert(endedTasks.size == 2) + assert(sparkListener.endedTask.size === 2) // finish other 2 tasks runEvent(makeCompletionEvent( @@ -1330,8 +1349,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(makeCompletionEvent( taskSets(0).tasks(3), Success, 42, Seq.empty, Array.empty, createFakeTaskInfoWithId(3))) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(endedTasks.size == 4) + assert(sparkListener.endedTask.size === 4) // verify the stage is done assert(!scheduler.stageIdToStage.contains(0)) @@ -1341,15 +1359,13 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(makeCompletionEvent( taskSets(0).tasks(3), Success, 42, Seq.empty, Array.empty, createFakeTaskInfoWithId(5))) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(endedTasks.size == 5) + assert(sparkListener.endedTask.size === 5) // make sure non successful tasks also send out event runEvent(makeCompletionEvent( taskSets(0).tasks(3), UnknownReason, 42, Seq.empty, Array.empty, createFakeTaskInfoWithId(6))) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(endedTasks.size == 6) + assert(sparkListener.endedTask.size === 6) } test("ignore late map task completions") { @@ -1422,8 +1438,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // Listener bus should get told about the map stage failing, but not the reduce stage // (since the reduce stage hasn't been started yet). - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(failedStages.toSet === Set(0)) + assert(sparkListener.failedStages.toSet === Set(0)) assertDataStructuresEmpty() } @@ -1666,9 +1681,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(cancelledStages.toSet === Set(0, 2)) // Make sure the listeners got told about both failed stages. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(successfulStages.isEmpty) - assert(failedStages.toSet === Set(0, 2)) + assert(sparkListener.successfulStages.isEmpty) + assert(sparkListener.failedStages.toSet === Set(0, 2)) assert(listener1.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage") assert(listener2.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage") @@ -2642,11 +2656,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val mapStageId = 0 def countSubmittedMapStageAttempts(): Int = { - submittedStageInfos.count(_.stageId == mapStageId) + sparkListener.submittedStageInfos.count(_.stageId == mapStageId) } // The map stage should have been submitted. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 1) // The first map task fails with TaskKilled. @@ -2654,8 +2667,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi taskSets(0).tasks(0), TaskKilled("test"), null)) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(failedStages === Seq(0)) + assert(sparkListener.failedStages === Seq(0)) // The second map task fails with TaskKilled. runEvent(makeCompletionEvent( @@ -2665,7 +2677,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // Trigger resubmission of the failed map stage. runEvent(ResubmitFailedStages) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) // Another attempt for the map stage should have been submitted, resulting in 2 total attempts. assert(countSubmittedMapStageAttempts() === 2) @@ -2679,11 +2690,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val mapStageId = 0 def countSubmittedMapStageAttempts(): Int = { - submittedStageInfos.count(_.stageId == mapStageId) + sparkListener.submittedStageInfos.count(_.stageId == mapStageId) } // The map stage should have been submitted. - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 1) // The first map task fails with TaskKilled. @@ -2691,12 +2701,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi taskSets(0).tasks(0), TaskKilled("test"), null)) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(failedStages === Seq(0)) + assert(sparkListener.failedStages === Seq(0)) // Trigger resubmission of the failed map stage. runEvent(ResubmitFailedStages) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) // Another attempt for the map stage should have been submitted, resulting in 2 total attempts. assert(countSubmittedMapStageAttempts() === 2) @@ -2709,7 +2717,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // The second map task failure doesn't trigger stage retry. runEvent(ResubmitFailedStages) - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 2) } From 3d999e9f722f382e1e7d26287317b5e7752524f7 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Sat, 7 Sep 2019 07:36:03 +0900 Subject: [PATCH 3/3] Reflect review comments --- .../spark/scheduler/DAGSchedulerSuite.scala | 41 ++++++++----------- 1 file changed, 17 insertions(+), 24 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index fd5974d90dbc..44a4eadef630 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -204,41 +204,35 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi _endedTasks += taskEnd.taskInfo.taskId } - def submittedStageInfos: Set[StageInfo] = withWaitingListenerUntilEmpty { + def submittedStageInfos: Set[StageInfo] = { + waitForListeners() _submittedStageInfos.toSet } - def successfulStages: Set[Int] = withWaitingListenerUntilEmpty { + def successfulStages: Set[Int] = { + waitForListeners() _successfulStages.toSet } - def failedStages: List[Int] = withWaitingListenerUntilEmpty { + def failedStages: List[Int] = { + waitForListeners() _failedStages.toList } - def stageByOrderOfExecution: List[Int] = withWaitingListenerUntilEmpty { + def stageByOrderOfExecution: List[Int] = { + waitForListeners() _stageByOrderOfExecution.toList } - def endedTask: Set[Long] = withWaitingListenerUntilEmpty { + def endedTasks: Set[Long] = { + waitForListeners() _endedTasks.toSet } - def clear(): Unit = { - _submittedStageInfos.clear() - _successfulStages.clear() - _failedStages.clear() - _stageByOrderOfExecution.clear() - _endedTasks.clear() - } - - private def withWaitingListenerUntilEmpty[T](fn: => T): T = { - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - fn - } + private def waitForListeners(): Unit = sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) } - val sparkListener = new EventInfoRecordingListener() + var sparkListener: EventInfoRecordingListener = null var mapOutputTracker: MapOutputTrackerMaster = null var broadcastManager: BroadcastManager = null @@ -288,7 +282,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi private def init(testConf: SparkConf): Unit = { sc = new SparkContext("local[2]", "DAGSchedulerSuite", testConf) - sparkListener.clear() + sparkListener = new EventInfoRecordingListener failure = null sc.addSparkListener(sparkListener) taskSets.clear() @@ -1340,7 +1334,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi Seq.empty, Array.empty, createFakeTaskInfoWithId(1))) // verify stage exists assert(scheduler.stageIdToStage.contains(0)) - assert(sparkListener.endedTask.size === 2) + assert(sparkListener.endedTasks.size === 2) // finish other 2 tasks runEvent(makeCompletionEvent( @@ -1349,7 +1343,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(makeCompletionEvent( taskSets(0).tasks(3), Success, 42, Seq.empty, Array.empty, createFakeTaskInfoWithId(3))) - assert(sparkListener.endedTask.size === 4) + assert(sparkListener.endedTasks.size === 4) // verify the stage is done assert(!scheduler.stageIdToStage.contains(0)) @@ -1359,13 +1353,13 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(makeCompletionEvent( taskSets(0).tasks(3), Success, 42, Seq.empty, Array.empty, createFakeTaskInfoWithId(5))) - assert(sparkListener.endedTask.size === 5) + assert(sparkListener.endedTasks.size === 5) // make sure non successful tasks also send out event runEvent(makeCompletionEvent( taskSets(0).tasks(3), UnknownReason, 42, Seq.empty, Array.empty, createFakeTaskInfoWithId(6))) - assert(sparkListener.endedTask.size === 6) + assert(sparkListener.endedTasks.size === 6) } test("ignore late map task completions") { @@ -2823,7 +2817,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi null)) assert(failure == null, "job should not fail") - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) val failedStages = scheduler.failedStages.toSeq assert(failedStages.length == 2) // Shuffle blocks of "hostA" is lost, so first task of the `shuffleMapRdd2` needs to retry.