Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.TaskScheduler
import org.apache.spark.util.ActorLogReceive
import org.apache.spark.scheduler.ExecutorLossReason

/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
Expand All @@ -32,18 +33,56 @@ private[spark] case class Heartbeat(
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
blockManagerId: BlockManagerId)

private[spark] case object ExpireDeadHosts

private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)

/**
* Lives in the driver to receive heartbeats from executors..
*/
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
extends Actor with ActorLogReceive with Logging {

val executorLastSeen = new mutable.HashMap[String, Long]

import context.dispatcher
var timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
10.milliseconds, self, ExpireDeadHosts)

val slaveTimeout = sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs",
math.max(sc.conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 120000))

override def receiveWithLogging = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
val response = HeartbeatResponse(
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
heartbeatReceived(executorId)
sender ! response
case ExpireDeadHosts =>
expireDeadHosts()

}

private def heartbeatReceived(executorId: String) = {
executorLastSeen(executorId) = System.currentTimeMillis()
}

private def expireDeadHosts() {
logTrace("Checking for hosts with no recent heart beats in HeartbeatReceiver.")
val now = System.currentTimeMillis()
val minSeenTime = now - slaveTimeout
for ((executorId, lastSeenMs) <- executorLastSeen) {
if (lastSeenMs < minSeenTime) {
val msg = "Removing Executor " + executorId + " with no recent heart beats: "
+(now - lastSeenMs) + "ms exceeds " + slaveTimeout + "ms"
logWarning(msg)
if (scheduler.isInstanceOf[org.apache.spark.scheduler.TaskSchedulerImpl]) {
scheduler.asInstanceOf[org.apache.spark.scheduler.TaskSchedulerImpl]
.executorLost(executorId, new ExecutorLossReason(""))
}
sc.killExecutor(executorId)
executorLastSeen.remove(executorId)
}
}
}
}