diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml
index 0cb5e115906a..7311c9ea446a 100644
--- a/resource-managers/kubernetes/core/pom.xml
+++ b/resource-managers/kubernetes/core/pom.xml
@@ -51,7 +51,16 @@
spark-tags_${scala.binary.version}
test-jar
-
+
+ io.fabric8
+ volcano-client
+ ${kubernetes-client.version}
+
+
+ io.fabric8
+ volcano-model-v1beta1
+ ${kubernetes-client.version}
+
io.fabric8
kubernetes-client
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 2458e2d66530..d8abe3672e0b 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -22,6 +22,7 @@ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
private[spark] object Config extends Logging {
@@ -636,6 +637,24 @@ private[spark] object Config extends Logging {
val KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX = "spark.kubernetes.executor.secretKeyRef."
val KUBERNETES_EXECUTOR_VOLUMES_PREFIX = "spark.kubernetes.executor.volumes."
+ val KUBERNETES_ENABLE_PODGROUP = ConfigBuilder("spark.kubernetes.enablePodGroup")
+ .doc("If true, PodGroup annotation('scheduling.k8s.io/group-name') would be specified to " +
+ "each driver/executor pod, all pods in a Job would be set in a same PodGroup, this info " +
+ "would be used by Kubernetes batch scheduler.")
+ .version("3.3.0")
+ .booleanConf
+ .createWithDefault(false)
+ val KUBERNETES_PODGROUP_MIN_MEMORY = ConfigBuilder("spark.kubernetes.podgroup.min.memory")
+ .doc("Amount of memory to use for the PodGroup minResource, in MiB unless otherwise specified.")
+ .version("3.3.0")
+ .bytesConf(ByteUnit.MiB)
+ .createWithDefaultString("3g")
+ val KUBERNETES_PODGROUP_MIN_CPU = ConfigBuilder("spark.kubernetes.podgroup.min.cpu")
+ .doc("Amount of cpu to use for the PodGroup minResource")
+ .version("3.3.0")
+ .doubleConf
+ .createWithDefault(2.0)
+
val KUBERNETES_VOLUMES_HOSTPATH_TYPE = "hostPath"
val KUBERNETES_VOLUMES_PVC_TYPE = "persistentVolumeClaim"
val KUBERNETES_VOLUMES_EMPTYDIR_TYPE = "emptyDir"
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index 81304024e1ff..46086fac0202 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -43,6 +43,7 @@ private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) {
def secretNamesToMountPaths: Map[String, String]
def volumes: Seq[KubernetesVolumeSpec]
def schedulerName: String
+ def appId: String
def appName: String = get("spark.app.name", "spark")
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala
index fce8c6a4bf49..a603cb08ba9a 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala
@@ -20,5 +20,6 @@ import io.fabric8.kubernetes.api.model.HasMetadata
private[spark] case class KubernetesDriverSpec(
pod: SparkPod,
+ driverPreKubernetesResources: Seq[HasMetadata],
driverKubernetesResources: Seq[HasMetadata],
systemProperties: Map[String, String])
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index 0c8d9646a2b4..7a21c603c56a 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -22,6 +22,7 @@ import java.security.SecureRandom
import java.util.{Collections, UUID}
import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, HasMetadata, OwnerReferenceBuilder, Pod, PodBuilder, Quantity}
import io.fabric8.kubernetes.client.KubernetesClient
@@ -381,4 +382,21 @@ object KubernetesUtils extends Logging {
}
}
}
+
+ @Since("3.3.0")
+ def createOrReplaceResource(
+ client: KubernetesClient,
+ resources: Seq[HasMetadata],
+ ownerPod: Pod = null): Unit = {
+ try {
+ addOwnerReference(ownerPod, resources)
+ client.resourceList(resources: _*).createOrReplace()
+ } catch {
+ case NonFatal(e) =>
+ if (ownerPod != null) {
+ client.pods().delete(ownerPod)
+ }
+ throw e
+ }
+ }
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala
index 3fec92644b95..02fd3e11d921 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala
@@ -70,7 +70,15 @@ trait KubernetesFeatureConfigStep {
/**
* Return any additional Kubernetes resources that should be added to support this feature. Only
- * applicable when creating the driver in cluster mode.
+ * applicable when creating the driver in cluster mode. Resources would be setup/refresh after
+ * Pod creation.
*/
def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
+
+ /**
+ * Return any additional Kubernetes resources that should be added to support this feature. Only
+ * applicable when creating the driver in cluster mode. Resources would be setup/refresh before
+ * Pod creation.
+ */
+ def getAdditionalPreKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodGroupFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodGroupFeatureStep.scala
new file mode 100644
index 000000000000..4def8d9bfb5a
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodGroupFeatureStep.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.features
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.volcano.scheduling.v1beta1.PodGroupBuilder
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+
+private[spark] class PodGroupFeatureStep(kubernetesConf: KubernetesConf)
+ extends KubernetesFeatureConfigStep {
+
+ val conf: SparkConf = kubernetesConf.sparkConf
+ val POD_GROUP_ANNOTATION = "scheduling.k8s.io/group-name"
+ val podGroupName = s"${kubernetesConf.appId}-podgroup"
+ val enablePodGroup: Boolean = conf.get(KUBERNETES_ENABLE_PODGROUP)
+
+ override def configurePod(pod: SparkPod): SparkPod = {
+ if (enablePodGroup) {
+ val k8sPodBuilder = new PodBuilder(pod.pod)
+ .editMetadata()
+ .addToAnnotations(POD_GROUP_ANNOTATION, podGroupName)
+ .endMetadata()
+ val k8sPod = k8sPodBuilder.build()
+ SparkPod(k8sPod, pod.container)
+ } else {
+ pod
+ }
+ }
+
+ private def getPodGroupMinResources(): java.util.HashMap[String, Quantity] = {
+ val cpu = kubernetesConf.get(KUBERNETES_PODGROUP_MIN_CPU)
+ val memory = kubernetesConf.get(KUBERNETES_PODGROUP_MIN_MEMORY)
+
+ val cpuQ = new QuantityBuilder(false)
+ .withAmount(s"${cpu}")
+ .build()
+ val memoryQ = new QuantityBuilder(false)
+ .withAmount(s"${memory}Mi")
+ .build()
+ new java.util.HashMap(Map("cpu" -> cpuQ, "memory" -> memoryQ).asJava)
+ }
+
+ private def getVolcanoResources(): Seq[HasMetadata] = {
+ val podGroup = new PodGroupBuilder()
+ .editOrNewMetadata()
+ .withName(podGroupName)
+ .withNamespace(kubernetesConf.get(KUBERNETES_NAMESPACE))
+ .endMetadata()
+ .editOrNewSpec()
+ .withMinResources(getPodGroupMinResources())
+ .endSpec()
+ .build()
+
+ Seq{podGroup}
+ }
+
+ private def getPodGroupResources(): Seq[HasMetadata] = {
+ if (enablePodGroup) {
+ kubernetesConf.schedulerName match {
+ case "volcano" =>
+ getVolcanoResources()
+ case _ =>
+ Seq.empty
+ }
+ } else {
+ Seq.empty
+ }
+ }
+
+ override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
+ getPodGroupResources()
+ }
+
+ override def getAdditionalPreKubernetesResources(): Seq[HasMetadata] = {
+ getPodGroupResources()
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 3c3c4258ad9c..06be6bb10300 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -30,7 +30,7 @@ import org.apache.spark.deploy.SparkApplication
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
+import org.apache.spark.deploy.k8s.KubernetesUtils.createOrReplaceResource
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
@@ -133,6 +133,10 @@ private[spark] class Client(
.build()
val driverPodName = resolvedDriverPod.getMetadata.getName
+ // setup resources before pod creation
+ val preKubernetesResources = resolvedDriverSpec.driverPreKubernetesResources
+ createOrReplaceResource(kubernetesClient, preKubernetesResources)
+
var watch: Watch = null
var createdDriverPod: Pod = null
try {
@@ -142,15 +146,10 @@ private[spark] class Client(
logError("Please check \"kubectl auth can-i create pod\" first. It should be yes.")
throw e
}
- try {
- val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
- addOwnerReference(createdDriverPod, otherKubernetesResources)
- kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
- } catch {
- case NonFatal(e) =>
- kubernetesClient.pods().delete(createdDriverPod)
- throw e
- }
+
+ // setup resources after pod creation, and refresh all resources owner references
+ val driverKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
+ createOrReplaceResource(kubernetesClient, driverKubernetesResources, createdDriverPod)
if (conf.get(WAIT_FOR_APP_COMPLETION)) {
val sId = Seq(conf.namespace, driverPodName).mkString(":")
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
index b8b93bb22b13..9cfbdea86f0b 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -53,19 +53,23 @@ private[spark] class KubernetesDriverBuilder {
new HadoopConfDriverFeatureStep(conf),
new KerberosConfDriverFeatureStep(conf),
new PodTemplateConfigMapStep(conf),
- new LocalDirsFeatureStep(conf)) ++ userFeatures
+ new LocalDirsFeatureStep(conf),
+ new PodGroupFeatureStep(conf)) ++ userFeatures
val spec = KubernetesDriverSpec(
initialPod,
+ driverPreKubernetesResources = Seq.empty,
driverKubernetesResources = Seq.empty,
conf.sparkConf.getAll.toMap)
features.foldLeft(spec) { case (spec, feature) =>
val configuredPod = feature.configurePod(spec.pod)
val addedSystemProperties = feature.getAdditionalPodSystemProperties()
+ val addedPreResources = feature.getAdditionalPreKubernetesResources()
val addedResources = feature.getAdditionalKubernetesResources()
KubernetesDriverSpec(
configuredPod,
+ spec.driverPreKubernetesResources ++ addedPreResources,
spec.driverKubernetesResources ++ addedResources,
spec.systemProperties ++ addedSystemProperties)
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
index 1a62d08a7b41..c6b6f82f34e9 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -52,7 +52,8 @@ private[spark] class KubernetesExecutorBuilder {
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
- new LocalDirsFeatureStep(conf)) ++ userFeatures
+ new LocalDirsFeatureStep(conf),
+ new PodGroupFeatureStep(conf)) ++ userFeatures
val spec = KubernetesExecutorSpec(
initialPod,
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala
index ebbb42f225c5..c92bf803ec55 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala
@@ -175,7 +175,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
}
val pod = step.configurePod(SparkPod.initialPod())
val props = step.getAdditionalPodSystemProperties()
- KubernetesDriverSpec(pod, Nil, props)
+ KubernetesDriverSpec(pod, Nil, Nil, props)
}
}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodGroupFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodGroupFeatureStepSuite.scala
new file mode 100644
index 000000000000..9a971cf5c27d
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodGroupFeatureStepSuite.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.Quantity
+import io.fabric8.volcano.scheduling.v1beta1.PodGroup
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s._
+import org.apache.spark.deploy.k8s.Config._
+
+class PodGroupFeatureStepSuite extends SparkFunSuite {
+ test("Do nothing when KUBERNETES_ENABLE_PODGROUP is false") {
+ val conf = KubernetesTestConf.createDriverConf()
+ val step = new PodGroupFeatureStep(conf)
+
+ val initialPod = SparkPod.initialPod()
+ val configuredPod = step.configurePod(initialPod)
+ assert(configuredPod === initialPod)
+
+ assert(step.getAdditionalKubernetesResources().isEmpty)
+ assert(step.getAdditionalPodSystemProperties().isEmpty)
+ }
+
+ test("Driver Pod with Volcano PodGroup") {
+ val sparkConf = new SparkConf()
+ .set(KUBERNETES_DRIVER_SCHEDULER_NAME, "volcano")
+ .set(KUBERNETES_ENABLE_PODGROUP, true)
+ val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf)
+ val step = new PodGroupFeatureStep(kubernetesConf)
+ val configuredPod = step.configurePod(SparkPod.initialPod())
+
+ val annotation = configuredPod.pod.getMetadata.getAnnotations
+
+ assert(annotation.get("scheduling.k8s.io/group-name") === step.podGroupName)
+ val podGroup = step.getAdditionalKubernetesResources().head
+ assert(podGroup.getMetadata.getName === step.podGroupName)
+ }
+
+ test("Executor Pod with Volcano PodGroup") {
+ val sparkConf = new SparkConf()
+ .set(KUBERNETES_EXECUTOR_SCHEDULER_NAME, "volcano")
+ .set(KUBERNETES_ENABLE_PODGROUP, true)
+ val kubernetesConf = KubernetesTestConf.createExecutorConf(sparkConf)
+ val step = new PodGroupFeatureStep(kubernetesConf)
+ val configuredPod = step.configurePod(SparkPod.initialPod())
+
+ val annotation = configuredPod.pod.getMetadata.getAnnotations
+
+ assert(annotation.get("scheduling.k8s.io/group-name") === step.podGroupName)
+ val podGroup = step.getAdditionalKubernetesResources().head.asInstanceOf[PodGroup]
+ assert(podGroup.getMetadata.getName === s"${kubernetesConf.appId}-podgroup")
+ assert(podGroup.getSpec.getMinResources.get("cpu") === new Quantity("2.0"))
+ assert(podGroup.getSpec.getMinResources.get("memory") === new Quantity("3072"))
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index a5b85938af3b..59a8c90c10c3 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -64,6 +64,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
private val BUILT_KUBERNETES_SPEC = KubernetesDriverSpec(
SparkPod(BUILT_DRIVER_POD, BUILT_DRIVER_CONTAINER),
+ Nil,
ADDITIONAL_RESOURCES,
RESOLVED_JAVA_OPTIONS)