Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,16 @@
<artifactId>spark-tags_${scala.binary.version}</artifactId>
<type>test-jar</type>
</dependency>

<dependency>
Copy link
Member Author

@Yikun Yikun Nov 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Volcano support in k8s-cli would be released at kubernetes-client v5.11
fabric8io/kubernetes-client#3580

TODO: neet to bump kubernetes-client version to latest when it publised.

spark/pom.xml

Line 207 in 7b50cf0

<kubernetes-client.version>5.10.1</kubernetes-client.version>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<groupId>io.fabric8</groupId>
<artifactId>volcano-client</artifactId>
<version>${kubernetes-client.version}</version>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>volcano-model-v1beta1</artifactId>
<version>${kubernetes-client.version}</version>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit

private[spark] object Config extends Logging {

Expand Down Expand Up @@ -636,6 +637,24 @@ private[spark] object Config extends Logging {
val KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX = "spark.kubernetes.executor.secretKeyRef."
val KUBERNETES_EXECUTOR_VOLUMES_PREFIX = "spark.kubernetes.executor.volumes."

val KUBERNETES_ENABLE_PODGROUP = ConfigBuilder("spark.kubernetes.enablePodGroup")
.doc("If true, PodGroup annotation('scheduling.k8s.io/group-name') would be specified to " +
"each driver/executor pod, all pods in a Job would be set in a same PodGroup, this info " +
"would be used by Kubernetes batch scheduler.")
.version("3.3.0")
.booleanConf
.createWithDefault(false)
val KUBERNETES_PODGROUP_MIN_MEMORY = ConfigBuilder("spark.kubernetes.podgroup.min.memory")
.doc("Amount of memory to use for the PodGroup minResource, in MiB unless otherwise specified.")
.version("3.3.0")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("3g")
val KUBERNETES_PODGROUP_MIN_CPU = ConfigBuilder("spark.kubernetes.podgroup.min.cpu")
.doc("Amount of cpu to use for the PodGroup minResource")
.version("3.3.0")
.doubleConf
.createWithDefault(2.0)

val KUBERNETES_VOLUMES_HOSTPATH_TYPE = "hostPath"
val KUBERNETES_VOLUMES_PVC_TYPE = "persistentVolumeClaim"
val KUBERNETES_VOLUMES_EMPTYDIR_TYPE = "emptyDir"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) {
def secretNamesToMountPaths: Map[String, String]
def volumes: Seq[KubernetesVolumeSpec]
def schedulerName: String
def appId: String

def appName: String = get("spark.app.name", "spark")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ import io.fabric8.kubernetes.api.model.HasMetadata

private[spark] case class KubernetesDriverSpec(
pod: SparkPod,
driverPreKubernetesResources: Seq[HasMetadata],
driverKubernetesResources: Seq[HasMetadata],
systemProperties: Map[String, String])
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.security.SecureRandom
import java.util.{Collections, UUID}

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, HasMetadata, OwnerReferenceBuilder, Pod, PodBuilder, Quantity}
import io.fabric8.kubernetes.client.KubernetesClient
Expand Down Expand Up @@ -381,4 +382,21 @@ object KubernetesUtils extends Logging {
}
}
}

