-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29568][SS] Stop existing running streams when a new stream is launched #26225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #112529 has finished for PR 26225 at commit
|
| val queryManager = activeOption.getOrElse(this) | ||
| logInfo(s"Stopping existing streaming query [id=${query.id}], as a new run is being " + | ||
| "started.") | ||
| queryManager.get(query.id).stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the existing stream is a "zombie", can it happen that it does not respond to stop() and then this will block forever?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great question. I can add some safeguards against this, but in most cases we mean that the stream is a "zombie", because we lost all references to it, not because it is uninterruptable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be fine. If stop returns, the query should already be stopped, because stop waits until the streaming thread dies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, this has a deadlock. We are waiting for stopping query inside a lock which the query needs to remove itself from the active queries.
|
Test build #112530 has finished for PR 26225 at commit
|
|
retest this please |
|
Test build #112590 has finished for PR 26225 at commit
|
| sparkSession.sessionState.conf.getConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM) | ||
| if (streamAlreadyActive && turnOffOldStream) { | ||
| val queryManager = activeOption.getOrElse(this) | ||
| logInfo(s"Stopping existing streaming query [id=${query.id}], as a new run is being " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: whether giving a warning is better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, make this a warning, and add the previous runId and new runId to make it easier to debug.
|
Retest this please. |
| "older stream's SparkSession may not be possible, and the stream may have turned into a " + | ||
| "zombie stream. When this flag is true, we will stop the old stream to start the new one.") | ||
| .booleanConf | ||
| .createWithDefault(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we have false by default to avoid the behavior changes?
cc @gatorsmile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great question. Here's my argument why we should change it:
- This change is going into Spark 3.0, a release where we can actually break existing behavior (unless it is critical behavior which people depend on)
- The existing behavior was that any new start of a stream would fail, because an existing stream was already running. This is programming error on the user's part.
- However, there are legitimate cases, where a user would like to restart a new instance of the stream (because they upgrade the code for instance), but they have no way of stopping the existing stream, because it turns into a zombie.
I would argue that 3 is more common than 2, and including 1, this is where we can change behavior and mention in release notes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great question. Here's my argument why we should change it:
- This change is going into Spark 3.0, a release where we can actually break existing behavior (unless it is critical behavior which people depend on)
- The existing behavior was that any new start of a stream would fail, because an existing stream was already running. This is programming error on the user's part.
- However, there are legitimate cases, where a user would like to restart a new instance of the stream (because they upgrade the code for instance), but they have no way of stopping the existing stream, because it turns into a zombie.
I would argue that 3 is more common than 2, and including 1, this is where we can change behavior and mention in release notes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for the release notes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think the docs can be better. here are confusing parts.
- it seems that this will work only when the stream is restarted in a different session. but is it s
- the term stream is confusing here. does it refer to a streaming query, a query run? We should try to be clear by same starting a "streaming query" instead of a "stream" in the explanation, and depending on what is consistent with other confs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 one the name now. I like it.
| .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2") | ||
| .createWithDefault(2) | ||
|
|
||
| val STOP_RUNNING_DUPLICATE_STREAM = buildConf("spark.sql.streaming.stopExistingDuplicateStream") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stopExistingDuplicateStream -> stopExistingDuplicatedStream?
|
Test build #112621 has finished for PR 26225 at commit
|
|
retest this please |
|
Test build #112847 has finished for PR 26225 at commit
|
|
Seems like it affects testing time considerably. |
|
retest this please |
|
Test build #112874 has finished for PR 26225 at commit
|
|
retest this please |
+1 on this |
|
Test build #112900 has finished for PR 26225 at commit
|
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One minor comment about the error message. Otherwise looks good to me.
| "Cannot start query with id ${query.id} as another query with same id is " + | ||
| "already active. Perhaps you are attempting to restart a query from checkpoint " + | ||
| "that is already active. You may stop the old query by setting the SQL " + | ||
| s"""configuration: spark.conf.set("${SQLConf.STOP_RUNNING_DUPLICATE_STREAM}", true).""") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SQLConf.STOP_RUNNING_DUPLICATE_STREAM => SQLConf.STOP_RUNNING_DUPLICATE_STREAM.key
And
You may stop the old query by setting the SQL ... and retry.
|
There was a deadlock causing tests to fail. cc @zsxwing @tdas @dongjoon-hyun addressed your comments. Can you ptal? |
|
Test build #113349 has finished for PR 26225 at commit
|
|
Test build #113351 has finished for PR 26225 at commit
|
|
Test build #113394 has finished for PR 26225 at commit
|
|
Test build #113404 has finished for PR 26225 at commit
|
|
retest this please |
|
Test build #113468 has finished for PR 26225 at commit
|
|
Thank you for updating, @brkyvz ! |
| .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2") | ||
| .createWithDefault(2) | ||
|
|
||
| val STOP_RUNNING_DUPLICATE_STREAM = buildConf("spark.sql.streaming.stopExistingDuplicatedStream") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
STOP_RUNNING_DUPLICATE_STREAM -> STOP_RUNNING_DUPLICATED_STREAM?
| // The following code block checks if a stream with the same name or id is running. Then it | ||
| // returns an Option of an already active stream to stop outside of the lock | ||
| // to avoid a deadlock. | ||
| val activeDuplicateQuery = activeQueriesLock.synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
activeDuplicateQuery -> activeDuplicatedQuery?
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
Outdated
Show resolved
Hide resolved
|
There's a deadlock if you stop it there. It's mentioned in the comment above
…On Sat, Nov 9, 2019, 5:43 PM Dongjoon Hyun ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
<#26225 (comment)>:
> @@ -353,17 +356,44 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
}
// Make sure no other query with same id is active across all sessions
- val activeOption =
- Option(sparkSession.sharedState.activeStreamingQueries.putIfAbsent(query.id, this))
- if (activeOption.isDefined || activeQueries.values.exists(_.id == query.id)) {
+ val activeOption = Option(sparkSession.sharedState.activeStreamingQueries.get(query.id))
+ .orElse(activeQueries.get(query.id))
+
+ val turnOffOldStream =
+ sparkSession.sessionState.conf.getConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM)
+ if (activeOption.isDefined && turnOffOldStream) {
+ val oldQuery = activeOption.get
+ logWarning(s"Stopping existing streaming query [id=${query.id}, runId=${oldQuery.runId}]," +
+ " as a new run is being started.")
+ Some(oldQuery)
Then, we don't need val activeDuplicateQuery = declararion.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#26225?email_source=notifications&email_token=ABIAE6Z24SHPQ5JR7SV4O6TQS5RNJA5CNFSM4JD6YV42YY3PNVWWK3TUL52HS4DFWFIHK3DMKJSXC5LFON2FEZLWNFSXPKTDN5WW2ZLOORPWSZGOCLADEHQ#discussion_r344468786>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ABIAE64Q7YQRTIBESZUIUPDQS5RNJANCNFSM4JD6YV4Q>
.
|
|
Oops. I overlooked the above comment. Thanks, @brkyvz ! |
tdas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost LGTM. I am mostly grumbling about the names.
| throw new IllegalArgumentException( | ||
| s"Cannot start query with name $name as a query with that name is already active") | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: probably update this to "is already active in this SparkSession." to be clearer
| // The following code block checks if a stream with the same name or id is running. Then it | ||
| // returns an Option of an already active stream to stop outside of the lock | ||
| // to avoid a deadlock. | ||
| val activeDuplicateQuery = activeQueriesLock.synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe rename this to activeQuerySharedLock to indicate this is shared across sessions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or directly use sharedState.activeQueryLock
| val activeOption = Option(sparkSession.sharedState.activeStreamingQueries.get(query.id)) | ||
| .orElse(activeQueries.get(query.id)) | ||
|
|
||
| val turnOffOldStream = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TuneOff?? make it same as the conf.
| "older stream's SparkSession may not be possible, and the stream may have turned into a " + | ||
| "zombie stream. When this flag is true, we will stop the old stream to start the new one.") | ||
| .booleanConf | ||
| .createWithDefault(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think the docs can be better. here are confusing parts.
- it seems that this will work only when the stream is restarted in a different session. but is it s
- the term stream is confusing here. does it refer to a streaming query, a query run? We should try to be clear by same starting a "streaming query" instead of a "stream" in the explanation, and depending on what is consistent with other confs.
| .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2") | ||
| .createWithDefault(2) | ||
|
|
||
| val STOP_RUNNING_DUPLICATE_STREAM = buildConf("spark.sql.streaming.stopExistingDuplicatedStream") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make the conf CAPS name consistent with the actual string.
| Option(sparkSession.sharedState.activeStreamingQueries.putIfAbsent(query.id, this)) | ||
| if (activeOption.isDefined || activeQueries.values.exists(_.id == query.id)) { | ||
| val activeOption = Option(sparkSession.sharedState.activeStreamingQueries.get(query.id)) | ||
| .orElse(activeQueries.get(query.id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to check both? can it be in the shared state and but not in the local one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
paranoia
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
Show resolved
Hide resolved
| } | ||
| assert(e.getMessage.contains("same id")) | ||
| } finally { | ||
| query1.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stop all active streams
| assert(!query1.isActive, | ||
| "First query should have stopped before starting the second query") | ||
| } finally { | ||
| query2.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stop all active streams
| assert(!query1.isActive, | ||
| "First query should have stopped before starting the second query") | ||
| } finally { | ||
| query2.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stop all active streams
|
Test build #113650 has finished for PR 26225 at commit
|
| query.id, query.streamingQuery) // we need to put the StreamExecution, not the wrapper | ||
| if (oldActiveQuery != null) { | ||
| throw new ConcurrentModificationException( | ||
| "Another instance of this query was just started by a concurrent session.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the correct error message when stopActiveRunOnRestart is false.
If the active run was stopped, then this error message is correct.
If the active run was not stopped, then this error will be thrown and therefore should simply say that there is an active run (run id ...).
In other words, this can stay as the same message as it was in Spark 2.4,.... may be improved by adding the run id.
| activeRunOpt.foreach(_.stop()) | ||
|
|
||
| activeQueriesSharedLock.synchronized { | ||
| // We still can have a race condition when two concurrent instances try to start the same |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This comment is true only if the active run was stopped. So qualify the comment accordingly.
| } | ||
| } | ||
|
|
||
| activeRunOpt.foreach(_.stop()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Please document here that stop() will automatically clear the activeStreamingQueries. Without this implicit easy-to-miss information, it is hard to reason about this code.
|
The implementation looks good, but please fix the error message before merging. |
|
If stopActiveRunOnRestart is false, this piece of code is not even
executed. Another error is thrown earlier.
…On Tue, Nov 12, 2019, 6:04 PM Tathagata Das ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
<#26225 (comment)>:
>
+ activeRunOpt.foreach(_.stop())
+
+ activeQueriesSharedLock.synchronized {
+ // We still can have a race condition when two concurrent instances try to start the same
+ // stream, while a third one was already active. In this case, we throw a
+ // ConcurrentModificationException.
+ val oldActiveQuery = sparkSession.sharedState.activeStreamingQueries.put(
+ query.id, query.streamingQuery) // we need to put the StreamExecution, not the wrapper
+ if (oldActiveQuery != null) {
+ throw new ConcurrentModificationException(
+ "Another instance of this query was just started by a concurrent session.")
This is not the correct error message when stopActiveRunOnRestart is
false.
If the active run was stopped, then this error message is correct.
If the active run was not stopped, then this error will be thrown and
therefore should simply say that there is an active run (run id ...).
In other words, this can stay as the same message as it was in Spark
2.4,.... may be improved by adding the run id.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#26225?email_source=notifications&email_token=ABIAE67GGGFBE62ES54DPNDQTNODTA5CNFSM4JD6YV42YY3PNVWWK3TUL52HS4DFWFIHK3DMKJSXC5LFON2FEZLWNFSXPKTDN5WW2ZLOORPWSZGOCLKS3FA#pullrequestreview-315960724>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ABIAE65ERNLXRWXYE3U7HK3QTNODTANCNFSM4JD6YV4Q>
.
|
|
Aah. I was confused. LGTM then. |
|
Update the description with the new conf name. |
|
Test build #113657 has finished for PR 26225 at commit
|
|
Test build #113667 has finished for PR 26225 at commit
|
|
Thanks! Merging to master |
What changes were proposed in this pull request?
This PR adds a SQL Conf:
spark.sql.streaming.stopActiveRunOnRestart. When this conf istrue(by default it is), an already running stream will be stopped, if a new copy gets launched on the same checkpoint location.Why are the changes needed?
In multi-tenant environments where you have multiple SparkSessions, you can accidentally start multiple copies of the same stream (i.e. streams using the same checkpoint location). This will cause all new instantiations of the new stream to fail. However, sometimes you may want to turn off the old stream, as the old stream may have turned into a zombie (you no longer have access to the query handle or SparkSession).
It would be nice to have a SQL flag that allows the stopping of the old stream for such zombie cases.
Does this PR introduce any user-facing change?
Yes. Now by default, if you launch a new copy of an already running stream on a multi-tenant cluster, the existing stream will be stopped.
How was this patch tested?
Unit tests in StreamingQueryManagerSuite