Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions dev/deps/spark-deps-hadoop-2.7-hive-1.2
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ jsr305/3.0.0//jsr305-3.0.0.jar
jta/1.1//jta-1.1.jar
jul-to-slf4j/1.7.16//jul-to-slf4j-1.7.16.jar
kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
kubernetes-client/4.6.4//kubernetes-client-4.6.4.jar
kubernetes-model-common/4.6.4//kubernetes-model-common-4.6.4.jar
kubernetes-model/4.6.4//kubernetes-model-4.6.4.jar
kubernetes-client/4.7.1//kubernetes-client-4.7.1.jar
kubernetes-model-common/4.7.1//kubernetes-model-common-4.7.1.jar
kubernetes-model/4.7.1//kubernetes-model-4.7.1.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
libfb303/0.9.3//libfb303-0.9.3.jar
libthrift/0.12.0//libthrift-0.12.0.jar
Expand Down
6 changes: 3 additions & 3 deletions dev/deps/spark-deps-hadoop-2.7-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ jsr305/3.0.0//jsr305-3.0.0.jar
jta/1.1//jta-1.1.jar
jul-to-slf4j/1.7.16//jul-to-slf4j-1.7.16.jar
kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
kubernetes-client/4.6.4//kubernetes-client-4.6.4.jar
kubernetes-model-common/4.6.4//kubernetes-model-common-4.6.4.jar
kubernetes-model/4.6.4//kubernetes-model-4.6.4.jar
kubernetes-client/4.7.1//kubernetes-client-4.7.1.jar
kubernetes-model-common/4.7.1//kubernetes-model-common-4.7.1.jar
kubernetes-model/4.7.1//kubernetes-model-4.7.1.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
libfb303/0.9.3//libfb303-0.9.3.jar
libthrift/0.12.0//libthrift-0.12.0.jar
Expand Down
6 changes: 3 additions & 3 deletions dev/deps/spark-deps-hadoop-3.2-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ kerby-pkix/1.0.1//kerby-pkix-1.0.1.jar
kerby-util/1.0.1//kerby-util-1.0.1.jar
kerby-xdr/1.0.1//kerby-xdr-1.0.1.jar
kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
kubernetes-client/4.6.4//kubernetes-client-4.6.4.jar
kubernetes-model-common/4.6.4//kubernetes-model-common-4.6.4.jar
kubernetes-model/4.6.4//kubernetes-model-4.6.4.jar
kubernetes-client/4.7.1//kubernetes-client-4.7.1.jar
kubernetes-model-common/4.7.1//kubernetes-model-common-4.7.1.jar
kubernetes-model/4.7.1//kubernetes-model-4.7.1.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
libfb303/0.9.3//libfb303-0.9.3.jar
libthrift/0.12.0//libthrift-0.12.0.jar
Expand Down
2 changes: 1 addition & 1 deletion resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<name>Spark Project Kubernetes</name>
<properties>
<sbt.project.name>kubernetes</sbt.project.name>
<kubernetes.client.version>4.6.4</kubernetes.client.version>
<kubernetes.client.version>4.7.1</kubernetes.client.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.UUID

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder, Quantity, QuantityBuilder}
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder, Quantity}
import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.commons.codec.binary.Hex
import org.apache.hadoop.fs.{FileSystem, Path}
Expand Down Expand Up @@ -234,9 +234,7 @@ private[spark] object KubernetesUtils extends Logging {
throw new SparkException(s"Resource: ${request.id.resourceName} was requested, " +
"but vendor was not specified.")
}
val quantity = new QuantityBuilder(false)
.withAmount(request.amount.toString)
.build()
val quantity = new Quantity(request.amount.toString)
(KubernetesConf.buildKubernetesResourceName(vendorDomain, request.id.resourceName), quantity)
}.toMap
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,10 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
.build()
}

