diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-1.2 b/dev/deps/spark-deps-hadoop-2.7-hive-1.2 index 62d5772ffc94a..534ac39e0c46e 100644 --- a/dev/deps/spark-deps-hadoop-2.7-hive-1.2 +++ b/dev/deps/spark-deps-hadoop-2.7-hive-1.2 @@ -137,9 +137,9 @@ jsr305/3.0.0//jsr305-3.0.0.jar jta/1.1//jta-1.1.jar jul-to-slf4j/1.7.16//jul-to-slf4j-1.7.16.jar kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar -kubernetes-client/4.6.4//kubernetes-client-4.6.4.jar -kubernetes-model-common/4.6.4//kubernetes-model-common-4.6.4.jar -kubernetes-model/4.6.4//kubernetes-model-4.6.4.jar +kubernetes-client/4.7.1//kubernetes-client-4.7.1.jar +kubernetes-model-common/4.7.1//kubernetes-model-common-4.7.1.jar +kubernetes-model/4.7.1//kubernetes-model-4.7.1.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar libfb303/0.9.3//libfb303-0.9.3.jar libthrift/0.12.0//libthrift-0.12.0.jar diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 index 1b57250c1fb54..42bdf112efccb 100644 --- a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 @@ -153,9 +153,9 @@ jsr305/3.0.0//jsr305-3.0.0.jar jta/1.1//jta-1.1.jar jul-to-slf4j/1.7.16//jul-to-slf4j-1.7.16.jar kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar -kubernetes-client/4.6.4//kubernetes-client-4.6.4.jar -kubernetes-model-common/4.6.4//kubernetes-model-common-4.6.4.jar -kubernetes-model/4.6.4//kubernetes-model-4.6.4.jar +kubernetes-client/4.7.1//kubernetes-client-4.7.1.jar +kubernetes-model-common/4.7.1//kubernetes-model-common-4.7.1.jar +kubernetes-model/4.7.1//kubernetes-model-4.7.1.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar libfb303/0.9.3//libfb303-0.9.3.jar libthrift/0.12.0//libthrift-0.12.0.jar diff --git a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 index ffd2364a51317..6006fa4b43f42 100644 --- a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 @@ -165,9 +165,9 @@ kerby-pkix/1.0.1//kerby-pkix-1.0.1.jar kerby-util/1.0.1//kerby-util-1.0.1.jar kerby-xdr/1.0.1//kerby-xdr-1.0.1.jar kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar -kubernetes-client/4.6.4//kubernetes-client-4.6.4.jar -kubernetes-model-common/4.6.4//kubernetes-model-common-4.6.4.jar -kubernetes-model/4.6.4//kubernetes-model-4.6.4.jar +kubernetes-client/4.7.1//kubernetes-client-4.7.1.jar +kubernetes-model-common/4.7.1//kubernetes-model-common-4.7.1.jar +kubernetes-model/4.7.1//kubernetes-model-4.7.1.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar libfb303/0.9.3//libfb303-0.9.3.jar libthrift/0.12.0//libthrift-0.12.0.jar diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index 8c46738b259bc..f8c6b38225559 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -29,7 +29,7 @@ Spark Project Kubernetes kubernetes - 4.6.4 + 4.7.1 diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index e234b1780a7d0..c49f4a15de974 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -23,7 +23,7 @@ import java.util.UUID import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder, Quantity, QuantityBuilder} +import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder, Quantity} import io.fabric8.kubernetes.client.KubernetesClient import org.apache.commons.codec.binary.Hex import org.apache.hadoop.fs.{FileSystem, Path} @@ -234,9 +234,7 @@ private[spark] object KubernetesUtils extends Logging { throw new SparkException(s"Resource: ${request.id.resourceName} was requested, " + "but vendor was not specified.") } - val quantity = new QuantityBuilder(false) - .withAmount(request.amount.toString) - .build() + val quantity = new Quantity(request.amount.toString) (KubernetesConf.buildKubernetesResourceName(vendorDomain, request.id.resourceName), quantity) }.toMap } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index e2d70d7692ef3..eec275e6e6f23 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -80,14 +80,10 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) .build() } - val driverCpuQuantity = new QuantityBuilder(false) - .withAmount(driverCoresRequest) - .build() - val driverMemoryQuantity = new QuantityBuilder(false) - .withAmount(s"${driverMemoryWithOverheadMiB}Mi") - .build() + val driverCpuQuantity = new Quantity(driverCoresRequest) + val driverMemoryQuantity = new Quantity(s"${driverMemoryWithOverheadMiB}Mi") val maybeCpuLimitQuantity = driverLimitCores.map { limitCores => - ("cpu", new QuantityBuilder(false).withAmount(limitCores).build()) + ("cpu", new Quantity(limitCores)) } val driverResourceQuantities = diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index d88bd5858bc94..6a26df2997fd2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -88,12 +88,8 @@ private[spark] class BasicExecutorFeatureStep( // Replace dangerous characters in the remaining string with a safe alternative. .replaceAll("[^\\w-]+", "_") - val executorMemoryQuantity = new QuantityBuilder(false) - .withAmount(s"${executorMemoryTotal}Mi") - .build() - val executorCpuQuantity = new QuantityBuilder(false) - .withAmount(executorCoresRequest) - .build() + val executorMemoryQuantity = new Quantity(s"${executorMemoryTotal}Mi") + val executorCpuQuantity = new Quantity(executorCoresRequest) val executorResourceQuantities = KubernetesUtils.buildResourcesQuantities(SPARK_EXECUTOR_PREFIX, @@ -183,9 +179,7 @@ private[spark] class BasicExecutorFeatureStep( .addToArgs("executor") .build() val containerWithLimitCores = executorLimitCores.map { limitCores => - val executorCpuLimitQuantity = new QuantityBuilder(false) - .withAmount(limitCores) - .build() + val executorCpuLimitQuantity = new Quantity(limitCores) new ContainerBuilder(executorContainer) .editResources() .addToLimits("cpu", executorCpuLimitQuantity) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala index 8548e7057cdf0..4599df99b3c61 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -65,7 +65,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) new VolumeBuilder() .withEmptyDir( new EmptyDirVolumeSource(medium.getOrElse(""), - new Quantity(sizeLimit.orNull))) + sizeLimit.map(new Quantity(_)).orNull)) } val volume = volumeBuilder.withName(spec.volumeName).build() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala index 707c823d69cf0..26bd317de8ec6 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala @@ -101,8 +101,7 @@ abstract class PodBuilderSuite extends SparkFunSuite { assert(container.getArgs.contains("arg")) assert(container.getCommand.equals(List("command").asJava)) assert(container.getEnv.asScala.exists(_.getName == "env-key")) - assert(container.getResources.getLimits.get("gpu") === - new QuantityBuilder().withAmount("1").build()) + assert(container.getResources.getLimits.get("gpu") === new Quantity("1")) assert(container.getSecurityContext.getRunAsNonRoot) assert(container.getStdin) assert(container.getTerminationMessagePath === "termination-message-path") @@ -156,7 +155,7 @@ abstract class PodBuilderSuite extends SparkFunSuite { .withImagePullPolicy("Always") .withName("executor-container") .withNewResources() - .withLimits(Map("gpu" -> new QuantityBuilder().withAmount("1").build()).asJava) + .withLimits(Map("gpu" -> new Quantity("1")).asJava) .endResources() .withNewSecurityContext() .withRunAsNonRoot(true) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index ce66afd9448a9..ef69600ea88ab 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.features import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.{ContainerPort, ContainerPortBuilder, LocalObjectReferenceBuilder} +import io.fabric8.kubernetes.api.model.{ContainerPort, ContainerPortBuilder, LocalObjectReferenceBuilder, Quantity} import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod} @@ -105,13 +105,13 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { val resourceRequirements = configuredPod.container.getResources val requests = resourceRequirements.getRequests.asScala - assert(requests("cpu").getAmount === "2") - assert(requests("memory").getAmount === "456Mi") + assert(amountAndFormat(requests("cpu")) === "2") + assert(amountAndFormat(requests("memory")) === "456Mi") val limits = resourceRequirements.getLimits.asScala - assert(limits("memory").getAmount === "456Mi") - assert(limits("cpu").getAmount === "4") + assert(amountAndFormat(limits("memory")) === "456Mi") + assert(amountAndFormat(limits("cpu")) === "4") resources.foreach { case (k8sName, testRInfo) => - assert(limits(k8sName).getAmount === testRInfo.count) + assert(amountAndFormat(limits(k8sName)) === testRInfo.count) } val driverPodMetadata = configuredPod.pod.getMetadata @@ -141,7 +141,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { .configurePod(basePod) .container.getResources .getRequests.asScala - assert(requests1("cpu").getAmount === "1") + assert(amountAndFormat(requests1("cpu")) === "1") // if spark.driver.cores is set it should be used sparkConf.set(DRIVER_CORES, 10) @@ -149,7 +149,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { .configurePod(basePod) .container.getResources .getRequests.asScala - assert(requests2("cpu").getAmount === "10") + assert(amountAndFormat(requests2("cpu")) === "10") // spark.kubernetes.driver.request.cores should be preferred over spark.driver.cores Seq("0.1", "100m").foreach { value => @@ -158,7 +158,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { .configurePod(basePod) .container.getResources .getRequests.asScala - assert(requests3("cpu").getAmount === value) + assert(amountAndFormat(requests3("cpu")) === value) } } @@ -204,7 +204,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { mainAppResource = resource) val step = new BasicDriverFeatureStep(conf) val pod = step.configurePod(SparkPod.initialPod()) - val mem = pod.container.getResources.getRequests.get("memory").getAmount() + val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory")) val expected = (driverMem + driverMem * expectedFactor).toInt assert(mem === s"${expected}Mi") @@ -219,4 +219,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { .withContainerPort(portNumber) .withProtocol("TCP") .build() + + private def amountAndFormat(quantity: Quantity): String = quantity.getAmount + quantity.getFormat } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index f375b1fe6a5cd..da50372d04c73 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -128,10 +128,11 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { val executor = step.configurePod(SparkPod.initialPod()) assert(executor.container.getResources.getLimits.size() === 3) - assert(executor.container.getResources - .getLimits.get("memory").getAmount === "1408Mi") + assert(amountAndFormat(executor.container.getResources + .getLimits.get("memory")) === "1408Mi") gpuResources.foreach { case (k8sName, testRInfo) => - assert(executor.container.getResources.getLimits.get(k8sName).getAmount === testRInfo.count) + assert(amountAndFormat( + executor.container.getResources.getLimits.get(k8sName)) === testRInfo.count) } } @@ -151,8 +152,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { assert(executor.container.getImage === EXECUTOR_IMAGE) assert(executor.container.getVolumeMounts.isEmpty) assert(executor.container.getResources.getLimits.size() === 1) - assert(executor.container.getResources - .getLimits.get("memory").getAmount === "1408Mi") + assert(amountAndFormat(executor.container.getResources + .getLimits.get("memory")) === "1408Mi") // The pod has no node selector, volumes. assert(executor.pod.getSpec.getNodeSelector.isEmpty) @@ -201,7 +202,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { val step = new BasicExecutorFeatureStep(newExecutorConf(), new SecurityManager(baseConf)) val executor = step.configurePod(SparkPod.initialPod()) // This is checking that basic executor + executorMemory = 1408 + 42 = 1450 - assert(executor.container.getResources.getRequests.get("memory").getAmount === "1450Mi") + assert(amountAndFormat(executor.container.getResources.getRequests.get("memory")) === "1450Mi") } test("auth secret propagation") { @@ -273,4 +274,6 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { val expectedEnvs = defaultEnvs ++ additionalEnvVars ++ extraJavaOptsEnvs assert(containerEnvs === expectedEnvs) } + + private def amountAndFormat(quantity: Quantity): String = quantity.getAmount + quantity.getFormat } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index 8c430eeb3fa71..3888062785324 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -79,7 +79,8 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { assert(configuredPod.pod.getSpec.getVolumes.size() === 1) val emptyDir = configuredPod.pod.getSpec.getVolumes.get(0).getEmptyDir assert(emptyDir.getMedium === "Memory") - assert(emptyDir.getSizeLimit.getAmount === "6G") + assert(emptyDir.getSizeLimit.getAmount === "6") + assert(emptyDir.getSizeLimit.getFormat === "G") assert(configuredPod.container.getVolumeMounts.size() === 1) assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp") assert(configuredPod.container.getVolumeMounts.get(0).getName === "testVolume") @@ -101,7 +102,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { assert(configuredPod.pod.getSpec.getVolumes.size() === 1) val emptyDir = configuredPod.pod.getSpec.getVolumes.get(0).getEmptyDir assert(emptyDir.getMedium === "") - assert(emptyDir.getSizeLimit.getAmount === null) + assert(emptyDir.getSizeLimit === null) assert(configuredPod.container.getVolumeMounts.size() === 1) assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp") assert(configuredPod.container.getVolumeMounts.get(0).getName === "testVolume") diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala index 7181774b9f17e..289fb9641295d 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala @@ -54,12 +54,8 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite => ).toArray val resources = Map( - "cpu" -> new QuantityBuilder() - .withAmount("1") - .build(), - "memory" -> new QuantityBuilder() - .withAmount("512M") - .build() + "cpu" -> new Quantity("1"), + "memory" -> new Quantity("512M") ).asJava new ContainerBuilder() diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala index f0218217e6afb..4b4dff93f5742 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala @@ -45,7 +45,7 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite => .withName("test-local-pv") .endMetadata() .withNewSpec() - .withCapacity(Map("storage" -> new QuantityBuilder().withAmount("1Gi").build()).asJava) + .withCapacity(Map("storage" -> new Quantity("1Gi")).asJava) .withAccessModes("ReadWriteOnce") .withPersistentVolumeReclaimPolicy("Retain") .withStorageClassName("test-local-storage") @@ -71,8 +71,7 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite => .withAccessModes("ReadWriteOnce") .withStorageClassName("test-local-storage") .withResources(new ResourceRequirementsBuilder() - .withRequests(Map("storage" -> new QuantityBuilder() - .withAmount("1Gi").build()).asJava).build()) + .withRequests(Map("storage" -> new Quantity("1Gi")).asJava).build()) .endSpec() kubernetesTestComponents