Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.{ExecutorKilled, ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.util.ThreadUtils

Expand Down Expand Up @@ -124,6 +124,8 @@ private[yarn] class YarnAllocator(

private val labelExpression = sparkConf.getOption("spark.yarn.executor.nodeLabelExpression")

private val executorsKilledByDriver = Collections.newSetFromMap[String](
new ConcurrentHashMap[String, java.lang.Boolean])
// ContainerRequest constructor that can take a node label expression. We grab it through
// reflection because it's only available in later versions of YARN.
private val nodeLabelConstructor = labelExpression.flatMap { expr =>
Expand Down Expand Up @@ -203,6 +205,7 @@ private[yarn] class YarnAllocator(
val container = executorIdToContainer.get(executorId).get
internalReleaseContainer(container)
numExecutorsRunning -= 1
executorsKilledByDriver.add(executorId)
} else {
logWarning(s"Attempted to kill unknown executor $executorId!")
}
Expand Down Expand Up @@ -522,7 +525,9 @@ private[yarn] class YarnAllocator(
// This is usually happened when explicitly killing a container, the result will be
// returned in one AM-RM communication. So query RPC will be later than this completed
// container process.
releasedExecutorLossReasons.put(eid, exitReason)
if (!executorsKilledByDriver.contains(eid)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a race here where if the executor went down for reason other then us kill after we called kill we won't get the right loss reason. ie killExecutor just adding it to list to be removed later, if it happened to die for another reason we would miss out on that. I don't think this is a big deal but we could put in a check here to compare reason.

releasedExecutorLossReasons.put(eid, exitReason)
}
}
if (!alreadyReleased) {
// The executor could have gone away (like no route to host, node failure, etc)
Expand All @@ -540,18 +545,24 @@ private[yarn] class YarnAllocator(
*/
private[yarn] def enqueueGetLossReasonRequest(
eid: String,
context: RpcCallContext): Unit = synchronized {
if (executorIdToContainer.contains(eid)) {
pendingLossReasonRequests
.getOrElseUpdate(eid, new ArrayBuffer[RpcCallContext]) += context
} else if (releasedExecutorLossReasons.contains(eid)) {
// Executor is already released explicitly before getting the loss reason, so directly send
// the pre-stored lost reason
context.reply(releasedExecutorLossReasons.remove(eid).get)
} else {
logWarning(s"Tried to get the loss reason for non-existent executor $eid")
context.sendFailure(
new SparkException(s"Fail to find loss reason for non-existent executor $eid"))
context: RpcCallContext): Unit = {
if (executorsKilledByDriver.contains(eid)) {
context.reply(ExecutorKilled)
return
}
synchronized {
if (executorIdToContainer.contains(eid)) {
pendingLossReasonRequests
.getOrElseUpdate(eid, new ArrayBuffer[RpcCallContext]) += context
} else if (releasedExecutorLossReasons.contains(eid)) {
// Executor is already released explicitly before getting the loss reason, so directly send
// the pre-stored lost reason
context.reply(releasedExecutorLossReasons.remove(eid).get)
} else {
logWarning(s"Tried to get the loss reason for non-existent executor $eid")
context.sendFailure(
new SparkException(s"Fail to find loss reason for non-existent executor $eid"))
}
}
}

Expand Down