diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala new file mode 100644 index 0000000000000..8cef421becd11 --- /dev/null +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.util.UUID + +import org.json4s.{JObject, JString} +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL.{jobject2assoc, pair2Assoc} +import org.json4s.jackson.JsonMethods.{compact, render} + +import org.apache.spark.annotation.Evolving +import org.apache.spark.scheduler.SparkListenerEvent + +/** + * Interface for listening to events related to [[StreamingQuery StreamingQueries]]. + * @note + * The methods are not thread-safe as they may be called from different threads. + * + * @since 3.5.0 + */ +@Evolving +abstract class StreamingQueryListener extends Serializable { + + import StreamingQueryListener._ + + /** + * Called when a query is started. + * @note + * This is called synchronously with + * [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]], that is, + * `onQueryStart` will be called on all listeners before `DataStreamWriter.start()` returns + * the corresponding [[StreamingQuery]]. Please don't block this method as it will block your + * query. + * @since 3.5.0 + */ + def onQueryStarted(event: QueryStartedEvent): Unit + + /** + * Called when there is some status update (ingestion rate updated, etc.) + * + * @note + * This method is asynchronous. The status in [[StreamingQuery]] will always be latest no + * matter when this method is called. Therefore, the status of [[StreamingQuery]] may be + * changed before/when you process the event. E.g., you may find [[StreamingQuery]] is + * terminated when you are processing `QueryProgressEvent`. + * @since 3.5.0 + */ + def onQueryProgress(event: QueryProgressEvent): Unit + + /** + * Called when the query is idle and waiting for new data to process. + * @since 3.5.0 + */ + def onQueryIdle(event: QueryIdleEvent): Unit = {} + + /** + * Called when a query is stopped, with or without error. + * @since 3.5.0 + */ + def onQueryTerminated(event: QueryTerminatedEvent): Unit +} + +/** + * Py4J allows a pure interface so this proxy is required. + */ +private[spark] trait PythonStreamingQueryListener { + import StreamingQueryListener._ + + def onQueryStarted(event: QueryStartedEvent): Unit + + def onQueryProgress(event: QueryProgressEvent): Unit + + def onQueryIdle(event: QueryIdleEvent): Unit + + def onQueryTerminated(event: QueryTerminatedEvent): Unit +} + +private[spark] class PythonStreamingQueryListenerWrapper(listener: PythonStreamingQueryListener) + extends StreamingQueryListener { + import StreamingQueryListener._ + + def onQueryStarted(event: QueryStartedEvent): Unit = listener.onQueryStarted(event) + + def onQueryProgress(event: QueryProgressEvent): Unit = listener.onQueryProgress(event) + + override def onQueryIdle(event: QueryIdleEvent): Unit = listener.onQueryIdle(event) + + def onQueryTerminated(event: QueryTerminatedEvent): Unit = listener.onQueryTerminated(event) +} + +/** + * Companion object of [[StreamingQueryListener]] that defines the listener events. + * @since 3.5.0 + */ +@Evolving +object StreamingQueryListener extends Serializable { + + /** + * Base type of [[StreamingQueryListener]] events + * @since 3.5.0 + */ + @Evolving + trait Event extends SparkListenerEvent + + /** + * Event representing the start of a query + * @param id + * A unique query id that persists across restarts. See `StreamingQuery.id()`. + * @param runId + * A query id that is unique for every start/restart. See `StreamingQuery.runId()`. + * @param name + * User-specified name of the query, null if not specified. + * @param timestamp + * The timestamp to start a query. + * @since 3.5.0 + */ + @Evolving + class QueryStartedEvent private[sql] ( + val id: UUID, + val runId: UUID, + val name: String, + val timestamp: String) + extends Event + with Serializable { + + def json: String = compact(render(jsonValue)) + + private def jsonValue: JValue = { + ("id" -> JString(id.toString)) ~ + ("runId" -> JString(runId.toString)) ~ + ("name" -> JString(name)) ~ + ("timestamp" -> JString(timestamp)) + } + } + + /** + * Event representing any progress updates in a query. + * @param progress + * The query progress updates. + * @since 3.5.0 + */ + @Evolving + class QueryProgressEvent private[sql] (val progress: StreamingQueryProgress) + extends Event + with Serializable { + + def json: String = compact(render(jsonValue)) + + private def jsonValue: JValue = JObject("progress" -> progress.jsonValue) + } + + /** + * Event representing that query is idle and waiting for new data to process. + * + * @param id + * A unique query id that persists across restarts. See `StreamingQuery.id()`. + * @param runId + * A query id that is unique for every start/restart. See `StreamingQuery.runId()`. + * @param timestamp + * The timestamp when the latest no-batch trigger happened. + * @since 3.5.0 + */ + @Evolving + class QueryIdleEvent private[sql] (val id: UUID, val runId: UUID, val timestamp: String) + extends Event + with Serializable { + + def json: String = compact(render(jsonValue)) + + private def jsonValue: JValue = { + ("id" -> JString(id.toString)) ~ + ("runId" -> JString(runId.toString)) ~ + ("timestamp" -> JString(timestamp)) + } + } + + /** + * Event representing that termination of a query. + * + * @param id + * A unique query id that persists across restarts. See `StreamingQuery.id()`. + * @param runId + * A query id that is unique for every start/restart. See `StreamingQuery.runId()`. + * @param exception + * The exception message of the query if the query was terminated with an exception. + * Otherwise, it will be `None`. + * @param errorClassOnException + * The error class from the exception if the query was terminated with an exception which is a + * part of error class framework. If the query was terminated without an exception, or the + * exception is not a part of error class framework, it will be `None`. + * @since 3.5.0 + */ + @Evolving + class QueryTerminatedEvent private[sql] ( + val id: UUID, + val runId: UUID, + val exception: Option[String], + val errorClassOnException: Option[String]) + extends Event + with Serializable { + // compatibility with versions in prior to 3.5.0 + def this(id: UUID, runId: UUID, exception: Option[String]) = { + this(id, runId, exception, None) + } + + def json: String = compact(render(jsonValue)) + + private def jsonValue: JValue = { + ("id" -> JString(id.toString)) ~ + ("runId" -> JString(runId.toString)) ~ + ("exception" -> JString(exception.orNull)) ~ + ("errorClassOnException" -> JString(errorClassOnException.orNull)) + } + } +} diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 13bbf4706390e..f22adc1a96624 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -18,15 +18,20 @@ package org.apache.spark.sql.streaming import java.util.UUID +import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import scala.collection.JavaConverters._ +import com.google.protobuf.ByteString + import org.apache.spark.annotation.Evolving import org.apache.spark.connect.proto.Command import org.apache.spark.connect.proto.StreamingQueryManagerCommand import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connect.common.{InvalidPlanInput, StreamingListenerPacket} +import org.apache.spark.util.Utils /** * A class to manage all the [[StreamingQuery]] active in a `SparkSession`. @@ -36,6 +41,15 @@ import org.apache.spark.sql.SparkSession @Evolving class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Logging { + // Mapping from id to StreamingQueryListener. There's another mapping from id to + // StreamingQueryListener on server side. This is used by removeListener() to find the id + // of previously added StreamingQueryListener and pass it to server side to find the + // corresponding listener on server side. We use id to StreamingQueryListener mapping + // here to make sure there's no hash collision as well as handling the case that adds and + // removes the same listener instance multiple times properly. + private lazy val listenerCache: ConcurrentMap[String, StreamingQueryListener] = + new ConcurrentHashMap() + /** * Returns a list of active queries associated with this SQLContext * @@ -126,6 +140,56 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo executeManagerCmd(_.setResetTerminated(true)) } + /** + * Register a [[StreamingQueryListener]] to receive up-calls for life cycle events of + * [[StreamingQuery]]. + * + * @since 3.5.0 + */ + def addListener(listener: StreamingQueryListener): Unit = { + // TODO: [SPARK-44400] Improve the Listener to provide users a way to access the Spark session + // and perform arbitrary actions inside the Listener. Right now users can use + // `val spark = SparkSession.builder.getOrCreate()` to create a Spark session inside the + // Listener, but this is a legacy session instead of a connect remote session. + val id = UUID.randomUUID.toString + cacheListenerById(id, listener) + executeManagerCmd( + _.getAddListenerBuilder + .setListenerPayload(ByteString.copyFrom(Utils + .serialize(StreamingListenerPacket(id, listener))))) + } + + /** + * Deregister a [[StreamingQueryListener]]. + * + * @since 3.5.0 + */ + def removeListener(listener: StreamingQueryListener): Unit = { + val id = getIdByListener(listener) + executeManagerCmd( + _.getRemoveListenerBuilder + .setListenerPayload(ByteString.copyFrom(Utils + .serialize(StreamingListenerPacket(id, listener))))) + removeCachedListener(id) + } + + /** + * List all [[StreamingQueryListener]]s attached to this [[StreamingQueryManager]]. + * + * @since 3.5.0 + */ + def listListeners(): Array[StreamingQueryListener] = { + executeManagerCmd(_.setListListeners(true)).getListListeners.getListenersList.asScala.map { + listener => + Utils + .deserialize[StreamingListenerPacket]( + listener.getListenerPayload.toByteArray, + Utils.getContextOrSparkClassLoader) + .listener + .asInstanceOf[StreamingQueryListener] + }.toArray + } + private def executeManagerCmd( setCmdFn: StreamingQueryManagerCommand.Builder => Unit // Sets the command field, like stop(). ): StreamingQueryManagerCommandResult = { @@ -145,4 +209,17 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo resp.getStreamingQueryManagerCommandResult } + + private def cacheListenerById(id: String, listener: StreamingQueryListener): Unit = { + listenerCache.putIfAbsent(id, listener) + } + + private def getIdByListener(listener: StreamingQueryListener): String = { + listenerCache.forEach((k, v) => if (listener.equals(v)) return k) + throw InvalidPlanInput(s"No id with listener $listener is found.") + } + + private def removeCachedListener(id: String): StreamingQueryListener = { + listenerCache.remove(id) + } } diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala new file mode 100644 index 0000000000000..113522ea53639 --- /dev/null +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.ui + +import java.util.UUID +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.mutable + +import com.fasterxml.jackson.annotation.JsonIgnore + +import org.apache.spark.SparkConf +import org.apache.spark.sql.internal.StaticSQLConf +import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress} +import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper._ +import org.apache.spark.sql.streaming.ui.UIUtils.parseProgressTimestamp +import org.apache.spark.status.{ElementTrackingStore, KVUtils} +import org.apache.spark.status.KVUtils.KVIndexParam +import org.apache.spark.util.kvstore.KVIndex + +/** + * A customized StreamingQueryListener used in structured streaming UI, which contains all UI data + * for both active and inactive query. + */ +private[sql] class StreamingQueryStatusListener( + @transient val conf: SparkConf, + @transient val store: ElementTrackingStore) + extends StreamingQueryListener { + + private val streamingProgressRetention = + conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES) + private val inactiveQueryStatusRetention = conf.get(StaticSQLConf.STREAMING_UI_RETAINED_QUERIES) + + store.addTrigger(classOf[StreamingQueryData], inactiveQueryStatusRetention) { count => + cleanupInactiveQueries(count) + } + + // Events from the same query run will never be processed concurrently, so it's safe to + // access `progressIds` without any protection. + private val queryToProgress = new ConcurrentHashMap[UUID, mutable.Queue[String]]() + + private def cleanupInactiveQueries(count: Long): Unit = { + val view = store.view(classOf[StreamingQueryData]).index("active").first(false).last(false) + val inactiveQueries = KVUtils.viewToSeq(view) + val numInactiveQueries = inactiveQueries.size + if (numInactiveQueries <= inactiveQueryStatusRetention) { + return + } + val toDelete = inactiveQueries + .sortBy(_.endTimestamp.get) + .take(numInactiveQueries - inactiveQueryStatusRetention) + val runIds = toDelete.map { e => + store.delete(e.getClass, e.runId) + e.runId + } + // Delete wrappers in one pass, as deleting them for each summary is slow + store.removeAllByIndexValues(classOf[StreamingQueryProgressWrapper], "runId", runIds) + } + + override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { + val startTimestamp = parseProgressTimestamp(event.timestamp) + store.write( + new StreamingQueryData( + event.name, + event.id, + event.runId.toString, + isActive = true, + None, + startTimestamp), + checkTriggers = true) + } + + override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { + val runId = event.progress.runId + val batchId = event.progress.batchId + val timestamp = event.progress.timestamp + if (!queryToProgress.containsKey(runId)) { + queryToProgress.put(runId, mutable.Queue.empty[String]) + } + val progressIds = queryToProgress.get(runId) + progressIds.enqueue(getUniqueId(runId, batchId, timestamp)) + store.write(new StreamingQueryProgressWrapper(event.progress)) + while (progressIds.length > streamingProgressRetention) { + val uniqueId = progressIds.dequeue + store.delete(classOf[StreamingQueryProgressWrapper], uniqueId) + } + } + + override def onQueryIdle(event: StreamingQueryListener.QueryIdleEvent): Unit = {} + + override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { + val querySummary = store.read(classOf[StreamingQueryData], event.runId.toString) + val curTime = System.currentTimeMillis() + store.write( + new StreamingQueryData( + querySummary.name, + querySummary.id, + querySummary.runId, + isActive = false, + event.exception, + querySummary.startTimestamp, + Some(curTime)), + checkTriggers = true) + queryToProgress.remove(event.runId) + } +} + +private[spark] class StreamingQueryData( + val name: String, + val id: UUID, + @KVIndexParam val runId: String, + @KVIndexParam("active") val isActive: Boolean, + val exception: Option[String], + @KVIndexParam("startTimestamp") val startTimestamp: Long, + val endTimestamp: Option[Long] = None) + +/** + * This class contains all message related to UI display, each instance corresponds to a single + * [[org.apache.spark.sql.streaming.StreamingQuery]]. + */ +private[sql] case class StreamingQueryUIData( + summary: StreamingQueryData, + recentProgress: Array[StreamingQueryProgress]) { + + def lastProgress: StreamingQueryProgress = { + if (recentProgress.nonEmpty) { + recentProgress.last + } else { + null + } + } +} + +private[spark] class StreamingQueryProgressWrapper(val progress: StreamingQueryProgress) { + @JsonIgnore @KVIndex + private val uniqueId: String = getUniqueId(progress.runId, progress.batchId, progress.timestamp) + + @JsonIgnore @KVIndex("runId") + private def runIdIndex: String = progress.runId.toString +} + +private[sql] object StreamingQueryProgressWrapper { + + /** + * Adding `timestamp` into unique id to support reporting `empty` query progress in which no + * data comes but with the same batchId. + */ + def getUniqueId(runId: UUID, batchId: Long, timestamp: String): String = { + s"${runId}_${batchId}_$timestamp" + } +} diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala new file mode 100644 index 0000000000000..88a110fa9a329 --- /dev/null +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.ui + +import java.text.SimpleDateFormat + +import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone + +private[ui] object UIUtils { + + /** + * Check whether `number` is valid, if not return 0.0d + */ + def withNumberInvalid(number: => Double): Double = { + if (number.isNaN || number.isInfinite) { + 0.0d + } else { + number + } + } + + /** + * Execute a block of code when there is already one completed batch in streaming query, + * otherwise return `default` value. + */ + def withNoProgress[T](query: StreamingQueryUIData, body: => T, default: T): T = { + if (query.lastProgress != null) { + body + } else { + default + } + } + + def getQueryName(uiData: StreamingQueryUIData): String = { + if (uiData.summary.name == null || uiData.summary.name.isEmpty) { + "" + } else { + uiData.summary.name + } + } + + def getQueryStatus(uiData: StreamingQueryUIData): String = { + if (uiData.summary.isActive) { + "RUNNING" + } else { + uiData.summary.exception.map(_ => "FAILED").getOrElse("FINISHED") + } + } + + private val progressTimestampFormat = new ThreadLocal[SimpleDateFormat]() { + override def initialValue(): SimpleDateFormat = { + val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + format.setTimeZone(getTimeZone("UTC")) + format + } + } + + def parseProgressTimestamp(timestamp: String): Long = { + progressTimestampFormat.get.parse(timestamp).getTime + } +} diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala index dded96a0b13c9..921381caf531e 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala @@ -140,7 +140,6 @@ object CheckConnectJvmClientCompatibility { ProblemFilters.exclude[Problem]("org.apache.spark.sql.internal.*"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.jdbc.*"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.sources.*"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.streaming.ui.*"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.test.*"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.util.*"), @@ -270,14 +269,6 @@ object CheckConnectJvmClientCompatibility { ProblemFilters.exclude[DirectMissingMethodProblem]( "org.apache.spark.sql.streaming.StreamingQueryException.time"), - // StreamingQueryManager - ProblemFilters.exclude[DirectMissingMethodProblem]( - "org.apache.spark.sql.streaming.StreamingQueryManager.addListener"), - ProblemFilters.exclude[DirectMissingMethodProblem]( - "org.apache.spark.sql.streaming.StreamingQueryManager.removeListener"), - ProblemFilters.exclude[DirectMissingMethodProblem]( - "org.apache.spark.sql.streaming.StreamingQueryManager.listListeners"), - // Classes missing from streaming API ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.ForeachWriter"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.GroupState"), @@ -289,10 +280,24 @@ object CheckConnectJvmClientCompatibility { "org.apache.spark.sql.streaming.PythonStreamingQueryListener"), ProblemFilters.exclude[MissingClassProblem]( "org.apache.spark.sql.streaming.PythonStreamingQueryListenerWrapper"), + + // Streaming UI + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.streaming.ui.StreamingQueryDataSource"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.streaming.ui.StreamingQueryPagedTable"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.streaming.ui.StructuredStreamingRow"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.streaming.ui.StructuredStreamingRow$"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.streaming.ui.StreamingQueryPage"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.streaming.ui.StreamingQueryStatisticsPage"), ProblemFilters.exclude[MissingClassProblem]( - "org.apache.spark.sql.streaming.StreamingQueryListener"), + "org.apache.spark.sql.streaming.ui.StreamingQueryTab"), ProblemFilters.exclude[MissingClassProblem]( - "org.apache.spark.sql.streaming.StreamingQueryListener$*"), + "org.apache.spark.sql.streaming.ui.StreamingQueryTab$"), // SQLImplicits ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.rddToDatasetHolder"), diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 438e6e0c2fe51..b1fe94fbfbf20 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -21,6 +21,7 @@ import java.io.{File, FileWriter} import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ +import scala.collection.mutable import org.scalatest.concurrent.Eventually.eventually import org.scalatest.concurrent.Futures.timeout @@ -28,12 +29,13 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession, SQLHelper} -import org.apache.spark.sql.connect.client.util.RemoteSparkSession +import org.apache.spark.sql.connect.client.util.QueryTest import org.apache.spark.sql.functions.col import org.apache.spark.sql.functions.window +import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryIdleEvent, QueryStartedEvent, QueryTerminatedEvent} import org.apache.spark.util.Utils -class StreamingQuerySuite extends RemoteSparkSession with SQLHelper with Logging { +class StreamingQuerySuite extends QueryTest with SQLHelper with Logging { test("Streaming API with windowed aggregate query") { // This verifies standard streaming API by starting a streaming query with windowed count. @@ -268,6 +270,59 @@ class StreamingQuerySuite extends RemoteSparkSession with SQLHelper with Logging assert(!q1.isActive) } + test("streaming query listener") { + // There's a default StreamingQueryStatusListener. + assert(spark.streams.listListeners().length == 1) + + val listener = new EventCollector + spark.streams.addListener(listener) + + val q = spark.readStream + .format("rate") + .load() + .writeStream + .format("console") + .start() + + try { + q.processAllAvailable() + eventually(timeout(30.seconds)) { + assert(q.isActive) + checkAnswer(spark.table("my_listener_table").toDF(), Seq(Row(1, 2), Row(4, 5))) + } + } finally { + q.stop() + spark.sql("DROP TABLE IF EXISTS my_listener_table") + } + + // List listeners after adding a new listener, length should be 2. + val listeners = spark.streams.listListeners() + assert(listeners.length == 2) + + val currListener = listeners(1).asInstanceOf[EventCollector] + assert(q.id.equals(currListener.startEvent.id)) + assert(q.runId.equals(currListener.terminationEvent.runId)) + assert(q.lastProgress.numInputRows == currListener.progressEvents.last.numInputRows) + + // Add listener1 as another instance of EventCollector and validate + val listener1 = new EventCollector + spark.streams.addListener(listener1) + assert(spark.streams.listListeners().length == 3) + spark.streams.removeListener(listener1) + assert(spark.streams.listListeners().length == 2) + + // Add the same listener again and validate, this aims to verify the listener cache + // is correctly stored and cleaned. + spark.streams.addListener(listener) + assert(spark.streams.listListeners().length == 3) + spark.streams.removeListener(listener) + assert(spark.streams.listListeners().length == 2) + + // Remove the listener, length should be 1. + spark.streams.removeListener(listener) + assert(spark.streams.listListeners().length == 1) + } + test("foreachBatch") { // Starts a streaming query with a foreachBatch function, which writes batchId and row count // to a temp view. The test verifies that the view is populated with data. @@ -330,6 +385,37 @@ case class TestClass(value: Int) { override def toString: String = value.toString } +class EventCollector extends StreamingQueryListener { + @volatile var startEvent: QueryStartedEvent = null + @volatile var terminationEvent: QueryTerminatedEvent = null + @volatile var idleEvent: QueryIdleEvent = null + + private val _progressEvents = new mutable.Queue[StreamingQueryProgress] + + def progressEvents: Seq[StreamingQueryProgress] = _progressEvents.synchronized { + _progressEvents.clone().toSeq + } + + override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { + startEvent = event + val spark = SparkSession.builder().getOrCreate() + val df = spark.createDataFrame(Seq((1, 2), (4, 5))) + df.write.saveAsTable("my_listener_table") + } + + override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { + _progressEvents += event.progress + } + + override def onQueryIdle(event: StreamingQueryListener.QueryIdleEvent): Unit = { + idleEvent = event + } + + override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { + terminationEvent = event + } +} + class ForeachBatchFn(val viewName: String) extends ((DataFrame, Long) => Unit) with Serializable { override def apply(df: DataFrame, batchId: Long): Unit = { val count = df.count() diff --git a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto index a5924747af9a2..6689662fcf8ec 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto @@ -349,12 +349,22 @@ message StreamingQueryManagerCommand { AwaitAnyTerminationCommand await_any_termination = 3; // resetTerminated() API. bool reset_terminated = 4; + // addListener API. + StreamingQueryListenerCommand add_listener = 5; + // removeListener API. + StreamingQueryListenerCommand remove_listener = 6; + // listListeners() API, returns a list of streaming query listeners. + bool list_listeners = 7; } message AwaitAnyTerminationCommand { // (Optional) The waiting time in milliseconds to wait for any query to terminate. optional int64 timeout_ms = 1; } + + message StreamingQueryListenerCommand { + bytes listener_payload = 1; + } } // Response for commands on the streaming query manager. @@ -364,6 +374,9 @@ message StreamingQueryManagerCommandResult { StreamingQueryInstance query = 2; AwaitAnyTerminationResult await_any_termination = 3; bool reset_terminated = 4; + bool add_listener = 5; + bool remove_listener = 6; + ListStreamingQueryListenerResult list_listeners = 7; } message ActiveResult { @@ -380,6 +393,14 @@ message StreamingQueryManagerCommandResult { message AwaitAnyTerminationResult { bool terminated = 1; } + + message StreamingQueryListenerInstance { + bytes listener_payload = 1; + } + + message ListStreamingQueryListenerResult { + repeated StreamingQueryListenerInstance listeners = 1; + } } // Command to get the output of 'SparkContext.resources' diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/StreamingListenerPacket.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/StreamingListenerPacket.scala new file mode 100644 index 0000000000000..ec3a42ae9933c --- /dev/null +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/StreamingListenerPacket.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.common + +/** + * A wrapper class around the StreamingQueryListener and an id associated with it. + * + * This class is shared between the client and the server to allow for serialization and + * deserialization of the JVM object. We'll need to cache a mapping between the id and listener + * object on server side in order to identify the correct server side listener object. + * + * @param id + * The id for the StreamingQueryListener. + * @param listener + * The StreamingQueryListener instance. + */ +case class StreamingListenerPacket(id: String, listener: AnyRef) extends Serializable diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 6b1f84ada510d..48d5e7509c3be 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -36,7 +36,7 @@ import org.apache.spark.connect.proto.Parse.ParseFormat import org.apache.spark.connect.proto.StreamingForeachFunction import org.apache.spark.connect.proto.StreamingQueryManagerCommand import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult -import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult.StreamingQueryInstance +import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult.{StreamingQueryInstance, StreamingQueryListenerInstance} import org.apache.spark.connect.proto.WriteStreamOperationStart.TriggerCase import org.apache.spark.internal.Logging import org.apache.spark.ml.{functions => MLFunctions} @@ -54,7 +54,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendColumns, CoGroup, Coll import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils} -import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, ForeachWriterPacket, InvalidPlanInput, LiteralValueProtoConverter, StorageLevelProtoConverter, UdfPacket} +import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, ForeachWriterPacket, InvalidPlanInput, LiteralValueProtoConverter, StorageLevelProtoConverter, StreamingListenerPacket, UdfPacket} import org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_ARROW_MAX_BATCH_SIZE import org.apache.spark.sql.connect.plugin.SparkConnectPluginRegistry import org.apache.spark.sql.connect.service.SessionHolder @@ -73,7 +73,7 @@ import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper import org.apache.spark.sql.expressions.ReduceAggregator import org.apache.spark.sql.internal.{CatalogImpl, TypedAggUtils} import org.apache.spark.sql.protobuf.{CatalystDataToProtobuf, ProtobufDataToCatalyst} -import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, StreamingQuery, StreamingQueryProgress, Trigger} +import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, StreamingQuery, StreamingQueryListener, StreamingQueryProgress, Trigger} import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.storage.CacheId @@ -2896,6 +2896,15 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { builder.build() } + private def buildStreamingQueryListenerInstance( + listener: StreamingQueryListener): StreamingQueryListenerInstance = { + StreamingQueryListenerInstance + .newBuilder() + .setListenerPayload(ByteString + .copyFrom(Utils.serialize(StreamingListenerPacket("", listener)))) + .build() + } + def handleStreamingQueryManagerCommand( command: StreamingQueryManagerCommand, sessionId: String, @@ -2930,6 +2939,37 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { session.streams.resetTerminated() respBuilder.setResetTerminated(true) + case StreamingQueryManagerCommand.CommandCase.ADD_LISTENER => + val listenerPacket = Utils + .deserialize[StreamingListenerPacket]( + command.getAddListener.getListenerPayload.toByteArray, + Utils.getContextOrSparkClassLoader) + val listener: StreamingQueryListener = listenerPacket.listener + .asInstanceOf[StreamingQueryListener] + val id: String = listenerPacket.id + sessionHolder.cacheListenerById(id, listener) + session.streams.addListener(listener) + respBuilder.setAddListener(true) + + case StreamingQueryManagerCommand.CommandCase.REMOVE_LISTENER => + val listenerId = Utils + .deserialize[StreamingListenerPacket]( + command.getRemoveListener.getListenerPayload.toByteArray, + Utils.getContextOrSparkClassLoader) + .id + val listener: StreamingQueryListener = sessionHolder.getListenerOrThrow(listenerId) + session.streams.removeListener(listener) + sessionHolder.removeCachedListener(listenerId) + respBuilder.setRemoveListener(true) + + case StreamingQueryManagerCommand.CommandCase.LIST_LISTENERS => + val listeners = session.streams.listListeners() + respBuilder.getListListenersBuilder.addAllListeners( + listeners + .map(listener => buildStreamingQueryListenerInstance(listener)) + .toIterable + .asJava) + case StreamingQueryManagerCommand.CommandCase.COMMAND_NOT_SET => throw new IllegalArgumentException("Missing command in StreamingQueryManagerCommand") } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala index 332e960c25b75..a24a9eb2feced 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager import org.apache.spark.sql.connect.common.InvalidPlanInput +import org.apache.spark.sql.streaming.StreamingQueryListener import org.apache.spark.util.Utils /** @@ -47,6 +48,11 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio // foreachBatch() in Streaming. Lazy since most sessions don't need it. private lazy val dataFrameCache: ConcurrentMap[String, DataFrame] = new ConcurrentHashMap() + // Mapping from id to StreamingQueryListener. Used for methods like removeListener() in + // StreamingQueryManager. + private lazy val listenerCache: ConcurrentMap[String, StreamingQueryListener] = + new ConcurrentHashMap() + private[connect] def createExecutePlanHolder( request: proto.ExecutePlanRequest): ExecutePlanHolder = { @@ -152,6 +158,33 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio private[connect] def removeCachedDataFrame(dfId: String): DataFrame = { dataFrameCache.remove(dfId) } + + /** + * Caches given StreamingQueryListener with the ID. + */ + private[connect] def cacheListenerById(id: String, listener: StreamingQueryListener): Unit = { + if (listenerCache.putIfAbsent(id, listener) != null) { + SparkException.internalError(s"A listener is already associated with id $id") + } + } + + /** + * Returns [[StreamingQueryListener]] cached for Listener ID `id`. If it is not found, throw + * [[InvalidPlanInput]]. + */ + private[connect] def getListenerOrThrow(id: String): StreamingQueryListener = { + Option(listenerCache.get(id)) + .getOrElse { + throw InvalidPlanInput(s"No listener with id $id is found in the session $sessionId") + } + } + + /** + * Removes corresponding StreamingQueryListener by ID. + */ + private[connect] def removeCachedListener(id: String): StreamingQueryListener = { + listenerCache.remove(id) + } } object SessionHolder { diff --git a/python/pyspark/sql/connect/proto/commands_pb2.py b/python/pyspark/sql/connect/proto/commands_pb2.py index 6d3cc22dcfa5d..3947d172ed40f 100644 --- a/python/pyspark/sql/connect/proto/commands_pb2.py +++ b/python/pyspark/sql/connect/proto/commands_pb2.py @@ -35,7 +35,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto"\x86\x07\n\x07\x43ommand\x12]\n\x11register_function\x18\x01 \x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02 \x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x18\x03 \x01(\x0b\x32).spark.connect.CreateDataFrameViewCommandH\x00R\x13\x63reateDataframeView\x12O\n\x12write_operation_v2\x18\x04 \x01(\x0b\x32\x1f.spark.connect.WriteOperationV2H\x00R\x10writeOperationV2\x12<\n\x0bsql_command\x18\x05 \x01(\x0b\x32\x19.spark.connect.SqlCommandH\x00R\nsqlCommand\x12k\n\x1cwrite_stream_operation_start\x18\x06 \x01(\x0b\x32(.spark.connect.WriteStreamOperationStartH\x00R\x19writeStreamOperationStart\x12^\n\x17streaming_query_command\x18\x07 \x01(\x0b\x32$.spark.connect.StreamingQueryCommandH\x00R\x15streamingQueryCommand\x12X\n\x15get_resources_command\x18\x08 \x01(\x0b\x32".spark.connect.GetResourcesCommandH\x00R\x13getResourcesCommand\x12t\n\x1fstreaming_query_manager_command\x18\t \x01(\x0b\x32+.spark.connect.StreamingQueryManagerCommandH\x00R\x1cstreamingQueryManagerCommand\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textensionB\x0e\n\x0c\x63ommand_type"\xf1\x01\n\nSqlCommand\x12\x10\n\x03sql\x18\x01 \x01(\tR\x03sql\x12\x37\n\x04\x61rgs\x18\x02 \x03(\x0b\x32#.spark.connect.SqlCommand.ArgsEntryR\x04\x61rgs\x12<\n\x08pos_args\x18\x03 \x03(\x0b\x32!.spark.connect.Expression.LiteralR\x07posArgs\x1aZ\n\tArgsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x37\n\x05value\x18\x02 \x01(\x0b\x32!.spark.connect.Expression.LiteralR\x05value:\x02\x38\x01"\x96\x01\n\x1a\x43reateDataFrameViewCommand\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x12\n\x04name\x18\x02 \x01(\tR\x04name\x12\x1b\n\tis_global\x18\x03 \x01(\x08R\x08isGlobal\x12\x18\n\x07replace\x18\x04 \x01(\x08R\x07replace"\x9b\x08\n\x0eWriteOperation\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x1b\n\x06source\x18\x02 \x01(\tH\x01R\x06source\x88\x01\x01\x12\x14\n\x04path\x18\x03 \x01(\tH\x00R\x04path\x12?\n\x05table\x18\x04 \x01(\x0b\x32\'.spark.connect.WriteOperation.SaveTableH\x00R\x05table\x12:\n\x04mode\x18\x05 \x01(\x0e\x32&.spark.connect.WriteOperation.SaveModeR\x04mode\x12*\n\x11sort_column_names\x18\x06 \x03(\tR\x0fsortColumnNames\x12\x31\n\x14partitioning_columns\x18\x07 \x03(\tR\x13partitioningColumns\x12\x43\n\tbucket_by\x18\x08 \x01(\x0b\x32&.spark.connect.WriteOperation.BucketByR\x08\x62ucketBy\x12\x44\n\x07options\x18\t \x03(\x0b\x32*.spark.connect.WriteOperation.OptionsEntryR\x07options\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a\x82\x02\n\tSaveTable\x12\x1d\n\ntable_name\x18\x01 \x01(\tR\ttableName\x12X\n\x0bsave_method\x18\x02 \x01(\x0e\x32\x37.spark.connect.WriteOperation.SaveTable.TableSaveMethodR\nsaveMethod"|\n\x0fTableSaveMethod\x12!\n\x1dTABLE_SAVE_METHOD_UNSPECIFIED\x10\x00\x12#\n\x1fTABLE_SAVE_METHOD_SAVE_AS_TABLE\x10\x01\x12!\n\x1dTABLE_SAVE_METHOD_INSERT_INTO\x10\x02\x1a[\n\x08\x42ucketBy\x12.\n\x13\x62ucket_column_names\x18\x01 \x03(\tR\x11\x62ucketColumnNames\x12\x1f\n\x0bnum_buckets\x18\x02 \x01(\x05R\nnumBuckets"\x89\x01\n\x08SaveMode\x12\x19\n\x15SAVE_MODE_UNSPECIFIED\x10\x00\x12\x14\n\x10SAVE_MODE_APPEND\x10\x01\x12\x17\n\x13SAVE_MODE_OVERWRITE\x10\x02\x12\x1d\n\x19SAVE_MODE_ERROR_IF_EXISTS\x10\x03\x12\x14\n\x10SAVE_MODE_IGNORE\x10\x04\x42\x0b\n\tsave_typeB\t\n\x07_source"\xad\x06\n\x10WriteOperationV2\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x1d\n\ntable_name\x18\x02 \x01(\tR\ttableName\x12\x1f\n\x08provider\x18\x03 \x01(\tH\x00R\x08provider\x88\x01\x01\x12L\n\x14partitioning_columns\x18\x04 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x13partitioningColumns\x12\x46\n\x07options\x18\x05 \x03(\x0b\x32,.spark.connect.WriteOperationV2.OptionsEntryR\x07options\x12_\n\x10table_properties\x18\x06 \x03(\x0b\x32\x34.spark.connect.WriteOperationV2.TablePropertiesEntryR\x0ftableProperties\x12\x38\n\x04mode\x18\x07 \x01(\x0e\x32$.spark.connect.WriteOperationV2.ModeR\x04mode\x12J\n\x13overwrite_condition\x18\x08 \x01(\x0b\x32\x19.spark.connect.ExpressionR\x12overwriteCondition\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01"\x9f\x01\n\x04Mode\x12\x14\n\x10MODE_UNSPECIFIED\x10\x00\x12\x0f\n\x0bMODE_CREATE\x10\x01\x12\x12\n\x0eMODE_OVERWRITE\x10\x02\x12\x1d\n\x19MODE_OVERWRITE_PARTITIONS\x10\x03\x12\x0f\n\x0bMODE_APPEND\x10\x04\x12\x10\n\x0cMODE_REPLACE\x10\x05\x12\x1a\n\x16MODE_CREATE_OR_REPLACE\x10\x06\x42\x0b\n\t_provider"\xa0\x06\n\x19WriteStreamOperationStart\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x16\n\x06\x66ormat\x18\x02 \x01(\tR\x06\x66ormat\x12O\n\x07options\x18\x03 \x03(\x0b\x32\x35.spark.connect.WriteStreamOperationStart.OptionsEntryR\x07options\x12:\n\x19partitioning_column_names\x18\x04 \x03(\tR\x17partitioningColumnNames\x12:\n\x18processing_time_interval\x18\x05 \x01(\tH\x00R\x16processingTimeInterval\x12%\n\ravailable_now\x18\x06 \x01(\x08H\x00R\x0c\x61vailableNow\x12\x14\n\x04once\x18\x07 \x01(\x08H\x00R\x04once\x12\x46\n\x1e\x63ontinuous_checkpoint_interval\x18\x08 \x01(\tH\x00R\x1c\x63ontinuousCheckpointInterval\x12\x1f\n\x0boutput_mode\x18\t \x01(\tR\noutputMode\x12\x1d\n\nquery_name\x18\n \x01(\tR\tqueryName\x12\x14\n\x04path\x18\x0b \x01(\tH\x01R\x04path\x12\x1f\n\ntable_name\x18\x0c \x01(\tH\x01R\ttableName\x12N\n\x0e\x66oreach_writer\x18\r \x01(\x0b\x32\'.spark.connect.StreamingForeachFunctionR\rforeachWriter\x12L\n\rforeach_batch\x18\x0e \x01(\x0b\x32\'.spark.connect.StreamingForeachFunctionR\x0c\x66oreachBatch\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\t\n\x07triggerB\x12\n\x10sink_destination"\xb3\x01\n\x18StreamingForeachFunction\x12\x43\n\x0fpython_function\x18\x01 \x01(\x0b\x32\x18.spark.connect.PythonUDFH\x00R\x0epythonFunction\x12\x46\n\x0escala_function\x18\x02 \x01(\x0b\x32\x1d.spark.connect.ScalarScalaUDFH\x00R\rscalaFunctionB\n\n\x08\x66unction"y\n\x1fWriteStreamOperationStartResult\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12\x12\n\x04name\x18\x02 \x01(\tR\x04name"A\n\x18StreamingQueryInstanceId\x12\x0e\n\x02id\x18\x01 \x01(\tR\x02id\x12\x15\n\x06run_id\x18\x02 \x01(\tR\x05runId"\xf8\x04\n\x15StreamingQueryCommand\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12\x18\n\x06status\x18\x02 \x01(\x08H\x00R\x06status\x12%\n\rlast_progress\x18\x03 \x01(\x08H\x00R\x0clastProgress\x12)\n\x0frecent_progress\x18\x04 \x01(\x08H\x00R\x0erecentProgress\x12\x14\n\x04stop\x18\x05 \x01(\x08H\x00R\x04stop\x12\x34\n\x15process_all_available\x18\x06 \x01(\x08H\x00R\x13processAllAvailable\x12O\n\x07\x65xplain\x18\x07 \x01(\x0b\x32\x33.spark.connect.StreamingQueryCommand.ExplainCommandH\x00R\x07\x65xplain\x12\x1e\n\texception\x18\x08 \x01(\x08H\x00R\texception\x12k\n\x11\x61wait_termination\x18\t \x01(\x0b\x32<.spark.connect.StreamingQueryCommand.AwaitTerminationCommandH\x00R\x10\x61waitTermination\x1a,\n\x0e\x45xplainCommand\x12\x1a\n\x08\x65xtended\x18\x01 \x01(\x08R\x08\x65xtended\x1aL\n\x17\x41waitTerminationCommand\x12"\n\ntimeout_ms\x18\x02 \x01(\x03H\x00R\ttimeoutMs\x88\x01\x01\x42\r\n\x0b_timeout_msB\t\n\x07\x63ommand"\xf5\x08\n\x1bStreamingQueryCommandResult\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12Q\n\x06status\x18\x02 \x01(\x0b\x32\x37.spark.connect.StreamingQueryCommandResult.StatusResultH\x00R\x06status\x12j\n\x0frecent_progress\x18\x03 \x01(\x0b\x32?.spark.connect.StreamingQueryCommandResult.RecentProgressResultH\x00R\x0erecentProgress\x12T\n\x07\x65xplain\x18\x04 \x01(\x0b\x32\x38.spark.connect.StreamingQueryCommandResult.ExplainResultH\x00R\x07\x65xplain\x12Z\n\texception\x18\x05 \x01(\x0b\x32:.spark.connect.StreamingQueryCommandResult.ExceptionResultH\x00R\texception\x12p\n\x11\x61wait_termination\x18\x06 \x01(\x0b\x32\x41.spark.connect.StreamingQueryCommandResult.AwaitTerminationResultH\x00R\x10\x61waitTermination\x1a\xaa\x01\n\x0cStatusResult\x12%\n\x0estatus_message\x18\x01 \x01(\tR\rstatusMessage\x12*\n\x11is_data_available\x18\x02 \x01(\x08R\x0fisDataAvailable\x12*\n\x11is_trigger_active\x18\x03 \x01(\x08R\x0fisTriggerActive\x12\x1b\n\tis_active\x18\x04 \x01(\x08R\x08isActive\x1aH\n\x14RecentProgressResult\x12\x30\n\x14recent_progress_json\x18\x05 \x03(\tR\x12recentProgressJson\x1a\'\n\rExplainResult\x12\x16\n\x06result\x18\x01 \x01(\tR\x06result\x1a\xc5\x01\n\x0f\x45xceptionResult\x12\x30\n\x11\x65xception_message\x18\x01 \x01(\tH\x00R\x10\x65xceptionMessage\x88\x01\x01\x12$\n\x0b\x65rror_class\x18\x02 \x01(\tH\x01R\nerrorClass\x88\x01\x01\x12$\n\x0bstack_trace\x18\x03 \x01(\tH\x02R\nstackTrace\x88\x01\x01\x42\x14\n\x12_exception_messageB\x0e\n\x0c_error_classB\x0e\n\x0c_stack_trace\x1a\x38\n\x16\x41waitTerminationResult\x12\x1e\n\nterminated\x18\x01 \x01(\x08R\nterminatedB\r\n\x0bresult_type"\xde\x02\n\x1cStreamingQueryManagerCommand\x12\x18\n\x06\x61\x63tive\x18\x01 \x01(\x08H\x00R\x06\x61\x63tive\x12\x1d\n\tget_query\x18\x02 \x01(\tH\x00R\x08getQuery\x12|\n\x15\x61wait_any_termination\x18\x03 \x01(\x0b\x32\x46.spark.connect.StreamingQueryManagerCommand.AwaitAnyTerminationCommandH\x00R\x13\x61waitAnyTermination\x12+\n\x10reset_terminated\x18\x04 \x01(\x08H\x00R\x0fresetTerminated\x1aO\n\x1a\x41waitAnyTerminationCommand\x12"\n\ntimeout_ms\x18\x01 \x01(\x03H\x00R\ttimeoutMs\x88\x01\x01\x42\r\n\x0b_timeout_msB\t\n\x07\x63ommand"\xd3\x05\n"StreamingQueryManagerCommandResult\x12X\n\x06\x61\x63tive\x18\x01 \x01(\x0b\x32>.spark.connect.StreamingQueryManagerCommandResult.ActiveResultH\x00R\x06\x61\x63tive\x12`\n\x05query\x18\x02 \x01(\x0b\x32H.spark.connect.StreamingQueryManagerCommandResult.StreamingQueryInstanceH\x00R\x05query\x12\x81\x01\n\x15\x61wait_any_termination\x18\x03 \x01(\x0b\x32K.spark.connect.StreamingQueryManagerCommandResult.AwaitAnyTerminationResultH\x00R\x13\x61waitAnyTermination\x12+\n\x10reset_terminated\x18\x04 \x01(\x08H\x00R\x0fresetTerminated\x1a\x7f\n\x0c\x41\x63tiveResult\x12o\n\x0e\x61\x63tive_queries\x18\x01 \x03(\x0b\x32H.spark.connect.StreamingQueryManagerCommandResult.StreamingQueryInstanceR\ractiveQueries\x1as\n\x16StreamingQueryInstance\x12\x37\n\x02id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x02id\x12\x17\n\x04name\x18\x02 \x01(\tH\x00R\x04name\x88\x01\x01\x42\x07\n\x05_name\x1a;\n\x19\x41waitAnyTerminationResult\x12\x1e\n\nterminated\x18\x01 \x01(\x08R\nterminatedB\r\n\x0bresult_type"\x15\n\x13GetResourcesCommand"\xd4\x01\n\x19GetResourcesCommandResult\x12U\n\tresources\x18\x01 \x03(\x0b\x32\x37.spark.connect.GetResourcesCommandResult.ResourcesEntryR\tresources\x1a`\n\x0eResourcesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x38\n\x05value\x18\x02 \x01(\x0b\x32".spark.connect.ResourceInformationR\x05value:\x02\x38\x01\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' + b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto"\x86\x07\n\x07\x43ommand\x12]\n\x11register_function\x18\x01 \x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02 \x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x18\x03 \x01(\x0b\x32).spark.connect.CreateDataFrameViewCommandH\x00R\x13\x63reateDataframeView\x12O\n\x12write_operation_v2\x18\x04 \x01(\x0b\x32\x1f.spark.connect.WriteOperationV2H\x00R\x10writeOperationV2\x12<\n\x0bsql_command\x18\x05 \x01(\x0b\x32\x19.spark.connect.SqlCommandH\x00R\nsqlCommand\x12k\n\x1cwrite_stream_operation_start\x18\x06 \x01(\x0b\x32(.spark.connect.WriteStreamOperationStartH\x00R\x19writeStreamOperationStart\x12^\n\x17streaming_query_command\x18\x07 \x01(\x0b\x32$.spark.connect.StreamingQueryCommandH\x00R\x15streamingQueryCommand\x12X\n\x15get_resources_command\x18\x08 \x01(\x0b\x32".spark.connect.GetResourcesCommandH\x00R\x13getResourcesCommand\x12t\n\x1fstreaming_query_manager_command\x18\t \x01(\x0b\x32+.spark.connect.StreamingQueryManagerCommandH\x00R\x1cstreamingQueryManagerCommand\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textensionB\x0e\n\x0c\x63ommand_type"\xf1\x01\n\nSqlCommand\x12\x10\n\x03sql\x18\x01 \x01(\tR\x03sql\x12\x37\n\x04\x61rgs\x18\x02 \x03(\x0b\x32#.spark.connect.SqlCommand.ArgsEntryR\x04\x61rgs\x12<\n\x08pos_args\x18\x03 \x03(\x0b\x32!.spark.connect.Expression.LiteralR\x07posArgs\x1aZ\n\tArgsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x37\n\x05value\x18\x02 \x01(\x0b\x32!.spark.connect.Expression.LiteralR\x05value:\x02\x38\x01"\x96\x01\n\x1a\x43reateDataFrameViewCommand\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x12\n\x04name\x18\x02 \x01(\tR\x04name\x12\x1b\n\tis_global\x18\x03 \x01(\x08R\x08isGlobal\x12\x18\n\x07replace\x18\x04 \x01(\x08R\x07replace"\x9b\x08\n\x0eWriteOperation\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x1b\n\x06source\x18\x02 \x01(\tH\x01R\x06source\x88\x01\x01\x12\x14\n\x04path\x18\x03 \x01(\tH\x00R\x04path\x12?\n\x05table\x18\x04 \x01(\x0b\x32\'.spark.connect.WriteOperation.SaveTableH\x00R\x05table\x12:\n\x04mode\x18\x05 \x01(\x0e\x32&.spark.connect.WriteOperation.SaveModeR\x04mode\x12*\n\x11sort_column_names\x18\x06 \x03(\tR\x0fsortColumnNames\x12\x31\n\x14partitioning_columns\x18\x07 \x03(\tR\x13partitioningColumns\x12\x43\n\tbucket_by\x18\x08 \x01(\x0b\x32&.spark.connect.WriteOperation.BucketByR\x08\x62ucketBy\x12\x44\n\x07options\x18\t \x03(\x0b\x32*.spark.connect.WriteOperation.OptionsEntryR\x07options\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a\x82\x02\n\tSaveTable\x12\x1d\n\ntable_name\x18\x01 \x01(\tR\ttableName\x12X\n\x0bsave_method\x18\x02 \x01(\x0e\x32\x37.spark.connect.WriteOperation.SaveTable.TableSaveMethodR\nsaveMethod"|\n\x0fTableSaveMethod\x12!\n\x1dTABLE_SAVE_METHOD_UNSPECIFIED\x10\x00\x12#\n\x1fTABLE_SAVE_METHOD_SAVE_AS_TABLE\x10\x01\x12!\n\x1dTABLE_SAVE_METHOD_INSERT_INTO\x10\x02\x1a[\n\x08\x42ucketBy\x12.\n\x13\x62ucket_column_names\x18\x01 \x03(\tR\x11\x62ucketColumnNames\x12\x1f\n\x0bnum_buckets\x18\x02 \x01(\x05R\nnumBuckets"\x89\x01\n\x08SaveMode\x12\x19\n\x15SAVE_MODE_UNSPECIFIED\x10\x00\x12\x14\n\x10SAVE_MODE_APPEND\x10\x01\x12\x17\n\x13SAVE_MODE_OVERWRITE\x10\x02\x12\x1d\n\x19SAVE_MODE_ERROR_IF_EXISTS\x10\x03\x12\x14\n\x10SAVE_MODE_IGNORE\x10\x04\x42\x0b\n\tsave_typeB\t\n\x07_source"\xad\x06\n\x10WriteOperationV2\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x1d\n\ntable_name\x18\x02 \x01(\tR\ttableName\x12\x1f\n\x08provider\x18\x03 \x01(\tH\x00R\x08provider\x88\x01\x01\x12L\n\x14partitioning_columns\x18\x04 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x13partitioningColumns\x12\x46\n\x07options\x18\x05 \x03(\x0b\x32,.spark.connect.WriteOperationV2.OptionsEntryR\x07options\x12_\n\x10table_properties\x18\x06 \x03(\x0b\x32\x34.spark.connect.WriteOperationV2.TablePropertiesEntryR\x0ftableProperties\x12\x38\n\x04mode\x18\x07 \x01(\x0e\x32$.spark.connect.WriteOperationV2.ModeR\x04mode\x12J\n\x13overwrite_condition\x18\x08 \x01(\x0b\x32\x19.spark.connect.ExpressionR\x12overwriteCondition\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01"\x9f\x01\n\x04Mode\x12\x14\n\x10MODE_UNSPECIFIED\x10\x00\x12\x0f\n\x0bMODE_CREATE\x10\x01\x12\x12\n\x0eMODE_OVERWRITE\x10\x02\x12\x1d\n\x19MODE_OVERWRITE_PARTITIONS\x10\x03\x12\x0f\n\x0bMODE_APPEND\x10\x04\x12\x10\n\x0cMODE_REPLACE\x10\x05\x12\x1a\n\x16MODE_CREATE_OR_REPLACE\x10\x06\x42\x0b\n\t_provider"\xa0\x06\n\x19WriteStreamOperationStart\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x16\n\x06\x66ormat\x18\x02 \x01(\tR\x06\x66ormat\x12O\n\x07options\x18\x03 \x03(\x0b\x32\x35.spark.connect.WriteStreamOperationStart.OptionsEntryR\x07options\x12:\n\x19partitioning_column_names\x18\x04 \x03(\tR\x17partitioningColumnNames\x12:\n\x18processing_time_interval\x18\x05 \x01(\tH\x00R\x16processingTimeInterval\x12%\n\ravailable_now\x18\x06 \x01(\x08H\x00R\x0c\x61vailableNow\x12\x14\n\x04once\x18\x07 \x01(\x08H\x00R\x04once\x12\x46\n\x1e\x63ontinuous_checkpoint_interval\x18\x08 \x01(\tH\x00R\x1c\x63ontinuousCheckpointInterval\x12\x1f\n\x0boutput_mode\x18\t \x01(\tR\noutputMode\x12\x1d\n\nquery_name\x18\n \x01(\tR\tqueryName\x12\x14\n\x04path\x18\x0b \x01(\tH\x01R\x04path\x12\x1f\n\ntable_name\x18\x0c \x01(\tH\x01R\ttableName\x12N\n\x0e\x66oreach_writer\x18\r \x01(\x0b\x32\'.spark.connect.StreamingForeachFunctionR\rforeachWriter\x12L\n\rforeach_batch\x18\x0e \x01(\x0b\x32\'.spark.connect.StreamingForeachFunctionR\x0c\x66oreachBatch\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\t\n\x07triggerB\x12\n\x10sink_destination"\xb3\x01\n\x18StreamingForeachFunction\x12\x43\n\x0fpython_function\x18\x01 \x01(\x0b\x32\x18.spark.connect.PythonUDFH\x00R\x0epythonFunction\x12\x46\n\x0escala_function\x18\x02 \x01(\x0b\x32\x1d.spark.connect.ScalarScalaUDFH\x00R\rscalaFunctionB\n\n\x08\x66unction"y\n\x1fWriteStreamOperationStartResult\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12\x12\n\x04name\x18\x02 \x01(\tR\x04name"A\n\x18StreamingQueryInstanceId\x12\x0e\n\x02id\x18\x01 \x01(\tR\x02id\x12\x15\n\x06run_id\x18\x02 \x01(\tR\x05runId"\xf8\x04\n\x15StreamingQueryCommand\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12\x18\n\x06status\x18\x02 \x01(\x08H\x00R\x06status\x12%\n\rlast_progress\x18\x03 \x01(\x08H\x00R\x0clastProgress\x12)\n\x0frecent_progress\x18\x04 \x01(\x08H\x00R\x0erecentProgress\x12\x14\n\x04stop\x18\x05 \x01(\x08H\x00R\x04stop\x12\x34\n\x15process_all_available\x18\x06 \x01(\x08H\x00R\x13processAllAvailable\x12O\n\x07\x65xplain\x18\x07 \x01(\x0b\x32\x33.spark.connect.StreamingQueryCommand.ExplainCommandH\x00R\x07\x65xplain\x12\x1e\n\texception\x18\x08 \x01(\x08H\x00R\texception\x12k\n\x11\x61wait_termination\x18\t \x01(\x0b\x32<.spark.connect.StreamingQueryCommand.AwaitTerminationCommandH\x00R\x10\x61waitTermination\x1a,\n\x0e\x45xplainCommand\x12\x1a\n\x08\x65xtended\x18\x01 \x01(\x08R\x08\x65xtended\x1aL\n\x17\x41waitTerminationCommand\x12"\n\ntimeout_ms\x18\x02 \x01(\x03H\x00R\ttimeoutMs\x88\x01\x01\x42\r\n\x0b_timeout_msB\t\n\x07\x63ommand"\xf5\x08\n\x1bStreamingQueryCommandResult\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12Q\n\x06status\x18\x02 \x01(\x0b\x32\x37.spark.connect.StreamingQueryCommandResult.StatusResultH\x00R\x06status\x12j\n\x0frecent_progress\x18\x03 \x01(\x0b\x32?.spark.connect.StreamingQueryCommandResult.RecentProgressResultH\x00R\x0erecentProgress\x12T\n\x07\x65xplain\x18\x04 \x01(\x0b\x32\x38.spark.connect.StreamingQueryCommandResult.ExplainResultH\x00R\x07\x65xplain\x12Z\n\texception\x18\x05 \x01(\x0b\x32:.spark.connect.StreamingQueryCommandResult.ExceptionResultH\x00R\texception\x12p\n\x11\x61wait_termination\x18\x06 \x01(\x0b\x32\x41.spark.connect.StreamingQueryCommandResult.AwaitTerminationResultH\x00R\x10\x61waitTermination\x1a\xaa\x01\n\x0cStatusResult\x12%\n\x0estatus_message\x18\x01 \x01(\tR\rstatusMessage\x12*\n\x11is_data_available\x18\x02 \x01(\x08R\x0fisDataAvailable\x12*\n\x11is_trigger_active\x18\x03 \x01(\x08R\x0fisTriggerActive\x12\x1b\n\tis_active\x18\x04 \x01(\x08R\x08isActive\x1aH\n\x14RecentProgressResult\x12\x30\n\x14recent_progress_json\x18\x05 \x03(\tR\x12recentProgressJson\x1a\'\n\rExplainResult\x12\x16\n\x06result\x18\x01 \x01(\tR\x06result\x1a\xc5\x01\n\x0f\x45xceptionResult\x12\x30\n\x11\x65xception_message\x18\x01 \x01(\tH\x00R\x10\x65xceptionMessage\x88\x01\x01\x12$\n\x0b\x65rror_class\x18\x02 \x01(\tH\x01R\nerrorClass\x88\x01\x01\x12$\n\x0bstack_trace\x18\x03 \x01(\tH\x02R\nstackTrace\x88\x01\x01\x42\x14\n\x12_exception_messageB\x0e\n\x0c_error_classB\x0e\n\x0c_stack_trace\x1a\x38\n\x16\x41waitTerminationResult\x12\x1e\n\nterminated\x18\x01 \x01(\x08R\nterminatedB\r\n\x0bresult_type"\xb9\x05\n\x1cStreamingQueryManagerCommand\x12\x18\n\x06\x61\x63tive\x18\x01 \x01(\x08H\x00R\x06\x61\x63tive\x12\x1d\n\tget_query\x18\x02 \x01(\tH\x00R\x08getQuery\x12|\n\x15\x61wait_any_termination\x18\x03 \x01(\x0b\x32\x46.spark.connect.StreamingQueryManagerCommand.AwaitAnyTerminationCommandH\x00R\x13\x61waitAnyTermination\x12+\n\x10reset_terminated\x18\x04 \x01(\x08H\x00R\x0fresetTerminated\x12n\n\x0c\x61\x64\x64_listener\x18\x05 \x01(\x0b\x32I.spark.connect.StreamingQueryManagerCommand.StreamingQueryListenerCommandH\x00R\x0b\x61\x64\x64Listener\x12t\n\x0fremove_listener\x18\x06 \x01(\x0b\x32I.spark.connect.StreamingQueryManagerCommand.StreamingQueryListenerCommandH\x00R\x0eremoveListener\x12\'\n\x0elist_listeners\x18\x07 \x01(\x08H\x00R\rlistListeners\x1aO\n\x1a\x41waitAnyTerminationCommand\x12"\n\ntimeout_ms\x18\x01 \x01(\x03H\x00R\ttimeoutMs\x88\x01\x01\x42\r\n\x0b_timeout_ms\x1aJ\n\x1dStreamingQueryListenerCommand\x12)\n\x10listener_payload\x18\x01 \x01(\x0cR\x0flistenerPayloadB\t\n\x07\x63ommand"\x82\t\n"StreamingQueryManagerCommandResult\x12X\n\x06\x61\x63tive\x18\x01 \x01(\x0b\x32>.spark.connect.StreamingQueryManagerCommandResult.ActiveResultH\x00R\x06\x61\x63tive\x12`\n\x05query\x18\x02 \x01(\x0b\x32H.spark.connect.StreamingQueryManagerCommandResult.StreamingQueryInstanceH\x00R\x05query\x12\x81\x01\n\x15\x61wait_any_termination\x18\x03 \x01(\x0b\x32K.spark.connect.StreamingQueryManagerCommandResult.AwaitAnyTerminationResultH\x00R\x13\x61waitAnyTermination\x12+\n\x10reset_terminated\x18\x04 \x01(\x08H\x00R\x0fresetTerminated\x12#\n\x0c\x61\x64\x64_listener\x18\x05 \x01(\x08H\x00R\x0b\x61\x64\x64Listener\x12)\n\x0fremove_listener\x18\x06 \x01(\x08H\x00R\x0eremoveListener\x12{\n\x0elist_listeners\x18\x07 \x01(\x0b\x32R.spark.connect.StreamingQueryManagerCommandResult.ListStreamingQueryListenerResultH\x00R\rlistListeners\x1a\x7f\n\x0c\x41\x63tiveResult\x12o\n\x0e\x61\x63tive_queries\x18\x01 \x03(\x0b\x32H.spark.connect.StreamingQueryManagerCommandResult.StreamingQueryInstanceR\ractiveQueries\x1as\n\x16StreamingQueryInstance\x12\x37\n\x02id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x02id\x12\x17\n\x04name\x18\x02 \x01(\tH\x00R\x04name\x88\x01\x01\x42\x07\n\x05_name\x1a;\n\x19\x41waitAnyTerminationResult\x12\x1e\n\nterminated\x18\x01 \x01(\x08R\nterminated\x1aK\n\x1eStreamingQueryListenerInstance\x12)\n\x10listener_payload\x18\x01 \x01(\x0cR\x0flistenerPayload\x1a\x92\x01\n ListStreamingQueryListenerResult\x12n\n\tlisteners\x18\x01 \x03(\x0b\x32P.spark.connect.StreamingQueryManagerCommandResult.StreamingQueryListenerInstanceR\tlistenersB\r\n\x0bresult_type"\x15\n\x13GetResourcesCommand"\xd4\x01\n\x19GetResourcesCommandResult\x12U\n\tresources\x18\x01 \x03(\x0b\x32\x37.spark.connect.GetResourcesCommandResult.ResourcesEntryR\tresources\x1a`\n\x0eResourcesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x38\n\x05value\x18\x02 \x01(\x0b\x32".spark.connect.ResourceInformationR\x05value:\x02\x38\x01\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' ) _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) @@ -115,21 +115,27 @@ _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_start = 6219 _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_end = 6275 _STREAMINGQUERYMANAGERCOMMAND._serialized_start = 6293 - _STREAMINGQUERYMANAGERCOMMAND._serialized_end = 6643 - _STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND._serialized_start = 6553 - _STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND._serialized_end = 6632 - _STREAMINGQUERYMANAGERCOMMANDRESULT._serialized_start = 6646 - _STREAMINGQUERYMANAGERCOMMANDRESULT._serialized_end = 7369 - _STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT._serialized_start = 7049 - _STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT._serialized_end = 7176 - _STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE._serialized_start = 7178 - _STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE._serialized_end = 7293 - _STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT._serialized_start = 7295 - _STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT._serialized_end = 7354 - _GETRESOURCESCOMMAND._serialized_start = 7371 - _GETRESOURCESCOMMAND._serialized_end = 7392 - _GETRESOURCESCOMMANDRESULT._serialized_start = 7395 - _GETRESOURCESCOMMANDRESULT._serialized_end = 7607 - _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_start = 7511 - _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_end = 7607 + _STREAMINGQUERYMANAGERCOMMAND._serialized_end = 6990 + _STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND._serialized_start = 6824 + _STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND._serialized_end = 6903 + _STREAMINGQUERYMANAGERCOMMAND_STREAMINGQUERYLISTENERCOMMAND._serialized_start = 6905 + _STREAMINGQUERYMANAGERCOMMAND_STREAMINGQUERYLISTENERCOMMAND._serialized_end = 6979 + _STREAMINGQUERYMANAGERCOMMANDRESULT._serialized_start = 6993 + _STREAMINGQUERYMANAGERCOMMANDRESULT._serialized_end = 8147 + _STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT._serialized_start = 7601 + _STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT._serialized_end = 7728 + _STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE._serialized_start = 7730 + _STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE._serialized_end = 7845 + _STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT._serialized_start = 7847 + _STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT._serialized_end = 7906 + _STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYLISTENERINSTANCE._serialized_start = 7908 + _STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYLISTENERINSTANCE._serialized_end = 7983 + _STREAMINGQUERYMANAGERCOMMANDRESULT_LISTSTREAMINGQUERYLISTENERRESULT._serialized_start = 7986 + _STREAMINGQUERYMANAGERCOMMANDRESULT_LISTSTREAMINGQUERYLISTENERRESULT._serialized_end = 8132 + _GETRESOURCESCOMMAND._serialized_start = 8149 + _GETRESOURCESCOMMAND._serialized_end = 8170 + _GETRESOURCESCOMMANDRESULT._serialized_start = 8173 + _GETRESOURCESCOMMANDRESULT._serialized_end = 8385 + _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_start = 8289 + _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_end = 8385 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/commands_pb2.pyi b/python/pyspark/sql/connect/proto/commands_pb2.pyi index 2590139d3017d..fe472b3140dfe 100644 --- a/python/pyspark/sql/connect/proto/commands_pb2.pyi +++ b/python/pyspark/sql/connect/proto/commands_pb2.pyi @@ -1356,10 +1356,27 @@ class StreamingQueryManagerCommand(google.protobuf.message.Message): self, oneof_group: typing_extensions.Literal["_timeout_ms", b"_timeout_ms"] ) -> typing_extensions.Literal["timeout_ms"] | None: ... + class StreamingQueryListenerCommand(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + LISTENER_PAYLOAD_FIELD_NUMBER: builtins.int + listener_payload: builtins.bytes + def __init__( + self, + *, + listener_payload: builtins.bytes = ..., + ) -> None: ... + def ClearField( + self, field_name: typing_extensions.Literal["listener_payload", b"listener_payload"] + ) -> None: ... + ACTIVE_FIELD_NUMBER: builtins.int GET_QUERY_FIELD_NUMBER: builtins.int AWAIT_ANY_TERMINATION_FIELD_NUMBER: builtins.int RESET_TERMINATED_FIELD_NUMBER: builtins.int + ADD_LISTENER_FIELD_NUMBER: builtins.int + REMOVE_LISTENER_FIELD_NUMBER: builtins.int + LIST_LISTENERS_FIELD_NUMBER: builtins.int active: builtins.bool """active() API, returns a list of active queries.""" get_query: builtins.str @@ -1371,6 +1388,16 @@ class StreamingQueryManagerCommand(google.protobuf.message.Message): """awaitAnyTermination() API, wait until any query terminates or timeout.""" reset_terminated: builtins.bool """resetTerminated() API.""" + @property + def add_listener(self) -> global___StreamingQueryManagerCommand.StreamingQueryListenerCommand: + """addListener API.""" + @property + def remove_listener( + self, + ) -> global___StreamingQueryManagerCommand.StreamingQueryListenerCommand: + """removeListener API.""" + list_listeners: builtins.bool + """listListeners() API, returns a list of streaming query listeners.""" def __init__( self, *, @@ -1379,18 +1406,29 @@ class StreamingQueryManagerCommand(google.protobuf.message.Message): await_any_termination: global___StreamingQueryManagerCommand.AwaitAnyTerminationCommand | None = ..., reset_terminated: builtins.bool = ..., + add_listener: global___StreamingQueryManagerCommand.StreamingQueryListenerCommand + | None = ..., + remove_listener: global___StreamingQueryManagerCommand.StreamingQueryListenerCommand + | None = ..., + list_listeners: builtins.bool = ..., ) -> None: ... def HasField( self, field_name: typing_extensions.Literal[ "active", b"active", + "add_listener", + b"add_listener", "await_any_termination", b"await_any_termination", "command", b"command", "get_query", b"get_query", + "list_listeners", + b"list_listeners", + "remove_listener", + b"remove_listener", "reset_terminated", b"reset_terminated", ], @@ -1400,12 +1438,18 @@ class StreamingQueryManagerCommand(google.protobuf.message.Message): field_name: typing_extensions.Literal[ "active", b"active", + "add_listener", + b"add_listener", "await_any_termination", b"await_any_termination", "command", b"command", "get_query", b"get_query", + "list_listeners", + b"list_listeners", + "remove_listener", + b"remove_listener", "reset_terminated", b"reset_terminated", ], @@ -1413,7 +1457,13 @@ class StreamingQueryManagerCommand(google.protobuf.message.Message): def WhichOneof( self, oneof_group: typing_extensions.Literal["command", b"command"] ) -> typing_extensions.Literal[ - "active", "get_query", "await_any_termination", "reset_terminated" + "active", + "get_query", + "await_any_termination", + "reset_terminated", + "add_listener", + "remove_listener", + "list_listeners", ] | None: ... global___StreamingQueryManagerCommand = StreamingQueryManagerCommand @@ -1487,10 +1537,49 @@ class StreamingQueryManagerCommandResult(google.protobuf.message.Message): self, field_name: typing_extensions.Literal["terminated", b"terminated"] ) -> None: ... + class StreamingQueryListenerInstance(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + LISTENER_PAYLOAD_FIELD_NUMBER: builtins.int + listener_payload: builtins.bytes + def __init__( + self, + *, + listener_payload: builtins.bytes = ..., + ) -> None: ... + def ClearField( + self, field_name: typing_extensions.Literal["listener_payload", b"listener_payload"] + ) -> None: ... + + class ListStreamingQueryListenerResult(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + LISTENERS_FIELD_NUMBER: builtins.int + @property + def listeners( + self, + ) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[ + global___StreamingQueryManagerCommandResult.StreamingQueryListenerInstance + ]: ... + def __init__( + self, + *, + listeners: collections.abc.Iterable[ + global___StreamingQueryManagerCommandResult.StreamingQueryListenerInstance + ] + | None = ..., + ) -> None: ... + def ClearField( + self, field_name: typing_extensions.Literal["listeners", b"listeners"] + ) -> None: ... + ACTIVE_FIELD_NUMBER: builtins.int QUERY_FIELD_NUMBER: builtins.int AWAIT_ANY_TERMINATION_FIELD_NUMBER: builtins.int RESET_TERMINATED_FIELD_NUMBER: builtins.int + ADD_LISTENER_FIELD_NUMBER: builtins.int + REMOVE_LISTENER_FIELD_NUMBER: builtins.int + LIST_LISTENERS_FIELD_NUMBER: builtins.int @property def active(self) -> global___StreamingQueryManagerCommandResult.ActiveResult: ... @property @@ -1500,6 +1589,12 @@ class StreamingQueryManagerCommandResult(google.protobuf.message.Message): self, ) -> global___StreamingQueryManagerCommandResult.AwaitAnyTerminationResult: ... reset_terminated: builtins.bool + add_listener: builtins.bool + remove_listener: builtins.bool + @property + def list_listeners( + self, + ) -> global___StreamingQueryManagerCommandResult.ListStreamingQueryListenerResult: ... def __init__( self, *, @@ -1508,16 +1603,26 @@ class StreamingQueryManagerCommandResult(google.protobuf.message.Message): await_any_termination: global___StreamingQueryManagerCommandResult.AwaitAnyTerminationResult | None = ..., reset_terminated: builtins.bool = ..., + add_listener: builtins.bool = ..., + remove_listener: builtins.bool = ..., + list_listeners: global___StreamingQueryManagerCommandResult.ListStreamingQueryListenerResult + | None = ..., ) -> None: ... def HasField( self, field_name: typing_extensions.Literal[ "active", b"active", + "add_listener", + b"add_listener", "await_any_termination", b"await_any_termination", + "list_listeners", + b"list_listeners", "query", b"query", + "remove_listener", + b"remove_listener", "reset_terminated", b"reset_terminated", "result_type", @@ -1529,10 +1634,16 @@ class StreamingQueryManagerCommandResult(google.protobuf.message.Message): field_name: typing_extensions.Literal[ "active", b"active", + "add_listener", + b"add_listener", "await_any_termination", b"await_any_termination", + "list_listeners", + b"list_listeners", "query", b"query", + "remove_listener", + b"remove_listener", "reset_terminated", b"reset_terminated", "result_type", @@ -1542,7 +1653,13 @@ class StreamingQueryManagerCommandResult(google.protobuf.message.Message): def WhichOneof( self, oneof_group: typing_extensions.Literal["result_type", b"result_type"] ) -> typing_extensions.Literal[ - "active", "query", "await_any_termination", "reset_terminated" + "active", + "query", + "await_any_termination", + "reset_terminated", + "add_listener", + "remove_listener", + "list_listeners", ] | None: ... global___StreamingQueryManagerCommandResult = StreamingQueryManagerCommandResult diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index 5c0027895cda6..484ed0245ddf6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -34,7 +34,7 @@ import org.apache.spark.scheduler.SparkListenerEvent * @since 2.0.0 */ @Evolving -abstract class StreamingQueryListener { +abstract class StreamingQueryListener extends Serializable { import StreamingQueryListener._ @@ -106,7 +106,7 @@ private[spark] class PythonStreamingQueryListenerWrapper( * @since 2.0.0 */ @Evolving -object StreamingQueryListener { +object StreamingQueryListener extends Serializable { /** * Base type of [[StreamingQueryListener]] events @@ -128,7 +128,7 @@ object StreamingQueryListener { val id: UUID, val runId: UUID, val name: String, - val timestamp: String) extends Event { + val timestamp: String) extends Event with Serializable { def json: String = compact(render(jsonValue)) @@ -146,7 +146,8 @@ object StreamingQueryListener { * @since 2.1.0 */ @Evolving - class QueryProgressEvent private[sql](val progress: StreamingQueryProgress) extends Event { + class QueryProgressEvent private[sql](val progress: StreamingQueryProgress) extends Event + with Serializable { def json: String = compact(render(jsonValue)) @@ -165,7 +166,7 @@ object StreamingQueryListener { class QueryIdleEvent private[sql]( val id: UUID, val runId: UUID, - val timestamp: String) extends Event { + val timestamp: String) extends Event with Serializable { def json: String = compact(render(jsonValue)) @@ -195,7 +196,7 @@ object StreamingQueryListener { val id: UUID, val runId: UUID, val exception: Option[String], - val errorClassOnException: Option[String]) extends Event { + val errorClassOnException: Option[String]) extends Event with Serializable { // compatibility with versions in prior to 3.5.0 def this(id: UUID, runId: UUID, exception: Option[String]) = { this(id, runId, exception, None) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala index 09553cc2a9dc5..1b4c8dbd5c8d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala @@ -38,8 +38,8 @@ import org.apache.spark.util.kvstore.KVIndex * UI data for both active and inactive query. */ private[sql] class StreamingQueryStatusListener( - conf: SparkConf, - store: ElementTrackingStore) extends StreamingQueryListener { + @transient val conf: SparkConf, + @transient val store: ElementTrackingStore) extends StreamingQueryListener { private val streamingProgressRetention = conf.get(StaticSQLConf.STREAMING_UI_RETAINED_PROGRESS_UPDATES)