Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev/deps/spark-deps-hadoop-2-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ tink/1.6.0//tink-1.6.0.jar
transaction-api/1.1//transaction-api-1.1.jar
univocity-parsers/2.9.1//univocity-parsers-2.9.1.jar
velocity/1.5//velocity-1.5.jar
volcano-model-v1beta1/5.12.0//volcano-model-v1beta1-5.12.0.jar
xbean-asm9-shaded/4.20//xbean-asm9-shaded-4.20.jar
xercesImpl/2.12.0//xercesImpl-2.12.0.jar
xml-apis/1.4.01//xml-apis-1.4.01.jar
Expand Down
1 change: 1 addition & 0 deletions dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ tink/1.6.0//tink-1.6.0.jar
transaction-api/1.1//transaction-api-1.1.jar
univocity-parsers/2.9.1//univocity-parsers-2.9.1.jar
velocity/1.5//velocity-1.5.jar
volcano-model-v1beta1/5.12.0//volcano-model-v1beta1-5.12.0.jar
wildfly-openssl/1.0.7.Final//wildfly-openssl-1.0.7.Final.jar
xbean-asm9-shaded/4.20//xbean-asm9-shaded-4.20.jar
xz/1.8//xz-1.8.jar
Expand Down
5 changes: 5 additions & 0 deletions resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>volcano-model-v1beta1</artifactId>
<version>${kubernetes-client.version}</version>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import io.fabric8.kubernetes.api.model._
import io.fabric8.volcano.scheduling.v1beta1.PodGroupBuilder

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverConf, KubernetesExecutorConf, SparkPod}

private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureConfigStep
with KubernetesExecutorCustomFeatureConfigStep {

private var kubernetesConf: KubernetesConf = _

private val POD_GROUP_ANNOTATION = "scheduling.k8s.io/group-name"

private lazy val podGroupName = s"${kubernetesConf.appId}-podgroup"
private lazy val namespace = kubernetesConf.namespace

override def init(config: KubernetesDriverConf): Unit = {
kubernetesConf = config
}

override def init(config: KubernetesExecutorConf): Unit = {
kubernetesConf = config
}

override def getAdditionalPreKubernetesResources(): Seq[HasMetadata] = {
val podGroup = new PodGroupBuilder()
.editOrNewMetadata()
.withName(podGroupName)
.withNamespace(namespace)
.endMetadata()
.build()
Seq(podGroup)
}

override def configurePod(pod: SparkPod): SparkPod = {
val k8sPodBuilder = new PodBuilder(pod.pod)
.editMetadata()
.addToAnnotations(POD_GROUP_ANNOTATION, podGroupName)
.endMetadata()
val k8sPod = k8sPodBuilder.build()
SparkPod(k8sPod, pod.container)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import io.fabric8.volcano.scheduling.v1beta1.PodGroup

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._

class VolcanoFeatureStepSuite extends SparkFunSuite {

test("SPARK-36061: Driver Pod with Volcano PodGroup") {
val sparkConf = new SparkConf()
val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf)
val step = new VolcanoFeatureStep()
step.init(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())

val annotations = configuredPod.pod.getMetadata.getAnnotations

assert(annotations.get("scheduling.k8s.io/group-name") === s"${kubernetesConf.appId}-podgroup")
val podGroup = step.getAdditionalPreKubernetesResources().head.asInstanceOf[PodGroup]
assert(podGroup.getMetadata.getName === s"${kubernetesConf.appId}-podgroup")
}

test("SPARK-36061: Executor Pod with Volcano PodGroup") {
val sparkConf = new SparkConf()
val kubernetesConf = KubernetesTestConf.createExecutorConf(sparkConf)
val step = new VolcanoFeatureStep()
step.init(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())
val annotations = configuredPod.pod.getMetadata.getAnnotations
assert(annotations.get("scheduling.k8s.io/group-name") === s"${kubernetesConf.appId}-podgroup")
}
}
11 changes: 11 additions & 0 deletions resource-managers/kubernetes/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@
<artifactId>spark-tags_${scala.binary.version}</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>volcano-client</artifactId>
<version>${kubernetes-client.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class KubernetesSuite extends SparkFunSuite
}
}

before {
protected def setUpTest(): Unit = {
appLocator = UUID.randomUUID().toString.replaceAll("-", "")
driverPodName = "spark-test-app-" + UUID.randomUUID().toString.replaceAll("-", "")
sparkAppConf = kubernetesTestComponents.newSparkAppConf()
Expand All @@ -195,6 +195,10 @@ class KubernetesSuite extends SparkFunSuite
}
}

before {
setUpTest()
}

after {
if (!kubernetesTestComponents.hasUserSpecifiedNamespace) {
kubernetesTestComponents.deleteNamespace()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.integrationtest

import org.scalatest.Tag

class VolcanoSuite extends KubernetesSuite with VolcanoTestsSuite {

override protected def setUpTest(): Unit = {
super.setUpTest()
sparkAppConf
.set("spark.kubernetes.driver.scheduler.name", "volcano")
.set("spark.kubernetes.executor.scheduler.name", "volcano")
}
}

private[spark] object VolcanoSuite {
val volcanoTag = Tag("volcano")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.integrationtest

import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.volcano.client.VolcanoClient

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.features.VolcanoFeatureStep
import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.k8sTestTag
import org.apache.spark.deploy.k8s.integrationtest.VolcanoSuite.volcanoTag

private[spark] trait VolcanoTestsSuite { k8sSuite: KubernetesSuite =>
import VolcanoTestsSuite._

protected def checkScheduler(pod: Pod): Unit = {
assert(pod.getSpec.getSchedulerName === "volcano")
}

protected def checkAnnotaion(pod: Pod): Unit = {
val appId = pod.getMetadata.getLabels.get("spark-app-selector")
val annotations = pod.getMetadata.getAnnotations
assert(annotations.get("scheduling.k8s.io/group-name") === s"$appId-podgroup")
}

protected def checkPodGroup(pod: Pod): Unit = {
val appId = pod.getMetadata.getLabels.get("spark-app-selector")
val podGroupName = s"$appId-podgroup"
val volcanoClient = kubernetesTestComponents.kubernetesClient.adapt(classOf[VolcanoClient])
val podGroup = volcanoClient.podGroups().withName(podGroupName).get()
assert(podGroup.getMetadata.getOwnerReferences.get(0).getName === pod.getMetadata.getName)
}

test("Run SparkPi with volcano scheduler", k8sTestTag, volcanoTag) {
sparkAppConf
.set("spark.kubernetes.driver.pod.featureSteps", VOLCANO_FEATURE_STEP)
.set("spark.kubernetes.executor.pod.featureSteps", VOLCANO_FEATURE_STEP)
runSparkPiAndVerifyCompletion(
driverPodChecker = (driverPod: Pod) => {
doBasicDriverPodCheck(driverPod)
checkScheduler(driverPod)
checkAnnotaion(driverPod)
checkPodGroup(driverPod)
},
executorPodChecker = (executorPod: Pod) => {
doBasicExecutorPodCheck(executorPod)
checkScheduler(executorPod)
checkAnnotaion(executorPod)
}
)
}
}

private[spark] object VolcanoTestsSuite extends SparkFunSuite {
val VOLCANO_FEATURE_STEP = classOf[VolcanoFeatureStep].getName
}