From 11b33c3bb65e62858ca13ab97c958332f101e60d Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 13 Dec 2016 15:25:06 -0600 Subject: [PATCH 1/4] [SPARK-18846][Scheduler] Fix flakiness in SchedulerIntegrationSuite There is a small race in SchedulerIntegrationSuite. The test assumes that the taskscheduler thread processing that last task will finish before the DAGScheduler processes the task event and notifies the job waiter, but that is not 100% guaranteed. --- .../scheduler/SchedulerIntegrationSuite.scala | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index c28aa06623a6..8d5ff41e56b5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -27,6 +27,9 @@ import scala.language.existentials import scala.reflect.ClassTag import org.scalactic.TripleEquals +import org.scalatest.concurrent.Eventually._ +import org.scalatest.concurrent.PatienceConfiguration +import org.scalatest.time.SpanSugar._ import org.scalatest.Assertions.AssertionsHelper import org.apache.spark._ @@ -157,8 +160,16 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa } // When a job fails, we terminate before waiting for all the task end events to come in, // so there might still be a running task set. So we only check these conditions - // when the job succeeds - assert(taskScheduler.runningTaskSets.isEmpty) + // when the job succeeds. + // When the final task of a taskset completes, we post + // the event to the DAGScheduler event loop before we finish processing in the taskscheduler + // thread. Its possible the DAGScheduler thread processes the event, finishes the job, + // and notifies the job waiter before our original thread in the task scheduler finishes + // handling the event and marks the taskset as complete. So its ok if we need to wait a + // *little* bit longer for the original taskscheduler thread to finish up to deal w/ the race. + eventually(timeout(1 second), interval(100 millis)) { + assert(taskScheduler.runningTaskSets.isEmpty) + } assert(!backend.hasTasks) } else { assert(failure != null) From 28a8bf16c70f3d20c50fe64865dac441f42cb142 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 13 Dec 2016 15:35:02 -0600 Subject: [PATCH 2/4] punctuation --- .../org/apache/spark/scheduler/SchedulerIntegrationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 8d5ff41e56b5..f7482d0c9dec 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -163,7 +163,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa // when the job succeeds. // When the final task of a taskset completes, we post // the event to the DAGScheduler event loop before we finish processing in the taskscheduler - // thread. Its possible the DAGScheduler thread processes the event, finishes the job, + // thread. It's possible the DAGScheduler thread processes the event, finishes the job, // and notifies the job waiter before our original thread in the task scheduler finishes // handling the event and marks the taskset as complete. So its ok if we need to wait a // *little* bit longer for the original taskscheduler thread to finish up to deal w/ the race. From 9cc5d587180d62807bb0a0a40744a4243746dac5 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 13 Dec 2016 16:18:19 -0600 Subject: [PATCH 3/4] review feedback --- .../org/apache/spark/scheduler/SchedulerIntegrationSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index f7482d0c9dec..fc3d58c1b077 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -28,7 +28,6 @@ import scala.reflect.ClassTag import org.scalactic.TripleEquals import org.scalatest.concurrent.Eventually._ -import org.scalatest.concurrent.PatienceConfiguration import org.scalatest.time.SpanSugar._ import org.scalatest.Assertions.AssertionsHelper @@ -167,7 +166,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa // and notifies the job waiter before our original thread in the task scheduler finishes // handling the event and marks the taskset as complete. So its ok if we need to wait a // *little* bit longer for the original taskscheduler thread to finish up to deal w/ the race. - eventually(timeout(1 second), interval(100 millis)) { + eventually(timeout(1 second), interval(10 millis)) { assert(taskScheduler.runningTaskSets.isEmpty) } assert(!backend.hasTasks) From af6ea550daa492222f614d84093c0cd1f688531e Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 13 Dec 2016 16:44:14 -0600 Subject: [PATCH 4/4] import order --- .../org/apache/spark/scheduler/SchedulerIntegrationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index fc3d58c1b077..2ba63da881be 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -27,9 +27,9 @@ import scala.language.existentials import scala.reflect.ClassTag import org.scalactic.TripleEquals +import org.scalatest.Assertions.AssertionsHelper import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ -import org.scalatest.Assertions.AssertionsHelper import org.apache.spark._ import org.apache.spark.TaskState._