diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 83ae57b7f151..2e52dd239c39 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -22,6 +22,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId import org.apache.spark.scheduler.TaskScheduler import org.apache.spark.util.ActorLogReceive +import org.apache.spark.scheduler.ExecutorLossReason /** * A heartbeat from executors to the driver. This is a shared message used by several internal @@ -32,18 +33,56 @@ private[spark] case class Heartbeat( taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics blockManagerId: BlockManagerId) +private[spark] case object ExpireDeadHosts + private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean) /** * Lives in the driver to receive heartbeats from executors.. */ -private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) +private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler) extends Actor with ActorLogReceive with Logging { + val executorLastSeen = new mutable.HashMap[String, Long] + + import context.dispatcher + var timeoutCheckingTask = context.system.scheduler.schedule(0.seconds, + 10.milliseconds, self, ExpireDeadHosts) + + val slaveTimeout = sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", + math.max(sc.conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 120000)) + override def receiveWithLogging = { case Heartbeat(executorId, taskMetrics, blockManagerId) => val response = HeartbeatResponse( !scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId)) + heartbeatReceived(executorId) sender ! response + case ExpireDeadHosts => + expireDeadHosts() + + } + + private def heartbeatReceived(executorId: String) = { + executorLastSeen(executorId) = System.currentTimeMillis() + } + + private def expireDeadHosts() { + logTrace("Checking for hosts with no recent heart beats in HeartbeatReceiver.") + val now = System.currentTimeMillis() + val minSeenTime = now - slaveTimeout + for ((executorId, lastSeenMs) <- executorLastSeen) { + if (lastSeenMs < minSeenTime) { + val msg = "Removing Executor " + executorId + " with no recent heart beats: " + +(now - lastSeenMs) + "ms exceeds " + slaveTimeout + "ms" + logWarning(msg) + if (scheduler.isInstanceOf[org.apache.spark.scheduler.TaskSchedulerImpl]) { + scheduler.asInstanceOf[org.apache.spark.scheduler.TaskSchedulerImpl] + .executorLost(executorId, new ExecutorLossReason("")) + } + sc.killExecutor(executorId) + executorLastSeen.remove(executorId) + } + } } }