Skip to content

Commit aac8d0a

Browse files
committed
[SPARK-41388][K8S] getReusablePVCs should ignore recently created PVCs in the previous batch
This PR aims to prevent `getReusablePVCs` from choosing recently created PVCs in the very previous batch by excluding newly created PVCs whose creation time is within `spark.kubernetes.allocation.batch.delay`. In case of slow K8s control plane situation where `spark.kubernetes.allocation.batch.delay` is too short relatively or `spark.kubernetes.executor.enablePollingWithResourceVersion=true` is used, `onNewSnapshots` may not return the full list of executor pods created by the previous batch. This sometimes makes Spark driver think the PVCs in the previous batch are reusable for the next batch. No. Pass the CIs with the newly created test case. Closes #38912 from dongjoon-hyun/SPARK-41388. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit e234cd8) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 651f5da) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 44b6db8 commit aac8d0a

File tree

2 files changed

+26
-3
lines changed

2 files changed

+26
-3
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,11 @@ private[spark] class ExecutorPodsAllocator(
364364
.getItems
365365
.asScala
366366

367-
val reusablePVCs = createdPVCs.filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName))
367+
val now = Instant.now().toEpochMilli
368+
val reusablePVCs = createdPVCs
369+
.filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName))
370+
.filter(pvc => now - Instant.parse(pvc.getMetadata.getCreationTimestamp).toEpochMilli
371+
> podAllocationDelay)
368372
logInfo(s"Found ${reusablePVCs.size} reusable PVCs from ${createdPVCs.size} PVCs")
369373
reusablePVCs
370374
} else {

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717
package org.apache.spark.scheduler.cluster.k8s
1818

1919
import java.time.Instant
20+
import java.time.temporal.ChronoUnit.MILLIS
2021
import java.util.concurrent.atomic.AtomicInteger
2122

2223
import scala.collection.JavaConverters._
24+
import scala.collection.mutable
2325

2426
import io.fabric8.kubernetes.api.model._
2527
import io.fabric8.kubernetes.client.KubernetesClient
@@ -678,8 +680,10 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
678680
when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims)
679681
when(persistentVolumeClaims.withLabel(any(), any())).thenReturn(labeledPersistentVolumeClaims)
680682
when(labeledPersistentVolumeClaims.list()).thenReturn(persistentVolumeClaimList)
681-
when(persistentVolumeClaimList.getItems)
682-
.thenReturn(Seq(persistentVolumeClaim("pvc-0", "gp2", "200Gi")).asJava)
683+
val pvc = persistentVolumeClaim("pvc-0", "gp2", "200Gi")
684+
pvc.getMetadata
685+
.setCreationTimestamp(Instant.now().minus(podAllocationDelay + 1, MILLIS).toString)
686+
when(persistentVolumeClaimList.getItems).thenReturn(Seq(pvc).asJava)
683687
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr),
684688
meq(kubernetesClient), any(classOf[ResourceProfile])))
685689
.thenAnswer((invocation: InvocationOnMock) => {
@@ -742,6 +746,21 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
742746
" namespace default"))
743747
}
744748

749+
test("SPARK-41388: getReusablePVCs should ignore recently created PVCs in the previous batch") {
750+
val getReusablePVCs =
751+
PrivateMethod[mutable.Buffer[PersistentVolumeClaim]](Symbol("getReusablePVCs"))
752+
753+
val pvc1 = persistentVolumeClaim("pvc-0", "gp2", "200Gi")
754+
val pvc2 = persistentVolumeClaim("pvc-1", "gp2", "200Gi")
755+
756+
val now = Instant.now()
757+
pvc1.getMetadata.setCreationTimestamp(now.minus(2 * podAllocationDelay, MILLIS).toString)
758+
pvc2.getMetadata.setCreationTimestamp(now.toString)
759+
760+
when(persistentVolumeClaimList.getItems).thenReturn(Seq(pvc1, pvc2).asJava)
761+
podsAllocatorUnderTest invokePrivate getReusablePVCs("appId", Seq("pvc-1"))
762+
}
763+
745764
private def executorPodAnswer(): Answer[KubernetesExecutorSpec] =
746765
(invocation: InvocationOnMock) => {
747766
val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)

0 commit comments

Comments
 (0)