diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index f9a77e71ad61..823f3e7c9b9a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -104,18 +104,18 @@ private[spark] object Config extends Logging { .stringConf .createOptional - val KUBERNETES_DRIVER_POD_NAME = - ConfigBuilder("spark.kubernetes.driver.pod.name") - .doc("Name of the driver pod.") + val KUBERNETES_DRIVER_POD_NAME_PREFIX = + ConfigBuilder("spark.kubernetes.driver.pod.namePrefix") + .doc("Prefix to use in front of the driver pod names.") .stringConf .createOptional - val KUBERNETES_EXECUTOR_POD_NAME_PREFIX = - ConfigBuilder("spark.kubernetes.executor.podNamePrefix") - .doc("Prefix to use in front of the executor pod names.") - .internal() - .stringConf - .createWithDefault("spark") + val KUBERNETES_DRIVER_JOB_BACKOFFLIMIT = + ConfigBuilder("spark.kubernetes.driver.job.backofflimit") + .doc("Driver job backofflimit.") + .intConf + .checkValue(value => value > 0, "Backofflimit must be a positive number") + .createWithDefault(6) val KUBERNETES_PYSPARK_PY_FILES = ConfigBuilder("spark.kubernetes.python.pyFiles") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 51d205fdb68d..6cda0b7b67f5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -217,7 +217,7 @@ private[spark] object KubernetesConf { KubernetesConf( sparkConf.clone(), KubernetesExecutorSpecificConf(executorId, driverPod), - sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX), + appId, appId, executorLabels, executorAnnotations, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala index 0c5ae022f407..ca1f30eb6634 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala @@ -19,13 +19,13 @@ package org.apache.spark.deploy.k8s import io.fabric8.kubernetes.api.model.HasMetadata private[spark] case class KubernetesDriverSpec( - pod: SparkPod, + job: SparkJob, driverKubernetesResources: Seq[HasMetadata], systemProperties: Map[String, String]) private[spark] object KubernetesDriverSpec { def initialSpec(initialProps: Map[String, String]): KubernetesDriverSpec = KubernetesDriverSpec( - SparkPod.initialPod(), + SparkJob.initialJob(), Seq.empty, initialProps) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkJob.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkJob.scala new file mode 100644 index 000000000000..dee43ca9d55f --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkJob.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Job, JobBuilder} + +private[spark] case class SparkJob(job: Job, container: Container) + +private[spark] object SparkJob { + def initialJob(): SparkJob = { + SparkJob( + new JobBuilder() + .withNewMetadata() + .endMetadata() + .withNewSpec() + .endSpec() + .build(), + new ContainerBuilder().build()) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 7e67b51de6e0..ec663caad3f5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -32,8 +32,8 @@ private[spark] class BasicDriverFeatureStep( conf: KubernetesConf[KubernetesDriverSpecificConf]) extends KubernetesFeatureConfigStep { - private val driverPodName = conf - .get(KUBERNETES_DRIVER_POD_NAME) + private val driverPodNamePrefix = conf + .get(KUBERNETES_DRIVER_POD_NAME_PREFIX) .getOrElse(s"${conf.appResourceNamePrefix}-driver") private val driverContainerImage = conf @@ -93,7 +93,7 @@ private[spark] class BasicDriverFeatureStep( val driverPod = new PodBuilder(pod.pod) .editOrNewMetadata() - .withName(driverPodName) + .withName(driverPodNamePrefix) .addToLabels(conf.roleLabels.asJava) .addToAnnotations(conf.roleAnnotations.asJava) .endMetadata() @@ -109,9 +109,8 @@ private[spark] class BasicDriverFeatureStep( override def getAdditionalPodSystemProperties(): Map[String, String] = { val additionalProps = mutable.Map( - KUBERNETES_DRIVER_POD_NAME.key -> driverPodName, + KUBERNETES_DRIVER_POD_NAME_PREFIX.key -> driverPodNamePrefix, "spark.app.id" -> conf.appId, - KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.appResourceNamePrefix, KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true") val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath( @@ -127,5 +126,4 @@ private[spark] class BasicDriverFeatureStep( additionalProps.toMap } - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index abaeff0313a7..05bcb6ca8db9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -42,7 +42,7 @@ private[spark] class BasicExecutorFeatureStep( .sparkConf .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) - private val executorPodNamePrefix = kubernetesConf.appResourceNamePrefix + private val executorPodNamePrefix = kubernetesConf.appId private val driverUrl = RpcEndpointAddress( kubernetesConf.get("spark.driver.host"), @@ -176,8 +176,4 @@ private[spark] class BasicExecutorFeatureStep( SparkPod(executorPod, containerWithLimitCores) } - - override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStep.scala index 03ff7d48420f..d319b84c53fc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStep.scala @@ -50,8 +50,4 @@ private[spark] class EnvSecretsFeatureStep( .build() SparkPod(pod.pod, containerWithEnvVars) } - - override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala index 4c1be3bb1329..58cdaa3cadd6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala @@ -61,11 +61,11 @@ private[spark] trait KubernetesFeatureConfigStep { /** * Return any system properties that should be set on the JVM in accordance to this feature. */ - def getAdditionalPodSystemProperties(): Map[String, String] + def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty /** * Return any additional Kubernetes resources that should be added to support this feature. Only * applicable when creating the driver in cluster mode. */ - def getAdditionalKubernetesResources(): Seq[HasMetadata] + def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala index 70b307303d14..4e7d2f54a0ea 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala @@ -70,8 +70,4 @@ private[spark] class LocalDirsFeatureStep( .build() SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts) } - - override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStep.scala index 97fa9499b2ed..4cecfe455498 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStep.scala @@ -54,9 +54,5 @@ private[spark] class MountSecretsFeatureStep( SparkPod(podWithVolumes, containerWithMounts) } - override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty - private def secretVolumeName(secretName: String): String = s"$secretName-volume" } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala index f52ec9fdc677..815c7bae0c93 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala @@ -38,7 +38,4 @@ private[spark] class JavaDriverFeatureStep( .build() SparkPod(pod.pod, withDriverArgs) } - override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala index c20bcac1f898..8d307c319a4f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala @@ -67,7 +67,4 @@ private[spark] class PythonDriverFeatureStep( SparkPod(pod.pod, withPythonPrimaryContainer) } - override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index eaff47205dbb..2b4e422a0a33 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -82,11 +82,11 @@ private[spark] object ClientArguments { } /** - * Submits a Spark application to run on Kubernetes by creating the driver pod and starting a + * Submits a Spark application to run on Kubernetes by creating the driver job and starting a * watcher that monitors and logs the application status. Waits for the application to terminate if * spark.kubernetes.submission.waitAppCompletion is true. * - * @param builder Responsible for building the base driver pod based on a composition of + * @param builder Responsible for building the base driver job based on a composition of * implemented features. * @param kubernetesConf application configuration * @param kubernetesClient the client to talk to the Kubernetes API server @@ -96,13 +96,13 @@ private[spark] object ClientArguments { * @param watcher a watcher that monitors and logs the application status */ private[spark] class Client( - builder: KubernetesDriverBuilder, - kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf], - kubernetesClient: KubernetesClient, - waitForAppCompletion: Boolean, - appName: String, - watcher: LoggingPodStatusWatcher, - kubernetesResourceNamePrefix: String) extends Logging { + builder: KubernetesDriverBuilder, + kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf], + kubernetesClient: KubernetesClient, + waitForAppCompletion: Boolean, + appName: String, + watcher: LoggingJobStatusWatcher, + kubernetesResourceNamePrefix: String) extends Logging { def run(): Unit = { val resolvedDriverSpec = builder.buildFromFeatures(kubernetesConf) @@ -110,7 +110,7 @@ private[spark] class Client( val configMap = buildConfigMap(configMapName, resolvedDriverSpec.systemProperties) // The include of the ENV_VAR for "SPARK_CONF_DIR" is to allow for the // Spark command builder to pickup on the Java Options present in the ConfigMap - val resolvedDriverContainer = new ContainerBuilder(resolvedDriverSpec.pod.container) + val resolvedDriverContainer = new ContainerBuilder(resolvedDriverSpec.job.container) .addNewEnv() .withName(ENV_SPARK_CONF_DIR) .withValue(SPARK_CONF_DIR_INTERNAL) @@ -120,31 +120,41 @@ private[spark] class Client( .withMountPath(SPARK_CONF_DIR_INTERNAL) .endVolumeMount() .build() - val resolvedDriverPod = new PodBuilder(resolvedDriverSpec.pod.pod) - .editSpec() - .addToContainers(resolvedDriverContainer) - .addNewVolume() - .withName(SPARK_CONF_VOLUME) - .withNewConfigMap() - .withName(configMapName) - .endConfigMap() - .endVolume() - .endSpec() + val resolvedDriverJob = new JobBuilder(resolvedDriverSpec.job.job) + .editOrNewSpec() + .editOrNewTemplate() + .editOrNewSpec() + .addToContainers(resolvedDriverContainer) + .addNewVolume() + .withName(SPARK_CONF_VOLUME) + .withNewConfigMap() + .withName(configMapName) + .endConfigMap() + .endVolume() + .withRestartPolicy("OnFailure") + .endSpec() + .endTemplate() + .endSpec() .build() + // If the fabric8 kubernetes client will support kubernetes 1.8 this + // should be removed and fixed with a more proper way + // (https://github.com/fabric8io/kubernetes-client/issues/1020) + resolvedDriverJob.getSpec.setAdditionalProperty("backoffLimit", + kubernetesConf.get(KUBERNETES_DRIVER_JOB_BACKOFFLIMIT)) Utils.tryWithResource( - kubernetesClient - .pods() - .withName(resolvedDriverPod.getMetadata.getName) + kubernetesClient.extensions() + .jobs() + .withName(resolvedDriverJob.getMetadata.getName) .watch(watcher)) { _ => - val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) + val createdDriverJob = kubernetesClient.extensions().jobs().create(resolvedDriverJob) try { val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) - addDriverOwnerReference(createdDriverPod, otherKubernetesResources) + addDriverOwnerReference(createdDriverJob, otherKubernetesResources) kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() } catch { case NonFatal(e) => - kubernetesClient.pods().delete(createdDriverPod) + kubernetesClient.extensions().jobs().delete(createdDriverJob) throw e } @@ -158,19 +168,19 @@ private[spark] class Client( } } - // Add a OwnerReference to the given resources making the driver pod an owner of them so when - // the driver pod is deleted, the resources are garbage collected. - private def addDriverOwnerReference(driverPod: Pod, resources: Seq[HasMetadata]): Unit = { - val driverPodOwnerReference = new OwnerReferenceBuilder() - .withName(driverPod.getMetadata.getName) - .withApiVersion(driverPod.getApiVersion) - .withUid(driverPod.getMetadata.getUid) - .withKind(driverPod.getKind) + // Add a OwnerReference to the given resources making the driver job an owner of them so when + // the driver job is deleted, the resources are garbage collected. + private def addDriverOwnerReference(driverJob: Job, resources: Seq[HasMetadata]): Unit = { + val driverJobOwnerReference = new OwnerReferenceBuilder() + .withName(driverJob.getMetadata.getName) + .withApiVersion(driverJob.getApiVersion) + .withUid(driverJob.getMetadata.getUid) + .withKind(driverJob.getKind) .withController(true) .build() resources.foreach { resource => val originalMetadata = resource.getMetadata - originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference)) + originalMetadata.setOwnerReferences(Collections.singletonList(driverJobOwnerReference)) } } @@ -229,9 +239,6 @@ private[spark] class KubernetesClientApplication extends SparkApplication { // The master URL has been checked for validity already in SparkSubmit. // We just need to get rid of the "k8s://" prefix here. val master = sparkConf.get("spark.master").substring("k8s://".length) - val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None - - val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval) Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( master, @@ -246,7 +253,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication { kubernetesClient, waitForAppCompletion, appName, - watcher, + new LoggingJobStatusWatcherImpl(kubernetesAppId, kubernetesClient), kubernetesResourceNamePrefix) client.run() } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index 7208e3d37759..62f681fb2794 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -16,7 +16,9 @@ */ package org.apache.spark.deploy.k8s.submit -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf} +import io.fabric8.kubernetes.api.model.JobBuilder + +import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.features._ import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep} @@ -77,16 +79,36 @@ private[spark] class KubernetesDriverBuilder( val allFeatures = (baseFeatures :+ bindingsStep) ++ secretFeature ++ envSecretFeature ++ volumesFeature - var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap) - for (feature <- allFeatures) { - val configuredPod = feature.configurePod(spec.pod) - val addedSystemProperties = feature.getAdditionalPodSystemProperties() - val addedResources = feature.getAdditionalKubernetesResources() - spec = KubernetesDriverSpec( - configuredPod, - spec.driverKubernetesResources ++ addedResources, - spec.systemProperties ++ addedSystemProperties) + val spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap) + val (configuredSparkPod, configuredKubernetesResources, configuredSystemProperties) = + allFeatures.foldLeft( + (SparkPod.initialPod(), spec.driverKubernetesResources, spec.systemProperties)) { + (init, feature) => + init match { + case (sparkPod, driverKubernetesResource, systemProperties) => + (feature.configurePod(sparkPod), + feature.getAdditionalKubernetesResources() ++ driverKubernetesResource, + feature.getAdditionalPodSystemProperties() ++ systemProperties) + } } - spec + new KubernetesDriverSpec( + createJobFromPod(spec.job, configuredSparkPod), + configuredKubernetesResources, + configuredSystemProperties + ) + } + + def createJobFromPod(sparkJob: SparkJob, sparkPod: SparkPod): SparkJob = { + val job = new JobBuilder(sparkJob.job) + .editOrNewMetadata() + .withName(sparkPod.pod.getMetadata.getName) + .endMetadata() + .editOrNewSpec() + .editOrNewTemplate() + .withMetadata(sparkPod.pod.getMetadata) + .withSpec(sparkPod.pod.getSpec) + .endTemplate() + .endSpec().build() + new SparkJob(job, sparkPod.container) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingJobStatusWatcher.scala similarity index 69% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingJobStatusWatcher.scala index 173ac541626a..48de5bfce741 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingJobStatusWatcher.scala @@ -16,55 +16,43 @@ */ package org.apache.spark.deploy.k8s.submit -import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.concurrent.CountDownLatch import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time} -import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Job, Pod, Time} +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.apache.spark.SparkException import org.apache.spark.internal.Logging -import org.apache.spark.util.ThreadUtils -private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { +private[k8s] trait LoggingJobStatusWatcher extends Watcher[Job] { def awaitCompletion(): Unit } -/** - * A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on - * every state change and also at an interval for liveness. - * - * @param appId application ID. - * @param maybeLoggingInterval ms between each state request. If provided, must be a positive - * number. - */ -private[k8s] class LoggingPodStatusWatcherImpl( - appId: String, - maybeLoggingInterval: Option[Long]) - extends LoggingPodStatusWatcher with Logging { - - private val podCompletedFuture = new CountDownLatch(1) - // start timer for periodic logging - private val scheduler = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher") - private val logRunnable: Runnable = new Runnable { - override def run() = logShortStatus() - } + /** + * A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on + * every state change and also at an interval for liveness. + * + * @param appId application ID. + * @param kubernetesClient kubernetes client. + */ +private[k8s] class LoggingJobStatusWatcherImpl(appId: String, kubernetesClient: KubernetesClient) + extends LoggingJobStatusWatcher with Logging { - private var pod = Option.empty[Pod] + private val jobCompletedFuture = new CountDownLatch(1) - private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown") + private var job : Option[Job] = None - def start(): Unit = { - maybeLoggingInterval.foreach { interval => - scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS) - } + private def phase: String = job match { + case Some(j) if j.getStatus.getConditions.isEmpty => "Running" + case Some(j) => j.getStatus.getConditions.get(0).getType + case _ => "unknown" } - override def eventReceived(action: Action, pod: Pod): Unit = { - this.pod = Option(pod) + override def eventReceived(action: Action, job: Job): Unit = { + this.job = Option(job) action match { case Action.DELETED | Action.ERROR => closeWatch() @@ -82,24 +70,21 @@ private[k8s] class LoggingPodStatusWatcherImpl( closeWatch() } - private def logShortStatus() = { - logInfo(s"Application status for $appId (phase: $phase)") - } - private def logLongStatus() = { - logInfo("State changed, new state: " + pod.map(formatPodState).getOrElse("unknown")) + logInfo("State changed, new state: " + job.map(formatPodState).getOrElse("unknown")) } private def hasCompleted(): Boolean = { - phase == "Succeeded" || phase == "Failed" + phase == "Complete" || phase == "Failed" } private def closeWatch(): Unit = { - podCompletedFuture.countDown() - scheduler.shutdown() + jobCompletedFuture.countDown() } - private def formatPodState(pod: Pod): String = { + private def formatPodState(job: Job): String = { + // Get driver pod from job + val pod: Pod = findDriverPod(job) val details = Seq[(String, String)]( // pod metadata ("pod name", pod.getMetadata.getName), @@ -134,11 +119,19 @@ private[k8s] class LoggingPodStatusWatcherImpl( }.mkString("") } + private def findDriverPod(job: Job): Pod = { + val pods = kubernetesClient.pods() + .withLabel("job-name", job.getMetadata.getName).list() + pods.getItems.asScala.find(p => + p.getMetadata.getName.startsWith(job.getMetadata.getName)).get + } + override def awaitCompletion(): Unit = { - podCompletedFuture.await() - logInfo(pod.map { p => - s"Container final statuses:\n\n${containersDescription(p)}" - }.getOrElse("No containers were found in the driver pod.")) + jobCompletedFuture.await() + job.foreach{ j => + val pod: Pod = findDriverPod(j) + logInfo(s"Container final statuses:\n\n${containersDescription(pod)}" + )} } private def containersDescription(p: Pod): String = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 5a143ad3600f..0347d11ba18d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import io.fabric8.kubernetes.api.model.PodBuilder import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.{SparkConf, SparkException} @@ -46,13 +47,15 @@ private[spark] class ExecutorPodsAllocator( private val podCreationTimeout = math.max(podAllocationDelay * 5, 60000) - private val kubernetesDriverPodName = conf - .get(KUBERNETES_DRIVER_POD_NAME) + private val kubernetesDriverPodNamePrefix = conf + .get(KUBERNETES_DRIVER_POD_NAME_PREFIX) .getOrElse(throw new SparkException("Must specify the driver pod name")) private val driverPod = kubernetesClient.pods() - .withName(kubernetesDriverPodName) - .get() + .withLabel("job-name", kubernetesDriverPodNamePrefix).list() + .getItems.asScala.find(p => + p.getMetadata.getName.startsWith(kubernetesDriverPodNamePrefix) && + p.getStatus.getPhase == "Running").get // Executor IDs that have been requested from Kubernetes but have not been detected in any // snapshot yet. Mapped to the timestamp when they were created. diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 165f46a07df2..9a56be02ecf8 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -59,7 +59,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { test("Check the pod respects all configurations from the user.") { val sparkConf = new SparkConf() - .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") + .set(KUBERNETES_DRIVER_POD_NAME_PREFIX, "spark-driver-pod") .set("spark.driver.cores", "2") .set(KUBERNETES_DRIVER_LIMIT_CORES, "4") .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "256M") @@ -118,9 +118,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { assert(driverPodMetadata.getAnnotations.asScala === DRIVER_ANNOTATIONS) assert(configuredPod.pod.getSpec.getRestartPolicy === "Never") val expectedSparkConf = Map( - KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod", + KUBERNETES_DRIVER_POD_NAME_PREFIX.key -> "spark-driver-pod", "spark.app.id" -> APP_ID, - KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX, "spark.kubernetes.submitInDriver" -> "true") assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf) } @@ -175,7 +174,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { val allJars = Seq("local:///opt/spark/jar1.jar", "hdfs:///opt/spark/jar2.jar") val allFiles = Seq("https://localhost:9000/file1.txt", "local:///opt/spark/file2.txt") val sparkConf = new SparkConf() - .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") + .set(KUBERNETES_DRIVER_POD_NAME_PREFIX, "spark-driver-pod") .setJars(allJars) .set("spark.files", allFiles.mkString(",")) .set(CONTAINER_IMAGE, "spark-driver:latest") @@ -195,9 +194,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { val step = new BasicDriverFeatureStep(kubernetesConf) val additionalProperties = step.getAdditionalPodSystemProperties() val expectedSparkConf = Map( - KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod", + KUBERNETES_DRIVER_POD_NAME_PREFIX.key -> "spark-driver-pod", "spark.app.id" -> APP_ID, - KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX, "spark.kubernetes.submitInDriver" -> "true", "spark.jars" -> "/opt/spark/jar1.jar,hdfs:///opt/spark/jar2.jar", "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt") diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index a44fa1f2ffc6..43483413a980 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -42,7 +42,6 @@ class BasicExecutorFeatureStepSuite private val DRIVER_POD_NAME = "driver-pod" private val DRIVER_POD_UID = "driver-uid" - private val RESOURCE_NAME_PREFIX = "base" private val EXECUTOR_IMAGE = "executor-image" private val LABELS = Map("label1key" -> "label1value") private val ANNOTATIONS = Map("annotation1key" -> "annotation1value") @@ -68,8 +67,7 @@ class BasicExecutorFeatureStepSuite before { MockitoAnnotations.initMocks(this) baseConf = new SparkConf() - .set(KUBERNETES_DRIVER_POD_NAME, DRIVER_POD_NAME) - .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, RESOURCE_NAME_PREFIX) + .set(KUBERNETES_DRIVER_POD_NAME_PREFIX, DRIVER_POD_NAME) .set(CONTAINER_IMAGE, EXECUTOR_IMAGE) .set(KUBERNETES_DRIVER_SUBMIT_CHECK, true) .set("spark.driver.host", DRIVER_HOSTNAME) @@ -82,7 +80,7 @@ class BasicExecutorFeatureStepSuite KubernetesConf( baseConf, KubernetesExecutorSpecificConf("1", DRIVER_POD), - RESOURCE_NAME_PREFIX, + APP_ID, APP_ID, LABELS, ANNOTATIONS, @@ -94,7 +92,7 @@ class BasicExecutorFeatureStepSuite val executor = step.configurePod(SparkPod.initialPod()) // The executor pod name and default labels. - assert(executor.pod.getMetadata.getName === s"$RESOURCE_NAME_PREFIX-exec-1") + assert(executor.pod.getMetadata.getName === s"$APP_ID-exec-1") assert(executor.pod.getMetadata.getLabels.asScala === LABELS) assert(executor.pod.getSpec.getImagePullSecrets.asScala === TEST_IMAGE_PULL_SECRET_OBJECTS) @@ -114,26 +112,6 @@ class BasicExecutorFeatureStepSuite checkOwnerReferences(executor.pod, DRIVER_POD_UID) } - test("executor pod hostnames get truncated to 63 characters") { - val conf = baseConf.clone() - val longPodNamePrefix = "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple" - - val step = new BasicExecutorFeatureStep( - KubernetesConf( - conf, - KubernetesExecutorSpecificConf("1", DRIVER_POD), - longPodNamePrefix, - APP_ID, - LABELS, - ANNOTATIONS, - Map.empty, - Map.empty, - Map.empty, - Nil, - Seq.empty[String])) - assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63) - } - test("classpath and extra java options get translated into environment variables") { val conf = baseConf.clone() conf.set(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS, "foo=bar") @@ -143,7 +121,7 @@ class BasicExecutorFeatureStepSuite KubernetesConf( conf, KubernetesExecutorSpecificConf("1", DRIVER_POD), - RESOURCE_NAME_PREFIX, + APP_ID, APP_ID, LABELS, ANNOTATIONS, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index 4d8e79189ff3..76336d3f8a18 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -18,24 +18,23 @@ package org.apache.spark.deploy.k8s.submit import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClient, Watch} -import io.fabric8.kubernetes.client.dsl.{MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource} +import io.fabric8.kubernetes.client.dsl.{ExtensionsAPIGroupDSL, MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, ScalableResource} import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} import org.mockito.Mockito.{doReturn, verify, when} import org.scalatest.BeforeAndAfter import org.scalatest.mockito.MockitoSugar._ import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, SparkJob} import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.Fabric8Aliases._ class ClientSuite extends SparkFunSuite with BeforeAndAfter { - private val DRIVER_POD_UID = "pod-id" - private val DRIVER_POD_API_VERSION = "v1" - private val DRIVER_POD_KIND = "pod" + private val DRIVER_JOB_UID = "job-id" + private val DRIVER_JOB_API_VERSION = "v1" + private val DRIVER_JOB_KIND = "job" private val KUBERNETES_RESOURCE_PREFIX = "resource-example" - private val POD_NAME = "driver" + private val JOB_NAME = "driver" private val CONTAINER_NAME = "container" private val APP_ID = "app-id" private val APP_NAME = "app" @@ -44,21 +43,25 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { private val RESOLVED_JAVA_OPTIONS = Map( "conf1key" -> "conf1value", "conf2key" -> "conf2value") - private val BUILT_DRIVER_POD = - new PodBuilder() + private val BUILT_DRIVER_JOB = + new JobBuilder() .withNewMetadata() - .withName(POD_NAME) - .endMetadata() + .withName(JOB_NAME) + .endMetadata() .withNewSpec() - .withHostname("localhost") - .endSpec() + .withNewTemplate() + .withNewSpec() + .withHostname("localhost") + .endSpec() + .endTemplate() + .endSpec() .build() private val BUILT_DRIVER_CONTAINER = new ContainerBuilder().withName(CONTAINER_NAME).build() private val ADDITIONAL_RESOURCES = Seq( new SecretBuilder().withNewMetadata().withName("secret").endMetadata().build()) private val BUILT_KUBERNETES_SPEC = KubernetesDriverSpec( - SparkPod(BUILT_DRIVER_POD, BUILT_DRIVER_CONTAINER), + SparkJob(BUILT_DRIVER_JOB, BUILT_DRIVER_CONTAINER), ADDITIONAL_RESOURCES, RESOLVED_JAVA_OPTIONS) @@ -66,66 +69,87 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { .addNewEnv() .withName(ENV_SPARK_CONF_DIR) .withValue(SPARK_CONF_DIR_INTERNAL) - .endEnv() + .endEnv() .addNewVolumeMount() .withName(SPARK_CONF_VOLUME) .withMountPath(SPARK_CONF_DIR_INTERNAL) - .endVolumeMount() + .endVolumeMount() .build() - private val FULL_EXPECTED_POD = new PodBuilder(BUILT_DRIVER_POD) + private val FULL_EXPECTED_JOB = new JobBuilder(BUILT_DRIVER_JOB) .editSpec() - .addToContainers(FULL_EXPECTED_CONTAINER) - .addNewVolume() - .withName(SPARK_CONF_VOLUME) - .withNewConfigMap().withName(s"$KUBERNETES_RESOURCE_PREFIX-driver-conf-map").endConfigMap() - .endVolume() - .endSpec() + .editTemplate() + .editSpec() + .addToContainers(FULL_EXPECTED_CONTAINER) + .addNewVolume() + .withName(SPARK_CONF_VOLUME) + .withNewConfigMap() + .withName(s"$KUBERNETES_RESOURCE_PREFIX-driver-conf-map") + .endConfigMap() + .endVolume() + .withRestartPolicy("OnFailure") + .endSpec() + .endTemplate() + .endSpec() .build() - private val POD_WITH_OWNER_REFERENCE = new PodBuilder(FULL_EXPECTED_POD) + // BackoffLimit needs to be set after creation + FULL_EXPECTED_JOB.getSpec.setAdditionalProperty("backoffLimit", 6) + + private val JOB_WITH_OWNER_REFERENCE = new JobBuilder(FULL_EXPECTED_JOB) .editMetadata() - .withUid(DRIVER_POD_UID) - .endMetadata() - .withApiVersion(DRIVER_POD_API_VERSION) - .withKind(DRIVER_POD_KIND) + .withUid(DRIVER_JOB_UID) + .withName(JOB_NAME) + .endMetadata() + .withApiVersion(DRIVER_JOB_API_VERSION) + .withKind(DRIVER_JOB_KIND) .build() private val ADDITIONAL_RESOURCES_WITH_OWNER_REFERENCES = ADDITIONAL_RESOURCES.map { secret => new SecretBuilder(secret) .editMetadata() .addNewOwnerReference() - .withName(POD_NAME) - .withApiVersion(DRIVER_POD_API_VERSION) - .withKind(DRIVER_POD_KIND) + .withName(JOB_NAME) + .withApiVersion(DRIVER_JOB_API_VERSION) + .withKind(DRIVER_JOB_KIND) .withController(true) - .withUid(DRIVER_POD_UID) - .endOwnerReference() - .endMetadata() + .withUid(DRIVER_JOB_UID) + .endOwnerReference() + .endMetadata() .build() } + private type ResourceList = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[ + HasMetadata, Boolean] + private type Jobs = MixedOperation[Job, JobList, DoneableJob, ScalableResource[Job, DoneableJob]] + @Mock private var kubernetesClient: KubernetesClient = _ @Mock - private var podOperations: PODS = _ + private var extension: ExtensionsAPIGroupDSL = _ @Mock - private var namedPods: PodResource[Pod, DoneablePod] = _ + private var jobOperations: Jobs = _ @Mock - private var loggingPodStatusWatcher: LoggingPodStatusWatcher = _ + private var namedJobs: ScalableResource[Job, DoneableJob] = _ + + @Mock + private var loggingJobStatusWatcher: LoggingJobStatusWatcher = _ @Mock private var driverBuilder: KubernetesDriverBuilder = _ @Mock - private var resourceList: RESOURCE_LIST = _ + private var resourceList: ResourceList = _ + + @Mock + private var jobMetadata: ObjectMeta = _ private var kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf] = _ private var sparkConf: SparkConf = _ - private var createdPodArgumentCaptor: ArgumentCaptor[Pod] = _ + private var createdJobArgumentCaptor: ArgumentCaptor[Job] = _ private var createdResourcesArgumentCaptor: ArgumentCaptor[HasMetadata] = _ before { @@ -144,29 +168,30 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { Nil, Seq.empty[String]) when(driverBuilder.buildFromFeatures(kubernetesConf)).thenReturn(BUILT_KUBERNETES_SPEC) - when(kubernetesClient.pods()).thenReturn(podOperations) - when(podOperations.withName(POD_NAME)).thenReturn(namedPods) + when(kubernetesClient.extensions()).thenReturn(extension) + when(kubernetesClient.extensions().jobs()).thenReturn(jobOperations) + when(jobOperations.withName(JOB_NAME)).thenReturn(namedJobs) - createdPodArgumentCaptor = ArgumentCaptor.forClass(classOf[Pod]) + createdJobArgumentCaptor = ArgumentCaptor.forClass(classOf[Job]) createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata]) - when(podOperations.create(FULL_EXPECTED_POD)).thenReturn(POD_WITH_OWNER_REFERENCE) - when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch]) + when(jobOperations.create(FULL_EXPECTED_JOB)).thenReturn(JOB_WITH_OWNER_REFERENCE) + when(namedJobs.watch(loggingJobStatusWatcher)).thenReturn(mock[Watch]) doReturn(resourceList) .when(kubernetesClient) .resourceList(createdResourcesArgumentCaptor.capture()) } - test("The client should configure the pod using the builder.") { + test("The client should configure the job using the builder.") { val submissionClient = new Client( driverBuilder, kubernetesConf, kubernetesClient, false, "spark", - loggingPodStatusWatcher, + loggingJobStatusWatcher, KUBERNETES_RESOURCE_PREFIX) submissionClient.run() - verify(podOperations).create(FULL_EXPECTED_POD) + verify(jobOperations).create(FULL_EXPECTED_JOB) } test("The client should create Kubernetes resources") { @@ -176,7 +201,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { kubernetesClient, false, "spark", - loggingPodStatusWatcher, + loggingJobStatusWatcher, KUBERNETES_RESOURCE_PREFIX) submissionClient.run() val otherCreatedResources = createdResourcesArgumentCaptor.getAllValues @@ -202,9 +227,9 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { kubernetesClient, true, "spark", - loggingPodStatusWatcher, + loggingJobStatusWatcher, KUBERNETES_RESOURCE_PREFIX) submissionClient.run() - verify(loggingPodStatusWatcher).awaitCompletion() + verify(loggingJobStatusWatcher).awaitCompletion() } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index 046e578b9462..55430731a08c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -216,7 +216,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { : Unit = { assert(resolvedSpec.systemProperties.size === stepTypes.size) stepTypes.foreach { stepType => - assert(resolvedSpec.pod.pod.getMetadata.getLabels.get(stepType) === stepType) + assert(resolvedSpec.job.job.getSpec.getTemplate + .getMetadata.getLabels.get(stepType) === stepType) assert(resolvedSpec.driverKubernetesResources.containsSlice( KubernetesFeaturesTestUtils.getSecretsForStepType(stepType))) assert(resolvedSpec.systemProperties(stepType) === stepType) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 0c19f5946b75..cab623a2aeeb 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.scheduler.cluster.k8s -import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder} +import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder, PodList} import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.dsl.PodResource import org.mockito.{ArgumentMatcher, Matchers, Mock, MockitoAnnotations} @@ -38,16 +38,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { private val driverPodName = "driver" - private val driverPod = new PodBuilder() - .withNewMetadata() - .withName(driverPodName) - .addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID) - .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE) - .withUid("driver-pod-uid") - .endMetadata() - .build() - - private val conf = new SparkConf().set(KUBERNETES_DRIVER_POD_NAME, driverPodName) + private val conf = new SparkConf().set(KUBERNETES_DRIVER_POD_NAME_PREFIX, driverPodName) private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) @@ -64,21 +55,43 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { @Mock private var labeledPods: LABELED_PODS = _ + @Mock + private var podList: PodList = _ + @Mock private var driverPodOperations: PodResource[Pod, DoneablePod] = _ @Mock private var executorBuilder: KubernetesExecutorBuilder = _ + private val podListElements = new java.util.ArrayList[Pod]() + private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _ private var podsAllocatorUnderTest: ExecutorPodsAllocator = _ + private val driverPod = new PodBuilder() + .withNewMetadata() + .withName(driverPodName) + .addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID) + .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE) + .addToLabels("job-name", driverPodName) + .withUid("driver-pod-uid") + .endMetadata() + .withNewStatus() + .withPhase("Running") + .endStatus() + .build() + + podListElements.add(driverPod) + + before { MockitoAnnotations.initMocks(this) when(kubernetesClient.pods()).thenReturn(podOperations) - when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations) - when(driverPodOperations.get).thenReturn(driverPod) + when(podOperations.withLabel("job-name", driverPodName)).thenReturn(labeledPods) + when(labeledPods.list()).thenReturn(podList) + when(podList.getItems).thenReturn(podListElements) when(executorBuilder.buildFromFeatures(kubernetesConfWithCorrectFields())) .thenAnswer(executorPodAnswer()) snapshotsStore = new DeterministicExecutorPodsSnapshotsStore() diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 774c3936b877..cb60bb83741d 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -46,7 +46,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite private var pyImage: String = _ private var containerLocalSparkDistroExamplesJar: String = _ private var appLocator: String = _ - private var driverPodName: String = _ + private var driverPodNamePrefix: String = _ override def beforeAll(): Unit = { // The scalatest-maven-plugin gives system properties that are referenced but not set null @@ -84,10 +84,10 @@ private[spark] class KubernetesSuite extends SparkFunSuite before { appLocator = UUID.randomUUID().toString.replaceAll("-", "") - driverPodName = "spark-test-app-" + UUID.randomUUID().toString.replaceAll("-", "") + driverPodNamePrefix = "spark-test-app-" + UUID.randomUUID().toString.replaceAll("-", "") sparkAppConf = kubernetesTestComponents.newSparkAppConf() .set("spark.kubernetes.container.image", image) - .set("spark.kubernetes.driver.pod.name", driverPodName) + .set("spark.kubernetes.driver.pod.namePrefix", driverPodNamePrefix) .set("spark.kubernetes.driver.label.spark-app-locator", appLocator) .set("spark.kubernetes.executor.label.spark-app-locator", appLocator) if (!kubernetesTestComponents.hasUserSpecifiedNamespace) { @@ -347,13 +347,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite } private def doBasicDriverPodCheck(driverPod: Pod): Unit = { - assert(driverPod.getMetadata.getName === driverPodName) + assert(driverPod.getMetadata.getName === driverPodNamePrefix) assert(driverPod.getSpec.getContainers.get(0).getImage === image) assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver") } private def doBasicDriverPyPodCheck(driverPod: Pod): Unit = { - assert(driverPod.getMetadata.getName === driverPodName) + assert(driverPod.getMetadata.getName === driverPodNamePrefix) assert(driverPod.getSpec.getContainers.get(0).getImage === pyImage) assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver") } @@ -387,11 +387,11 @@ private[spark] class KubernetesSuite extends SparkFunSuite } private def deleteDriverPod(): Unit = { - kubernetesTestComponents.kubernetesClient.pods().withName(driverPodName).delete() + kubernetesTestComponents.kubernetesClient.pods().withName(driverPodNamePrefix).delete() Eventually.eventually(TIMEOUT, INTERVAL) { assert(kubernetesTestComponents.kubernetesClient .pods() - .withName(driverPodName) + .withName(driverPodNamePrefix) .get() == null) } }