Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,7 @@ private[spark] class CoarseGrainedExecutorBackend(
}

if (notifyDriver && driver.nonEmpty) {
driver.get.ask[Boolean](
RemoveExecutor(executorId, new ExecutorLossReason(reason))
).failed.foreach(e =>
logWarning(s"Unable to notify the driver due to " + e.getMessage, e)
)(ThreadUtils.sameThread)
driver.get.send(RemoveExecutor(executorId, new ExecutorLossReason(reason)))
}

System.exit(code)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// The num of current max ExecutorId used to re-register appMaster
@volatile protected var currentExecutorIdCounter = 0

private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")

class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {

Expand All @@ -103,9 +106,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

protected val addressToExecutorId = new HashMap[RpcAddress, String]

private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")

override def onStart() {
// Periodically revive offers to allow delay scheduling to work
val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")
Expand Down Expand Up @@ -154,6 +154,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
executorDataMap.values.foreach { ed =>
ed.executorEndpoint.send(UpdateDelegationTokens(newDelegationTokens))
}

case RemoveExecutor(executorId, reason) =>
// We will remove the executor's state and cannot restore it. However, the connection
// between the driver and the executor may be still alive so that the executor won't exit
// automatically, so try to tell the executor to stop itself. See SPARK-13519.
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
removeExecutor(executorId, reason)
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down Expand Up @@ -215,14 +222,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
context.reply(true)

case RemoveExecutor(executorId, reason) =>
// We will remove the executor's state and cannot restore it. However, the connection
// between the driver and the executor may be still alive so that the executor won't exit
// automatically, so try to tell the executor to stop itself. See SPARK-13519.
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
removeExecutor(executorId, reason)
context.reply(true)

case RemoveWorker(workerId, host, message) =>
removeWorker(workerId, host, message)
context.reply(true)
Expand Down Expand Up @@ -373,10 +372,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

shouldDisable
}

override def onStop() {
reviveThread.shutdownNow()
}
}

var driverEndpoint: RpcEndpointRef = null
Expand Down Expand Up @@ -417,6 +412,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}

override def stop() {
reviveThread.shutdownNow()
stopExecutors()
try {
if (driverEndpoint != null) {
Expand Down Expand Up @@ -465,9 +461,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* at once.
*/
protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
// Only log the failure since we don't care about the result.
driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).failed.foreach(t =>
logError(t.getMessage, t))(ThreadUtils.sameThread)
driverEndpoint.send(RemoveExecutor(executorId, reason))
}

protected def removeWorker(workerId: String, host: String, message: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
// No more deletion attempts of the executors.
// This is graceful termination and should not be detected as a failure.
verify(podOperations, times(1)).delete(resolvedPod)
verify(driverEndpointRef, times(1)).ask[Boolean](
verify(driverEndpointRef, times(1)).send(
RemoveExecutor("1", ExecutorExited(
0,
exitCausedByApp = false,
Expand Down Expand Up @@ -318,7 +318,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
requestExecutorRunnable.getValue.run()
allocatorRunnable.getAllValues.asScala.last.run()
verify(podOperations, never()).delete(firstResolvedPod)
verify(driverEndpointRef).ask[Boolean](
verify(driverEndpointRef).send(
RemoveExecutor("1", ExecutorExited(
1,
exitCausedByApp = true,
Expand Down Expand Up @@ -356,7 +356,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
val recreatedResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
allocatorRunnable.getValue.run()
verify(podOperations).delete(firstResolvedPod)
verify(driverEndpointRef).ask[Boolean](
verify(driverEndpointRef).send(
RemoveExecutor("1", SlaveLost("Executor lost for unknown reasons.")))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster
import java.util.concurrent.atomic.{AtomicBoolean}

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import scala.util.control.NonFatal

Expand Down Expand Up @@ -245,14 +246,7 @@ private[spark] abstract class YarnSchedulerBackend(
Future.successful(RemoveExecutor(executorId, SlaveLost("AM is not yet registered.")))
}

removeExecutorMessage
.flatMap { message =>
driverEndpoint.ask[Boolean](message)
}(ThreadUtils.sameThread)
.onFailure {
case NonFatal(e) => logError(
s"Error requesting driver to remove executor $executorId after disconnection.", e)
}(ThreadUtils.sameThread)
removeExecutorMessage.foreach { message => driverEndpoint.send(message) }
}

override def receive: PartialFunction[Any, Unit] = {
Expand All @@ -265,12 +259,10 @@ private[spark] abstract class YarnSchedulerBackend(
addWebUIFilter(filterName, filterParams, proxyBase)

case r @ RemoveExecutor(executorId, reason) =>
logWarning(reason.toString)
driverEndpoint.ask[Boolean](r).onFailure {
case e =>
logError("Error requesting driver to remove executor" +
s" $executorId for reason $reason", e)
}(ThreadUtils.sameThread)
if (!stopped.get) {
logWarning(s"Requesting driver to remove executor $executorId for reason $reason")
driverEndpoint.send(r)
}
}


Expand Down