Skip to content

Conversation

@jkleckner
Copy link

@jkleckner jkleckner commented Aug 20, 2020

What changes were proposed in this pull request?

This patch processes the HTTP Gone event and restarts the pod watcher.

Why are the changes needed?

This is a backport of PR #28423 to branch-2.4.

The reasons are explained in SPARK-24266 that spark jobs using the k8s resource scheduler may hang.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

This was rolled into an image that is deployed into our test cluster.

We have not seen the hangs since enabling this patch.

…ged from k8s

Backport of SPARK-24266 to branch 2.4

In collaboration with Mike Royle.
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@jkleckner jkleckner changed the base branch from master to branch-2.4 August 20, 2020 19:52
@jkleckner
Copy link
Author

Mistakenly forgot to redirect this at branch-2.4 now fixed.

@jkleckner
Copy link
Author

@liyinan926 FYI

@jkleckner
Copy link
Author

Please retest this.

@jkleckner
Copy link
Author

FWIW, we have yet to see a hang for our Hourly job.

@dongjoon-hyun
Copy link
Member

Thank you for making a backporting PR, @jkleckner .
Could you make a backporting PR to branch-3.0 first?
To prevent any regression from 2.4 to 3.0, we have a policy for backporting sequence. We cannot backport this to branch-2.4 directly.

BTW, cc @holdenk since she is the committer of the original PR.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-24266][k8s] Back port spark 28423 to 2.4 to restart watcher [SPARK-24266][k8s][2.4] Back port spark 28423 to 2.4 to restart watcher Aug 24, 2020
@jkleckner
Copy link
Author

jkleckner commented Aug 24, 2020

@dongjoon-hyun Ok, thanks. I'll close the request.

@jkleckner jkleckner closed this Aug 24, 2020
@jkleckner
Copy link
Author

@dongjoon-hyun After talking with Mike, we aren't sure how or whether to proceed since this is an attempt to backport #28423 which is already merged to master. What would be the steps?

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Aug 24, 2020

You can make another backporting PR against on branch-3.0 while keeping this PR independently.

After that PR is merging, we can revisit here.

@jkleckner
Copy link
Author

My blind spot while reading was that I thought it had already been merged into 3.0 and didn't realize it was only on master, sorry.

@jkleckner
Copy link
Author

Ok, I created PR #29533 for branch 3.0.

@SQUIDwarrior
Copy link

We have also run into problems due to this issue. Since it is not clear, is this fix going to be part of the next 2.4.X release?

@dongjoon-hyun
Copy link
Member

You are on the wrong thread. Currently, we are working on #29533 . Please comment on there to support @jkleckner . After that PR, we may restart to discuss on 2.4.x.

@jkleckner
Copy link
Author

jkleckner commented Aug 28, 2020

@SQUIDwarrior FYI, I have an image with the patch here [1] if you want to try it out (and of course you can build it for yourself). It was built from this tag [2].

In fact, it would be good to get independent confirmation that this patch works in your environment.

[1] registry.gitlab.com/jkleckner/spark/spark:v2.4.7-cphy1
[2] https://gitlab.com/jkleckner/spark/-/tags/v2.4.7-cphy1

@jkleckner jkleckner changed the title [SPARK-24266][k8s][2.4] Back port spark 28423 to 2.4 to restart watcher [SPARK-24266][K8S][2.4] Restart the watcher when we receive a version changed from k8s Aug 31, 2020
@shockdm
Copy link

shockdm commented Sep 1, 2020

@jkleckner Hi Jim, tried your fix and ran into a strange issue that makes the spark-submit quit immediately, with driver proceeding:

java.net.SocketException: Socket closed
	at java.net.SocketInputStream.socketRead0(Native Method)
	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
	at java.net.SocketInputStream.read(SocketInputStream.java:171)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
	at sun.security.ssl.InputRecord.read(InputRecord.java:503)
	at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:990)
	at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:948)
	at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
	at okio.Okio$2.read(Okio.java:140)
	at okio.AsyncTimeout$2.read(AsyncTimeout.java:237)
	at okio.RealBufferedSource.request(RealBufferedSource.java:68)
	at okio.RealBufferedSource.require(RealBufferedSource.java:61)
	at okio.RealBufferedSource.readByte(RealBufferedSource.java:74)
	at okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:117)
	at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:101)
	at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
	at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
	at okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
	at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Applying patch on top of 2.4.6.

