diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 717dc2f409ca..ddb846916259 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -21,8 +21,7 @@ import java.util.concurrent.TimeUnit import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON} -import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.internal.config.{ConfigBuilder, DYN_ALLOCATION_MAX_EXECUTORS, EXECUTOR_INSTANCES, PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON} private[spark] object Config extends Logging { @@ -101,6 +100,17 @@ private[spark] object Config extends Logging { .booleanConf .createWithDefault(true) + val KUBERNETES_DRIVER_WAIT_TO_REUSE_PVC = + ConfigBuilder("spark.kubernetes.driver.waitToReusePersistentVolumeClaims") + .doc("If true, driver pod counts the number of created on-demand persistent volume claims " + + s"and wait if the number is greater than or equal to the maximum which is " + + s"${EXECUTOR_INSTANCES.key} or ${DYN_ALLOCATION_MAX_EXECUTORS.key}. " + + s"This config requires both ${KUBERNETES_DRIVER_OWN_PVC.key}=true and " + + s"${KUBERNETES_DRIVER_REUSE_PVC.key}=true.") + .version("3.4.0") + .booleanConf + .createWithDefault(false) + val KUBERNETES_NAMESPACE = ConfigBuilder("spark.kubernetes.namespace") .doc("The namespace that will be used for running the driver and executor pods.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index d8ae910b1aec..4188a9038aa2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -33,8 +33,9 @@ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.KubernetesConf import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT +import org.apache.spark.internal.config.{DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, DYN_ALLOCATION_MAX_EXECUTORS, EXECUTOR_INSTANCES} import org.apache.spark.resource.ResourceProfile +import org.apache.spark.scheduler.cluster.SchedulerBackendUtils.DEFAULT_NUMBER_EXECUTORS import org.apache.spark.util.{Clock, Utils} class ExecutorPodsAllocator( @@ -47,6 +48,17 @@ class ExecutorPodsAllocator( private val EXECUTOR_ID_COUNTER = new AtomicInteger(0) + private val PVC_COUNTER = new AtomicInteger(0) + + private val maxPVCs = if (Utils.isDynamicAllocationEnabled(conf)) { + conf.get(DYN_ALLOCATION_MAX_EXECUTORS) + } else { + conf.getInt(EXECUTOR_INSTANCES.key, DEFAULT_NUMBER_EXECUTORS) + } + + private val podAllocOnPVC = conf.get(KUBERNETES_DRIVER_OWN_PVC) && + conf.get(KUBERNETES_DRIVER_REUSE_PVC) && conf.get(KUBERNETES_DRIVER_WAIT_TO_REUSE_PVC) + // ResourceProfile id -> total expected executors per profile, currently we don't remove // any resource profiles - https://issues.apache.org/jira/browse/SPARK-30749 private val totalExpectedExecutorsPerResourceProfileId = new ConcurrentHashMap[Int, Int]() @@ -398,6 +410,10 @@ class ExecutorPodsAllocator( // Check reusable PVCs for this executor allocation batch val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse) for ( _ <- 0 until numExecutorsToAllocate) { + if (reusablePVCs.isEmpty && podAllocOnPVC && maxPVCs <= PVC_COUNTER.get()) { + logInfo(s"Wait to reuse one of the existing ${PVC_COUNTER.get()} PVCs.") + return + } val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet() val executorConf = KubernetesConf.createExecutorConf( conf, @@ -429,6 +445,7 @@ class ExecutorPodsAllocator( logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " + s"StorageClass ${pvc.getSpec.getStorageClassName}") kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create() + PVC_COUNTER.incrementAndGet() } newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis()) logDebug(s"Requested executor with id $newExecutorId from Kubernetes.") @@ -438,6 +455,7 @@ class ExecutorPodsAllocator( .inNamespace(namespace) .resource(createdExecutorPod) .delete() + PVC_COUNTER.decrementAndGet() throw e } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index c526bf0968e4..e4c3a853d18a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -39,7 +39,7 @@ import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesExecutorSp import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Fabric8Aliases._ -import org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT +import org.apache.spark.internal.config.{DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, EXECUTOR_INSTANCES} import org.apache.spark.resource._ import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._ import org.apache.spark.util.ManualClock @@ -90,6 +90,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { @Mock private var pvcWithNamespace: PVC_WITH_NAMESPACE = _ + @Mock + private var pvcResource: io.fabric8.kubernetes.client.dsl.Resource[PersistentVolumeClaim] = _ + @Mock private var labeledPersistentVolumeClaims: LABELED_PERSISTENT_VOLUME_CLAIMS = _ @@ -139,6 +142,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims) when(persistentVolumeClaims.inNamespace("default")).thenReturn(pvcWithNamespace) when(pvcWithNamespace.withLabel(any(), any())).thenReturn(labeledPersistentVolumeClaims) + when(pvcWithNamespace.resource(any())).thenReturn(pvcResource) when(labeledPersistentVolumeClaims.list()).thenReturn(persistentVolumeClaimList) when(persistentVolumeClaimList.getItems).thenReturn(Seq.empty[PersistentVolumeClaim].asJava) } @@ -809,6 +813,75 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest invokePrivate getReusablePVCs("appId", Seq("pvc-1")) } + test("SPARK-41410: Support waitToReusePersistentVolumeClaims") { + val prefix = "spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1" + val confWithPVC = conf.clone + .set(KUBERNETES_DRIVER_OWN_PVC.key, "true") + .set(KUBERNETES_DRIVER_REUSE_PVC.key, "true") + .set(KUBERNETES_DRIVER_WAIT_TO_REUSE_PVC.key, "true") + .set(EXECUTOR_INSTANCES.key, "1") + .set(s"$prefix.mount.path", "/spark-local-dir") + .set(s"$prefix.mount.readOnly", "false") + .set(s"$prefix.option.claimName", "OnDemand") + .set(s"$prefix.option.sizeLimit", "200Gi") + .set(s"$prefix.option.storageClass", "gp3") + + when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr), + meq(kubernetesClient), any(classOf[ResourceProfile]))) + .thenAnswer((invocation: InvocationOnMock) => { + val k8sConf: KubernetesExecutorConf = invocation.getArgument(0) + KubernetesExecutorSpec( + executorPodWithIdAndVolume(k8sConf.executorId.toInt, k8sConf.resourceProfileId), + Seq(persistentVolumeClaim("pvc-0", "gp3", "200Gi"))) + }) + + podsAllocatorUnderTest = new ExecutorPodsAllocator( + confWithPVC, secMgr, executorBuilder, + kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) + + when(podsWithNamespace + .withField("status.phase", "Pending")) + .thenReturn(labeledPods) + when(labeledPods + .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) + .thenReturn(labeledPods) + when(labeledPods + .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) + .thenReturn(labeledPods) + when(labeledPods + .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any())) + .thenReturn(labeledPods) + + val startTime = Instant.now.toEpochMilli + waitForExecutorPodsClock.setTime(startTime) + + val counter = PrivateMethod[AtomicInteger](Symbol("PVC_COUNTER"))() + assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0) + + // Target 1 executor, make sure it's requested, even with an empty initial snapshot. + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) + verify(podsWithNamespace).resource(podWithAttachedContainerForIdAndVolume(1)) + assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 1) + + // Mark executor as running, verify that subsequent allocation cycle is a no-op. + snapshotsStore.updatePod(runningExecutor(1)) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + verify(podResource, times(1)).create() + verify(podResource, never()).delete() + verify(pvcWithNamespace, times(1)).resource(any()) + assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 1) + + // Request a new executor, make sure that no new pod and pvc are created + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2)) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + verify(podResource, times(1)).create() + assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 1) + } + private def executorPodAnswer(): Answer[KubernetesExecutorSpec] = (invocation: InvocationOnMock) => { val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)