Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import scala.reflect.ClassTag

import org.scalactic.TripleEquals
import org.scalatest.Assertions.AssertionsHelper
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._

import org.apache.spark._
import org.apache.spark.TaskState._
Expand Down Expand Up @@ -157,8 +159,16 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
}
// When a job fails, we terminate before waiting for all the task end events to come in,
// so there might still be a running task set. So we only check these conditions
// when the job succeeds
assert(taskScheduler.runningTaskSets.isEmpty)
// when the job succeeds.
// When the final task of a taskset completes, we post
// the event to the DAGScheduler event loop before we finish processing in the taskscheduler
// thread. It's possible the DAGScheduler thread processes the event, finishes the job,
// and notifies the job waiter before our original thread in the task scheduler finishes
// handling the event and marks the taskset as complete. So its ok if we need to wait a
// *little* bit longer for the original taskscheduler thread to finish up to deal w/ the race.
eventually(timeout(1 second), interval(10 millis)) {
assert(taskScheduler.runningTaskSets.isEmpty)
}
assert(!backend.hasTasks)
} else {
assert(failure != null)
Expand Down