Edit:

It seems that the current code in the PR is set to never wait on the application completion, specifically:

} else {
    logInfo(s"Deployed Spark application ${appId} with submission ID $sId into Kubernetes")
    // Always act like the application has completed since we don't want to wait for app completion
    true
  }

when

if (hasCompleted()) {

So if the application hasn't completed - this will result in the terminate, since the current code just quits and does not wait. Not sure if this is the intention... But in the current master, instead of simply checking hasCompleted, the check is

if (conf.get(WAIT_FOR_APP_COMPLETION)) {

Not sure if change is intentional or not, we likely want to check for that flag, unless its not present in 2.4.6 stream. But anyways, the error I've posted above is caused by not waiting for the app to complete.

@jkleckner
Copy link
Author

@shockdm This is an attempt to backport the master branch fix which has that behavior [1].
Also this works fine in our environment for a while now.
As mentioned above [2], we are using further than v2.4.6 [3] since we wanted to bring in as many fixes as possible.

We launch using the spark-operator-on-k8s [4] mechanism with this version [5] and have not tried any other launch method.
So it is quite possible that there are issues with directly launching - how do you launch?

[1] https://github.com/apache/spark/blob/master/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala#L114
[2] #29496 (comment)
[3] https://gitlab.com/jkleckner/spark/-/tags/v2.4.7-cphy1
[4] https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
[5] https://gitlab.com/jkleckner/spark-on-k8s-operator/-/commit/f30dd1535220813315eea282a3d868aeb14cacb3

@shockdm
Copy link

shockdm commented Sep 1, 2020

@jkleckner We are launching directly via a spark-submit invocation with the scheduler url set to kube api. That must be the difference. We are not using https://github.com/GoogleCloudPlatform/spark-on-k8s-operator.

@jkleckner
Copy link
Author

jkleckner commented Sep 1, 2020

@shockdm Post back on the thread here if you determine that this is the cause. Very likely would apply to the master branch as well.

If you have an example you would like me to run with something like spark-pi with specific arguments, I would be happy to run that in my environment.

if (time != null || time != "") time else "N/A"
}

override def watchOrStop(sId: String): Boolean = if (hasCompleted()) {
Copy link

@shockdm shockdm Sep 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jkleckner

This line is different in the master branch. It checks whether the client was set to wait for the completion of the driver, not simply checking hasCompleted(), which should normally return false - causing spark-submit to terminate immediately.

For 2.4.7 port you may want to add additional parameter to the LoggingPodStatusWatcherImpl, specifically waitForCompletion: Boolean, that can be checked in this if statement. In current master they are using KubernetesDriverConf instead of passing parameters separately:

private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf)
.

However in 2.4.6, KubernetesDriverConf is not yet introduced, and instead the appId and maybeLoggingInterval are passed individually. Simple solution is to add waitForCompletion as a parameter, and use it in the above if statement.

This has worked well for me.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shockdm Yes, you are right. And it is great that you have it working!
We're happy to make the change but would you mind sharing the code so as to be consistent with what you already have?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shockdm ping

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jkleckner sorry, spent some time doing test runs to make sure it fixed our use case. I'll share the final code shortly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shockdm ping

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jkleckner Thank you for the reminder, and sorry for the delay! Check out shockdm#1

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your patches look like an improvement, thank you @shockdm.
There are enough differences that perhaps you might be willing to submit them?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the original comment by @dongjoon-hyun:

Thank you for making a backporting PR, @jkleckner .
Could you make a backporting PR to branch-3.0 first?
To prevent any regression from 2.4 to 3.0, we have a policy for backporting sequence. We cannot backport this to branch-2.4 directly.

What would be the correct approach to submit this backport atm?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun Is the merging or the submitting of a PR to branch-3.0 a pre-requisite to submitting this PR or an improved PR?

Note that the implementations inherently diverge due to the divergence of the base branches.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shockdm The 3.0 patch #29533 has now been merged. Even though we never got an answer whether that was a prerequisite, that blockage is now gone.

Would you be willing to rebase and clean up your patch for submission?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants