Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)

private var timeoutCheckingTask: ScheduledFuture[_] = null

private val timeoutCheckingThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-timeout-checking-thread")
// "eventLoopThread" is used to run some pretty fast actions. The actions running in it should not
// block the thread for a long time.
private val eventLoopThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-receiver-event-loop-thread")

private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")

override def onStart(): Unit = {
timeoutCheckingTask = timeoutCheckingThread.scheduleAtFixedRate(new Runnable {
timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ExpireDeadHosts))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not part of this change, but can we change this to

if (self != null) {
  self.send(ExpireDeadHosts)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote this line because I was trying to avoid writing the following 3 lines code:

var _self = self
if (_self != null) {
  _self.send(ExpireDeadHosts)
}

self is a method actually. In your code, the second self may return null if it's stopping.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok that makes sense.

}
Expand All @@ -99,11 +101,15 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
if (scheduler != null) {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, taskMetrics, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
executorLastSeen(executorId) = System.currentTimeMillis()
context.reply(response)
eventLoopThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, taskMetrics, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
context.reply(response)
}
})
} else {
// Because Executor will sleep several seconds before sending the first "Heartbeat", this
// case rarely happens. However, if it really happens, log it and ask the executor to
Expand All @@ -125,7 +131,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
if (sc.supportDynamicAllocation) {
// Asynchronously kill the executor to avoid blocking the current thread
killExecutorThread.submit(new Runnable {
override def run(): Unit = sc.killExecutor(executorId)
override def run(): Unit = Utils.tryLogNonFatalError {
sc.killExecutor(executorId)
}
})
}
executorLastSeen.remove(executorId)
Expand All @@ -137,7 +145,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
if (timeoutCheckingTask != null) {
timeoutCheckingTask.cancel(true)
}
timeoutCheckingThread.shutdownNow()
eventLoopThread.shutdownNow()
killExecutorThread.shutdownNow()
}
}
Expand Down