diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala index ab84393dd5d23..adc75b2214e93 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.deploy.k8s.features +import java.util + import io.fabric8.kubernetes.api.model._ import io.fabric8.volcano.client.DefaultVolcanoClient import io.fabric8.volcano.scheduling.v1beta1.{PodGroup, PodGroupSpec} @@ -54,11 +56,33 @@ private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureCon metadata.setName(podGroupName) metadata.setNamespace(namespace) pg.setMetadata(metadata) - var spec = pg.getSpec if (spec == null) spec = new PodGroupSpec + val queue = kubernetesConf.getOption(POD_GROUP_SPEC_QUEUE) + if (queue.isDefined) { + spec.setQueue(queue.get) + } + val minResourceCPU = kubernetesConf.getOption(POD_GROUP_SPEC_MIN_RESOURCE_CPU) + val minResourceMemory = kubernetesConf.getOption(POD_GROUP_SPEC_MIN_RESOURCE_MEMORY) + if (minResourceCPU.isDefined || minResourceMemory.isDefined) { + val minResources = new util.HashMap[String, Quantity] + if (minResourceCPU.isDefined) { + minResources.put("cpu", new Quantity(minResourceCPU.get)) + } + if (minResourceMemory.isDefined) { + minResources.put("memory", new Quantity(minResourceMemory.get)) + } + spec.setMinResources(minResources) + } + val minMember = kubernetesConf.getOption(POD_GROUP_SPEC_MIN_MEMBER) + if (minMember.isDefined) { + spec.setMinMember(minMember.get.toInt) + } + val priorityClassName = kubernetesConf.getOption(POD_GROUP_SPEC_PRIORITY_CLASS_NAME) + if (priorityClassName.isDefined) { + spec.setPriorityClassName(priorityClassName.get) + } pg.setSpec(spec) - Seq(pg) } @@ -75,4 +99,12 @@ private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureCon private[spark] object VolcanoFeatureStep { val POD_GROUP_ANNOTATION = "scheduling.k8s.io/group-name" val POD_GROUP_TEMPLATE_FILE_KEY = "spark.kubernetes.scheduler.volcano.podGroupTemplateFile" + val POD_GROUP_SPEC_QUEUE = "spark.kubernetes.scheduler.volcano.podGroup.spec.queue" + val POD_GROUP_SPEC_MIN_RESOURCE_CPU = + "spark.kubernetes.scheduler.volcano.podGroup.spec.minResources.cpu" + val POD_GROUP_SPEC_MIN_RESOURCE_MEMORY = + "spark.kubernetes.scheduler.volcano.podGroup.spec.minResources.memory" + val POD_GROUP_SPEC_MIN_MEMBER = "spark.kubernetes.scheduler.volcano.podGroup.spec.minMember" + val POD_GROUP_SPEC_PRIORITY_CLASS_NAME = + "spark.kubernetes.scheduler.volcano.podGroup.spec.PriorityClassName" } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala index dab414e0e19e7..dedc141d98cbb 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala @@ -73,4 +73,24 @@ class VolcanoFeatureStepSuite extends SparkFunSuite { step.init(kubernetesConf) assert(step.getAdditionalPreKubernetesResources() === Seq.empty) } + + test("SPARK-38455: Support driver podgroup parameter") { + val sparkConf = new SparkConf() + .set(VolcanoFeatureStep.POD_GROUP_SPEC_QUEUE, "driver-queue") + .set(VolcanoFeatureStep.POD_GROUP_SPEC_MIN_RESOURCE_CPU, "2") + .set(VolcanoFeatureStep.POD_GROUP_SPEC_MIN_RESOURCE_MEMORY, "2048Mi") + .set(VolcanoFeatureStep.POD_GROUP_SPEC_MIN_MEMBER, "2") + .set(VolcanoFeatureStep.POD_GROUP_SPEC_PRIORITY_CLASS_NAME, "driver-priority") + val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf) + val step = new VolcanoFeatureStep() + step.init(kubernetesConf) + step.configurePod(SparkPod.initialPod()) + val podGroup = step.getAdditionalPreKubernetesResources().head.asInstanceOf[PodGroup] + assert(podGroup.getSpec.getQueue == "driver-queue") + assert(podGroup.getSpec.getMinMember == 2) + assert(podGroup.getSpec.getMinResources.get("cpu").getAmount == "2") + assert(podGroup.getSpec.getMinResources.get("memory").getAmount == "2048") + assert(podGroup.getSpec.getMinResources.get("memory").getFormat == "Mi") + assert(podGroup.getSpec.getPriorityClassName == "driver-priority") + } }