diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 4e044aa4788d..923e40fe1725 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -36,7 +36,7 @@ import org.apache.log4j.{Level, Logger} import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} -import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} +import org.apache.spark.scheduler.{ExecutorKilled, ExecutorExited, ExecutorLossReason} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor import org.apache.spark.util.ThreadUtils @@ -124,6 +124,8 @@ private[yarn] class YarnAllocator( private val labelExpression = sparkConf.getOption("spark.yarn.executor.nodeLabelExpression") + private val executorsKilledByDriver = Collections.newSetFromMap[String]( + new ConcurrentHashMap[String, java.lang.Boolean]) // ContainerRequest constructor that can take a node label expression. We grab it through // reflection because it's only available in later versions of YARN. private val nodeLabelConstructor = labelExpression.flatMap { expr => @@ -203,6 +205,7 @@ private[yarn] class YarnAllocator( val container = executorIdToContainer.get(executorId).get internalReleaseContainer(container) numExecutorsRunning -= 1 + executorsKilledByDriver.add(executorId) } else { logWarning(s"Attempted to kill unknown executor $executorId!") } @@ -522,7 +525,9 @@ private[yarn] class YarnAllocator( // This is usually happened when explicitly killing a container, the result will be // returned in one AM-RM communication. So query RPC will be later than this completed // container process. - releasedExecutorLossReasons.put(eid, exitReason) + if (!executorsKilledByDriver.contains(eid)) { + releasedExecutorLossReasons.put(eid, exitReason) + } } if (!alreadyReleased) { // The executor could have gone away (like no route to host, node failure, etc) @@ -540,18 +545,24 @@ private[yarn] class YarnAllocator( */ private[yarn] def enqueueGetLossReasonRequest( eid: String, - context: RpcCallContext): Unit = synchronized { - if (executorIdToContainer.contains(eid)) { - pendingLossReasonRequests - .getOrElseUpdate(eid, new ArrayBuffer[RpcCallContext]) += context - } else if (releasedExecutorLossReasons.contains(eid)) { - // Executor is already released explicitly before getting the loss reason, so directly send - // the pre-stored lost reason - context.reply(releasedExecutorLossReasons.remove(eid).get) - } else { - logWarning(s"Tried to get the loss reason for non-existent executor $eid") - context.sendFailure( - new SparkException(s"Fail to find loss reason for non-existent executor $eid")) + context: RpcCallContext): Unit = { + if (executorsKilledByDriver.contains(eid)) { + context.reply(ExecutorKilled) + return + } + synchronized { + if (executorIdToContainer.contains(eid)) { + pendingLossReasonRequests + .getOrElseUpdate(eid, new ArrayBuffer[RpcCallContext]) += context + } else if (releasedExecutorLossReasons.contains(eid)) { + // Executor is already released explicitly before getting the loss reason, so directly send + // the pre-stored lost reason + context.reply(releasedExecutorLossReasons.remove(eid).get) + } else { + logWarning(s"Tried to get the loss reason for non-existent executor $eid") + context.sendFailure( + new SparkException(s"Fail to find loss reason for non-existent executor $eid")) + } } }