val driverCpuQuantity = new QuantityBuilder(false)
.withAmount(driverCoresRequest)
.build()
val driverMemoryQuantity = new QuantityBuilder(false)
.withAmount(s"${driverMemoryWithOverheadMiB}Mi")
.build()
val driverCpuQuantity = new Quantity(driverCoresRequest)
val driverMemoryQuantity = new Quantity(s"${driverMemoryWithOverheadMiB}Mi")
val maybeCpuLimitQuantity = driverLimitCores.map { limitCores =>
("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
("cpu", new Quantity(limitCores))
}

val driverResourceQuantities =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,8 @@ private[spark] class BasicExecutorFeatureStep(
// Replace dangerous characters in the remaining string with a safe alternative.
.replaceAll("[^\\w-]+", "_")

val executorMemoryQuantity = new QuantityBuilder(false)
.withAmount(s"${executorMemoryTotal}Mi")
.build()
val executorCpuQuantity = new QuantityBuilder(false)
.withAmount(executorCoresRequest)
.build()
val executorMemoryQuantity = new Quantity(s"${executorMemoryTotal}Mi")
val executorCpuQuantity = new Quantity(executorCoresRequest)

val executorResourceQuantities =
KubernetesUtils.buildResourcesQuantities(SPARK_EXECUTOR_PREFIX,
Expand Down Expand Up @@ -183,9 +179,7 @@ private[spark] class BasicExecutorFeatureStep(
.addToArgs("executor")
.build()
val containerWithLimitCores = executorLimitCores.map { limitCores =>
val executorCpuLimitQuantity = new QuantityBuilder(false)
.withAmount(limitCores)
.build()
val executorCpuLimitQuantity = new Quantity(limitCores)
new ContainerBuilder(executorContainer)
.editResources()
.addToLimits("cpu", executorCpuLimitQuantity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
new VolumeBuilder()
.withEmptyDir(
new EmptyDirVolumeSource(medium.getOrElse(""),
new Quantity(sizeLimit.orNull)))
sizeLimit.map(new Quantity(_)).orNull))
}

val volume = volumeBuilder.withName(spec.volumeName).build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ abstract class PodBuilderSuite extends SparkFunSuite {
assert(container.getArgs.contains("arg"))
assert(container.getCommand.equals(List("command").asJava))
assert(container.getEnv.asScala.exists(_.getName == "env-key"))
assert(container.getResources.getLimits.get("gpu") ===
new QuantityBuilder().withAmount("1").build())
assert(container.getResources.getLimits.get("gpu") === new Quantity("1"))
assert(container.getSecurityContext.getRunAsNonRoot)
assert(container.getStdin)
assert(container.getTerminationMessagePath === "termination-message-path")
Expand Down Expand Up @@ -156,7 +155,7 @@ abstract class PodBuilderSuite extends SparkFunSuite {
.withImagePullPolicy("Always")
.withName("executor-container")
.withNewResources()
.withLimits(Map("gpu" -> new QuantityBuilder().withAmount("1").build()).asJava)
.withLimits(Map("gpu" -> new Quantity("1")).asJava)
.endResources()
.withNewSecurityContext()
.withRunAsNonRoot(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.features

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerPort, ContainerPortBuilder, LocalObjectReferenceBuilder}
import io.fabric8.kubernetes.api.model.{ContainerPort, ContainerPortBuilder, LocalObjectReferenceBuilder, Quantity}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod}
Expand Down Expand Up @@ -105,13 +105,13 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {

val resourceRequirements = configuredPod.container.getResources
val requests = resourceRequirements.getRequests.asScala
assert(requests("cpu").getAmount === "2")
assert(requests("memory").getAmount === "456Mi")
assert(amountAndFormat(requests("cpu")) === "2")
assert(amountAndFormat(requests("memory")) === "456Mi")
val limits = resourceRequirements.getLimits.asScala
assert(limits("memory").getAmount === "456Mi")
assert(limits("cpu").getAmount === "4")
assert(amountAndFormat(limits("memory")) === "456Mi")
assert(amountAndFormat(limits("cpu")) === "4")
resources.foreach { case (k8sName, testRInfo) =>
assert(limits(k8sName).getAmount === testRInfo.count)
assert(amountAndFormat(limits(k8sName)) === testRInfo.count)
}

val driverPodMetadata = configuredPod.pod.getMetadata
Expand Down Expand Up @@ -141,15 +141,15 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
.configurePod(basePod)
.container.getResources
.getRequests.asScala
assert(requests1("cpu").getAmount === "1")
assert(amountAndFormat(requests1("cpu")) === "1")

// if spark.driver.cores is set it should be used
sparkConf.set(DRIVER_CORES, 10)
val requests2 = new BasicDriverFeatureStep(KubernetesTestConf.createDriverConf(sparkConf))
.configurePod(basePod)
.container.getResources
.getRequests.asScala
assert(requests2("cpu").getAmount === "10")
assert(amountAndFormat(requests2("cpu")) === "10")

// spark.kubernetes.driver.request.cores should be preferred over spark.driver.cores
Seq("0.1", "100m").foreach { value =>
Expand All @@ -158,7 +158,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
.configurePod(basePod)
.container.getResources
.getRequests.asScala
assert(requests3("cpu").getAmount === value)
assert(amountAndFormat(requests3("cpu")) === value)
}
}

Expand Down Expand Up @@ -204,7 +204,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
mainAppResource = resource)
val step = new BasicDriverFeatureStep(conf)
val pod = step.configurePod(SparkPod.initialPod())
val mem = pod.container.getResources.getRequests.get("memory").getAmount()
val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory"))
val expected = (driverMem + driverMem * expectedFactor).toInt
assert(mem === s"${expected}Mi")

Expand All @@ -219,4 +219,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
.withContainerPort(portNumber)
.withProtocol("TCP")
.build()

private def amountAndFormat(quantity: Quantity): String = quantity.getAmount + quantity.getFormat
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,11 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
val executor = step.configurePod(SparkPod.initialPod())

assert(executor.container.getResources.getLimits.size() === 3)
assert(executor.container.getResources
.getLimits.get("memory").getAmount === "1408Mi")
assert(amountAndFormat(executor.container.getResources
.getLimits.get("memory")) === "1408Mi")
gpuResources.foreach { case (k8sName, testRInfo) =>
assert(executor.container.getResources.getLimits.get(k8sName).getAmount === testRInfo.count)
assert(amountAndFormat(
executor.container.getResources.getLimits.get(k8sName)) === testRInfo.count)
}
}

Expand All @@ -151,8 +152,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
assert(executor.container.getImage === EXECUTOR_IMAGE)
assert(executor.container.getVolumeMounts.isEmpty)
assert(executor.container.getResources.getLimits.size() === 1)
assert(executor.container.getResources
.getLimits.get("memory").getAmount === "1408Mi")
assert(amountAndFormat(executor.container.getResources
.getLimits.get("memory")) === "1408Mi")

// The pod has no node selector, volumes.
assert(executor.pod.getSpec.getNodeSelector.isEmpty)
Expand Down Expand Up @@ -201,7 +202,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf))
val executor = step.configurePod(SparkPod.initialPod())
// This is checking that basic executor + executorMemory = 1408 + 42 = 1450
assert(executor.container.getResources.getRequests.get("memory").getAmount === "1450Mi")
assert(amountAndFormat(executor.container.getResources.getRequests.get("memory")) === "1450Mi")
}

