Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
package org.apache.spark.deploy.master

import java.util.Date
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.{HashMap, HashSet}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.duration._
import scala.io.Source
import scala.reflect.ClassTag
Expand Down Expand Up @@ -97,13 +97,40 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extend
}
}

class MockExecutorLaunchFailWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf)
extends MockWorker(master, conf) {
// This class is designed to handle the lifecycle of only one application.
class MockExecutorLaunchFailWorker(master: Master, conf: SparkConf = new SparkConf)
extends MockWorker(master.self, conf) with Eventually {

val appRegistered = new CountDownLatch(1)
val launchExecutorReceived = new CountDownLatch(1)
val appIdsToLaunchExecutor = new mutable.HashSet[String]
var failedCnt = 0

override def receive: PartialFunction[Any, Unit] = {
case LaunchDriver(driverId, _, _) =>
master.self.send(RegisterApplication(appDesc, newDriver(driverId)))

// Below code doesn't make driver stuck, as newDriver opens another rpc endpoint for
// handling driver related messages. To simplify logic, we will block handling
// LaunchExecutor message until we validate registering app succeeds.
eventually(timeout(5.seconds)) {
// an app would be registered with Master once Driver set up
assert(apps.nonEmpty)
assert(master.idToApp.keySet.intersect(apps.keySet) == apps.keySet)
}

appRegistered.countDown()
case LaunchExecutor(_, appId, execId, _, _, _, _) =>
assert(appRegistered.await(10, TimeUnit.SECONDS))

if (failedCnt == 0) {
launchExecutorReceived.countDown()
}
assert(master.idToApp.contains(appId))
appIdsToLaunchExecutor += appId
failedCnt += 1
master.send(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None))
master.self.send(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None))

case otherMsg => super.receive(otherMsg)
}
}
Expand Down Expand Up @@ -662,7 +689,7 @@ class MasterSuite extends SparkFunSuite
val master = makeAliveMaster()
var worker: MockExecutorLaunchFailWorker = null
try {
worker = new MockExecutorLaunchFailWorker(master.self)
worker = new MockExecutorLaunchFailWorker(master)
worker.rpcEnv.setupEndpoint("worker", worker)
val workerRegMsg = RegisterWorker(
worker.id,
Expand All @@ -677,19 +704,16 @@ class MasterSuite extends SparkFunSuite
val driver = DeployTestUtils.createDriverDesc()
// mimic DriverClient to send RequestSubmitDriver to master
master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver))
var appId: String = null
eventually(timeout(10.seconds)) {
// an app would be registered with Master once Driver set up
assert(worker.apps.nonEmpty)
appId = worker.apps.head._1
assert(master.idToApp.contains(appId))
}

// LaunchExecutor message should have been received in worker side
assert(worker.launchExecutorReceived.await(10, TimeUnit.SECONDS))

eventually(timeout(10.seconds)) {
val appIds = worker.appIdsToLaunchExecutor
// Master would continually launch executors until reach MAX_EXECUTOR_RETRIES
assert(worker.failedCnt == master.conf.get(MAX_EXECUTOR_RETRIES))
// Master would remove the app if no executor could be launched for it
assert(!master.idToApp.contains(appId))
assert(master.idToApp.keySet.intersect(appIds).isEmpty)
}
} finally {
if (worker != null) {
Expand Down