diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3
index 50480e4ab693..022523dc06f9 100644
--- a/dev/deps/spark-deps-hadoop-2-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-2-hive-2.3
@@ -259,6 +259,7 @@ tink/1.6.0//tink-1.6.0.jar
transaction-api/1.1//transaction-api-1.1.jar
univocity-parsers/2.9.1//univocity-parsers-2.9.1.jar
velocity/1.5//velocity-1.5.jar
+volcano-model-v1beta1/5.12.0//volcano-model-v1beta1-5.12.0.jar
xbean-asm9-shaded/4.20//xbean-asm9-shaded-4.20.jar
xercesImpl/2.12.0//xercesImpl-2.12.0.jar
xml-apis/1.4.01//xml-apis-1.4.01.jar
diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3
index 13b23c06cf64..8a8bbfeb64d9 100644
--- a/dev/deps/spark-deps-hadoop-3-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3-hive-2.3
@@ -245,6 +245,7 @@ tink/1.6.0//tink-1.6.0.jar
transaction-api/1.1//transaction-api-1.1.jar
univocity-parsers/2.9.1//univocity-parsers-2.9.1.jar
velocity/1.5//velocity-1.5.jar
+volcano-model-v1beta1/5.12.0//volcano-model-v1beta1-5.12.0.jar
wildfly-openssl/1.0.7.Final//wildfly-openssl-1.0.7.Final.jar
xbean-asm9-shaded/4.20//xbean-asm9-shaded-4.20.jar
xz/1.8//xz-1.8.jar
diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml
index 0cb5e115906a..e2332ff1502b 100644
--- a/resource-managers/kubernetes/core/pom.xml
+++ b/resource-managers/kubernetes/core/pom.xml
@@ -99,6 +99,11 @@
test
+
+ io.fabric8
+ volcano-model-v1beta1
+ ${kubernetes-client.version}
+
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala
new file mode 100644
index 000000000000..1c936848db67
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.volcano.scheduling.v1beta1.PodGroupBuilder
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverConf, KubernetesExecutorConf, SparkPod}
+
+private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureConfigStep
+ with KubernetesExecutorCustomFeatureConfigStep {
+
+ private var kubernetesConf: KubernetesConf = _
+
+ private val POD_GROUP_ANNOTATION = "scheduling.k8s.io/group-name"
+
+ private lazy val podGroupName = s"${kubernetesConf.appId}-podgroup"
+ private lazy val namespace = kubernetesConf.namespace
+
+ override def init(config: KubernetesDriverConf): Unit = {
+ kubernetesConf = config
+ }
+
+ override def init(config: KubernetesExecutorConf): Unit = {
+ kubernetesConf = config
+ }
+
+ override def getAdditionalPreKubernetesResources(): Seq[HasMetadata] = {
+ val podGroup = new PodGroupBuilder()
+ .editOrNewMetadata()
+ .withName(podGroupName)
+ .withNamespace(namespace)
+ .endMetadata()
+ .build()
+ Seq(podGroup)
+ }
+
+ override def configurePod(pod: SparkPod): SparkPod = {
+ val k8sPodBuilder = new PodBuilder(pod.pod)
+ .editMetadata()
+ .addToAnnotations(POD_GROUP_ANNOTATION, podGroupName)
+ .endMetadata()
+ val k8sPod = k8sPodBuilder.build()
+ SparkPod(k8sPod, pod.container)
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala
new file mode 100644
index 000000000000..cf337f99cab9
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.volcano.scheduling.v1beta1.PodGroup
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s._
+
+class VolcanoFeatureStepSuite extends SparkFunSuite {
+
+ test("SPARK-36061: Driver Pod with Volcano PodGroup") {
+ val sparkConf = new SparkConf()
+ val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf)
+ val step = new VolcanoFeatureStep()
+ step.init(kubernetesConf)
+ val configuredPod = step.configurePod(SparkPod.initialPod())
+
+ val annotations = configuredPod.pod.getMetadata.getAnnotations
+
+ assert(annotations.get("scheduling.k8s.io/group-name") === s"${kubernetesConf.appId}-podgroup")
+ val podGroup = step.getAdditionalPreKubernetesResources().head.asInstanceOf[PodGroup]
+ assert(podGroup.getMetadata.getName === s"${kubernetesConf.appId}-podgroup")
+ }
+
+ test("SPARK-36061: Executor Pod with Volcano PodGroup") {
+ val sparkConf = new SparkConf()
+ val kubernetesConf = KubernetesTestConf.createExecutorConf(sparkConf)
+ val step = new VolcanoFeatureStep()
+ step.init(kubernetesConf)
+ val configuredPod = step.configurePod(SparkPod.initialPod())
+ val annotations = configuredPod.pod.getMetadata.getAnnotations
+ assert(annotations.get("scheduling.k8s.io/group-name") === s"${kubernetesConf.appId}-podgroup")
+ }
+}
diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml
index a44cedb9e1e2..5775652f7dab 100644
--- a/resource-managers/kubernetes/integration-tests/pom.xml
+++ b/resource-managers/kubernetes/integration-tests/pom.xml
@@ -74,6 +74,17 @@
spark-tags_${scala.binary.version}
test-jar
+
+ org.apache.spark
+ spark-kubernetes_${scala.binary.version}
+ ${project.version}
+ test
+
+
+ io.fabric8
+ volcano-client
+ ${kubernetes-client.version}
+
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index c1237e3eb9df..69b736951301 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -181,7 +181,7 @@ class KubernetesSuite extends SparkFunSuite
}
}
- before {
+ protected def setUpTest(): Unit = {
appLocator = UUID.randomUUID().toString.replaceAll("-", "")
driverPodName = "spark-test-app-" + UUID.randomUUID().toString.replaceAll("-", "")
sparkAppConf = kubernetesTestComponents.newSparkAppConf()
@@ -195,6 +195,10 @@ class KubernetesSuite extends SparkFunSuite
}
}
+ before {
+ setUpTest()
+ }
+
after {
if (!kubernetesTestComponents.hasUserSpecifiedNamespace) {
kubernetesTestComponents.deleteNamespace()
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoSuite.scala
new file mode 100644
index 000000000000..ed7371718f9a
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoSuite.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import org.scalatest.Tag
+
+class VolcanoSuite extends KubernetesSuite with VolcanoTestsSuite {
+
+ override protected def setUpTest(): Unit = {
+ super.setUpTest()
+ sparkAppConf
+ .set("spark.kubernetes.driver.scheduler.name", "volcano")
+ .set("spark.kubernetes.executor.scheduler.name", "volcano")
+ }
+}
+
+private[spark] object VolcanoSuite {
+ val volcanoTag = Tag("volcano")
+}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
new file mode 100644
index 000000000000..377a1b816798
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.volcano.client.VolcanoClient
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.k8s.features.VolcanoFeatureStep
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.k8sTestTag
+import org.apache.spark.deploy.k8s.integrationtest.VolcanoSuite.volcanoTag
+
+private[spark] trait VolcanoTestsSuite { k8sSuite: KubernetesSuite =>
+ import VolcanoTestsSuite._
+
+ protected def checkScheduler(pod: Pod): Unit = {
+ assert(pod.getSpec.getSchedulerName === "volcano")
+ }
+
+ protected def checkAnnotaion(pod: Pod): Unit = {
+ val appId = pod.getMetadata.getLabels.get("spark-app-selector")
+ val annotations = pod.getMetadata.getAnnotations
+ assert(annotations.get("scheduling.k8s.io/group-name") === s"$appId-podgroup")
+ }
+
+ protected def checkPodGroup(pod: Pod): Unit = {
+ val appId = pod.getMetadata.getLabels.get("spark-app-selector")
+ val podGroupName = s"$appId-podgroup"
+ val volcanoClient = kubernetesTestComponents.kubernetesClient.adapt(classOf[VolcanoClient])
+ val podGroup = volcanoClient.podGroups().withName(podGroupName).get()
+ assert(podGroup.getMetadata.getOwnerReferences.get(0).getName === pod.getMetadata.getName)
+ }
+
+ test("Run SparkPi with volcano scheduler", k8sTestTag, volcanoTag) {
+ sparkAppConf
+ .set("spark.kubernetes.driver.pod.featureSteps", VOLCANO_FEATURE_STEP)
+ .set("spark.kubernetes.executor.pod.featureSteps", VOLCANO_FEATURE_STEP)
+ runSparkPiAndVerifyCompletion(
+ driverPodChecker = (driverPod: Pod) => {
+ doBasicDriverPodCheck(driverPod)
+ checkScheduler(driverPod)
+ checkAnnotaion(driverPod)
+ checkPodGroup(driverPod)
+ },
+ executorPodChecker = (executorPod: Pod) => {
+ doBasicExecutorPodCheck(executorPod)
+ checkScheduler(executorPod)
+ checkAnnotaion(executorPod)
+ }
+ )
+ }
+}
+
+private[spark] object VolcanoTestsSuite extends SparkFunSuite {
+ val VOLCANO_FEATURE_STEP = classOf[VolcanoFeatureStep].getName
+}