Skip to content

Commit cc55de3

Browse files
committed
[SPARK-41410][K8S] Support PVC-oriented executor pod allocation
### What changes were proposed in this pull request? This PR aims to support `PVC-oriented executor pod allocation` which means Spark driver will create a fixed number of PVCs (= `spark.executor.instances` or `spark.dynamicAllocation.maxExecutors`) and hold on new executor pod creations if the number of created PVCs reached the limit. ### Why are the changes needed? This will allow Spark to hand over the existing PVCs from dead executors to new executors. Previously, Spark creates new executors without waiting the dead executors release their PVCs. ### Does this PR introduce _any_ user-facing change? No, this is a new feature which is disabled by default. ### How was this patch tested? Pass the CIs with the newly added test case. Closes #38943 from dongjoon-hyun/SPARK-41410. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent ad503ca commit cc55de3

File tree

3 files changed

+105
-4
lines changed

3 files changed

+105
-4
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ import java.util.concurrent.TimeUnit
2121

2222
import org.apache.spark.deploy.k8s.Constants._
2323
import org.apache.spark.internal.Logging
24-
import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
25-
import org.apache.spark.internal.config.ConfigBuilder
24+
import org.apache.spark.internal.config.{ConfigBuilder, DYN_ALLOCATION_MAX_EXECUTORS, EXECUTOR_INSTANCES, PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
2625

2726
private[spark] object Config extends Logging {
2827

@@ -101,6 +100,17 @@ private[spark] object Config extends Logging {
101100
.booleanConf
102101
.createWithDefault(true)
103102

103+
val KUBERNETES_DRIVER_WAIT_TO_REUSE_PVC =
104+
ConfigBuilder("spark.kubernetes.driver.waitToReusePersistentVolumeClaims")
105+
.doc("If true, driver pod counts the number of created on-demand persistent volume claims " +
106+
s"and wait if the number is greater than or equal to the maximum which is " +
107+
s"${EXECUTOR_INSTANCES.key} or ${DYN_ALLOCATION_MAX_EXECUTORS.key}. " +
108+
s"This config requires both ${KUBERNETES_DRIVER_OWN_PVC.key}=true and " +
109+
s"${KUBERNETES_DRIVER_REUSE_PVC.key}=true.")
110+
.version("3.4.0")
111+
.booleanConf
112+
.createWithDefault(false)
113+
104114
val KUBERNETES_NAMESPACE =
105115
ConfigBuilder("spark.kubernetes.namespace")
106116
.doc("The namespace that will be used for running the driver and executor pods.")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ import org.apache.spark.deploy.k8s.Constants._
3333
import org.apache.spark.deploy.k8s.KubernetesConf
3434
import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
3535
import org.apache.spark.internal.Logging
36-
import org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT
36+
import org.apache.spark.internal.config.{DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, DYN_ALLOCATION_MAX_EXECUTORS, EXECUTOR_INSTANCES}
3737
import org.apache.spark.resource.ResourceProfile
38+
import org.apache.spark.scheduler.cluster.SchedulerBackendUtils.DEFAULT_NUMBER_EXECUTORS
3839
import org.apache.spark.util.{Clock, Utils}
3940

4041
class ExecutorPodsAllocator(
@@ -47,6 +48,17 @@ class ExecutorPodsAllocator(
4748

4849
private val EXECUTOR_ID_COUNTER = new AtomicInteger(0)
4950

51+
private val PVC_COUNTER = new AtomicInteger(0)
52+
53+
private val maxPVCs = if (Utils.isDynamicAllocationEnabled(conf)) {
54+
conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
55+
} else {
56+
conf.getInt(EXECUTOR_INSTANCES.key, DEFAULT_NUMBER_EXECUTORS)
57+
}
58+
59+
private val podAllocOnPVC = conf.get(KUBERNETES_DRIVER_OWN_PVC) &&
60+
conf.get(KUBERNETES_DRIVER_REUSE_PVC) && conf.get(KUBERNETES_DRIVER_WAIT_TO_REUSE_PVC)
61+
5062
// ResourceProfile id -> total expected executors per profile, currently we don't remove
5163
// any resource profiles - https://issues.apache.org/jira/browse/SPARK-30749
5264
private val totalExpectedExecutorsPerResourceProfileId = new ConcurrentHashMap[Int, Int]()
@@ -398,6 +410,10 @@ class ExecutorPodsAllocator(
398410
// Check reusable PVCs for this executor allocation batch
399411
val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse)
400412
for ( _ <- 0 until numExecutorsToAllocate) {
413+
if (reusablePVCs.isEmpty && podAllocOnPVC && maxPVCs <= PVC_COUNTER.get()) {
414+
logInfo(s"Wait to reuse one of the existing ${PVC_COUNTER.get()} PVCs.")
415+
return
416+
}
401417
val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
402418
val executorConf = KubernetesConf.createExecutorConf(
403419
conf,
@@ -429,6 +445,7 @@ class ExecutorPodsAllocator(
429445
logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " +
430446
s"StorageClass ${pvc.getSpec.getStorageClassName}")
431447
kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create()
448+
PVC_COUNTER.incrementAndGet()
432449
}
433450
newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis())
434451
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
@@ -438,6 +455,7 @@ class ExecutorPodsAllocator(
438455
.inNamespace(namespace)
439456
.resource(createdExecutorPod)
440457
.delete()
458+
PVC_COUNTER.decrementAndGet()
441459
throw e
442460
}
443461
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesExecutorSp
3939
import org.apache.spark.deploy.k8s.Config._
4040
import org.apache.spark.deploy.k8s.Constants._
4141
import org.apache.spark.deploy.k8s.Fabric8Aliases._
42-
import org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT
42+
import org.apache.spark.internal.config.{DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, EXECUTOR_INSTANCES}
4343
import org.apache.spark.resource._
4444
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
4545
import org.apache.spark.util.ManualClock
@@ -90,6 +90,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
9090
@Mock
9191
private var pvcWithNamespace: PVC_WITH_NAMESPACE = _
9292

93+
@Mock
94+
private var pvcResource: io.fabric8.kubernetes.client.dsl.Resource[PersistentVolumeClaim] = _
95+
9396
@Mock
9497
private var labeledPersistentVolumeClaims: LABELED_PERSISTENT_VOLUME_CLAIMS = _
9598

@@ -139,6 +142,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
139142
when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims)
140143
when(persistentVolumeClaims.inNamespace("default")).thenReturn(pvcWithNamespace)
141144
when(pvcWithNamespace.withLabel(any(), any())).thenReturn(labeledPersistentVolumeClaims)
145+
when(pvcWithNamespace.resource(any())).thenReturn(pvcResource)
142146
when(labeledPersistentVolumeClaims.list()).thenReturn(persistentVolumeClaimList)
143147
when(persistentVolumeClaimList.getItems).thenReturn(Seq.empty[PersistentVolumeClaim].asJava)
144148
}
@@ -809,6 +813,75 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
809813
podsAllocatorUnderTest invokePrivate getReusablePVCs("appId", Seq("pvc-1"))
810814
}
811815

816+
test("SPARK-41410: Support waitToReusePersistentVolumeClaims") {
817+
val prefix = "spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1"
818+
val confWithPVC = conf.clone
819+
.set(KUBERNETES_DRIVER_OWN_PVC.key, "true")
820+
.set(KUBERNETES_DRIVER_REUSE_PVC.key, "true")
821+
.set(KUBERNETES_DRIVER_WAIT_TO_REUSE_PVC.key, "true")
822+
.set(EXECUTOR_INSTANCES.key, "1")
823+
.set(s"$prefix.mount.path", "/spark-local-dir")
824+
.set(s"$prefix.mount.readOnly", "false")
825+
.set(s"$prefix.option.claimName", "OnDemand")
826+
.set(s"$prefix.option.sizeLimit", "200Gi")
827+
.set(s"$prefix.option.storageClass", "gp3")
828+
829+
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr),
830+
meq(kubernetesClient), any(classOf[ResourceProfile])))
831+
.thenAnswer((invocation: InvocationOnMock) => {
832+
val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)
833+
KubernetesExecutorSpec(
834+
executorPodWithIdAndVolume(k8sConf.executorId.toInt, k8sConf.resourceProfileId),
835+
Seq(persistentVolumeClaim("pvc-0", "gp3", "200Gi")))
836+
})
837+
838+
podsAllocatorUnderTest = new ExecutorPodsAllocator(
839+
confWithPVC, secMgr, executorBuilder,
840+
kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
841+
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
842+
843+
when(podsWithNamespace
844+
.withField("status.phase", "Pending"))
845+
.thenReturn(labeledPods)
846+
when(labeledPods
847+
.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
848+
.thenReturn(labeledPods)
849+
when(labeledPods
850+
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
851+
.thenReturn(labeledPods)
852+
when(labeledPods
853+
.withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
854+
.thenReturn(labeledPods)
855+
856+
val startTime = Instant.now.toEpochMilli
857+
waitForExecutorPodsClock.setTime(startTime)
858+
859+
val counter = PrivateMethod[AtomicInteger](Symbol("PVC_COUNTER"))()
860+
assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0)
861+
862+
// Target 1 executor, make sure it's requested, even with an empty initial snapshot.
863+
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
864+
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
865+
verify(podsWithNamespace).resource(podWithAttachedContainerForIdAndVolume(1))
866+
assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 1)
867+
868+
// Mark executor as running, verify that subsequent allocation cycle is a no-op.
869+
snapshotsStore.updatePod(runningExecutor(1))
870+
snapshotsStore.notifySubscribers()
871+
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
872+
verify(podResource, times(1)).create()
873+
verify(podResource, never()).delete()
874+
verify(pvcWithNamespace, times(1)).resource(any())
875+
assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 1)
876+
877+
// Request a new executor, make sure that no new pod and pvc are created
878+
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2))
879+
snapshotsStore.notifySubscribers()
880+
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
881+
verify(podResource, times(1)).create()
882+
assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 1)
883+
}
884+
812885
private def executorPodAnswer(): Answer[KubernetesExecutorSpec] =
813886
(invocation: InvocationOnMock) => {
814887
val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)

0 commit comments

Comments
 (0)