diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 69bcda82f708..0cf573c2490b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -18,12 +18,12 @@ package org.apache.spark.deploy.master import java.util.Date -import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.collection.mutable.{HashMap, HashSet} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.duration._ import scala.io.Source import scala.reflect.ClassTag @@ -97,13 +97,40 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extend } } -class MockExecutorLaunchFailWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) - extends MockWorker(master, conf) { +// This class is designed to handle the lifecycle of only one application. +class MockExecutorLaunchFailWorker(master: Master, conf: SparkConf = new SparkConf) + extends MockWorker(master.self, conf) with Eventually { + + val appRegistered = new CountDownLatch(1) + val launchExecutorReceived = new CountDownLatch(1) + val appIdsToLaunchExecutor = new mutable.HashSet[String] var failedCnt = 0 + override def receive: PartialFunction[Any, Unit] = { + case LaunchDriver(driverId, _, _) => + master.self.send(RegisterApplication(appDesc, newDriver(driverId))) + + // Below code doesn't make driver stuck, as newDriver opens another rpc endpoint for + // handling driver related messages. To simplify logic, we will block handling + // LaunchExecutor message until we validate registering app succeeds. + eventually(timeout(5.seconds)) { + // an app would be registered with Master once Driver set up + assert(apps.nonEmpty) + assert(master.idToApp.keySet.intersect(apps.keySet) == apps.keySet) + } + + appRegistered.countDown() case LaunchExecutor(_, appId, execId, _, _, _, _) => + assert(appRegistered.await(10, TimeUnit.SECONDS)) + + if (failedCnt == 0) { + launchExecutorReceived.countDown() + } + assert(master.idToApp.contains(appId)) + appIdsToLaunchExecutor += appId failedCnt += 1 - master.send(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)) + master.self.send(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)) + case otherMsg => super.receive(otherMsg) } } @@ -662,7 +689,7 @@ class MasterSuite extends SparkFunSuite val master = makeAliveMaster() var worker: MockExecutorLaunchFailWorker = null try { - worker = new MockExecutorLaunchFailWorker(master.self) + worker = new MockExecutorLaunchFailWorker(master) worker.rpcEnv.setupEndpoint("worker", worker) val workerRegMsg = RegisterWorker( worker.id, @@ -677,19 +704,16 @@ class MasterSuite extends SparkFunSuite val driver = DeployTestUtils.createDriverDesc() // mimic DriverClient to send RequestSubmitDriver to master master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver)) - var appId: String = null - eventually(timeout(10.seconds)) { - // an app would be registered with Master once Driver set up - assert(worker.apps.nonEmpty) - appId = worker.apps.head._1 - assert(master.idToApp.contains(appId)) - } + + // LaunchExecutor message should have been received in worker side + assert(worker.launchExecutorReceived.await(10, TimeUnit.SECONDS)) eventually(timeout(10.seconds)) { + val appIds = worker.appIdsToLaunchExecutor // Master would continually launch executors until reach MAX_EXECUTOR_RETRIES assert(worker.failedCnt == master.conf.get(MAX_EXECUTOR_RETRIES)) // Master would remove the app if no executor could be launched for it - assert(!master.idToApp.contains(appId)) + assert(master.idToApp.keySet.intersect(appIds).isEmpty) } } finally { if (worker != null) {