From 52a895f2e11e3410d963be4eb7c6a378277e9ff9 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 3 Dec 2025 17:02:13 +0800 Subject: [PATCH 1/3] init --- .../sql/connect/StreamingQueryManager.scala | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala index ac864a1292c81..1ea4b8e563f2b 100644 --- a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala +++ b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala @@ -39,15 +39,6 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends streaming.StreamingQueryManager with Logging { - // Mapping from id to StreamingQueryListener. There's another mapping from id to - // StreamingQueryListener on server side. This is used by removeListener() to find the id - // of previously added StreamingQueryListener and pass it to server side to find the - // corresponding listener on server side. We use id to StreamingQueryListener mapping - // here to make sure there's no hash collision as well as handling the case that adds and - // removes the same listener instance multiple times properly. - private lazy val listenerCache: ConcurrentMap[String, StreamingQueryListener] = - new ConcurrentHashMap() - private[spark] val streamingQueryListenerBus = new StreamingQueryListenerBus(sparkSession) private[spark] def close(): Unit = { @@ -128,17 +119,4 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) resp.getStreamingQueryManagerCommandResult } - - private def cacheListenerById(id: String, listener: StreamingQueryListener): Unit = { - listenerCache.putIfAbsent(id, listener) - } - - private def getIdByListener(listener: StreamingQueryListener): String = { - listenerCache.forEach((k, v) => if (listener.equals(v)) return k) - throw InvalidPlanInput(s"No id with listener $listener is found.") - } - - private def removeCachedListener(id: String): StreamingQueryListener = { - listenerCache.remove(id) - } } From ec36df331bd5d935f044c9f64aba178130c9b67a Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 3 Dec 2025 17:35:08 +0800 Subject: [PATCH 2/3] import --- .../org/apache/spark/sql/connect/StreamingQueryManager.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala index 1ea4b8e563f2b..6802f53365d6f 100644 --- a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala +++ b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala @@ -18,14 +18,10 @@ package org.apache.spark.sql.connect import java.util.UUID -import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} - -import scala.jdk.CollectionConverters._ import org.apache.spark.annotation.Evolving import org.apache.spark.connect.proto.{Command, StreamingQueryManagerCommand, StreamingQueryManagerCommandResult} import org.apache.spark.internal.Logging -import org.apache.spark.sql.connect.common.InvalidPlanInput import org.apache.spark.sql.streaming import org.apache.spark.sql.streaming.{StreamingQueryException, StreamingQueryListener} From 5bd04846225b8799d68724e2c51e681264f3425d Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 4 Dec 2025 14:53:00 +0800 Subject: [PATCH 3/3] fix --- .../org/apache/spark/sql/connect/StreamingQueryManager.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala index 6802f53365d6f..da3669bc69fde 100644 --- a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala +++ b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.connect import java.util.UUID +import scala.jdk.CollectionConverters._ + import org.apache.spark.annotation.Evolving import org.apache.spark.connect.proto.{Command, StreamingQueryManagerCommand, StreamingQueryManagerCommandResult} import org.apache.spark.internal.Logging