Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ private[spark] class CoarseGrainedExecutorBackend(
resourcesFileOpt: Option[String])
extends IsolatedRpcEndpoint with ExecutorBackend with Logging {

import CoarseGrainedExecutorBackend._

private implicit val formats = DefaultFormats

private[this] val stopping = new AtomicBoolean(false)
Expand All @@ -80,9 +82,8 @@ private[spark] class CoarseGrainedExecutorBackend(
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
extractAttributes, resources))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
// Always receive `true`. Just ignore it
case Success(_) =>
self.send(RegisteredExecutor)
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
Expand Down Expand Up @@ -133,9 +134,6 @@ private[spark] class CoarseGrainedExecutorBackend(
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}

case RegisterExecutorFailed(message) =>
exitExecutor(1, "Slave registration failed: " + message)

case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
Expand Down Expand Up @@ -226,6 +224,10 @@ private[spark] class CoarseGrainedExecutorBackend(

private[spark] object CoarseGrainedExecutorBackend extends Logging {

// Message used internally to start the executor when the driver successfully accepted the
// registration request.
case object RegisteredExecutor

case class Arguments(
driverUrl: String,
executorId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,6 @@ private[spark] object CoarseGrainedClusterMessages {
case class KillExecutorsOnHost(host: String)
extends CoarseGrainedClusterMessage

sealed trait RegisterExecutorResponse

case object RegisteredExecutor extends CoarseGrainedClusterMessage with RegisterExecutorResponse

case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
with RegisterExecutorResponse

case class UpdateDelegationTokens(tokens: Array[Byte])
extends CoarseGrainedClusterMessage

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
attributes, resources) =>
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
} else if (scheduler.nodeBlacklist.contains(hostname)) {
context.sendFailure(new IllegalStateException(s"Duplicate executor ID: $executorId"))
} else if (scheduler.nodeBlacklist.contains(hostname) ||
isBlacklisted(executorId, hostname)) {
// If the cluster manager gives us an executor on a blacklisted node (because it
// already started allocating those resources before we informed it of our blacklist,
// or if it ignored our blacklist), then we reject that executor immediately.
logInfo(s"Rejecting $executorId as it has been blacklisted.")
executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))
context.reply(true)
context.sendFailure(new IllegalStateException(s"Executor is blacklisted: $executorId"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we would rather sendFailure(_) instead of the exiting the executor with a RegisterExecutorFailed message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explained in the PR description.

} else {
// If the executor's rpc env is not listening for incoming connections, `hostPort`
// will be null, and the client connection should be used to contact the executor.
Expand Down Expand Up @@ -250,7 +249,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
executorRef.send(RegisteredExecutor)
// Note: some tests expect the reply to come after we put the executor in the map
context.reply(true)
listenerBus.post(
Expand Down Expand Up @@ -779,6 +777,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

protected def currentDelegationTokens: Array[Byte] = delegationTokens.get()

/**
* Checks whether the executor is blacklisted. This is called when the executor tries to
* register with the scheduler, and will deny registration if this method returns true.
*
* This is in addition to the blacklist kept by the task scheduler, so custom implementations
* don't need to check there.
*/
protected def isBlacklisted(executorId: String, hostname: String): Boolean = false

// SPARK-27112: We need to ensure that there is ordering of lock acquisition
// between TaskSchedulerImpl and CoarseGrainedSchedulerBackend objects in order to fix
// the deadlock issue exposed in SPARK-27112
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ import org.scalatest.{BeforeAndAfterAll, PrivateMethodTester}
import org.scalatest.concurrent.Eventually._

import org.apache.spark._
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.ApplicationInfo
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.internal.config
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{LaunchedExecutor, RegisterExecutor, RegisterExecutorFailed}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{LaunchedExecutor, RegisterExecutor}

/**
* End-to-end tests for dynamic allocation in standalone mode.
Expand Down Expand Up @@ -482,12 +482,16 @@ class StandaloneDynamicAllocationSuite
assert(apps.head.getExecutorLimit === Int.MaxValue)
}
val beforeList = getApplications().head.executors.keys.toSet
assert(killExecutorsOnHost(sc, "localhost").equals(true))

syncExecutors(sc)
val afterList = getApplications().head.executors.keys.toSet

sc.schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutorsOnHost("localhost")
case _ => fail("expected coarse grained scheduler")
}

eventually(timeout(10.seconds), interval(100.millis)) {
val afterList = getApplications().head.executors.keys.toSet
assert(beforeList.intersect(afterList).size == 0)
}
}
Expand All @@ -514,10 +518,11 @@ class StandaloneDynamicAllocationSuite
val scheduler = new CoarseGrainedSchedulerBackend(taskScheduler, rpcEnv)
try {
scheduler.start()
scheduler.driverEndpoint.ask[Boolean](message)
eventually(timeout(10.seconds), interval(100.millis)) {
verify(endpointRef).send(RegisterExecutorFailed(any()))
val e = intercept[SparkException] {
scheduler.driverEndpoint.askSync[Boolean](message)
}
assert(e.getCause().isInstanceOf[IllegalStateException])
assert(scheduler.getExecutorIds().isEmpty)
} finally {
scheduler.stop()
}
Expand All @@ -536,6 +541,11 @@ class StandaloneDynamicAllocationSuite
.setMaster(masterRpcEnv.address.toSparkURL)
.setAppName("test")
.set(config.EXECUTOR_MEMORY.key, "256m")
// Because we're faking executor launches in the Worker, set the config so that the driver
// will not timeout anything related to executors.
.set(config.Network.NETWORK_TIMEOUT.key, "2h")
.set(config.EXECUTOR_HEARTBEAT_INTERVAL.key, "1h")
.set(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT.key, "1h")
}

/** Make a master to which our application will send executor requests. */
Expand All @@ -549,8 +559,7 @@ class StandaloneDynamicAllocationSuite
private def makeWorkers(cores: Int, memory: Int): Seq[Worker] = {
(0 until numWorkers).map { i =>
val rpcEnv = workerRpcEnvs(i)
val worker = new Worker(rpcEnv, 0, cores, memory, Array(masterRpcEnv.address),
Worker.ENDPOINT_NAME, null, conf, securityManager)
val worker = new TestWorker(rpcEnv, cores, memory)
rpcEnv.setupEndpoint(Worker.ENDPOINT_NAME, worker)
worker
}
Expand Down Expand Up @@ -588,16 +597,6 @@ class StandaloneDynamicAllocationSuite
}
}

/** Kill the executors on a given host. */
private def killExecutorsOnHost(sc: SparkContext, host: String): Boolean = {
syncExecutors(sc)
sc.schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutorsOnHost(host)
case _ => fail("expected coarse grained scheduler")
}
}

/**
* Return a list of executor IDs belonging to this application.
*
Expand All @@ -620,9 +619,8 @@ class StandaloneDynamicAllocationSuite
* we submit a request to kill them. This must be called before each kill request.
*/
private def syncExecutors(sc: SparkContext): Unit = {
val driverExecutors = sc.env.blockManager.master.getStorageStatus
.map(_.blockManagerId.executorId)
.filter { _ != SparkContext.DRIVER_IDENTIFIER}
val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
val driverExecutors = backend.getExecutorIds()
val masterExecutors = getExecutorIds(sc)
val missingExecutors = masterExecutors.toSet.diff(driverExecutors.toSet).toSeq.sorted
missingExecutors.foreach { id =>
Expand All @@ -632,10 +630,29 @@ class StandaloneDynamicAllocationSuite
when(endpointRef.address).thenReturn(mockAddress)
val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty, Map.empty,
Map.empty)
val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
backend.driverEndpoint.askSync[Boolean](message)
backend.driverEndpoint.send(LaunchedExecutor(id))
}
}

/**
* Worker implementation that does not actually launch any executors, but reports them as
* running so the Master keeps track of them. This requires that `syncExecutors` be used
* to make sure the Master instance and the SparkContext under test agree about what
* executors are running.
*/
private class TestWorker(rpcEnv: RpcEnv, cores: Int, memory: Int)
extends Worker(
rpcEnv, 0, cores, memory, Array(masterRpcEnv.address), Worker.ENDPOINT_NAME,
null, conf, securityManager) {

override def receive: PartialFunction[Any, Unit] = testReceive.orElse(super.receive)

private def testReceive: PartialFunction[Any, Unit] = synchronized {
case LaunchExecutor(_, appId, execId, _, _, _, _) =>
self.send(ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None))
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
val conf = new SparkConf()
.set(EXECUTOR_CORES, 3)
.set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive during test
.set(EXECUTOR_INSTANCES, 0) // avoid errors about duplicate executor registrations
.setMaster(
"coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]")
.setAppName("test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ private[spark] class ExecutorPodsAllocator(

private var lastSnapshot = ExecutorPodsSnapshot(Nil)

// Executors that have been deleted by this allocator but not yet detected as deleted in
// a snapshot from the API server. This is used to deny registration from these executors
// if they happen to come up before the deletion takes effect.
@volatile private var deletedExecutorIds = Set.empty[Long]

def start(applicationId: String): Unit = {
snapshotsStore.addSubscriber(podAllocationDelay) {
onNewSnapshots(applicationId, _)
Expand All @@ -85,6 +90,8 @@ private[spark] class ExecutorPodsAllocator(
}
}

def isDeleted(executorId: String): Boolean = deletedExecutorIds.contains(executorId.toLong)

private def onNewSnapshots(
applicationId: String,
snapshots: Seq[ExecutorPodsSnapshot]): Unit = synchronized {
Expand Down Expand Up @@ -141,10 +148,17 @@ private[spark] class ExecutorPodsAllocator(
}
.map { case (id, _) => id }

// Make a local, non-volatile copy of the reference since it's used multiple times. This
// is the only method that modifies the list, so this is safe.
var _deletedExecutorIds = deletedExecutorIds

if (snapshots.nonEmpty) {
logDebug(s"Pod allocation status: $currentRunningCount running, " +
s"${currentPendingExecutors.size} pending, " +
s"${newlyCreatedExecutors.size} unacknowledged.")

val existingExecs = lastSnapshot.executorPods.keySet
_deletedExecutorIds = _deletedExecutorIds.filter(existingExecs.contains)
}

val currentTotalExpectedExecutors = totalExpectedExecutors.get
Expand All @@ -169,6 +183,8 @@ private[spark] class ExecutorPodsAllocator(

if (toDelete.nonEmpty) {
logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).")
_deletedExecutorIds = _deletedExecutorIds ++ toDelete

Utils.tryLogNonFatalError {
kubernetesClient
.pods()
Expand Down Expand Up @@ -209,6 +225,8 @@ private[spark] class ExecutorPodsAllocator(
}
}

deletedExecutorIds = _deletedExecutorIds

// Update the flag that helps the setTotalExpectedExecutors() callback avoid triggering this
// update method when not needed.
hasPendingPods.set(knownPendingCount + newlyCreatedExecutors.size > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint))
}

override protected def isBlacklisted(executorId: String, hostname: String): Boolean = {
podAllocator.isDeleted(executorId)
}

private class KubernetesDriverEndpoint extends DriverEndpoint {

override def onDisconnected(rpcAddress: RpcAddress): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,13 @@ class DeterministicExecutorPodsSnapshotsStore extends ExecutorPodsSnapshotsStore
currentSnapshot = ExecutorPodsSnapshot(newSnapshot)
snapshotsBuffer += currentSnapshot
}

def removeDeletedExecutors(): Unit = {
val nonDeleted = currentSnapshot.executorPods.filter {
case (_, PodDeleted(_)) => false
case _ => true
}
currentSnapshot = ExecutorPodsSnapshot(nonDeleted)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could call replaceSnapshot() here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it is unrelated to this change but it is strange to me that notifySubscribers clears the snapshotsBuffer but does not sets currentSnapshot to an empty ExecutorPodsSnapshot().
So this way a notifySubscribers call followed by an updatePod could keeps some pods which with the next notifySubscribers would be notified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaceSnapshot takes a seq, nonDeleted is a map.

As for not clearing the current snapshot, that's because snapshots are cumulative. Each update from the k8s server just adds to the previous snapshot (until a periodic full sync replaces it with replaceSnapshot).

snapshotsBuffer += currentSnapshot
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,17 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
verify(podOperations, times(4)).create(any())
verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3", "4")
verify(podOperations).delete()
assert(podsAllocatorUnderTest.isDeleted("3"))
assert(podsAllocatorUnderTest.isDeleted("4"))

// Update the snapshot to not contain the deleted executors, make sure the
// allocator cleans up internal state.
snapshotsStore.updatePod(deletedExecutor(3))
snapshotsStore.updatePod(deletedExecutor(4))
snapshotsStore.removeDeletedExecutors()
snapshotsStore.notifySubscribers()
assert(!podsAllocatorUnderTest.isDeleted("3"))
assert(!podsAllocatorUnderTest.isDeleted("4"))
}

private def executorPodAnswer(): Answer[SparkPod] =
Expand Down