diff --git a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala index ac864a1292c81..da3669bc69fde 100644 --- a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala +++ b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/StreamingQueryManager.scala @@ -18,14 +18,12 @@ package org.apache.spark.sql.connect import java.util.UUID -import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import scala.jdk.CollectionConverters._ import org.apache.spark.annotation.Evolving import org.apache.spark.connect.proto.{Command, StreamingQueryManagerCommand, StreamingQueryManagerCommandResult} import org.apache.spark.internal.Logging -import org.apache.spark.sql.connect.common.InvalidPlanInput import org.apache.spark.sql.streaming import org.apache.spark.sql.streaming.{StreamingQueryException, StreamingQueryListener} @@ -39,15 +37,6 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends streaming.StreamingQueryManager with Logging { - // Mapping from id to StreamingQueryListener. There's another mapping from id to - // StreamingQueryListener on server side. This is used by removeListener() to find the id - // of previously added StreamingQueryListener and pass it to server side to find the - // corresponding listener on server side. We use id to StreamingQueryListener mapping - // here to make sure there's no hash collision as well as handling the case that adds and - // removes the same listener instance multiple times properly. - private lazy val listenerCache: ConcurrentMap[String, StreamingQueryListener] = - new ConcurrentHashMap() - private[spark] val streamingQueryListenerBus = new StreamingQueryListenerBus(sparkSession) private[spark] def close(): Unit = { @@ -128,17 +117,4 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) resp.getStreamingQueryManagerCommandResult } - - private def cacheListenerById(id: String, listener: StreamingQueryListener): Unit = { - listenerCache.putIfAbsent(id, listener) - } - - private def getIdByListener(listener: StreamingQueryListener): String = { - listenerCache.forEach((k, v) => if (listener.equals(v)) return k) - throw InvalidPlanInput(s"No id with listener $listener is found.") - } - - private def removeCachedListener(id: String): StreamingQueryListener = { - listenerCache.remove(id) - } }