diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index edeaa380194a..074cf21f9db3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -17,13 +17,15 @@ package org.apache.spark.deploy.k8s.submit import java.io.StringWriter +import java.net.HttpURLConnection.HTTP_GONE import java.util.{Collections, UUID} import java.util.Properties import io.fabric8.kubernetes.api.model._ -import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch} import scala.collection.mutable import scala.util.control.NonFatal +import util.control.Breaks._ import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkApplication @@ -133,29 +135,33 @@ private[spark] class Client( .endVolume() .endSpec() .build() - Utils.tryWithResource( - kubernetesClient - .pods() - .withName(resolvedDriverPod.getMetadata.getName) - .watch(watcher)) { _ => - val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) - try { - val otherKubernetesResources = - resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) - addDriverOwnerReference(createdDriverPod, otherKubernetesResources) - kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() - } catch { - case NonFatal(e) => - kubernetesClient.pods().delete(createdDriverPod) - throw e - } - if (waitForAppCompletion) { - logInfo(s"Waiting for application $appName to finish...") - watcher.awaitCompletion() - logInfo(s"Application $appName finished.") - } else { - logInfo(s"Deployed Spark application $appName into Kubernetes.") + val driverPodName = resolvedDriverPod.getMetadata.getName + var watch: Watch = null + val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) + try { + val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap) + addDriverOwnerReference(createdDriverPod, otherKubernetesResources) + kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() + } catch { + case NonFatal(e) => + kubernetesClient.pods().delete(createdDriverPod) + throw e + } + val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":") + breakable { + while (true) { + try { + watch = kubernetesClient + .pods() + .withName(driverPodName) + .watch(watcher) + watcher.watchOrStop(sId) + break + } catch { + case e: KubernetesClientException if e.getCode == HTTP_GONE => + logInfo("Resource version changed rerunning the watcher") + } } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index 4a7d3d42d23d..460adc155d62 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.deploy.k8s.submit +import java.net.HttpURLConnection.HTTP_GONE import java.util.concurrent.{CountDownLatch, TimeUnit} import scala.collection.JavaConverters._ @@ -28,8 +29,10 @@ import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.util.ThreadUtils + private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { def awaitCompletion(): Unit + def watchOrStop(sId: String): Boolean } /** @@ -45,6 +48,8 @@ private[k8s] class LoggingPodStatusWatcherImpl( maybeLoggingInterval: Option[Long]) extends LoggingPodStatusWatcher with Logging { + private var resourceTooOldReceived: Boolean = false + private var podCompleted = false private val podCompletedFuture = new CountDownLatch(1) // start timer for periodic logging private val scheduler = @@ -79,7 +84,12 @@ private[k8s] class LoggingPodStatusWatcherImpl( override def onClose(e: KubernetesClientException): Unit = { logDebug(s"Stopping watching application $appId with last-observed phase $phase") - closeWatch() + if (e != null && e.getCode==HTTP_GONE) { + resourceTooOldReceived = true + logDebug(s"Got HTTP Gone code, resource version changed in k8s api: $e") + } else { + closeWatch() + } } private def logShortStatus() = { @@ -97,6 +107,7 @@ private[k8s] class LoggingPodStatusWatcherImpl( private def closeWatch(): Unit = { podCompletedFuture.countDown() scheduler.shutdown() + podCompleted = true } private def formatPodState(pod: Pod): String = { @@ -177,4 +188,27 @@ private[k8s] class LoggingPodStatusWatcherImpl( private def formatTime(time: String): String = { if (time != null || time != "") time else "N/A" } + + override def watchOrStop(sId: String): Boolean = if (hasCompleted()) { + logInfo(s"Waiting for application ${appId} with submission ID $sId to finish...") + val interval = maybeLoggingInterval + synchronized { + while (!podCompleted && !resourceTooOldReceived) { + wait(interval.get) + logInfo(s"Application status for $appId (phase: $phase)") + } + } + + if(podCompleted) { + logInfo( + pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" } + .getOrElse("No containers were found in the driver pod.")) + logInfo(s"Application ${appId} with submission ID $sId finished") + } + podCompleted + } else { + logInfo(s"Deployed Spark application ${appId} with submission ID $sId into Kubernetes") + // Always act like the application has completed since we don't want to wait for app completion + true + } }