-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23491][SS] Remove explicit job cancellation from ContinuousExecution reconfiguring #20622
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @mgaido91 - this should completely resolve the other symptom you posted in SPARK-23416 |
|
thanks for pinging me @jose-torres! Unfortunately I don't know yet structured streaming codebase well enough to give a feedback. Thanks anyway for looking at it! |
|
Test build #87486 has finished for PR 20622 at commit
|
|
StreamingOuterJoinSuite failure is a known flakiness issue. |
|
retest this please |
|
@zsxwing pointed out that the original behavior was more subtly wrong than I expected. What we want to do is cancel the Spark job, and then cleanly restart it from the last checkpoint. But in fact, this was not working, since cancelling a Spark job throws an opaque SparkException which we didn't anticipate. The reason things seemed to work was that the interrupt() call would almost always (but was not guaranteed to) interrupt the job cancellation, thus preventing the SparkException. So I've updated the PR to anticipate that SparkException, and filed SPARK-23444 to ask for a better handle for job cancellations. Note that the continuous processing reconfiguration tests will always deterministically fail if they don't properly catch this exception. So the checking logic isn't really fragile despite being weird, and I think it's a significant upgrade over the flakiness. |
|
Test build #87491 has finished for PR 20622 at commit
|
|
Test build #87495 has finished for PR 20622 at commit
|
|
Test build #87499 has finished for PR 20622 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add comments on when can this exception happen and what this code segment is doing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
Test build #87517 has finished for PR 20622 at commit
|
|
@jose-torres I had a long offline chat with @zsxwing, kudos to him for catching a corner case in the current solution. The following sequence of events may occur.
Fundamentally, its not a great setup that one thread is starting the jobs and another thread is canceling them. Because of the async nature, we have no way reasoning which attempt wins, starting or cancelling. Rather let's make sure that we start and cancel in the same thread (then we can do some reasoning). Here is an alternate solution.
There is less likely to be race conditions that end up not canceling Spark job as a single thread (the query thread) is responsible for all Spark state management. |
|
The alternate solution doesn't solve the problem SPARK-23441 sets out to address. Using a thread interrupt to kill the stream appears to be a fundamentally fragile solution, requiring us to maintain a whitelist of exceptions we think Spark execution might surface in response to an interrupt. If you don't think there's a simple way to remove this interrupt currently, I can close this PR and write a new one that simply addresses the test flakiness using the alternate solution. |
I think the original issue is it interrupts the thread and also cancels the Spark job. Then |
|
The issue that caused the test flakiness is as you say. But I had understood that interrupting is itself problematic, requiring us to maintain an awkward whitelist of possible exceptions in |
|
Unfortunately, we havent figured out any good to avoid that for MicroBatchExecution till now. The streaming thread is doing a whole lot of different things, and interrupting is the only reliable way to stop it immediately. And since different pieces of code can react to interrupts differently, it can finally manifest a small set of interrupt-related exceptions. This whitelist of exceptions have been sufficient for a while now for MicroBatchExecution, so I dont think this will grow much more. I am convinced that ContinuousExecution has the same set of problems (thread needs to be interrupted from whatever it is doing) and therefore needs to be solved in a similar way. The only difference is that besides stopping, there is an additional way to interrupt the currently active query (i.e. while reconfiguring). And we need to catch the same set of exceptions as stop, except the expected state will be RECONFIGURING instead of TERMINATED. So we can reuse the method |
|
The difference in ContinuousExecution is that the thread isn't doing any metadata work like looking for new batches - it's either running the Spark job or cleaning up after finishing it. In any case, this sounds like something we aren't plausibly going to solve here, so I'll update the PR to just resolve the flakiness in the way you suggest. |
3b56232 to
0e5e52f
Compare
| case t: Throwable | ||
| if StreamExecution.isInterruptionException(t) && state.get() == RECONFIGURING => | ||
| // interrupted by reconfiguration - swallow exception so we can restart the query | ||
| } finally { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add cancel job here.
|
Test build #87620 has finished for PR 20622 at commit
|
|
Test build #4128 has finished for PR 20622 at commit
|
|
Test build #4135 has finished for PR 20622 at commit
|
|
Test build #87623 has finished for PR 20622 at commit
|
|
Test build #4127 has finished for PR 20622 at commit
|
|
Test build #4130 has finished for PR 20622 at commit
|
|
Test build #4133 has finished for PR 20622 at commit
|
|
Test build #4131 has finished for PR 20622 at commit
|
|
Test build #4132 has finished for PR 20622 at commit
|
|
Test build #4134 has finished for PR 20622 at commit
|
|
Test build #4129 has finished for PR 20622 at commit
|
| case t: Throwable | ||
| if StreamExecution.isInterruptionException(t) && state.get() == RECONFIGURING => | ||
| stopSources() | ||
| sparkSession.sparkContext.cancelJobGroup(runId.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its cleaner to put this in the finally since this is the invariant we want when this entire method terminates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we only swallow the exception when we are reconfiguration (btw always add logging when swallowing exceptions to leave a trail for debugging), and stopSources() and cancelJobGroups() can be finally as we want that as invariant no matter what happens in this runContinuous method.
|
LGTM, assuming tests pass. |
|
Test build #87636 has finished for PR 20622 at commit
|
|
Test build #87637 has finished for PR 20622 at commit
|
…cution reconfiguring ## What changes were proposed in this pull request? Remove queryExecutionThread.interrupt() from ContinuousExecution. As detailed in the JIRA, interrupting the thread is only relevant in the microbatch case; for continuous processing the query execution can quickly clean itself up without. ## How was this patch tested? existing tests Author: Jose Torres <jose@databricks.com> Closes #20622 from jose-torres/SPARK-23441.
What changes were proposed in this pull request?
Remove queryExecutionThread.interrupt() from ContinuousExecution. As detailed in the JIRA, interrupting the thread is only relevant in the microbatch case; for continuous processing the query execution can quickly clean itself up without.
How was this patch tested?
existing tests