Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.deploy.k8s.features

import java.util

import io.fabric8.kubernetes.api.model._
import io.fabric8.volcano.client.DefaultVolcanoClient
import io.fabric8.volcano.scheduling.v1beta1.{PodGroup, PodGroupSpec}
Expand Down Expand Up @@ -54,11 +56,33 @@ private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureCon
metadata.setName(podGroupName)
metadata.setNamespace(namespace)
pg.setMetadata(metadata)

var spec = pg.getSpec
if (spec == null) spec = new PodGroupSpec
val queue = kubernetesConf.getOption(POD_GROUP_SPEC_QUEUE)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val pg = template.map(client.podGroups.load(_).get).getOrElse(new PodGroup())
val builder = new PodGroupBuilder(pg)
  .editOrNewMetadata()
    .withName(podGroupName)
    .withNamespace(namespace)
  .endMetadata()
  .editOrNewSpec()
  .endSpec()

if (xxx) {
  builder.editOrNewSpec().endSpec()
}

return Seq(builder.build())

If we decided to add parameters support, the builder style might more stable and easy to maintainance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can refactor this, if my opinion is accepted,

if (queue.isDefined) {
spec.setQueue(queue.get)
}
val minResourceCPU = kubernetesConf.getOption(POD_GROUP_SPEC_MIN_RESOURCE_CPU)
val minResourceMemory = kubernetesConf.getOption(POD_GROUP_SPEC_MIN_RESOURCE_MEMORY)
if (minResourceCPU.isDefined || minResourceMemory.isDefined) {
val minResources = new util.HashMap[String, Quantity]
if (minResourceCPU.isDefined) {
minResources.put("cpu", new Quantity(minResourceCPU.get))
}
if (minResourceMemory.isDefined) {
minResources.put("memory", new Quantity(minResourceMemory.get))
}
spec.setMinResources(minResources)
Copy link
Member

@Yikun Yikun Sep 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will force to overwrite existing MinResources and ignore existing template if one of configurations is set.

}
val minMember = kubernetesConf.getOption(POD_GROUP_SPEC_MIN_MEMBER)
if (minMember.isDefined) {
spec.setMinMember(minMember.get.toInt)
}
val priorityClassName = kubernetesConf.getOption(POD_GROUP_SPEC_PRIORITY_CLASS_NAME)
if (priorityClassName.isDefined) {
spec.setPriorityClassName(priorityClassName.get)
}
pg.setSpec(spec)

Seq(pg)
}

Expand All @@ -75,4 +99,12 @@ private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureCon
private[spark] object VolcanoFeatureStep {
val POD_GROUP_ANNOTATION = "scheduling.k8s.io/group-name"
val POD_GROUP_TEMPLATE_FILE_KEY = "spark.kubernetes.scheduler.volcano.podGroupTemplateFile"
val POD_GROUP_SPEC_QUEUE = "spark.kubernetes.scheduler.volcano.podGroup.spec.queue"
val POD_GROUP_SPEC_MIN_RESOURCE_CPU =
"spark.kubernetes.scheduler.volcano.podGroup.spec.minResources.cpu"
val POD_GROUP_SPEC_MIN_RESOURCE_MEMORY =
"spark.kubernetes.scheduler.volcano.podGroup.spec.minResources.memory"
val POD_GROUP_SPEC_MIN_MEMBER = "spark.kubernetes.scheduler.volcano.podGroup.spec.minMember"
val POD_GROUP_SPEC_PRIORITY_CLASS_NAME =
"spark.kubernetes.scheduler.volcano.podGroup.spec.PriorityClassName"
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,24 @@ class VolcanoFeatureStepSuite extends SparkFunSuite {
step.init(kubernetesConf)
assert(step.getAdditionalPreKubernetesResources() === Seq.empty)
}

test("SPARK-38455: Support driver podgroup parameter") {
val sparkConf = new SparkConf()
.set(VolcanoFeatureStep.POD_GROUP_SPEC_QUEUE, "driver-queue")
.set(VolcanoFeatureStep.POD_GROUP_SPEC_MIN_RESOURCE_CPU, "2")
.set(VolcanoFeatureStep.POD_GROUP_SPEC_MIN_RESOURCE_MEMORY, "2048Mi")
.set(VolcanoFeatureStep.POD_GROUP_SPEC_MIN_MEMBER, "2")
.set(VolcanoFeatureStep.POD_GROUP_SPEC_PRIORITY_CLASS_NAME, "driver-priority")
val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf)
val step = new VolcanoFeatureStep()
step.init(kubernetesConf)
step.configurePod(SparkPod.initialPod())
val podGroup = step.getAdditionalPreKubernetesResources().head.asInstanceOf[PodGroup]
assert(podGroup.getSpec.getQueue == "driver-queue")
assert(podGroup.getSpec.getMinMember == 2)
assert(podGroup.getSpec.getMinResources.get("cpu").getAmount == "2")
assert(podGroup.getSpec.getMinResources.get("memory").getAmount == "2048")
assert(podGroup.getSpec.getMinResources.get("memory").getFormat == "Mi")
assert(podGroup.getSpec.getPriorityClassName == "driver-priority")
}
}