-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s #30283
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s #30283
Conversation
|
@shockdm Please check this over. @dongjoon-hyun Maybe we can get this into 2.4 ? My force-push prevented the old PR from being reopened it seems. |
|
ok to test |
|
Test build #130737 has finished for PR 30283 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
An image of just the spark build was created with this job [1]. We are using this image starting today in our dev cluster. [1] https://gitlab.com/jkleckner/spark/-/pipelines/212962140 |
|
This version of Spark has been running fine since the last 3 days in our cluster. |
| private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { | ||
| def awaitCompletion(): Unit | ||
| def reset(): Unit | ||
| def watchOrStop(sId: String): Boolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you preserve the order from the original patches (master/branch-3.0)?
def watchOrStop(submissionId: String): Boolean
def reset(): UnitThe function declaration order is different. The parameter name is not the same here (sId != submissionId)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
...ernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
Show resolved
Hide resolved
| override def onClose(e: KubernetesClientException): Unit = { | ||
| logDebug(s"Stopping watching application $appId with last-observed phase $phase") | ||
| closeWatch() | ||
| logInfo(s"Stopping watching application $appId with last-observed phase $phase") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a regression because this is previous logDebug. (Of course, this is different from master branch, too).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| logInfo(s"Stopping watching application $appId with last-observed phase $phase") | ||
| if (e != null && e.getCode==HTTP_GONE) { | ||
| resourceTooOldReceived = true | ||
| logInfo(s"Got HTTP Gone code, resource version changed in k8s api: $e") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| resourceTooOldReceived = true | ||
| logInfo(s"Got HTTP Gone code, resource version changed in k8s api: $e") | ||
| } else { | ||
| logInfo(s"Got proper termination code, closing watcher.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| s"Container final statuses:\n\n${containersDescription(p)}" | ||
| }.getOrElse("No containers were found in the driver pod.")) | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This removal looks like a part of independency PR instead of the part of SPARK-24266. Could you tell us why this is required and where this came from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shockdm Could you chime in on this one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun It does look like it originates from SPARK-28947 02c5b4f which eliminated the future and was a rename.
Since this is a private trait, the logic should be completely self-contained and safe to remove.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jkleckner thank you for following up, that is correct. Sorry for the late response :(
| } | ||
|
|
||
| override def watchOrStop(sId: String): Boolean = if (waitForCompletion) { | ||
| logInfo(s"Patched Sept 8th: Waiting for application" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Patched Sept 8th:?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| synchronized { | ||
| while (!podCompleted && !resourceTooOldReceived) { | ||
| wait(interval.get) | ||
| logDebug(s"Application status for $appId (phase: $phase)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be logInfo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata]) | ||
| when(podOperations.create(FULL_EXPECTED_POD)).thenReturn(POD_WITH_OWNER_REFERENCE) | ||
| when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch]) | ||
| when(loggingPodStatusWatcher.watchOrStop("default" + ":" + POD_NAME)).thenReturn(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, kubernetesConf.namespace() instead of "default"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @jkleckner .
This will be an official Apache Spark patch. Could you clean up something like logInfo(s"Patched Sept 8th: and make it consistent across Apache Spark branches?
404bfa4 to
5960d24
Compare
… changed from k8s This is a backport of apache#29533 from master. It includes the shockdm/pull/1 which has been squashed and the import review comment include. It has also been rebased to branch-2.4 Address review comments.
5960d24 to
86f8ee8
Compare
|
@dongjoon-hyun Sorry for doing force-push. I got used to gitlab which keeps track of history better. |
|
Test build #130823 has finished for PR 30283 at commit
|
|
Test build #130825 has finished for PR 30283 at commit
|
|
Retest this please |
|
Test build #130946 has finished for PR 30283 at commit
|
|
Retest this please. |
| val podWithName = kubernetesClient | ||
| .pods() | ||
| .withName(driverPodName) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add the following comments like the other branches?
// Reset resource to old before we start the watch, this is important for race conditions
| .withName(driverPodName) | ||
|
|
||
| watcher.reset() | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this empty line like the other branch.
| podCompleted | ||
| } else { | ||
| logInfo(s"Deployed Spark application ${appId} with submission ID $sId into Kubernetes") | ||
| logInfo(s"It seems we end up here, because we never want to wait for completion...") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remote this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
...ernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
Show resolved
Hide resolved
| KUBERNETES_RESOURCE_PREFIX) | ||
| submissionClient.run() | ||
| verify(loggingPodStatusWatcher).awaitCompletion() | ||
| verify(loggingPodStatusWatcher).watchOrStop("default:driver") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default -> kubernetesConf.namespace()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for updating, @jkleckner . I left some comments to match this to the other branches.
|
Test build #131105 has finished for PR 30283 at commit
|
| } | ||
|
|
||
| override def watchOrStop(sId: String): Boolean = if (waitForCompletion) { | ||
| logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, it's a compilation error.
[error] /home/jenkins/workspace/SparkPullRequestBuilder@2/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala:195: object appName is not a member of package conf
[error] logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...")
[error] ^
[error] one error foundThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you confirm that this is the way you want it?
| override def onClose(e: KubernetesClientException): Unit = { | ||
| logDebug(s"Stopping watching application $appId with last-observed phase $phase") | ||
| closeWatch() | ||
| if (e != null && e.getCode==HTTP_GONE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.getCode == HTTP_GONE?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed this one somehow. Updated now.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For LoggingPodStatusWatcherImpl, I missed that it doesn't have conf which is different from the other branch class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf). Sorry about that. Could you fix the compilation error?
|
@dongjoon-hyun I made the changes. No force-push this time. Sorry to take so long but I did get very very busy. |
|
Test build #131385 has finished for PR 30283 at commit
|
b4cc870 to
1c64c6c
Compare
|
Test build #131388 has finished for PR 30283 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Kubernetes integration test status success |
| val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval) | ||
| val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, | ||
| loggingInterval, | ||
| waitForAppCompletion) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert this change. This is inconsistent with Apache Spark 3.1 and 3.0.
| appId: String, | ||
| maybeLoggingInterval: Option[Long]) | ||
| maybeLoggingInterval: Option[Long], | ||
| waitForCompletion: Boolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert this change. This is inconsistent with Apache Spark 3.1 and 3.0.
… changed from k8s ### What changes were proposed in this pull request? This patch processes the HTTP Gone event and restarts the pod watcher. ### Why are the changes needed? This is a backport of PR #28423 to branch-2.4. The reasons are explained in SPARK-24266 that spark jobs using the k8s resource scheduler may hang. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Manually. Closes #30283 from jkleckner/shockdm-2.4.6-spark-submit-fix. Lead-authored-by: Jim Kleckner <jim@cloudphysics.com> Co-authored-by: Dmitriy Drinfeld <dmitriy.drinfeld@ibm.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, @jkleckner and all.
+1, LGTM. Merged to branch-2.4 for Apache Spark 2.4.8.
|
Thank you, @dongjoon-hyun ! |
What changes were proposed in this pull request?
This patch processes the HTTP Gone event and restarts the pod watcher.
Why are the changes needed?
This is a backport of PR #28423 to branch-2.4.
The reasons are explained in SPARK-24266 that spark jobs using the k8s resource scheduler may hang.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Manually.