Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ private[spark] class TaskSchedulerImpl(
// If the host mapping still exists, it means we don't know the loss reason for the
// executor. So call removeExecutor() to update tasks running on that executor when
// the real loss reason is finally known.
logError(s"Actual reason for lost executor $executorId: ${reason.message}")
removeExecutor(executorId, reason)

case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,15 +269,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* Stop making resource offers for the given executor. The executor is marked as lost with
* the loss reason still pending.
*
* @return Whether executor was alive.
* @return Whether executor should be disabled
*/
protected def disableExecutor(executorId: String): Boolean = {
val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
if (executorIsAlive(executorId)) {
executorsPendingLossReason += executorId
true
} else {
false
// Returns true for explicitly killed executors, we also need to get pending loss reasons;
// For others return false.
executorsPendingToRemove.contains(executorId)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.util.RackResolver

import org.apache.log4j.{Level, Logger}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
Expand Down Expand Up @@ -96,6 +96,10 @@ private[yarn] class YarnAllocator(
// was lost.
private val pendingLossReasonRequests = new HashMap[String, mutable.Buffer[RpcCallContext]]

// Maintain loss reasons for already released executors, it will be added when executor loss
// reason is got from AM-RM call, and be removed after querying this loss reason.
private val releasedExecutorLossReasons = new HashMap[String, ExecutorLossReason]

// Keep track of which container is running which executor to remove the executors later
// Visible for testing.
private[yarn] val executorIdToContainer = new HashMap[String, Container]
Expand Down Expand Up @@ -202,8 +206,7 @@ private[yarn] class YarnAllocator(
*/
def killExecutor(executorId: String): Unit = synchronized {
if (executorIdToContainer.contains(executorId)) {
val container = executorIdToContainer.remove(executorId).get
containerIdToExecutorId.remove(container.getId)
val container = executorIdToContainer.get(executorId).get
internalReleaseContainer(container)
numExecutorsRunning -= 1
} else {
Expand Down Expand Up @@ -514,9 +517,18 @@ private[yarn] class YarnAllocator(

containerIdToExecutorId.remove(containerId).foreach { eid =>
executorIdToContainer.remove(eid)
pendingLossReasonRequests.remove(eid).foreach { pendingRequests =>
// Notify application of executor loss reasons so it can decide whether it should abort
pendingRequests.foreach(_.reply(exitReason))
pendingLossReasonRequests.remove(eid) match {
case Some(pendingRequests) =>
// Notify application of executor loss reasons so it can decide whether it should abort
pendingRequests.foreach(_.reply(exitReason))

case None =>
// We cannot find executor for pending reasons. This is because completed container
// is processed before querying pending result. We should store it for later query.
// This is usually happened when explicitly killing a container, the result will be
// returned in one AM-RM communication. So query RPC will be later than this completed
// container process.
releasedExecutorLossReasons.put(eid, exitReason)
}
if (!alreadyReleased) {
// The executor could have gone away (like no route to host, node failure, etc)
Expand All @@ -538,8 +550,14 @@ private[yarn] class YarnAllocator(
if (executorIdToContainer.contains(eid)) {
pendingLossReasonRequests
.getOrElseUpdate(eid, new ArrayBuffer[RpcCallContext]) += context
} else if (releasedExecutorLossReasons.contains(eid)) {
// Executor is already released explicitly before getting the loss reason, so directly send
// the pre-stored lost reason
context.reply(releasedExecutorLossReasons.remove(eid).get)
} else {
logWarning(s"Tried to get the loss reason for non-existent executor $eid")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: here it would also be better to response a message, otherwise driver will pending on waiting the message until timeout. This may happens when issue a loss reason query, AM already processed this completed containers (since they run asynchronously).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, probably good to call context.sendFailure here.

context.sendFailure(
new SparkException(s"Fail to find loss reason for non-existent executor $eid"))
}
}

Expand Down