Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
package org.apache.spark.sql.connect

import java.util.UUID
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}

import scala.jdk.CollectionConverters._

import org.apache.spark.annotation.Evolving
import org.apache.spark.connect.proto.{Command, StreamingQueryManagerCommand, StreamingQueryManagerCommandResult}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.connect.common.InvalidPlanInput
import org.apache.spark.sql.streaming
import org.apache.spark.sql.streaming.{StreamingQueryException, StreamingQueryListener}

Expand All @@ -39,15 +37,6 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession)
extends streaming.StreamingQueryManager
with Logging {

// Mapping from id to StreamingQueryListener. There's another mapping from id to
// StreamingQueryListener on server side. This is used by removeListener() to find the id
// of previously added StreamingQueryListener and pass it to server side to find the
// corresponding listener on server side. We use id to StreamingQueryListener mapping
// here to make sure there's no hash collision as well as handling the case that adds and
// removes the same listener instance multiple times properly.
private lazy val listenerCache: ConcurrentMap[String, StreamingQueryListener] =
new ConcurrentHashMap()

private[spark] val streamingQueryListenerBus = new StreamingQueryListenerBus(sparkSession)

private[spark] def close(): Unit = {
Expand Down Expand Up @@ -128,17 +117,4 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession)

resp.getStreamingQueryManagerCommandResult
}

private def cacheListenerById(id: String, listener: StreamingQueryListener): Unit = {
listenerCache.putIfAbsent(id, listener)
}

private def getIdByListener(listener: StreamingQueryListener): String = {
listenerCache.forEach((k, v) => if (listener.equals(v)) return k)
throw InvalidPlanInput(s"No id with listener $listener is found.")
}

private def removeCachedListener(id: String): StreamingQueryListener = {
listenerCache.remove(id)
}
}