Skip to content
Closed
1 change: 1 addition & 0 deletions conf/log4j.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ log4j.logger.org.spark-project.jetty=WARN
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.akka.remote.ReliableDeliverySupervisor=ERROR
88 changes: 67 additions & 21 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import scala.collection.mutable
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEnv, RpcCallContext}
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.{SlaveLost, TaskScheduler}
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.scheduler._
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}

/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
Expand All @@ -43,15 +43,25 @@ private[spark] case class Heartbeat(
*/
private[spark] case object TaskSchedulerIsSet

private[spark] case object ExpireDeadHosts

private[spark] case object ExpireDeadHosts

private case class ExecutorRegistered(executorId: String)

private case class ExecutorRemoved(executorId: String)

private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)

/**
* Lives in the driver to receive heartbeats from executors..
*/
private[spark] class HeartbeatReceiver(sc: SparkContext)
extends ThreadSafeRpcEndpoint with Logging {
private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
extends ThreadSafeRpcEndpoint with SparkListener with Logging {

def this(sc: SparkContext) {
this(sc, new SystemClock)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make SystemClock as a default value to save one constructor, please?

}

sc.addSparkListener(this)

override val rpcEnv: RpcEnv = sc.env.rpcEnv

Expand Down Expand Up @@ -86,30 +96,45 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
override def onStart(): Unit = {
timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ExpireDeadHosts))
Option(self).foreach(_.ask[Boolean](ExpireDeadHosts))
}
}, 0, checkTimeoutIntervalMs, TimeUnit.MILLISECONDS)
}

override def receive: PartialFunction[Any, Unit] = {
case ExpireDeadHosts =>
expireDeadHosts()
case TaskSchedulerIsSet =>
scheduler = sc.taskScheduler
case ExecutorRegistered(executorId) =>
executorLastSeen(executorId) = clock.getTimeMillis()
case ExecutorRemoved(executorId) =>
executorLastSeen.remove(executorId)
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case TaskSchedulerIsSet =>
scheduler = sc.taskScheduler
context.reply(true)
case ExpireDeadHosts =>
expireDeadHosts()
context.reply(true)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved these here to increase test determinism

case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
if (scheduler != null) {
executorLastSeen(executorId) = System.currentTimeMillis()
eventLoopThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, taskMetrics, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
context.reply(response)
}
})
if (executorLastSeen.contains(executorId)) {
executorLastSeen(executorId) = clock.getTimeMillis()
eventLoopThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, taskMetrics, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
context.reply(response)
}
})
} else {
// This may happen if we get an executor's in-flight heartbeat immediately
// after we just removed it. It's not really an error condition so we should
// not log warning here. Otherwise there may be a lot of noise especially if
// we explicitly remove executors (SPARK-4134).
logDebug(s"Received heartbeat from unknown executor $executorId")
context.reply(HeartbeatResponse(reregisterBlockManager = true))
}
} else {
// Because Executor will sleep several seconds before sending the first "Heartbeat", this
// case rarely happens. However, if it really happens, log it and ask the executor to
Expand All @@ -119,9 +144,30 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
}
}

/**
* If the heartbeat receiver is not stopped, notify it of executor registrations.
*/
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
Option(self).foreach(_.send(ExecutorRegistered(executorAdded.executorId)))
}

/**
* If the heartbeat receiver is not stopped, notify it of executor removals so it doesn't
* log superfluous errors.
*
* Note that we must do this after the executor is actually removed to guard against the
* following race condition: if we remove an executor's metadata from our data structure
* prematurely, we may get an in-flight heartbeat from the executor before the executor is
* actually removed, in which case we will still mark the executor as a dead host later
* and expire it with loud error messages.
*/
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
Option(self).foreach(_.send(ExecutorRemoved(executorRemoved.executorId)))
}

