diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index b1837c9c0c9ea..1fe901a83af33 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -54,6 +54,8 @@ private[spark] class CoarseGrainedExecutorBackend( resourcesFileOpt: Option[String]) extends IsolatedRpcEndpoint with ExecutorBackend with Logging { + import CoarseGrainedExecutorBackend._ + private implicit val formats = DefaultFormats private[this] val stopping = new AtomicBoolean(false) @@ -80,9 +82,8 @@ private[spark] class CoarseGrainedExecutorBackend( ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls, extractAttributes, resources)) }(ThreadUtils.sameThread).onComplete { - // This is a very fast action so we can use "ThreadUtils.sameThread" - case Success(msg) => - // Always receive `true`. Just ignore it + case Success(_) => + self.send(RegisteredExecutor) case Failure(e) => exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false) }(ThreadUtils.sameThread) @@ -133,9 +134,6 @@ private[spark] class CoarseGrainedExecutorBackend( exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) } - case RegisterExecutorFailed(message) => - exitExecutor(1, "Slave registration failed: " + message) - case LaunchTask(data) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") @@ -226,6 +224,10 @@ private[spark] class CoarseGrainedExecutorBackend( private[spark] object CoarseGrainedExecutorBackend extends Logging { + // Message used internally to start the executor when the driver successfully accepted the + // registration request. + case object RegisteredExecutor + case class Arguments( driverUrl: String, executorId: String, diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 9ce23683245eb..57317e7f6af00 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -48,13 +48,6 @@ private[spark] object CoarseGrainedClusterMessages { case class KillExecutorsOnHost(host: String) extends CoarseGrainedClusterMessage - sealed trait RegisterExecutorResponse - - case object RegisteredExecutor extends CoarseGrainedClusterMessage with RegisterExecutorResponse - - case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage - with RegisterExecutorResponse - case class UpdateDelegationTokens(tokens: Array[Byte]) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 7d9c1c6f96f6c..55e7b2827d8ff 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -207,15 +207,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, attributes, resources) => if (executorDataMap.contains(executorId)) { - executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) - context.reply(true) - } else if (scheduler.nodeBlacklist.contains(hostname)) { + context.sendFailure(new IllegalStateException(s"Duplicate executor ID: $executorId")) + } else if (scheduler.nodeBlacklist.contains(hostname) || + isBlacklisted(executorId, hostname)) { // If the cluster manager gives us an executor on a blacklisted node (because it // already started allocating those resources before we informed it of our blacklist, // or if it ignored our blacklist), then we reject that executor immediately. logInfo(s"Rejecting $executorId as it has been blacklisted.") - executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId")) - context.reply(true) + context.sendFailure(new IllegalStateException(s"Executor is blacklisted: $executorId")) } else { // If the executor's rpc env is not listening for incoming connections, `hostPort` // will be null, and the client connection should be used to contact the executor. @@ -250,7 +249,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") } } - executorRef.send(RegisteredExecutor) // Note: some tests expect the reply to come after we put the executor in the map context.reply(true) listenerBus.post( @@ -779,6 +777,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp protected def currentDelegationTokens: Array[Byte] = delegationTokens.get() + /** + * Checks whether the executor is blacklisted. This is called when the executor tries to + * register with the scheduler, and will deny registration if this method returns true. + * + * This is in addition to the blacklist kept by the task scheduler, so custom implementations + * don't need to check there. + */ + protected def isBlacklisted(executorId: String, hostname: String): Boolean = false + // SPARK-27112: We need to ensure that there is ordering of lock acquisition // between TaskSchedulerImpl and CoarseGrainedSchedulerBackend objects in order to fix // the deadlock issue exposed in SPARK-27112 diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index dd790b8dbb853..e316da738440e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfterAll, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ import org.apache.spark._ -import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} +import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.ApplicationInfo import org.apache.spark.deploy.master.Master import org.apache.spark.deploy.worker.Worker @@ -34,7 +34,7 @@ import org.apache.spark.internal.config import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster._ -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{LaunchedExecutor, RegisterExecutor, RegisterExecutorFailed} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{LaunchedExecutor, RegisterExecutor} /** * End-to-end tests for dynamic allocation in standalone mode. @@ -482,12 +482,16 @@ class StandaloneDynamicAllocationSuite assert(apps.head.getExecutorLimit === Int.MaxValue) } val beforeList = getApplications().head.executors.keys.toSet - assert(killExecutorsOnHost(sc, "localhost").equals(true)) - syncExecutors(sc) - val afterList = getApplications().head.executors.keys.toSet + + sc.schedulerBackend match { + case b: CoarseGrainedSchedulerBackend => + b.killExecutorsOnHost("localhost") + case _ => fail("expected coarse grained scheduler") + } eventually(timeout(10.seconds), interval(100.millis)) { + val afterList = getApplications().head.executors.keys.toSet assert(beforeList.intersect(afterList).size == 0) } } @@ -514,10 +518,11 @@ class StandaloneDynamicAllocationSuite val scheduler = new CoarseGrainedSchedulerBackend(taskScheduler, rpcEnv) try { scheduler.start() - scheduler.driverEndpoint.ask[Boolean](message) - eventually(timeout(10.seconds), interval(100.millis)) { - verify(endpointRef).send(RegisterExecutorFailed(any())) + val e = intercept[SparkException] { + scheduler.driverEndpoint.askSync[Boolean](message) } + assert(e.getCause().isInstanceOf[IllegalStateException]) + assert(scheduler.getExecutorIds().isEmpty) } finally { scheduler.stop() } @@ -536,6 +541,11 @@ class StandaloneDynamicAllocationSuite .setMaster(masterRpcEnv.address.toSparkURL) .setAppName("test") .set(config.EXECUTOR_MEMORY.key, "256m") + // Because we're faking executor launches in the Worker, set the config so that the driver + // will not timeout anything related to executors. + .set(config.Network.NETWORK_TIMEOUT.key, "2h") + .set(config.EXECUTOR_HEARTBEAT_INTERVAL.key, "1h") + .set(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT.key, "1h") } /** Make a master to which our application will send executor requests. */ @@ -549,8 +559,7 @@ class StandaloneDynamicAllocationSuite private def makeWorkers(cores: Int, memory: Int): Seq[Worker] = { (0 until numWorkers).map { i => val rpcEnv = workerRpcEnvs(i) - val worker = new Worker(rpcEnv, 0, cores, memory, Array(masterRpcEnv.address), - Worker.ENDPOINT_NAME, null, conf, securityManager) + val worker = new TestWorker(rpcEnv, cores, memory) rpcEnv.setupEndpoint(Worker.ENDPOINT_NAME, worker) worker } @@ -588,16 +597,6 @@ class StandaloneDynamicAllocationSuite } } - /** Kill the executors on a given host. */ - private def killExecutorsOnHost(sc: SparkContext, host: String): Boolean = { - syncExecutors(sc) - sc.schedulerBackend match { - case b: CoarseGrainedSchedulerBackend => - b.killExecutorsOnHost(host) - case _ => fail("expected coarse grained scheduler") - } - } - /** * Return a list of executor IDs belonging to this application. * @@ -620,9 +619,8 @@ class StandaloneDynamicAllocationSuite * we submit a request to kill them. This must be called before each kill request. */ private def syncExecutors(sc: SparkContext): Unit = { - val driverExecutors = sc.env.blockManager.master.getStorageStatus - .map(_.blockManagerId.executorId) - .filter { _ != SparkContext.DRIVER_IDENTIFIER} + val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend] + val driverExecutors = backend.getExecutorIds() val masterExecutors = getExecutorIds(sc) val missingExecutors = masterExecutors.toSet.diff(driverExecutors.toSet).toSeq.sorted missingExecutors.foreach { id => @@ -632,10 +630,29 @@ class StandaloneDynamicAllocationSuite when(endpointRef.address).thenReturn(mockAddress) val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty, Map.empty, Map.empty) - val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend] backend.driverEndpoint.askSync[Boolean](message) backend.driverEndpoint.send(LaunchedExecutor(id)) } } + /** + * Worker implementation that does not actually launch any executors, but reports them as + * running so the Master keeps track of them. This requires that `syncExecutors` be used + * to make sure the Master instance and the SparkContext under test agree about what + * executors are running. + */ + private class TestWorker(rpcEnv: RpcEnv, cores: Int, memory: Int) + extends Worker( + rpcEnv, 0, cores, memory, Array(masterRpcEnv.address), Worker.ENDPOINT_NAME, + null, conf, securityManager) { + + override def receive: PartialFunction[Any, Unit] = testReceive.orElse(super.receive) + + private def testReceive: PartialFunction[Any, Unit] = synchronized { + case LaunchExecutor(_, appId, execId, _, _, _, _) => + self.send(ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None)) + } + + } + } diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 8a16ae64e9372..6f727374ba0bf 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -189,6 +189,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo val conf = new SparkConf() .set(EXECUTOR_CORES, 3) .set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive during test + .set(EXECUTOR_INSTANCES, 0) // avoid errors about duplicate executor registrations .setMaster( "coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]") .setAppName("test") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 2201bf91d3905..b394f35b15111 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -72,6 +72,11 @@ private[spark] class ExecutorPodsAllocator( private var lastSnapshot = ExecutorPodsSnapshot(Nil) + // Executors that have been deleted by this allocator but not yet detected as deleted in + // a snapshot from the API server. This is used to deny registration from these executors + // if they happen to come up before the deletion takes effect. + @volatile private var deletedExecutorIds = Set.empty[Long] + def start(applicationId: String): Unit = { snapshotsStore.addSubscriber(podAllocationDelay) { onNewSnapshots(applicationId, _) @@ -85,6 +90,8 @@ private[spark] class ExecutorPodsAllocator( } } + def isDeleted(executorId: String): Boolean = deletedExecutorIds.contains(executorId.toLong) + private def onNewSnapshots( applicationId: String, snapshots: Seq[ExecutorPodsSnapshot]): Unit = synchronized { @@ -141,10 +148,17 @@ private[spark] class ExecutorPodsAllocator( } .map { case (id, _) => id } + // Make a local, non-volatile copy of the reference since it's used multiple times. This + // is the only method that modifies the list, so this is safe. + var _deletedExecutorIds = deletedExecutorIds + if (snapshots.nonEmpty) { logDebug(s"Pod allocation status: $currentRunningCount running, " + s"${currentPendingExecutors.size} pending, " + s"${newlyCreatedExecutors.size} unacknowledged.") + + val existingExecs = lastSnapshot.executorPods.keySet + _deletedExecutorIds = _deletedExecutorIds.filter(existingExecs.contains) } val currentTotalExpectedExecutors = totalExpectedExecutors.get @@ -169,6 +183,8 @@ private[spark] class ExecutorPodsAllocator( if (toDelete.nonEmpty) { logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).") + _deletedExecutorIds = _deletedExecutorIds ++ toDelete + Utils.tryLogNonFatalError { kubernetesClient .pods() @@ -209,6 +225,8 @@ private[spark] class ExecutorPodsAllocator( } } + deletedExecutorIds = _deletedExecutorIds + // Update the flag that helps the setTotalExpectedExecutors() callback avoid triggering this // update method when not needed. hasPendingPods.set(knownPendingCount + newlyCreatedExecutors.size > 0) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index e221a926daca8..105841ac834b3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -181,6 +181,10 @@ private[spark] class KubernetesClusterSchedulerBackend( Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint)) } + override protected def isBlacklisted(executorId: String, hostname: String): Boolean = { + podAllocator.isDeleted(executorId) + } + private class KubernetesDriverEndpoint extends DriverEndpoint { override def onDisconnected(rpcAddress: RpcAddress): Unit = { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala index 1b6dfe5443377..9ac7e0222054a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala @@ -48,4 +48,13 @@ class DeterministicExecutorPodsSnapshotsStore extends ExecutorPodsSnapshotsStore currentSnapshot = ExecutorPodsSnapshot(newSnapshot) snapshotsBuffer += currentSnapshot } + + def removeDeletedExecutors(): Unit = { + val nonDeleted = currentSnapshot.executorPods.filter { + case (_, PodDeleted(_)) => false + case _ => true + } + currentSnapshot = ExecutorPodsSnapshot(nonDeleted) + snapshotsBuffer += currentSnapshot + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 4475d5db6f03a..a0abded3823bb 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -189,6 +189,17 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { verify(podOperations, times(4)).create(any()) verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3", "4") verify(podOperations).delete() + assert(podsAllocatorUnderTest.isDeleted("3")) + assert(podsAllocatorUnderTest.isDeleted("4")) + + // Update the snapshot to not contain the deleted executors, make sure the + // allocator cleans up internal state. + snapshotsStore.updatePod(deletedExecutor(3)) + snapshotsStore.updatePod(deletedExecutor(4)) + snapshotsStore.removeDeletedExecutors() + snapshotsStore.notifySubscribers() + assert(!podsAllocatorUnderTest.isDeleted("3")) + assert(!podsAllocatorUnderTest.isDeleted("4")) } private def executorPodAnswer(): Answer[SparkPod] =