Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import java.util.concurrent.TimeUnit

import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.internal.config.{ConfigBuilder, DYN_ALLOCATION_MAX_EXECUTORS, EXECUTOR_INSTANCES, PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}

private[spark] object Config extends Logging {

Expand Down Expand Up @@ -101,6 +100,17 @@ private[spark] object Config extends Logging {
.booleanConf
.createWithDefault(true)

val KUBERNETES_DRIVER_WAIT_TO_REUSE_PVC =
ConfigBuilder("spark.kubernetes.driver.waitToReusePersistentVolumeClaims")
.doc("If true, driver pod counts the number of created on-demand persistent volume claims " +
s"and wait if the number is greater than or equal to the maximum which is " +
s"${EXECUTOR_INSTANCES.key} or ${DYN_ALLOCATION_MAX_EXECUTORS.key}. " +
s"This config requires both ${KUBERNETES_DRIVER_OWN_PVC.key}=true and " +
s"${KUBERNETES_DRIVER_REUSE_PVC.key}=true.")
Comment on lines +104 to +109
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not to mention PVC-oriented executor pod allocation in the config description? I think it is more clear on what this feature is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, initially, I tried to use it as a config name but PVC-oriented executor pod allocation was achieved by three configurations.

  • spark.kubernetes.driver.waitToReusePersistentVolumeClaims
  • spark.kubernetes.driver.ownPersistentVolumeClaims
  • spark.kubernetes.driver.reusePersistentVolumeClaims

I'll add a K8s document section with that name.

.version("3.4.0")
.booleanConf
.createWithDefault(false)

val KUBERNETES_NAMESPACE =
ConfigBuilder("spark.kubernetes.namespace")
.doc("The namespace that will be used for running the driver and executor pods.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesConf
import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT
import org.apache.spark.internal.config.{DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, DYN_ALLOCATION_MAX_EXECUTORS, EXECUTOR_INSTANCES}
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.cluster.SchedulerBackendUtils.DEFAULT_NUMBER_EXECUTORS
import org.apache.spark.util.{Clock, Utils}

class ExecutorPodsAllocator(
Expand All @@ -47,6 +48,17 @@ class ExecutorPodsAllocator(

private val EXECUTOR_ID_COUNTER = new AtomicInteger(0)

private val PVC_COUNTER = new AtomicInteger(0)

private val maxPVCs = if (Utils.isDynamicAllocationEnabled(conf)) {
conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
} else {
conf.getInt(EXECUTOR_INSTANCES.key, DEFAULT_NUMBER_EXECUTORS)
}

private val podAllocOnPVC = conf.get(KUBERNETES_DRIVER_OWN_PVC) &&
conf.get(KUBERNETES_DRIVER_REUSE_PVC) && conf.get(KUBERNETES_DRIVER_WAIT_TO_REUSE_PVC)

// ResourceProfile id -> total expected executors per profile, currently we don't remove
// any resource profiles - https://issues.apache.org/jira/browse/SPARK-30749
private val totalExpectedExecutorsPerResourceProfileId = new ConcurrentHashMap[Int, Int]()
Expand Down Expand Up @@ -398,6 +410,10 @@ class ExecutorPodsAllocator(
// Check reusable PVCs for this executor allocation batch
val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse)
for ( _ <- 0 until numExecutorsToAllocate) {
if (reusablePVCs.isEmpty && podAllocOnPVC && maxPVCs <= PVC_COUNTER.get()) {
logInfo(s"Wait to reuse one of the existing ${PVC_COUNTER.get()} PVCs.")
return
}
val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
val executorConf = KubernetesConf.createExecutorConf(
conf,
Expand Down Expand Up @@ -429,6 +445,7 @@ class ExecutorPodsAllocator(
logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " +
s"StorageClass ${pvc.getSpec.getStorageClassName}")
kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create()
PVC_COUNTER.incrementAndGet()
}
newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis())
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
Expand All @@ -438,6 +455,7 @@ class ExecutorPodsAllocator(
.inNamespace(namespace)
.resource(createdExecutorPod)
.delete()
PVC_COUNTER.decrementAndGet()
throw e
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesExecutorSp
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT
import org.apache.spark.internal.config.{DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, EXECUTOR_INSTANCES}
import org.apache.spark.resource._
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
import org.apache.spark.util.ManualClock
Expand Down Expand Up @@ -90,6 +90,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
@Mock
private var pvcWithNamespace: PVC_WITH_NAMESPACE = _

@Mock
private var pvcResource: io.fabric8.kubernetes.client.dsl.Resource[PersistentVolumeClaim] = _

@Mock
private var labeledPersistentVolumeClaims: LABELED_PERSISTENT_VOLUME_CLAIMS = _

Expand Down Expand Up @@ -139,6 +142,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims)
when(persistentVolumeClaims.inNamespace("default")).thenReturn(pvcWithNamespace)
when(pvcWithNamespace.withLabel(any(), any())).thenReturn(labeledPersistentVolumeClaims)
when(pvcWithNamespace.resource(any())).thenReturn(pvcResource)
when(labeledPersistentVolumeClaims.list()).thenReturn(persistentVolumeClaimList)
when(persistentVolumeClaimList.getItems).thenReturn(Seq.empty[PersistentVolumeClaim].asJava)
}
Expand Down Expand Up @@ -809,6 +813,75 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
podsAllocatorUnderTest invokePrivate getReusablePVCs("appId", Seq("pvc-1"))
}

test("SPARK-41410: Support waitToReusePersistentVolumeClaims") {
val prefix = "spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1"
val confWithPVC = conf.clone
.set(KUBERNETES_DRIVER_OWN_PVC.key, "true")
.set(KUBERNETES_DRIVER_REUSE_PVC.key, "true")
.set(KUBERNETES_DRIVER_WAIT_TO_REUSE_PVC.key, "true")
.set(EXECUTOR_INSTANCES.key, "1")
.set(s"$prefix.mount.path", "/spark-local-dir")
.set(s"$prefix.mount.readOnly", "false")
.set(s"$prefix.option.claimName", "OnDemand")
.set(s"$prefix.option.sizeLimit", "200Gi")
.set(s"$prefix.option.storageClass", "gp3")

when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr),
meq(kubernetesClient), any(classOf[ResourceProfile])))
.thenAnswer((invocation: InvocationOnMock) => {
val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)
KubernetesExecutorSpec(
executorPodWithIdAndVolume(k8sConf.executorId.toInt, k8sConf.resourceProfileId),
Seq(persistentVolumeClaim("pvc-0", "gp3", "200Gi")))
})

