Skip to content

Commit d99ff20

Browse files
stijndehaesdongjoon-hyun
authored andcommitted
[SPARK-24266][K8S][3.0] Restart the watcher when we receive a version changed from k8s
### What changes were proposed in this pull request? This is a straight application of #28423 onto branch-3.0 Restart the watcher when it failed with a HTTP_GONE code from the kubernetes api. Which means a resource version has changed. For more relevant information see here: fabric8io/kubernetes-client#1075 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This was tested in #28423 by running spark-submit to a k8s cluster. Closes #29533 from jkleckner/backport-SPARK-24266-to-branch-3.0. Authored-by: Stijn De Haes <stijndehaes@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
1 parent 71ef48e commit d99ff20

File tree

3 files changed

+61
-27
lines changed

3 files changed

+61
-27
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import java.util.{Collections, UUID}
2121
import java.util.Properties
2222

2323
import io.fabric8.kubernetes.api.model._
24-
import io.fabric8.kubernetes.client.KubernetesClient
24+
import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
25+
import io.fabric8.kubernetes.client.Watcher.Action
2526
import scala.collection.mutable
27+
import scala.util.control.Breaks._
2628
import scala.util.control.NonFatal
2729

2830
import org.apache.spark.SparkConf
@@ -122,25 +124,37 @@ private[spark] class Client(
122124
.endSpec()
123125
.build()
124126
val driverPodName = resolvedDriverPod.getMetadata.getName
125-
Utils.tryWithResource(
126-
kubernetesClient
127-
.pods()
128-
.withName(driverPodName)
129-
.watch(watcher)) { _ =>
130-
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
131-
try {
132-
val otherKubernetesResources =
133-
resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
134-
addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
135-
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
136-
} catch {
137-
case NonFatal(e) =>
138-
kubernetesClient.pods().delete(createdDriverPod)
139-
throw e
140-
}
141127

142-
val sId = Seq(conf.namespace, driverPodName).mkString(":")
143-
watcher.watchOrStop(sId)
128+
var watch: Watch = null
129+
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
130+
try {
131+
val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
132+
addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
133+
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
134+
} catch {
135+
case NonFatal(e) =>
136+
kubernetesClient.pods().delete(createdDriverPod)
137+
throw e
138+
}
139+
val sId = Seq(conf.namespace, driverPodName).mkString(":")
140+
breakable {
141+
while (true) {
142+
val podWithName = kubernetesClient
143+
.pods()
144+
.withName(driverPodName)
145+
// Reset resource to old before we start the watch, this is important for race conditions
146+
watcher.reset()
147+
watch = podWithName.watch(watcher)
148+
149+
// Send the latest pod state we know to the watcher to make sure we didn't miss anything
150+
watcher.eventReceived(Action.MODIFIED, podWithName.get())
151+
152+
// Break the while loop if the pod is completed or we don't want to wait
153+
if(watcher.watchOrStop(sId)) {
154+
watch.close()
155+
break
156+
}
157+
}
144158
}
145159
}
146160

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,16 @@ package org.apache.spark.deploy.k8s.submit
1919
import io.fabric8.kubernetes.api.model.Pod
2020
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
2121
import io.fabric8.kubernetes.client.Watcher.Action
22+
import java.net.HttpURLConnection.HTTP_GONE
2223

2324
import org.apache.spark.deploy.k8s.Config._
2425
import org.apache.spark.deploy.k8s.KubernetesDriverConf
2526
import org.apache.spark.deploy.k8s.KubernetesUtils._
2627
import org.apache.spark.internal.Logging
2728

2829
private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
29-
def watchOrStop(submissionId: String): Unit
30+
def watchOrStop(submissionId: String): Boolean
31+
def reset(): Unit
3032
}
3133

3234
/**
@@ -42,10 +44,16 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
4244

4345
private var podCompleted = false
4446

47+
private var resourceTooOldReceived = false
48+
4549
private var pod = Option.empty[Pod]
4650

4751
private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown")
4852

53+
override def reset(): Unit = {
54+
resourceTooOldReceived = false
55+
}
56+
4957
override def eventReceived(action: Action, pod: Pod): Unit = {
5058
this.pod = Option(pod)
5159
action match {
@@ -62,7 +70,12 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
6270

6371
override def onClose(e: KubernetesClientException): Unit = {
6472
logDebug(s"Stopping watching application $appId with last-observed phase $phase")
65-
closeWatch()
73+
if(e != null && e.getCode == HTTP_GONE) {
74+
resourceTooOldReceived = true
75+
logDebug(s"Got HTTP Gone code, resource version changed in k8s api: $e")
76+
} else {
77+
closeWatch()
78+
}
6679
}
6780

6881
private def logLongStatus(): Unit = {
@@ -78,20 +91,26 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
7891
this.notifyAll()
7992
}
8093

81-
override def watchOrStop(sId: String): Unit = if (conf.get(WAIT_FOR_APP_COMPLETION)) {
94+
override def watchOrStop(sId: String): Boolean = if (conf.get(WAIT_FOR_APP_COMPLETION)) {
8295
logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...")
8396
val interval = conf.get(REPORT_INTERVAL)
8497
synchronized {
85-
while (!podCompleted) {
98+
while (!podCompleted && !resourceTooOldReceived) {
8699
wait(interval)
87100
logInfo(s"Application status for $appId (phase: $phase)")
88101
}
89102
}
90-
logInfo(
91-
pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" }
92-
.getOrElse("No containers were found in the driver pod."))
93-
logInfo(s"Application ${conf.appName} with submission ID $sId finished")
103+
104+
if(podCompleted) {
105+
logInfo(
106+
pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" }
107+
.getOrElse("No containers were found in the driver pod."))
108+
logInfo(s"Application ${conf.appName} with submission ID $sId finished")
109+
}
110+
podCompleted
94111
} else {
95112
logInfo(s"Deployed Spark application ${conf.appName} with submission ID $sId into Kubernetes")
113+
// Always act like the application has completed since we don't want to wait for app completion
114+
true
96115
}
97116
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
136136
createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata])
137137
when(podOperations.create(FULL_EXPECTED_POD)).thenReturn(POD_WITH_OWNER_REFERENCE)
138138
when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch])
139+
when(loggingPodStatusWatcher.watchOrStop(kconf.namespace + ":" + POD_NAME)).thenReturn(true)
139140
doReturn(resourceList)
140141
.when(kubernetesClient)
141142
.resourceList(createdResourcesArgumentCaptor.capture())

0 commit comments

Comments
 (0)