test("auth secret propagation") {
Expand Down Expand Up @@ -273,4 +274,6 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
val expectedEnvs = defaultEnvs ++ additionalEnvVars ++ extraJavaOptsEnvs
assert(containerEnvs === expectedEnvs)
}

private def amountAndFormat(quantity: Quantity): String = quantity.getAmount + quantity.getFormat
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
val emptyDir = configuredPod.pod.getSpec.getVolumes.get(0).getEmptyDir
assert(emptyDir.getMedium === "Memory")
assert(emptyDir.getSizeLimit.getAmount === "6G")
assert(emptyDir.getSizeLimit.getAmount === "6")
assert(emptyDir.getSizeLimit.getFormat === "G")
assert(configuredPod.container.getVolumeMounts.size() === 1)
assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp")
assert(configuredPod.container.getVolumeMounts.get(0).getName === "testVolume")
Expand All @@ -101,7 +102,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
val emptyDir = configuredPod.pod.getSpec.getVolumes.get(0).getEmptyDir
assert(emptyDir.getMedium === "")
assert(emptyDir.getSizeLimit.getAmount === null)
assert(emptyDir.getSizeLimit === null)
assert(configuredPod.container.getVolumeMounts.size() === 1)
assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp")
assert(configuredPod.container.getVolumeMounts.get(0).getName === "testVolume")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,8 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
).toArray

val resources = Map(
"cpu" -> new QuantityBuilder()
.withAmount("1")
.build(),
"memory" -> new QuantityBuilder()
.withAmount("512M")
.build()
"cpu" -> new Quantity("1"),
"memory" -> new Quantity("512M")
).asJava

new ContainerBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite =>
.withName("test-local-pv")
.endMetadata()
.withNewSpec()
.withCapacity(Map("storage" -> new QuantityBuilder().withAmount("1Gi").build()).asJava)
.withCapacity(Map("storage" -> new Quantity("1Gi")).asJava)
.withAccessModes("ReadWriteOnce")
.withPersistentVolumeReclaimPolicy("Retain")
.withStorageClassName("test-local-storage")
Expand All @@ -71,8 +71,7 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite =>
.withAccessModes("ReadWriteOnce")
.withStorageClassName("test-local-storage")
.withResources(new ResourceRequirementsBuilder()
.withRequests(Map("storage" -> new QuantityBuilder()
.withAmount("1Gi").build()).asJava).build())
.withRequests(Map("storage" -> new Quantity("1Gi")).asJava).build())
.endSpec()

kubernetesTestComponents
Expand Down