podsAllocatorUnderTest = new ExecutorPodsAllocator(
confWithPVC, secMgr, executorBuilder,
kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)

when(podsWithNamespace
.withField("status.phase", "Pending"))
.thenReturn(labeledPods)
when(labeledPods
.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
.thenReturn(labeledPods)
when(labeledPods
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
.thenReturn(labeledPods)
when(labeledPods
.withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
.thenReturn(labeledPods)

val startTime = Instant.now.toEpochMilli
waitForExecutorPodsClock.setTime(startTime)

val counter = PrivateMethod[AtomicInteger](Symbol("PVC_COUNTER"))()
assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0)

// Target 1 executor, make sure it's requested, even with an empty initial snapshot.
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
verify(podsWithNamespace).resource(podWithAttachedContainerForIdAndVolume(1))
assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 1)

// Mark executor as running, verify that subsequent allocation cycle is a no-op.
snapshotsStore.updatePod(runningExecutor(1))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
verify(podResource, times(1)).create()
verify(podResource, never()).delete()
verify(pvcWithNamespace, times(1)).resource(any())
assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 1)

// Request a new executor, make sure that no new pod and pvc are created
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
verify(podResource, times(1)).create()
assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 1)
}

private def executorPodAnswer(): Answer[KubernetesExecutorSpec] =
(invocation: InvocationOnMock) => {
val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)
Expand Down