Skip to content

Commit a44c29e

Browse files
committed
Fixed unit tests and made maximum executor lost reason checks configurable
1 parent f8e3249 commit a44c29e

File tree

4 files changed

+17
-7
lines changed

4 files changed

+17
-7
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,5 +111,14 @@ private[spark] object Config extends Logging {
111111
.stringConf
112112
.createOptional
113113

114+
val KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS =
115+
ConfigBuilder("spark.kubernetes.executor.lostCheck.maxAttempts")
116+
.doc("Maximum number of attempts allowed for checking the reason of an executor loss " +
117+
"before it is assumed that the executor failed.")
118+
.intConf
119+
.checkValue(value => value > 0, "Maximum attempts of checks of executor lost reason " +
120+
"must be a positive integer")
121+
.createWithDefault(5)
122+
114123
val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
115124
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
9090

9191
private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
9292

93+
private val executorLostReasonCheckMaxAttempts = conf.get(
94+
KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS)
95+
9396
private val allocatorRunnable = new Runnable {
9497

9598
// Maintains a map of executor id to count of checks performed to learn the loss reason
@@ -174,7 +177,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
174177

175178
def removeExecutorOrIncrementLossReasonCheckCount(executorId: String): Unit = {
176179
val reasonCheckCount = executorReasonCheckAttemptCounts.getOrElse(executorId, 0)
177-
if (reasonCheckCount >= MAX_EXECUTOR_LOST_REASON_CHECKS) {
180+
if (reasonCheckCount >= executorLostReasonCheckMaxAttempts) {
178181
removeExecutor(executorId, SlaveLost("Executor lost for unknown reasons."))
179182
deleteExecutorFromClusterAndDataStructures(executorId)
180183
} else {
@@ -427,7 +430,4 @@ private[spark] class KubernetesClusterSchedulerBackend(
427430

428431
private object KubernetesClusterSchedulerBackend {
429432
private val UNKNOWN_EXIT_CODE = -1
430-
// Number of times we are allowed check for the loss reason for an executor before we give up
431-
// and assume the executor failed for good, and attribute it to a framework fault.
432-
val MAX_EXECUTOR_LOST_REASON_CHECKS = 10
433433
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
123123
ENV_EXECUTOR_CORES -> "1",
124124
ENV_EXECUTOR_MEMORY -> "1g",
125125
ENV_APPLICATION_ID -> "dummy",
126-
ENV_EXECUTOR_POD_IP -> null,
127-
ENV_EXECUTOR_PORT -> "10000") ++ additionalEnvVars
126+
ENV_EXECUTOR_POD_IP -> null) ++ additionalEnvVars
128127

129128
assert(executor.getSpec.getContainers.size() === 1)
130129
assert(executor.getSpec.getContainers.get(0).getEnv.size() === defaultEnvs.size)

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,8 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
330330
sparkConf
331331
.set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
332332
.set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
333+
val executorLostReasonCheckMaxAttempts = sparkConf.get(
334+
KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS)
333335

334336
val scheduler = newSchedulerBackend()
335337
scheduler.start()
@@ -346,7 +348,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
346348
.apply(registerFirstExecutorMessage)
347349

348350
driverEndpoint.getValue.onDisconnected(executorEndpointRef.address)
349-
1 to KubernetesClusterSchedulerBackend.MAX_EXECUTOR_LOST_REASON_CHECKS foreach { _ =>
351+
1 to executorLostReasonCheckMaxAttempts foreach { _ =>
350352
allocatorRunnable.getValue.run()
351353
verify(podOperations, never()).delete(FIRST_EXECUTOR_POD)
352354
}

0 commit comments

Comments
 (0)