Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
package org.apache.spark.deploy.k8s.submit

import java.io.StringWriter
import java.net.HttpURLConnection.HTTP_GONE
import java.util.{Collections, UUID}
import java.util.Properties

import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch}
import scala.collection.mutable
import scala.util.control.NonFatal
import util.control.Breaks._

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkApplication
Expand Down Expand Up @@ -133,29 +135,33 @@ private[spark] class Client(
.endVolume()
.endSpec()
.build()
Utils.tryWithResource(
kubernetesClient
.pods()
.withName(resolvedDriverPod.getMetadata.getName)
.watch(watcher)) { _ =>
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
try {
val otherKubernetesResources =
resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
throw e
}

if (waitForAppCompletion) {
logInfo(s"Waiting for application $appName to finish...")
watcher.awaitCompletion()
logInfo(s"Application $appName finished.")
} else {
logInfo(s"Deployed Spark application $appName into Kubernetes.")
val driverPodName = resolvedDriverPod.getMetadata.getName
var watch: Watch = null
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
try {
val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
throw e
}
val sId = Seq(kubernetesConf.namespace(), driverPodName).mkString(":")
breakable {
while (true) {
try {
watch = kubernetesClient
.pods()
.withName(driverPodName)
.watch(watcher)
watcher.watchOrStop(sId)
break
} catch {
case e: KubernetesClientException if e.getCode == HTTP_GONE =>
logInfo("Resource version changed rerunning the watcher")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.deploy.k8s.submit

import java.net.HttpURLConnection.HTTP_GONE
import java.util.concurrent.{CountDownLatch, TimeUnit}

import scala.collection.JavaConverters._
Expand All @@ -28,8 +29,10 @@ import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.util.ThreadUtils


private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
def awaitCompletion(): Unit
def watchOrStop(sId: String): Boolean
}

/**
Expand All @@ -45,6 +48,8 @@ private[k8s] class LoggingPodStatusWatcherImpl(
maybeLoggingInterval: Option[Long])
extends LoggingPodStatusWatcher with Logging {

private var resourceTooOldReceived: Boolean = false
private var podCompleted = false
private val podCompletedFuture = new CountDownLatch(1)
// start timer for periodic logging
private val scheduler =
Expand Down Expand Up @@ -79,7 +84,12 @@ private[k8s] class LoggingPodStatusWatcherImpl(

override def onClose(e: KubernetesClientException): Unit = {
logDebug(s"Stopping watching application $appId with last-observed phase $phase")
closeWatch()
if (e != null && e.getCode==HTTP_GONE) {
resourceTooOldReceived = true
logDebug(s"Got HTTP Gone code, resource version changed in k8s api: $e")
} else {
closeWatch()
}
}

private def logShortStatus() = {
Expand All @@ -97,6 +107,7 @@ private[k8s] class LoggingPodStatusWatcherImpl(
private def closeWatch(): Unit = {
podCompletedFuture.countDown()
scheduler.shutdown()
podCompleted = true
}

private def formatPodState(pod: Pod): String = {
Expand Down Expand Up @@ -177,4 +188,27 @@ private[k8s] class LoggingPodStatusWatcherImpl(
private def formatTime(time: String): String = {
if (time != null || time != "") time else "N/A"
}

override def watchOrStop(sId: String): Boolean = if (hasCompleted()) {
Copy link

@shockdm shockdm Sep 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jkleckner

This line is different in the master branch. It checks whether the client was set to wait for the completion of the driver, not simply checking hasCompleted(), which should normally return false - causing spark-submit to terminate immediately.

For 2.4.7 port you may want to add additional parameter to the LoggingPodStatusWatcherImpl, specifically waitForCompletion: Boolean, that can be checked in this if statement. In current master they are using KubernetesDriverConf instead of passing parameters separately:

private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
.

However in 2.4.6, KubernetesDriverConf is not yet introduced, and instead the appId and maybeLoggingInterval are passed individually. Simple solution is to add waitForCompletion as a parameter, and use it in the above if statement.

This has worked well for me.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shockdm Yes, you are right. And it is great that you have it working!
We're happy to make the change but would you mind sharing the code so as to be consistent with what you already have?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shockdm ping

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jkleckner sorry, spent some time doing test runs to make sure it fixed our use case. I'll share the final code shortly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shockdm ping

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jkleckner Thank you for the reminder, and sorry for the delay! Check out shockdm#1

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your patches look like an improvement, thank you @shockdm.
There are enough differences that perhaps you might be willing to submit them?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the original comment by @dongjoon-hyun:

Thank you for making a backporting PR, @jkleckner .
Could you make a backporting PR to branch-3.0 first?
To prevent any regression from 2.4 to 3.0, we have a policy for backporting sequence. We cannot backport this to branch-2.4 directly.

What would be the correct approach to submit this backport atm?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun Is the merging or the submitting of a PR to branch-3.0 a pre-requisite to submitting this PR or an improved PR?

Note that the implementations inherently diverge due to the divergence of the base branches.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shockdm The 3.0 patch #29533 has now been merged. Even though we never got an answer whether that was a prerequisite, that blockage is now gone.

Would you be willing to rebase and clean up your patch for submission?

logInfo(s"Waiting for application ${appId} with submission ID $sId to finish...")
val interval = maybeLoggingInterval
synchronized {
while (!podCompleted && !resourceTooOldReceived) {
wait(interval.get)
logInfo(s"Application status for $appId (phase: $phase)")
}
}

if(podCompleted) {
logInfo(
pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" }
.getOrElse("No containers were found in the driver pod."))
logInfo(s"Application ${appId} with submission ID $sId finished")
}
podCompleted
} else {
logInfo(s"Deployed Spark application ${appId} with submission ID $sId into Kubernetes")
// Always act like the application has completed since we don't want to wait for app completion
true
}
}