Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,15 @@ object SQLConf {
.checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2")
.createWithDefault(2)

val STREAMING_STOP_ACTIVE_RUN_ON_RESTART =
buildConf("spark.sql.streaming.stopActiveRunOnRestart")
.doc("Running multiple runs of the same streaming query concurrently is not supported. " +
"If we find a concurrent active run for a streaming query (in the same or different " +
"SparkSessions on the same cluster) and this flag is true, we will stop the old streaming " +
"query run to start the new one.")
.booleanConf
.createWithDefault(true)
Copy link
Member

@dongjoon-hyun dongjoon-hyun Oct 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we have false by default to avoid the behavior changes?
cc @gatorsmile

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question. Here's my argument why we should change it:

  1. This change is going into Spark 3.0, a release where we can actually break existing behavior (unless it is critical behavior which people depend on)
  2. The existing behavior was that any new start of a stream would fail, because an existing stream was already running. This is programming error on the user's part.
  3. However, there are legitimate cases, where a user would like to restart a new instance of the stream (because they upgrade the code for instance), but they have no way of stopping the existing stream, because it turns into a zombie.

I would argue that 3 is more common than 2, and including 1, this is where we can change behavior and mention in release notes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question. Here's my argument why we should change it:

  1. This change is going into Spark 3.0, a release where we can actually break existing behavior (unless it is critical behavior which people depend on)
  2. The existing behavior was that any new start of a stream would fail, because an existing stream was already running. This is programming error on the user's part.
  3. However, there are legitimate cases, where a user would like to restart a new instance of the stream (because they upgrade the code for instance), but they have no way of stopping the existing stream, because it turns into a zombie.

I would argue that 3 is more common than 2, and including 1, this is where we can change behavior and mention in release notes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the release notes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think the docs can be better. here are confusing parts.

  • it seems that this will work only when the stream is restarted in a different session. but is it s
  • the term stream is confusing here. does it refer to a streaming query, a query run? We should try to be clear by same starting a "streaming query" instead of a "stream" in the explanation, and depending on what is consistent with other confs.

Copy link
Contributor

@tdas tdas Nov 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 one the name now. I like it.


val STREAMING_JOIN_STATE_FORMAT_VERSION =
buildConf("spark.sql.streaming.join.stateFormatVersion")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.internal
import java.net.URL
import java.util.{Locale, UUID}
import java.util.concurrent.ConcurrentHashMap
import javax.annotation.concurrent.GuardedBy

import scala.reflect.ClassTag
import scala.util.control.NonFatal
Expand All @@ -32,9 +33,10 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab}
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.streaming.StreamingQueryManager
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.status.ElementTrackingStore
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -112,11 +114,15 @@ private[sql] class SharedState(
*/
val cacheManager: CacheManager = new CacheManager

/** A global lock for all streaming query lifecycle tracking and management. */
private[sql] val activeQueriesLock = new Object

/**
* A map of active streaming queries to the session specific StreamingQueryManager that manages
* the lifecycle of that stream.
*/
private[sql] val activeStreamingQueries = new ConcurrentHashMap[UUID, StreamingQueryManager]()
@GuardedBy("activeQueriesLock")
private[sql] val activeStreamingQueries = new ConcurrentHashMap[UUID, StreamExecution]()

/**
* A status store to query SQL status/metrics of this Spark application, based on SQL-specific
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.streaming

import java.util.UUID
import java.util.{ConcurrentModificationException, UUID}
import java.util.concurrent.TimeUnit
import javax.annotation.concurrent.GuardedBy

Expand All @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS
import org.apache.spark.util.{Clock, SystemClock, Utils}
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}

/**
* A class to manage all the [[StreamingQuery]] active in a `SparkSession`.
Expand All @@ -51,9 +51,10 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
private val listenerBus = new StreamingQueryListenerBus(sparkSession.sparkContext.listenerBus)

@GuardedBy("activeQueriesLock")
@GuardedBy("activeQueriesSharedLock")
private val activeQueries = new mutable.HashMap[UUID, StreamingQuery]
private val activeQueriesLock = new Object
// A global lock to keep track of active streaming queries across Spark sessions
private val activeQueriesSharedLock = sparkSession.sharedState.activeQueriesLock
private val awaitTerminationLock = new Object

@GuardedBy("awaitTerminationLock")
Expand All @@ -77,7 +78,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
*
* @since 2.0.0
*/
def active: Array[StreamingQuery] = activeQueriesLock.synchronized {
def active: Array[StreamingQuery] = activeQueriesSharedLock.synchronized {
activeQueries.values.toArray
}

Expand All @@ -86,7 +87,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
*
* @since 2.1.0
*/
def get(id: UUID): StreamingQuery = activeQueriesLock.synchronized {
def get(id: UUID): StreamingQuery = activeQueriesSharedLock.synchronized {
activeQueries.get(id).orNull
}

Expand Down Expand Up @@ -343,27 +344,61 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
trigger,
triggerClock)

activeQueriesLock.synchronized {
// The following code block checks if a stream with the same name or id is running. Then it
// returns an Option of an already active stream to stop outside of the lock
// to avoid a deadlock.
val activeRunOpt = activeQueriesSharedLock.synchronized {
// Make sure no other query with same name is active
userSpecifiedName.foreach { name =>
if (activeQueries.values.exists(_.name == name)) {
throw new IllegalArgumentException(
s"Cannot start query with name $name as a query with that name is already active")
throw new IllegalArgumentException(s"Cannot start query with name $name as a query " +
s"with that name is already active in this SparkSession")
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: probably update this to "is already active in this SparkSession." to be clearer


// Make sure no other query with same id is active across all sessions
val activeOption =
Option(sparkSession.sharedState.activeStreamingQueries.putIfAbsent(query.id, this))
if (activeOption.isDefined || activeQueries.values.exists(_.id == query.id)) {
throw new IllegalStateException(
s"Cannot start query with id ${query.id} as another query with same id is " +
s"already active. Perhaps you are attempting to restart a query from checkpoint " +
s"that is already active.")
val activeOption = Option(sparkSession.sharedState.activeStreamingQueries.get(query.id))
.orElse(activeQueries.get(query.id)) // shouldn't be needed but paranoia ...

val shouldStopActiveRun =
sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART)
if (activeOption.isDefined) {
if (shouldStopActiveRun) {
val oldQuery = activeOption.get
logWarning(s"Stopping existing streaming query [id=${query.id}, " +
s"runId=${oldQuery.runId}], as a new run is being started.")
Some(oldQuery)
} else {
throw new IllegalStateException(
s"Cannot start query with id ${query.id} as another query with same id is " +
s"already active. Perhaps you are attempting to restart a query from checkpoint " +
s"that is already active. You may stop the old query by setting the SQL " +
"configuration: " +
s"""spark.conf.set("${SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key}", true) """ +
"and retry.")
}
} else {
// nothing to stop so, no-op
None
}
}

// stop() will clear the queryId from activeStreamingQueries as well as activeQueries
activeRunOpt.foreach(_.stop())
Copy link
Contributor

@tdas tdas Nov 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Please document here that stop() will automatically clear the activeStreamingQueries. Without this implicit easy-to-miss information, it is hard to reason about this code.


activeQueriesSharedLock.synchronized {
// We still can have a race condition when two concurrent instances try to start the same
Copy link
Contributor

@tdas tdas Nov 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This comment is true only if the active run was stopped. So qualify the comment accordingly.

// stream, while a third one was already active and stopped above. In this case, we throw a
// ConcurrentModificationException.
val oldActiveQuery = sparkSession.sharedState.activeStreamingQueries.put(
query.id, query.streamingQuery) // we need to put the StreamExecution, not the wrapper
if (oldActiveQuery != null) {
throw new ConcurrentModificationException(
"Another instance of this query was just started by a concurrent session.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the correct error message when stopActiveRunOnRestart is false.
If the active run was stopped, then this error message is correct.
If the active run was not stopped, then this error will be thrown and therefore should simply say that there is an active run (run id ...).

In other words, this can stay as the same message as it was in Spark 2.4,.... may be improved by adding the run id.

}
activeQueries.put(query.id, query)
}

try {
// When starting a query, it will call `StreamingQueryListener.onQueryStarted` synchronously.
// As it's provided by the user and can run arbitrary codes, we must not hold any lock here.
Expand All @@ -372,15 +407,15 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
query.streamingQuery.start()
} catch {
case e: Throwable =>
unregisterTerminatedStream(query.id)
unregisterTerminatedStream(query)
throw e
}
query
}

/** Notify (by the StreamingQuery) that the query has been terminated */
private[sql] def notifyQueryTermination(terminatedQuery: StreamingQuery): Unit = {
unregisterTerminatedStream(terminatedQuery.id)
unregisterTerminatedStream(terminatedQuery)
awaitTerminationLock.synchronized {
if (lastTerminatedQuery == null || terminatedQuery.exception.nonEmpty) {
lastTerminatedQuery = terminatedQuery
Expand All @@ -390,11 +425,12 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
stateStoreCoordinator.deactivateInstances(terminatedQuery.runId)
}

private def unregisterTerminatedStream(terminatedQueryId: UUID): Unit = {
activeQueriesLock.synchronized {
// remove from shared state only if the streaming query manager also matches
sparkSession.sharedState.activeStreamingQueries.remove(terminatedQueryId, this)
activeQueries -= terminatedQueryId
private def unregisterTerminatedStream(terminatedQuery: StreamingQuery): Unit = {
activeQueriesSharedLock.synchronized {
// remove from shared state only if the streaming execution also matches
sparkSession.sharedState.activeStreamingQueries.remove(
terminatedQuery.id, terminatedQuery)
activeQueries -= terminatedQuery.id
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.{Dataset, Encoders}
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.BlockingSource
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -274,48 +275,119 @@ class StreamingQueryManagerSuite extends StreamTest {
}

testQuietly("can't start multiple instances of the same streaming query in the same session") {
withTempDir { dir =>
val (ms1, ds1) = makeDataset
val (ms2, ds2) = makeDataset
val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
val dataLocation = new File(dir, "data").getCanonicalPath

val query1 = ds1.writeStream.format("parquet")
.option("checkpointLocation", chkLocation).start(dataLocation)
ms1.addData(1, 2, 3)
try {
val e = intercept[IllegalStateException] {
ds2.writeStream.format("parquet")
withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "false") {
withTempDir { dir =>
val (ms1, ds1) = makeDataset
val (ms2, ds2) = makeDataset
val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
val dataLocation = new File(dir, "data").getCanonicalPath

val query1 = ds1.writeStream.format("parquet")
.option("checkpointLocation", chkLocation).start(dataLocation)
ms1.addData(1, 2, 3)
try {
val e = intercept[IllegalStateException] {
ds2.writeStream.format("parquet")
.option("checkpointLocation", chkLocation).start(dataLocation)
}
assert(e.getMessage.contains("same id"))
} finally {
spark.streams.active.foreach(_.stop())
}
}
}
}

testQuietly("new instance of the same streaming query stops old query in the same session") {
failAfter(90 seconds) {
withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "true") {
withTempDir { dir =>
val (ms1, ds1) = makeDataset
val (ms2, ds2) = makeDataset
val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
val dataLocation = new File(dir, "data").getCanonicalPath

val query1 = ds1.writeStream.format("parquet")
.option("checkpointLocation", chkLocation).start(dataLocation)
ms1.addData(1, 2, 3)
val query2 = ds2.writeStream.format("parquet")
.option("checkpointLocation", chkLocation).start(dataLocation)
try {
ms2.addData(1, 2, 3)
query2.processAllAvailable()
assert(spark.sharedState.activeStreamingQueries.get(query2.id) ===
query2.asInstanceOf[StreamingQueryWrapper].streamingQuery,
"The correct streaming query is not being tracked in global state")

assert(!query1.isActive,
"First query should have stopped before starting the second query")
} finally {
spark.streams.active.foreach(_.stop())
}
}
assert(e.getMessage.contains("same id"))
} finally {
query1.stop()
}
}
}

testQuietly(
"can't start multiple instances of the same streaming query in the different sessions") {
withTempDir { dir =>
val session2 = spark.cloneSession()

val ms1 = MemoryStream(Encoders.INT, spark.sqlContext)
val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS()
val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
val dataLocation = new File(dir, "data").getCanonicalPath
withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "false") {
withTempDir { dir =>
val session2 = spark.cloneSession()

val ms1 = MemoryStream(Encoders.INT, spark.sqlContext)
val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS()
val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
val dataLocation = new File(dir, "data").getCanonicalPath

val query1 = ms1.toDS().writeStream.format("parquet")
.option("checkpointLocation", chkLocation).start(dataLocation)
ms1.addData(1, 2, 3)
try {
val e = intercept[IllegalStateException] {
ds2.writeStream.format("parquet")
.option("checkpointLocation", chkLocation).start(dataLocation)
}
assert(e.getMessage.contains("same id"))
} finally {
spark.streams.active.foreach(_.stop())
session2.streams.active.foreach(_.stop())
}
}
}
}

val query1 = ms1.toDS().writeStream.format("parquet")
.option("checkpointLocation", chkLocation).start(dataLocation)
ms1.addData(1, 2, 3)
try {
val e = intercept[IllegalStateException] {
ds2.writeStream.format("parquet")
testQuietly(
"new instance of the same streaming query stops old query in a different session") {
failAfter(90 seconds) {
withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "true") {
withTempDir { dir =>
val session2 = spark.cloneSession()

val ms1 = MemoryStream(Encoders.INT, spark.sqlContext)
val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS()
val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
val dataLocation = new File(dir, "data").getCanonicalPath

val query1 = ms1.toDS().writeStream.format("parquet")
.option("checkpointLocation", chkLocation).start(dataLocation)
ms1.addData(1, 2, 3)
val query2 = ds2.writeStream.format("parquet")
.option("checkpointLocation", chkLocation).start(dataLocation)
try {
ms1.addData(1, 2, 3)
query2.processAllAvailable()
assert(spark.sharedState.activeStreamingQueries.get(query2.id) ===
query2.asInstanceOf[StreamingQueryWrapper].streamingQuery,
"The correct streaming execution is not being tracked in global state")

assert(!query1.isActive,
"First query should have stopped before starting the second query")
} finally {
spark.streams.active.foreach(_.stop())
session2.streams.active.foreach(_.stop())
}
}
assert(e.getMessage.contains("same id"))
} finally {
query1.stop()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
assert(q3.runId !== q4.runId)

// Only one query with same id can be active
val q5 = startQuery(restart = false)
val e = intercept[IllegalStateException] {
startQuery(restart = true)
withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "false") {
val q5 = startQuery(restart = false)
val e = intercept[IllegalStateException] {
startQuery(restart = true)
}
}
}
}
Expand Down