@Since("3.3.0")
def createOrReplaceResource(
client: KubernetesClient,
resources: Seq[HasMetadata],
ownerPod: Pod = null): Unit = {
try {
addOwnerReference(ownerPod, resources)
client.resourceList(resources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
if (ownerPod != null) {
client.pods().delete(ownerPod)
}
throw e
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,15 @@ trait KubernetesFeatureConfigStep {

/**
* Return any additional Kubernetes resources that should be added to support this feature. Only
* applicable when creating the driver in cluster mode.
* applicable when creating the driver in cluster mode. Resources would be setup/refresh after
* Pod creation.
*/
def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty

Copy link
Member Author

@Yikun Yikun Nov 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is submited as a separated PR in #34599 .(1st and 2nd commits)

You could only see the 3rd commit for more clearly understand, that is, we only add pod feature step in this PR.

/**
* Return any additional Kubernetes resources that should be added to support this feature. Only
* applicable when creating the driver in cluster mode. Resources would be setup/refresh before
* Pod creation.
*/
def getAdditionalPreKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.k8s.features

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model._
import io.fabric8.volcano.scheduling.v1beta1.PodGroupBuilder

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._

private[spark] class PodGroupFeatureStep(kubernetesConf: KubernetesConf)
extends KubernetesFeatureConfigStep {

val conf: SparkConf = kubernetesConf.sparkConf
val POD_GROUP_ANNOTATION = "scheduling.k8s.io/group-name"
val podGroupName = s"${kubernetesConf.appId}-podgroup"
val enablePodGroup: Boolean = conf.get(KUBERNETES_ENABLE_PODGROUP)

override def configurePod(pod: SparkPod): SparkPod = {
if (enablePodGroup) {
val k8sPodBuilder = new PodBuilder(pod.pod)
.editMetadata()
.addToAnnotations(POD_GROUP_ANNOTATION, podGroupName)
.endMetadata()
val k8sPod = k8sPodBuilder.build()
SparkPod(k8sPod, pod.container)
} else {
pod
}
}

private def getPodGroupMinResources(): java.util.HashMap[String, Quantity] = {
val cpu = kubernetesConf.get(KUBERNETES_PODGROUP_MIN_CPU)
val memory = kubernetesConf.get(KUBERNETES_PODGROUP_MIN_MEMORY)

val cpuQ = new QuantityBuilder(false)
.withAmount(s"${cpu}")
.build()
val memoryQ = new QuantityBuilder(false)
.withAmount(s"${memory}Mi")
.build()
new java.util.HashMap(Map("cpu" -> cpuQ, "memory" -> memoryQ).asJava)
}

private def getVolcanoResources(): Seq[HasMetadata] = {
val podGroup = new PodGroupBuilder()
.editOrNewMetadata()
.withName(podGroupName)
.withNamespace(kubernetesConf.get(KUBERNETES_NAMESPACE))
.endMetadata()
.editOrNewSpec()
.withMinResources(getPodGroupMinResources())
.endSpec()
.build()

Seq{podGroup}
}

private def getPodGroupResources(): Seq[HasMetadata] = {
if (enablePodGroup) {
kubernetesConf.schedulerName match {
case "volcano" =>
getVolcanoResources()
case _ =>
Seq.empty
}
} else {
Seq.empty
}
}

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
getPodGroupResources()
}

override def getAdditionalPreKubernetesResources(): Seq[HasMetadata] = {
getPodGroupResources()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.deploy.SparkApplication
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
import org.apache.spark.deploy.k8s.KubernetesUtils.createOrReplaceResource
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -133,6 +133,10 @@ private[spark] class Client(
.build()
val driverPodName = resolvedDriverPod.getMetadata.getName

// setup resources before pod creation
val preKubernetesResources = resolvedDriverSpec.driverPreKubernetesResources
createOrReplaceResource(kubernetesClient, preKubernetesResources)

var watch: Watch = null
var createdDriverPod: Pod = null
try {
Expand All @@ -142,15 +146,10 @@ private[spark] class Client(
logError("Please check \"kubectl auth can-i create pod\" first. It should be yes.")
throw e
}
try {
val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
addOwnerReference(createdDriverPod, otherKubernetesResources)
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
throw e
}

// setup resources after pod creation, and refresh all resources owner references
val driverKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
createOrReplaceResource(kubernetesClient, driverKubernetesResources, createdDriverPod)

if (conf.get(WAIT_FOR_APP_COMPLETION)) {
val sId = Seq(conf.namespace, driverPodName).mkString(":")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,23 @@ private[spark] class KubernetesDriverBuilder {
new HadoopConfDriverFeatureStep(conf),
new KerberosConfDriverFeatureStep(conf),
new PodTemplateConfigMapStep(conf),
new LocalDirsFeatureStep(conf)) ++ userFeatures
new LocalDirsFeatureStep(conf),
new PodGroupFeatureStep(conf)) ++ userFeatures

val spec = KubernetesDriverSpec(
initialPod,
driverPreKubernetesResources = Seq.empty,
driverKubernetesResources = Seq.empty,
conf.sparkConf.getAll.toMap)

features.foldLeft(spec) { case (spec, feature) =>
val configuredPod = feature.configurePod(spec.pod)
val addedSystemProperties = feature.getAdditionalPodSystemProperties()
val addedPreResources = feature.getAdditionalPreKubernetesResources()
val addedResources = feature.getAdditionalKubernetesResources()
KubernetesDriverSpec(
configuredPod,
spec.driverPreKubernetesResources ++ addedPreResources,
spec.driverKubernetesResources ++ addedResources,
spec.systemProperties ++ addedSystemProperties)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ private[spark] class KubernetesExecutorBuilder {
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new LocalDirsFeatureStep(conf)) ++ userFeatures
new LocalDirsFeatureStep(conf),
new PodGroupFeatureStep(conf)) ++ userFeatures

val spec = KubernetesExecutorSpec(
initialPod,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
}
val pod = step.configurePod(SparkPod.initialPod())
val props = step.getAdditionalPodSystemProperties()
KubernetesDriverSpec(pod, Nil, props)
KubernetesDriverSpec(pod, Nil, Nil, props)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.k8s.features

import io.fabric8.kubernetes.api.model.Quantity
import io.fabric8.volcano.scheduling.v1beta1.PodGroup

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._

class PodGroupFeatureStepSuite extends SparkFunSuite {
test("Do nothing when KUBERNETES_ENABLE_PODGROUP is false") {
val conf = KubernetesTestConf.createDriverConf()
val step = new PodGroupFeatureStep(conf)

val initialPod = SparkPod.initialPod()
val configuredPod = step.configurePod(initialPod)
assert(configuredPod === initialPod)

assert(step.getAdditionalKubernetesResources().isEmpty)
assert(step.getAdditionalPodSystemProperties().isEmpty)
}

test("Driver Pod with Volcano PodGroup") {
val sparkConf = new SparkConf()
.set(KUBERNETES_DRIVER_SCHEDULER_NAME, "volcano")
.set(KUBERNETES_ENABLE_PODGROUP, true)
val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf)
val step = new PodGroupFeatureStep(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())

val annotation = configuredPod.pod.getMetadata.getAnnotations

assert(annotation.get("scheduling.k8s.io/group-name") === step.podGroupName)
val podGroup = step.getAdditionalKubernetesResources().head
assert(podGroup.getMetadata.getName === step.podGroupName)
}

test("Executor Pod with Volcano PodGroup") {
val sparkConf = new SparkConf()
.set(KUBERNETES_EXECUTOR_SCHEDULER_NAME, "volcano")
.set(KUBERNETES_ENABLE_PODGROUP, true)
val kubernetesConf = KubernetesTestConf.createExecutorConf(sparkConf)
val step = new PodGroupFeatureStep(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())

val annotation = configuredPod.pod.getMetadata.getAnnotations

assert(annotation.get("scheduling.k8s.io/group-name") === step.podGroupName)
val podGroup = step.getAdditionalKubernetesResources().head.asInstanceOf[PodGroup]
assert(podGroup.getMetadata.getName === s"${kubernetesConf.appId}-podgroup")
assert(podGroup.getSpec.getMinResources.get("cpu") === new Quantity("2.0"))
assert(podGroup.getSpec.getMinResources.get("memory") === new Quantity("3072"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {

private val BUILT_KUBERNETES_SPEC = KubernetesDriverSpec(
SparkPod(BUILT_DRIVER_POD, BUILT_DRIVER_CONTAINER),
Nil,
ADDITIONAL_RESOURCES,
RESOLVED_JAVA_OPTIONS)

Expand Down