From 1d4167fce6d19a043aff96dcaf24a1c6c8ca99a9 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 23 Oct 2019 11:40:05 +0200 Subject: [PATCH 01/10] Stop existing running streams when a new stream is launched --- .../apache/spark/sql/internal/SQLConf.scala | 8 ++ .../sql/streaming/StreamingQueryManager.scala | 15 ++- .../StreamingQueryManagerSuite.scala | 121 +++++++++++++----- 3 files changed, 109 insertions(+), 35 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 75db52e334b8..5789997883a6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1087,6 +1087,14 @@ object SQLConf { .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2") .createWithDefault(2) + val STOP_RUNNING_DUPLICATE_STREAM = buildConf("spark.sql.streaming.stopExistingDuplicateStream") + .doc("Running two streams using the same checkpoint location concurrently is not supported. " + + "In the case where multiple streams are started on different SparkSessions, access to the " + + "older stream's SparkSession may not be possible, and the stream may have turned into a " + + "zombie stream. When this flag is true, we will stop the old stream to start the new one.") + .booleanConf + .createWithDefault(true) + val UNSUPPORTED_OPERATION_CHECK_ENABLED = buildConf("spark.sql.streaming.unsupportedOperationCheck") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 9b43a83e7b94..c5e47d22af78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -355,11 +355,22 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo // Make sure no other query with same id is active across all sessions val activeOption = Option(sparkSession.sharedState.activeStreamingQueries.putIfAbsent(query.id, this)) - if (activeOption.isDefined || activeQueries.values.exists(_.id == query.id)) { + + val streamAlreadyActive = + activeOption.isDefined || activeQueries.values.exists(_.id == query.id) + val turnOffOldStream = + sparkSession.sessionState.conf.getConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM) + if (streamAlreadyActive && turnOffOldStream) { + val queryManager = activeOption.getOrElse(this) + logInfo(s"Stopping existing streaming query [id=${query.id}], as a new run is being " + + "started.") + queryManager.get(query.id).stop() + } else if (streamAlreadyActive) { throw new IllegalStateException( s"Cannot start query with id ${query.id} as another query with same id is " + s"already active. Perhaps you are attempting to restart a query from checkpoint " + - s"that is already active.") + s"that is already active. You may stop the old query by setting the SQL " + + s"configuration: spark.conf.set(\"${SQLConf.STOP_RUNNING_DUPLICATE_STREAM}\", true).") } activeQueries.put(query.id, query) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index 09580b94056b..a1fed0171263 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql.{Dataset, Encoders} import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.BlockingSource import org.apache.spark.util.Utils @@ -274,48 +275,102 @@ class StreamingQueryManagerSuite extends StreamTest { } testQuietly("can't start multiple instances of the same streaming query in the same session") { - withTempDir { dir => - val (ms1, ds1) = makeDataset - val (ms2, ds2) = makeDataset - val chkLocation = new File(dir, "_checkpoint").getCanonicalPath - val dataLocation = new File(dir, "data").getCanonicalPath - - val query1 = ds1.writeStream.format("parquet") - .option("checkpointLocation", chkLocation).start(dataLocation) - ms1.addData(1, 2, 3) - try { - val e = intercept[IllegalStateException] { - ds2.writeStream.format("parquet") - .option("checkpointLocation", chkLocation).start(dataLocation) + withSQLConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM.key -> "false") { + withTempDir { dir => + val (ms1, ds1) = makeDataset + val (ms2, ds2) = makeDataset + val chkLocation = new File(dir, "_checkpoint").getCanonicalPath + val dataLocation = new File(dir, "data").getCanonicalPath + + val query1 = ds1.writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + ms1.addData(1, 2, 3) + try { + val e = intercept[IllegalStateException] { + ds2.writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + } + assert(e.getMessage.contains("same id")) + } finally { + query1.stop() } - assert(e.getMessage.contains("same id")) - } finally { - query1.stop() + } + } + } + + testQuietly("new instance of the same streaming query stops old query in the same session") { + withSQLConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM.key -> "true") { + withTempDir { dir => + val (ms1, ds1) = makeDataset + val (ms2, ds2) = makeDataset + val chkLocation = new File(dir, "_checkpoint").getCanonicalPath + val dataLocation = new File(dir, "data").getCanonicalPath + + val query1 = ds1.writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + ms1.addData(1, 2, 3) + val query2 = ds2.writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + ms2.addData(1, 2, 3) + query2.processAllAvailable() + + assert(!query1.isActive, "First query should have stopped before starting the second query") } } } testQuietly( "can't start multiple instances of the same streaming query in the different sessions") { - withTempDir { dir => - val session2 = spark.cloneSession() - - val ms1 = MemoryStream(Encoders.INT, spark.sqlContext) - val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS() - val chkLocation = new File(dir, "_checkpoint").getCanonicalPath - val dataLocation = new File(dir, "data").getCanonicalPath + withSQLConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM.key -> "false") { + withTempDir { dir => + val session2 = spark.cloneSession() + + val ms1 = MemoryStream(Encoders.INT, spark.sqlContext) + val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS() + val chkLocation = new File(dir, "_checkpoint").getCanonicalPath + val dataLocation = new File(dir, "data").getCanonicalPath + + val query1 = ms1.toDS().writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + ms1.addData(1, 2, 3) + try { + val e = intercept[IllegalStateException] { + ds2.writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + } + assert(e.getMessage.contains("same id")) + } finally { + query1.stop() + } + } + } + } - val query1 = ms1.toDS().writeStream.format("parquet") - .option("checkpointLocation", chkLocation).start(dataLocation) - ms1.addData(1, 2, 3) - try { - val e = intercept[IllegalStateException] { - ds2.writeStream.format("parquet") - .option("checkpointLocation", chkLocation).start(dataLocation) + testQuietly( + "new instance of the same streaming query stops old query in a different session") { + withSQLConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM.key -> "true") { + withTempDir { dir => + val session2 = spark.cloneSession() + + val ms1 = MemoryStream(Encoders.INT, spark.sqlContext) + val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS() + val chkLocation = new File(dir, "_checkpoint").getCanonicalPath + val dataLocation = new File(dir, "data").getCanonicalPath + + val query1 = ms1.toDS().writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + ms1.addData(1, 2, 3) + val query2 = ds2.writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + try { + ms1.addData(1, 2, 3) + query2.processAllAvailable() + + assert(!query1.isActive, + "First query should have stopped before starting the second query") + } finally { + query2.stop() } - assert(e.getMessage.contains("same id")) - } finally { - query1.stop() } } } From e30ec9a0643086ebffaeb664e13f3ff6e0f08d7e Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 23 Oct 2019 12:04:08 +0200 Subject: [PATCH 02/10] Update StreamingQueryManager.scala --- .../spark/sql/streaming/StreamingQueryManager.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index c5e47d22af78..f805e29d853a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -367,10 +367,10 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo queryManager.get(query.id).stop() } else if (streamAlreadyActive) { throw new IllegalStateException( - s"Cannot start query with id ${query.id} as another query with same id is " + - s"already active. Perhaps you are attempting to restart a query from checkpoint " + - s"that is already active. You may stop the old query by setting the SQL " + - s"configuration: spark.conf.set(\"${SQLConf.STOP_RUNNING_DUPLICATE_STREAM}\", true).") + "Cannot start query with id ${query.id} as another query with same id is " + + "already active. Perhaps you are attempting to restart a query from checkpoint " + + "that is already active. You may stop the old query by setting the SQL " + + s"""configuration: spark.conf.set("${SQLConf.STOP_RUNNING_DUPLICATE_STREAM}", true).""") } activeQueries.put(query.id, query) From 7b6b17cb88bee0c45b6b255e7745d8aceebdeadb Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 6 Nov 2019 17:16:00 -0800 Subject: [PATCH 03/10] save --- .../spark/sql/internal/SharedState.scala | 5 ++ .../sql/streaming/StreamingQueryManager.scala | 40 ++++++--- .../StreamingQueryManagerSuite.scala | 89 +++++++++++-------- 3 files changed, 87 insertions(+), 47 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index d097f9f18f89..eaf07bad505b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.internal import java.net.URL import java.util.{Locale, UUID} import java.util.concurrent.ConcurrentHashMap +import javax.annotation.concurrent.GuardedBy import scala.reflect.ClassTag import scala.util.control.NonFatal @@ -112,10 +113,14 @@ private[sql] class SharedState( */ val cacheManager: CacheManager = new CacheManager + /** A global lock for all streaming query lifecycle tracking and management. */ + private[sql] val activeQueriesLock = new Object + /** * A map of active streaming queries to the session specific StreamingQueryManager that manages * the lifecycle of that stream. */ + @GuardedBy("activeQueriesLock") private[sql] val activeStreamingQueries = new ConcurrentHashMap[UUID, StreamingQueryManager]() /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index c5e47d22af78..3d5b3496c4db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -17,10 +17,10 @@ package org.apache.spark.sql.streaming -import java.util.UUID +import java.util.{ConcurrentModificationException, UUID} import java.util.concurrent.TimeUnit -import javax.annotation.concurrent.GuardedBy +import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ import scala.collection.mutable @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS -import org.apache.spark.util.{Clock, SystemClock, Utils} +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} /** * A class to manage all the [[StreamingQuery]] active in a `SparkSession`. @@ -53,7 +53,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo @GuardedBy("activeQueriesLock") private val activeQueries = new mutable.HashMap[UUID, StreamingQuery] - private val activeQueriesLock = new Object + private val activeQueriesLock = sparkSession.sharedState.activeQueriesLock private val awaitTerminationLock = new Object @GuardedBy("awaitTerminationLock") @@ -343,7 +343,9 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo trigger, triggerClock) - activeQueriesLock.synchronized { + // The following code block checks if a stream with the same name or id is running. Then it + // returns a function to stop an active stream outside of the lock to avoid a deadlock. + val stopActiveDuplicateQuery = activeQueriesLock.synchronized { // Make sure no other query with same name is active userSpecifiedName.foreach { name => if (activeQueries.values.exists(_.name == name)) { @@ -354,7 +356,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo // Make sure no other query with same id is active across all sessions val activeOption = - Option(sparkSession.sharedState.activeStreamingQueries.putIfAbsent(query.id, this)) + Option(sparkSession.sharedState.activeStreamingQueries.get(query.id)) val streamAlreadyActive = activeOption.isDefined || activeQueries.values.exists(_.id == query.id) @@ -362,19 +364,37 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo sparkSession.sessionState.conf.getConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM) if (streamAlreadyActive && turnOffOldStream) { val queryManager = activeOption.getOrElse(this) - logInfo(s"Stopping existing streaming query [id=${query.id}], as a new run is being " + - "started.") - queryManager.get(query.id).stop() + val oldQuery = queryManager.get(query.id) + logWarning(s"Stopping existing streaming query [id=${query.id}, runId=${oldQuery.runId}]," + + " as a new run is being started.") + () => oldQuery.stop() } else if (streamAlreadyActive) { throw new IllegalStateException( s"Cannot start query with id ${query.id} as another query with same id is " + s"already active. Perhaps you are attempting to restart a query from checkpoint " + s"that is already active. You may stop the old query by setting the SQL " + - s"configuration: spark.conf.set(\"${SQLConf.STOP_RUNNING_DUPLICATE_STREAM}\", true).") + s"""configuration: spark.conf.set("${SQLConf.STOP_RUNNING_DUPLICATE_STREAM.key}", """ + + "true) and retry.") + } else { + // nothing to stop so, no-op + () => () } + } + stopActiveDuplicateQuery() + + activeQueriesLock.synchronized { + // We still can have a race condition when two concurrent instances try to start the same + // stream, while a third one was already active. In this case, we throw a + // ConcurrentModificationException. + val oldActiveQuery = sparkSession.sharedState.activeStreamingQueries.put(query.id, this) + if (oldActiveQuery != null) { + throw new ConcurrentModificationException( + "Another instance of this query was just started by a concurrent session.") + } activeQueries.put(query.id, query) } + try { // When starting a query, it will call `StreamingQueryListener.onQueryStarted` synchronously. // As it's provided by the user and can run arbitrary codes, we must not hold any lock here. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index a1fed0171263..c9c488d0a27c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -299,22 +299,32 @@ class StreamingQueryManagerSuite extends StreamTest { } testQuietly("new instance of the same streaming query stops old query in the same session") { - withSQLConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM.key -> "true") { - withTempDir { dir => - val (ms1, ds1) = makeDataset - val (ms2, ds2) = makeDataset - val chkLocation = new File(dir, "_checkpoint").getCanonicalPath - val dataLocation = new File(dir, "data").getCanonicalPath - - val query1 = ds1.writeStream.format("parquet") - .option("checkpointLocation", chkLocation).start(dataLocation) - ms1.addData(1, 2, 3) - val query2 = ds2.writeStream.format("parquet") - .option("checkpointLocation", chkLocation).start(dataLocation) - ms2.addData(1, 2, 3) - query2.processAllAvailable() - - assert(!query1.isActive, "First query should have stopped before starting the second query") + failAfter(90 seconds) { + withSQLConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM.key -> "true") { + withTempDir { dir => + val (ms1, ds1) = makeDataset + val (ms2, ds2) = makeDataset + val chkLocation = new File(dir, "_checkpoint").getCanonicalPath + val dataLocation = new File(dir, "data").getCanonicalPath + + val query1 = ds1.writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + ms1.addData(1, 2, 3) + val query2 = ds2.writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + try { + ms2.addData(1, 2, 3) + query2.processAllAvailable() + assert(spark.sharedState.activeStreamingQueries.get(query2.id) === + spark.sessionState.streamingQueryManager, + "The correct streaming query manager is not being tracked in global state") + + assert(!query1.isActive, + "First query should have stopped before starting the second query") + } finally { + query2.stop() + } + } } } } @@ -348,28 +358,33 @@ class StreamingQueryManagerSuite extends StreamTest { testQuietly( "new instance of the same streaming query stops old query in a different session") { - withSQLConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM.key -> "true") { - withTempDir { dir => - val session2 = spark.cloneSession() - - val ms1 = MemoryStream(Encoders.INT, spark.sqlContext) - val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS() - val chkLocation = new File(dir, "_checkpoint").getCanonicalPath - val dataLocation = new File(dir, "data").getCanonicalPath - - val query1 = ms1.toDS().writeStream.format("parquet") - .option("checkpointLocation", chkLocation).start(dataLocation) - ms1.addData(1, 2, 3) - val query2 = ds2.writeStream.format("parquet") - .option("checkpointLocation", chkLocation).start(dataLocation) - try { + failAfter(90 seconds) { + withSQLConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM.key -> "true") { + withTempDir { dir => + val session2 = spark.cloneSession() + + val ms1 = MemoryStream(Encoders.INT, spark.sqlContext) + val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS() + val chkLocation = new File(dir, "_checkpoint").getCanonicalPath + val dataLocation = new File(dir, "data").getCanonicalPath + + val query1 = ms1.toDS().writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) ms1.addData(1, 2, 3) - query2.processAllAvailable() - - assert(!query1.isActive, - "First query should have stopped before starting the second query") - } finally { - query2.stop() + val query2 = ds2.writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) + try { + ms1.addData(1, 2, 3) + query2.processAllAvailable() + assert(spark.sharedState.activeStreamingQueries.get(query2.id) === + session2.sessionState.streamingQueryManager, + "The correct streaming query manager is not being tracked in global state") + + assert(!query1.isActive, + "First query should have stopped before starting the second query") + } finally { + query2.stop() + } } } } From 30892ba8604f112f862befdd5aef49d0d7ff15df Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 6 Nov 2019 17:36:39 -0800 Subject: [PATCH 04/10] simplify --- .../apache/spark/sql/internal/SharedState.scala | 4 ++-- .../sql/streaming/StreamingQueryManager.scala | 15 ++++++--------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index eaf07bad505b..cbe59a07eeca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab} import org.apache.spark.sql.internal.StaticSQLConf._ -import org.apache.spark.sql.streaming.StreamingQueryManager +import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.status.ElementTrackingStore import org.apache.spark.util.Utils @@ -121,7 +121,7 @@ private[sql] class SharedState( * the lifecycle of that stream. */ @GuardedBy("activeQueriesLock") - private[sql] val activeStreamingQueries = new ConcurrentHashMap[UUID, StreamingQueryManager]() + private[sql] val activeStreamingQueries = new ConcurrentHashMap[UUID, StreamingQuery]() /** * A status store to query SQL status/metrics of this Spark application, based on SQL-specific diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 3d5b3496c4db..8a228165d105 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -355,20 +355,17 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo } // Make sure no other query with same id is active across all sessions - val activeOption = - Option(sparkSession.sharedState.activeStreamingQueries.get(query.id)) + val activeOption = Option(sparkSession.sharedState.activeStreamingQueries.get(query.id)) + .orElse(activeQueries.get(query.id)) - val streamAlreadyActive = - activeOption.isDefined || activeQueries.values.exists(_.id == query.id) val turnOffOldStream = sparkSession.sessionState.conf.getConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM) - if (streamAlreadyActive && turnOffOldStream) { - val queryManager = activeOption.getOrElse(this) - val oldQuery = queryManager.get(query.id) + if (activeOption.isDefined && turnOffOldStream) { + val oldQuery = activeOption.get logWarning(s"Stopping existing streaming query [id=${query.id}, runId=${oldQuery.runId}]," + " as a new run is being started.") () => oldQuery.stop() - } else if (streamAlreadyActive) { + } else if (activeOption.isDefined) { throw new IllegalStateException( s"Cannot start query with id ${query.id} as another query with same id is " + s"already active. Perhaps you are attempting to restart a query from checkpoint " + @@ -387,7 +384,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo // We still can have a race condition when two concurrent instances try to start the same // stream, while a third one was already active. In this case, we throw a // ConcurrentModificationException. - val oldActiveQuery = sparkSession.sharedState.activeStreamingQueries.put(query.id, this) + val oldActiveQuery = sparkSession.sharedState.activeStreamingQueries.put(query.id, query) if (oldActiveQuery != null) { throw new ConcurrentModificationException( "Another instance of this query was just started by a concurrent session.") From d8d4e8f0bc6d16a7aa9f72e00eec178043fbad60 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 6 Nov 2019 17:38:28 -0800 Subject: [PATCH 05/10] fix ss --- .../org/apache/spark/sql/streaming/StreamingQueryManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 8a228165d105..69bc26e85a17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql.streaming import java.util.{ConcurrentModificationException, UUID} import java.util.concurrent.TimeUnit - import javax.annotation.concurrent.GuardedBy + import scala.collection.JavaConverters._ import scala.collection.mutable From dd20574c275d47174bbaeb97f6fa39ad91d3b607 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 6 Nov 2019 17:41:09 -0800 Subject: [PATCH 06/10] fix issue --- .../spark/sql/streaming/StreamingQueryManager.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 69bc26e85a17..7431091b12af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -400,7 +400,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo query.streamingQuery.start() } catch { case e: Throwable => - unregisterTerminatedStream(query.id) + unregisterTerminatedStream(query) throw e } query @@ -408,7 +408,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo /** Notify (by the StreamingQuery) that the query has been terminated */ private[sql] def notifyQueryTermination(terminatedQuery: StreamingQuery): Unit = { - unregisterTerminatedStream(terminatedQuery.id) + unregisterTerminatedStream(terminatedQuery) awaitTerminationLock.synchronized { if (lastTerminatedQuery == null || terminatedQuery.exception.nonEmpty) { lastTerminatedQuery = terminatedQuery @@ -418,11 +418,11 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo stateStoreCoordinator.deactivateInstances(terminatedQuery.runId) } - private def unregisterTerminatedStream(terminatedQueryId: UUID): Unit = { + private def unregisterTerminatedStream(terminatedQuery: StreamingQuery): Unit = { activeQueriesLock.synchronized { // remove from shared state only if the streaming query manager also matches - sparkSession.sharedState.activeStreamingQueries.remove(terminatedQueryId, this) - activeQueries -= terminatedQueryId + sparkSession.sharedState.activeStreamingQueries.remove(terminatedQuery.id, terminatedQuery) + activeQueries -= terminatedQuery.id } } } From d999fb702fd491ba2f4c3e1fe855b399f037d966 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 7 Nov 2019 08:56:04 -0800 Subject: [PATCH 07/10] save --- .../spark/sql/internal/SharedState.scala | 3 ++- .../sql/streaming/StreamingQueryManager.scala | 19 +++++++++++-------- .../StreamingQueryManagerSuite.scala | 8 ++++---- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index cbe59a07eeca..b810bedac471 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -33,6 +33,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.CacheManager +import org.apache.spark.sql.execution.streaming.StreamExecution import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab} import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.streaming.StreamingQuery @@ -121,7 +122,7 @@ private[sql] class SharedState( * the lifecycle of that stream. */ @GuardedBy("activeQueriesLock") - private[sql] val activeStreamingQueries = new ConcurrentHashMap[UUID, StreamingQuery]() + private[sql] val activeStreamingQueries = new ConcurrentHashMap[UUID, StreamExecution]() /** * A status store to query SQL status/metrics of this Spark application, based on SQL-specific diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 7431091b12af..5f3ff1a78f6d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -344,8 +344,9 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo triggerClock) // The following code block checks if a stream with the same name or id is running. Then it - // returns a function to stop an active stream outside of the lock to avoid a deadlock. - val stopActiveDuplicateQuery = activeQueriesLock.synchronized { + // returns an Option of an already active stream to stop outside of the lock + // to avoid a deadlock. + val activeDuplicateQuery = activeQueriesLock.synchronized { // Make sure no other query with same name is active userSpecifiedName.foreach { name => if (activeQueries.values.exists(_.name == name)) { @@ -364,7 +365,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo val oldQuery = activeOption.get logWarning(s"Stopping existing streaming query [id=${query.id}, runId=${oldQuery.runId}]," + " as a new run is being started.") - () => oldQuery.stop() + Some(oldQuery) } else if (activeOption.isDefined) { throw new IllegalStateException( s"Cannot start query with id ${query.id} as another query with same id is " + @@ -374,17 +375,18 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo "true) and retry.") } else { // nothing to stop so, no-op - () => () + None } } - stopActiveDuplicateQuery() + activeDuplicateQuery.foreach(_.stop()) activeQueriesLock.synchronized { // We still can have a race condition when two concurrent instances try to start the same // stream, while a third one was already active. In this case, we throw a // ConcurrentModificationException. - val oldActiveQuery = sparkSession.sharedState.activeStreamingQueries.put(query.id, query) + val oldActiveQuery = sparkSession.sharedState.activeStreamingQueries.put( + query.id, query.streamingQuery) // we need to put the StreamExecution, not the wrapper if (oldActiveQuery != null) { throw new ConcurrentModificationException( "Another instance of this query was just started by a concurrent session.") @@ -420,8 +422,9 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo private def unregisterTerminatedStream(terminatedQuery: StreamingQuery): Unit = { activeQueriesLock.synchronized { - // remove from shared state only if the streaming query manager also matches - sparkSession.sharedState.activeStreamingQueries.remove(terminatedQuery.id, terminatedQuery) + // remove from shared state only if the streaming execution also matches + sparkSession.sharedState.activeStreamingQueries.remove( + terminatedQuery.id, terminatedQuery) activeQueries -= terminatedQuery.id } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index c9c488d0a27c..62b10cd59479 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -316,8 +316,8 @@ class StreamingQueryManagerSuite extends StreamTest { ms2.addData(1, 2, 3) query2.processAllAvailable() assert(spark.sharedState.activeStreamingQueries.get(query2.id) === - spark.sessionState.streamingQueryManager, - "The correct streaming query manager is not being tracked in global state") + query2.asInstanceOf[StreamingQueryWrapper].streamingQuery, + "The correct streaming query is not being tracked in global state") assert(!query1.isActive, "First query should have stopped before starting the second query") @@ -377,8 +377,8 @@ class StreamingQueryManagerSuite extends StreamTest { ms1.addData(1, 2, 3) query2.processAllAvailable() assert(spark.sharedState.activeStreamingQueries.get(query2.id) === - session2.sessionState.streamingQueryManager, - "The correct streaming query manager is not being tracked in global state") + query2.asInstanceOf[StreamingQueryWrapper].streamingQuery, + "The correct streaming execution is not being tracked in global state") assert(!query1.isActive, "First query should have stopped before starting the second query") From 2216fe2bb4a977fd5c38e48e79dd47e61b1aef0a Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 7 Nov 2019 11:59:23 -0800 Subject: [PATCH 08/10] fix existing test --- .../apache/spark/sql/streaming/StreamingQuerySuite.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 760731d26f05..7cda71dad28f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -123,9 +123,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(q3.runId !== q4.runId) // Only one query with same id can be active - val q5 = startQuery(restart = false) - val e = intercept[IllegalStateException] { - startQuery(restart = true) + withSQLConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM.key -> "false") { + val q5 = startQuery(restart = false) + val e = intercept[IllegalStateException] { + startQuery(restart = true) + } } } } From 9fbf56ad141f0912cf8e26f41913a0f5424b36aa Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 12 Nov 2019 13:21:05 -0800 Subject: [PATCH 09/10] Address comments --- .../apache/spark/sql/internal/SQLConf.scala | 11 ++-- .../sql/streaming/StreamingQueryManager.scala | 56 ++++++++++--------- .../StreamingQueryManagerSuite.scala | 18 +++--- .../sql/streaming/StreamingQuerySuite.scala | 2 +- 4 files changed, 47 insertions(+), 40 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 135d29525716..c1f7cad782a6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1087,11 +1087,12 @@ object SQLConf { .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2") .createWithDefault(2) - val STOP_RUNNING_DUPLICATE_STREAM = buildConf("spark.sql.streaming.stopExistingDuplicatedStream") - .doc("Running two streams using the same checkpoint location concurrently is not supported. " + - "In the case where multiple streams are started on different SparkSessions, access to the " + - "older stream's SparkSession may not be possible, and the stream may have turned into a " + - "zombie stream. When this flag is true, we will stop the old stream to start the new one.") + val STREAMING_STOP_ACTIVE_RUN_ON_RESTART = + buildConf("spark.sql.streaming.stopActiveRunOnRestart") + .doc("Running multiple runs of the same streaming query concurrently is not supported. " + + "If we find a concurrent active run for a streaming query (in the same or different " + + "SparkSessions on the same cluster) and this flag is true, we will stop the old streaming " + + "query run to start the new one.") .booleanConf .createWithDefault(true) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 5f3ff1a78f6d..3ee07ab436e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -51,9 +51,10 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env) private val listenerBus = new StreamingQueryListenerBus(sparkSession.sparkContext.listenerBus) - @GuardedBy("activeQueriesLock") + @GuardedBy("activeQueriesSharedLock") private val activeQueries = new mutable.HashMap[UUID, StreamingQuery] - private val activeQueriesLock = sparkSession.sharedState.activeQueriesLock + // A global lock to keep track of active streaming queries across Spark sessions + private val activeQueriesSharedLock = sparkSession.sharedState.activeQueriesLock private val awaitTerminationLock = new Object @GuardedBy("awaitTerminationLock") @@ -77,7 +78,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo * * @since 2.0.0 */ - def active: Array[StreamingQuery] = activeQueriesLock.synchronized { + def active: Array[StreamingQuery] = activeQueriesSharedLock.synchronized { activeQueries.values.toArray } @@ -86,7 +87,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo * * @since 2.1.0 */ - def get(id: UUID): StreamingQuery = activeQueriesLock.synchronized { + def get(id: UUID): StreamingQuery = activeQueriesSharedLock.synchronized { activeQueries.get(id).orNull } @@ -346,42 +347,45 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo // The following code block checks if a stream with the same name or id is running. Then it // returns an Option of an already active stream to stop outside of the lock // to avoid a deadlock. - val activeDuplicateQuery = activeQueriesLock.synchronized { + val activeRunOpt = activeQueriesSharedLock.synchronized { // Make sure no other query with same name is active userSpecifiedName.foreach { name => if (activeQueries.values.exists(_.name == name)) { - throw new IllegalArgumentException( - s"Cannot start query with name $name as a query with that name is already active") + throw new IllegalArgumentException(s"Cannot start query with name $name as a query " + + s"with that name is already active in this SparkSession") } } // Make sure no other query with same id is active across all sessions val activeOption = Option(sparkSession.sharedState.activeStreamingQueries.get(query.id)) - .orElse(activeQueries.get(query.id)) - - val turnOffOldStream = - sparkSession.sessionState.conf.getConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM) - if (activeOption.isDefined && turnOffOldStream) { - val oldQuery = activeOption.get - logWarning(s"Stopping existing streaming query [id=${query.id}, runId=${oldQuery.runId}]," + - " as a new run is being started.") - Some(oldQuery) - } else if (activeOption.isDefined) { - throw new IllegalStateException( - s"Cannot start query with id ${query.id} as another query with same id is " + - s"already active. Perhaps you are attempting to restart a query from checkpoint " + - s"that is already active. You may stop the old query by setting the SQL " + - s"""configuration: spark.conf.set("${SQLConf.STOP_RUNNING_DUPLICATE_STREAM.key}", """ + - "true) and retry.") + .orElse(activeQueries.get(query.id)) // shouldn't be needed but paranoia ... + + val shouldStopActiveRun = + sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART) + if (activeOption.isDefined) { + if (shouldStopActiveRun) { + val oldQuery = activeOption.get + logWarning(s"Stopping existing streaming query [id=${query.id}, " + + s"runId=${oldQuery.runId}], as a new run is being started.") + Some(oldQuery) + } else { + throw new IllegalStateException( + s"Cannot start query with id ${query.id} as another query with same id is " + + s"already active. Perhaps you are attempting to restart a query from checkpoint " + + s"that is already active. You may stop the old query by setting the SQL " + + "configuration: " + + s"""spark.conf.set("${SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key}", true) """ + + "and retry.") + } } else { // nothing to stop so, no-op None } } - activeDuplicateQuery.foreach(_.stop()) + activeRunOpt.foreach(_.stop()) - activeQueriesLock.synchronized { + activeQueriesSharedLock.synchronized { // We still can have a race condition when two concurrent instances try to start the same // stream, while a third one was already active. In this case, we throw a // ConcurrentModificationException. @@ -421,7 +425,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo } private def unregisterTerminatedStream(terminatedQuery: StreamingQuery): Unit = { - activeQueriesLock.synchronized { + activeQueriesSharedLock.synchronized { // remove from shared state only if the streaming execution also matches sparkSession.sharedState.activeStreamingQueries.remove( terminatedQuery.id, terminatedQuery) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index 62b10cd59479..96f7efeef98e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -275,7 +275,7 @@ class StreamingQueryManagerSuite extends StreamTest { } testQuietly("can't start multiple instances of the same streaming query in the same session") { - withSQLConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM.key -> "false") { + withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "false") { withTempDir { dir => val (ms1, ds1) = makeDataset val (ms2, ds2) = makeDataset @@ -292,7 +292,7 @@ class StreamingQueryManagerSuite extends StreamTest { } assert(e.getMessage.contains("same id")) } finally { - query1.stop() + spark.streams.active.foreach(_.stop()) } } } @@ -300,7 +300,7 @@ class StreamingQueryManagerSuite extends StreamTest { testQuietly("new instance of the same streaming query stops old query in the same session") { failAfter(90 seconds) { - withSQLConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM.key -> "true") { + withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "true") { withTempDir { dir => val (ms1, ds1) = makeDataset val (ms2, ds2) = makeDataset @@ -322,7 +322,7 @@ class StreamingQueryManagerSuite extends StreamTest { assert(!query1.isActive, "First query should have stopped before starting the second query") } finally { - query2.stop() + spark.streams.active.foreach(_.stop()) } } } @@ -331,7 +331,7 @@ class StreamingQueryManagerSuite extends StreamTest { testQuietly( "can't start multiple instances of the same streaming query in the different sessions") { - withSQLConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM.key -> "false") { + withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "false") { withTempDir { dir => val session2 = spark.cloneSession() @@ -350,7 +350,8 @@ class StreamingQueryManagerSuite extends StreamTest { } assert(e.getMessage.contains("same id")) } finally { - query1.stop() + spark.streams.active.foreach(_.stop()) + session2.streams.active.foreach(_.stop()) } } } @@ -359,7 +360,7 @@ class StreamingQueryManagerSuite extends StreamTest { testQuietly( "new instance of the same streaming query stops old query in a different session") { failAfter(90 seconds) { - withSQLConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM.key -> "true") { + withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "true") { withTempDir { dir => val session2 = spark.cloneSession() @@ -383,7 +384,8 @@ class StreamingQueryManagerSuite extends StreamTest { assert(!query1.isActive, "First query should have stopped before starting the second query") } finally { - query2.stop() + spark.streams.active.foreach(_.stop()) + session2.streams.active.foreach(_.stop()) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 7cda71dad28f..4121f499bd69 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -123,7 +123,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(q3.runId !== q4.runId) // Only one query with same id can be active - withSQLConf(SQLConf.STOP_RUNNING_DUPLICATE_STREAM.key -> "false") { + withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "false") { val q5 = startQuery(restart = false) val e = intercept[IllegalStateException] { startQuery(restart = true) From bff9162d134269eebb95e6597bba284d4a3b5944 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 12 Nov 2019 19:34:06 -0800 Subject: [PATCH 10/10] Update StreamingQueryManager.scala --- .../org/apache/spark/sql/streaming/StreamingQueryManager.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 3ee07ab436e5..e64f67cc755f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -383,11 +383,12 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo } } + // stop() will clear the queryId from activeStreamingQueries as well as activeQueries activeRunOpt.foreach(_.stop()) activeQueriesSharedLock.synchronized { // We still can have a race condition when two concurrent instances try to start the same - // stream, while a third one was already active. In this case, we throw a + // stream, while a third one was already active and stopped above. In this case, we throw a // ConcurrentModificationException. val oldActiveQuery = sparkSession.sharedState.activeStreamingQueries.put( query.id, query.streamingQuery) // we need to put the StreamExecution, not the wrapper