private def expireDeadHosts(): Unit = {
logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.")
val now = System.currentTimeMillis()
val now = clock.getTimeMillis()
for ((executorId, lastSeenMs) <- executorLastSeen) {
if (now - lastSeenMs > executorTimeoutMs) {
logWarning(s"Removing executor $executorId with no recent heartbeats: " +
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.send(TaskSchedulerIsSet)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
Expand Down
16 changes: 10 additions & 6 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ class DAGScheduler(
}

// Called by TaskScheduler when an executor fails.
def executorLost(execId: String): Unit = {
eventProcessLoop.post(ExecutorLost(execId))
def executorLost(execId: String, isError: Boolean = true): Unit = {
eventProcessLoop.post(ExecutorLost(execId, isError))
}

// Called by TaskScheduler when a host is added
Expand Down Expand Up @@ -1130,7 +1130,8 @@ class DAGScheduler(

// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch))
handleExecutorLost(
bmAddress.executorId, fetchFailed = true, isError = true, Some(task.epoch))
}

case commitDenied: TaskCommitDenied =>
Expand Down Expand Up @@ -1163,11 +1164,14 @@ class DAGScheduler(
private[scheduler] def handleExecutorLost(
execId: String,
fetchFailed: Boolean,
isError: Boolean,
maybeEpoch: Option[Long] = None) {
val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
failedEpoch(execId) = currentEpoch
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
if (isError) {
logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
}
blockManagerMaster.removeExecutor(execId)

if (!env.blockManager.externalShuffleServiceEnabled || fetchFailed) {
Expand Down Expand Up @@ -1434,8 +1438,8 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)

case ExecutorLost(execId) =>
dagScheduler.handleExecutorLost(execId, fetchFailed = false)
case ExecutorLost(execId, isError) =>
dagScheduler.handleExecutorLost(execId, fetchFailed = false, isError = isError)

case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ private[scheduler] case class CompletionEvent(

private[scheduler] case class ExecutorAdded(execId: String, host: String) extends DAGSchedulerEvent

private[scheduler] case class ExecutorLost(execId: String) extends DAGSchedulerEvent
private[scheduler] case class ExecutorLost(execId: String, isError: Boolean = true)
extends DAGSchedulerEvent

private[scheduler]
case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.executor.ExecutorExitCode
* Represents an explanation for a executor or whole slave failing or exiting.
*/
private[spark]
class ExecutorLossReason(val message: String) {
class ExecutorLossReason(val message: String, val isError: Boolean = true) {
override def toString: String = message
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,11 @@ private[spark] class TaskSchedulerImpl(
synchronized {
if (activeExecutorIds.contains(executorId)) {
val hostPort = executorIdToHost(executorId)
logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason))
if (reason.isError) {
logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason))
} else {
logInfo("Removed executor %s on %s: %s".format(executorId, hostPort, reason))
}
removeExecutor(executorId)
failedExecutor = Some(executorId)
} else {
Expand All @@ -453,7 +457,7 @@ private[spark] class TaskSchedulerImpl(
}
// Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
if (failedExecutor.isDefined) {
dagScheduler.executorLost(failedExecutor.get)
dagScheduler.executorLost(failedExecutor.get, reason.isError)
backend.reviveOffers()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ private[spark] object CoarseGrainedClusterMessages {

case object StopExecutors extends CoarseGrainedClusterMessage

case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
case class RemoveExecutor(executorId: String, reason: String, isError: Boolean)
extends CoarseGrainedClusterMessage

case class SetupDriver(driver: RpcEndpointRef) extends CoarseGrainedClusterMessage

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
context.reply(true)

case RemoveExecutor(executorId, reason) =>
removeExecutor(executorId, reason)
case RemoveExecutor(executorId, reason, isError) =>
removeExecutor(executorId, reason, isError)
context.reply(true)

case RetrieveSparkProps =>
Expand Down Expand Up @@ -210,7 +210,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}

// Remove a disconnected slave from the cluster
def removeExecutor(executorId: String, reason: String): Unit = {
def removeExecutor(executorId: String, reason: String, isError: Boolean = true): Unit = {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
// This must be synchronized because variables mutated
Expand All @@ -222,7 +222,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
totalCoreCount.addAndGet(-executorInfo.totalCores)
totalRegisteredExecutors.addAndGet(-1)
scheduler.executorLost(executorId, SlaveLost(reason))
scheduler.executorLost(executorId, new ExecutorLossReason(reason, isError))
listenerBus.post(
SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason))
case None => logError(s"Asked to remove non-existent executor $executorId")
Expand Down Expand Up @@ -287,9 +287,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}

// Called by subclasses when notified of a lost worker
def removeExecutor(executorId: String, reason: String) {
def removeExecutor(executorId: String, reason: String, isError: Boolean = true) {
try {
driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason, isError))
} catch {
case e: Exception =>
throw new SparkException("Error notifying standalone scheduler's driver endpoint", e)
Expand Down Expand Up @@ -369,24 +369,47 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* Request that the cluster manager kill the specified executors.
* Return whether the kill request is acknowledged.
*/
final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
val filteredExecutorIds = new ArrayBuffer[String]
executorIds.foreach { id =>
if (executorDataMap.contains(id)) {
filteredExecutorIds += id
final override def killExecutors(executorIds: Seq[String]): Boolean = {
val executorIdsToKill = new ArrayBuffer[String]

// New target total after all pending requests have been granted
val newTargetTotal = synchronized {
if (executorIds.size == 1) {
logInfo(s"Requesting cluster manager to kill executor ${executorIds.head}.")
} else if (executorIds.size > 1) {
logInfo(s"Requesting cluster manager to kill executors ${executorIds.mkString(", ")}")
} else {
logWarning(s"Executor to kill $id does not exist!")
// No executors to kill
return false
}

executorIds.foreach { id =>
if (executorDataMap.contains(id)) {
executorIdsToKill += id
} else {
logWarning(s"Executor to kill $id does not exist!")
}
}

executorsPendingToRemove ++= executorIdsToKill

numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin I believe these two lines are functionally equivalent to what you had in the old code (L384). I believe the only difference is that I moved the askWithReply out of the synchronized block. Please let me me know if this is not the case.

What it used to look like:

val newTotal = (numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size
  - filteredExecutorIds.size)
...
executorsPendingToRemove ++= filteredExecutorIds

}

// Killing executors means effectively that we want fewer executors than before, so also
// update the target number of executors to avoid having the backend allocate new ones.
// We should do this outside of the synchronized block to avoid holding on to a lock while
// awaiting a reply.
val acked = doRequestTotalExecutors(newTargetTotal) && doKillExecutors(executorIdsToKill)
if (acked) {
// Remove executors from various data structures in advance
// so we do not generate a bunch of unexpected error messages
executorIdsToKill.foreach { id =>
removeExecutor(id, "manually killed", isError = false)
}
}
// Killing executors means effectively that we want less executors than before, so also update
// the target number of executors to avoid having the backend allocate new ones.
val newTotal = (numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size
- filteredExecutorIds.size)
doRequestTotalExecutors(newTotal)

executorsPendingToRemove ++= filteredExecutorIds
doKillExecutors(filteredExecutorIds)
acked
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class BlockManagerMaster(
/** Remove a dead executor from the driver endpoint. This is only called on the driver side. */
def removeExecutor(execId: String) {
tell(RemoveExecutor(execId))
logInfo("Removed " + execId + " successfully in removeExecutor")
logDebug(s"Removed executor $execId successfully")
}

/** Register the BlockManager's id with the driver. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,11 @@ class BlockManagerMasterEndpoint(
}
}
listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))
logInfo(s"Removing block manager $blockManagerId")
logDebug(s"Successfully removed BlockManager $blockManagerId")
}

private def removeExecutor(execId: String) {
logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
logDebug("Trying to remove executor " + execId + " from BlockManagerMaster.")
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
}

Expand Down
Loading