diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 4c1f92a1bcbf..9b62e4b1b715 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -165,11 +165,7 @@ private[spark] class CoarseGrainedExecutorBackend( } if (notifyDriver && driver.nonEmpty) { - driver.get.ask[Boolean]( - RemoveExecutor(executorId, new ExecutorLossReason(reason)) - ).failed.foreach(e => - logWarning(s"Unable to notify the driver due to " + e.getMessage, e) - )(ThreadUtils.sameThread) + driver.get.send(RemoveExecutor(executorId, new ExecutorLossReason(reason))) } System.exit(code) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 7bfb4d53c183..e6982f333d52 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -95,6 +95,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // The num of current max ExecutorId used to re-register appMaster @volatile protected var currentExecutorIdCounter = 0 + private val reviveThread = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread") + class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { @@ -103,9 +106,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp protected val addressToExecutorId = new HashMap[RpcAddress, String] - private val reviveThread = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread") - override def onStart() { // Periodically revive offers to allow delay scheduling to work val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s") @@ -154,6 +154,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap.values.foreach { ed => ed.executorEndpoint.send(UpdateDelegationTokens(newDelegationTokens)) } + + case RemoveExecutor(executorId, reason) => + // We will remove the executor's state and cannot restore it. However, the connection + // between the driver and the executor may be still alive so that the executor won't exit + // automatically, so try to tell the executor to stop itself. See SPARK-13519. + executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor)) + removeExecutor(executorId, reason) } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -215,14 +222,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } context.reply(true) - case RemoveExecutor(executorId, reason) => - // We will remove the executor's state and cannot restore it. However, the connection - // between the driver and the executor may be still alive so that the executor won't exit - // automatically, so try to tell the executor to stop itself. See SPARK-13519. - executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor)) - removeExecutor(executorId, reason) - context.reply(true) - case RemoveWorker(workerId, host, message) => removeWorker(workerId, host, message) context.reply(true) @@ -373,10 +372,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp shouldDisable } - - override def onStop() { - reviveThread.shutdownNow() - } } var driverEndpoint: RpcEndpointRef = null @@ -417,6 +412,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } override def stop() { + reviveThread.shutdownNow() stopExecutors() try { if (driverEndpoint != null) { @@ -465,9 +461,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * at once. */ protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = { - // Only log the failure since we don't care about the result. - driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).failed.foreach(t => - logError(t.getMessage, t))(ThreadUtils.sameThread) + driverEndpoint.send(RemoveExecutor(executorId, reason)) } protected def removeWorker(workerId: String, host: String, message: String): Unit = { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index 3febb2f47cfd..13c09033a50e 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -282,7 +282,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn // No more deletion attempts of the executors. // This is graceful termination and should not be detected as a failure. verify(podOperations, times(1)).delete(resolvedPod) - verify(driverEndpointRef, times(1)).ask[Boolean]( + verify(driverEndpointRef, times(1)).send( RemoveExecutor("1", ExecutorExited( 0, exitCausedByApp = false, @@ -318,7 +318,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn requestExecutorRunnable.getValue.run() allocatorRunnable.getAllValues.asScala.last.run() verify(podOperations, never()).delete(firstResolvedPod) - verify(driverEndpointRef).ask[Boolean]( + verify(driverEndpointRef).send( RemoveExecutor("1", ExecutorExited( 1, exitCausedByApp = true, @@ -356,7 +356,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn val recreatedResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD) allocatorRunnable.getValue.run() verify(podOperations).delete(firstResolvedPod) - verify(driverEndpointRef).ask[Boolean]( + verify(driverEndpointRef).send( RemoveExecutor("1", SlaveLost("Executor lost for unknown reasons."))) } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 415a29fd887e..bb615c36cd97 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster import java.util.concurrent.atomic.{AtomicBoolean} import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.ExecutionContext.Implicits.global import scala.util.{Failure, Success} import scala.util.control.NonFatal @@ -245,14 +246,7 @@ private[spark] abstract class YarnSchedulerBackend( Future.successful(RemoveExecutor(executorId, SlaveLost("AM is not yet registered."))) } - removeExecutorMessage - .flatMap { message => - driverEndpoint.ask[Boolean](message) - }(ThreadUtils.sameThread) - .onFailure { - case NonFatal(e) => logError( - s"Error requesting driver to remove executor $executorId after disconnection.", e) - }(ThreadUtils.sameThread) + removeExecutorMessage.foreach { message => driverEndpoint.send(message) } } override def receive: PartialFunction[Any, Unit] = { @@ -265,12 +259,10 @@ private[spark] abstract class YarnSchedulerBackend( addWebUIFilter(filterName, filterParams, proxyBase) case r @ RemoveExecutor(executorId, reason) => - logWarning(reason.toString) - driverEndpoint.ask[Boolean](r).onFailure { - case e => - logError("Error requesting driver to remove executor" + - s" $executorId for reason $reason", e) - }(ThreadUtils.sameThread) + if (!stopped.get) { + logWarning(s"Requesting driver to remove executor $executorId for reason $reason") + driverEndpoint.send(r) + } }