Skip to content

Comments

[SPARK-44439][CONNECT][SS]Fixed listListeners to only send ids back to client#42013

Closed
bogao007 wants to merge 5 commits intoapache:masterfrom
bogao007:list-listener
Closed

[SPARK-44439][CONNECT][SS]Fixed listListeners to only send ids back to client#42013
bogao007 wants to merge 5 commits intoapache:masterfrom
bogao007:list-listener

Conversation

@bogao007
Copy link
Contributor

What changes were proposed in this pull request?

This PR fixes listListener that's being implemented in a previous PR that sends listener class payload to client. We will only send ids as references of listener classes to client side instead.

Why are the changes needed?

We should only return what users added previously in listListeners API. Only sending ids would be much more light-weighted.

Does this PR introduce any user-facing change?

yes

How was this patch tested?

Unit tested.
Also performed manual integration test

@ import scala.collection.mutable
import scala.collection.mutable

@ import org.apache.spark.sql.streaming.StreamingQueryProgress
import org.apache.spark.sql.streaming.StreamingQueryProgress

@ import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener

@ import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryIdleEvent, QueryStartedEvent, QueryTerminatedEvent}
import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryIdleEvent, QueryStartedEvent, QueryTerminatedEvent}

@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

@ class EventCollector extends StreamingQueryListener {
    @volatile var startEvent: QueryStartedEvent = null
    @volatile var terminationEvent: QueryTerminatedEvent = null
    @volatile var idleEvent: QueryIdleEvent = null

    private val _progressEvents = new mutable.Queue[StreamingQueryProgress]

    def progressEvents: Seq[StreamingQueryProgress] = _progressEvents.synchronized {
      _progressEvents.clone().toSeq
    }

    override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
      startEvent = event
      val spark = SparkSession.builder().getOrCreate()
      val df = spark.createDataFrame(Seq((1, 2), (4, 5)))
      df.write.saveAsTable("my_listener_table")
    }

    override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
      _progressEvents += event.progress
    }

    override def onQueryIdle(event: StreamingQueryListener.QueryIdleEvent): Unit = {
      idleEvent = event
    }

    override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
      terminationEvent = event
    }
  }
defined class EventCollector

@ spark.streams.listListeners()
res6: Array[StreamingQueryListener] = Array()

@ val listener = new EventCollector
listener: EventCollector = ammonite.$sess.cmd5$Helper$EventCollector@1cd4082a

@ spark.streams.addListener(listener)


@ spark.streams.listListeners()
res9: Array[StreamingQueryListener] = Array(ammonite.$sess.cmd5$Helper$EventCollector@1cd4082a)

@ spark.streams.listListeners()
res10: Array[StreamingQueryListener] = Array(ammonite.$sess.cmd5$Helper$EventCollector@1cd4082a)

@ val listener1 = new EventCollector
listener1: EventCollector = ammonite.$sess.cmd5$Helper$EventCollector@3c251258

@ spark.streams.addListener(listener1)


@ spark.streams.listListeners()
res13: Array[StreamingQueryListener] = Array(ammonite.$sess.cmd5$Helper$EventCollector@1cd4082a, ammonite.$sess.cmd5$Helper$EventCollector@3c251258)

@ spark.streams.listListeners()
res14: Array[StreamingQueryListener] = Array(ammonite.$sess.cmd5$Helper$EventCollector@1cd4082a, ammonite.$sess.cmd5$Helper$EventCollector@3c251258)

@ val q = spark.readStream.format("rate").load().writeStream.format("console").start()
q: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.streaming.RemoteStreamingQuery@42be0eca

@ q.stop()


@ spark.streams.listListeners()
res17: Array[StreamingQueryListener] = Array(ammonite.$sess.cmd5$Helper$EventCollector@1cd4082a, ammonite.$sess.cmd5$Helper$EventCollector@3c251258)

@ spark.table("my_listener_table").show()
23/07/14 16:41:36 INFO CodeGenerator: Code generated in 98.321167 ms
+---+---+
| _1| _2|
+---+---+
|  4|  5|
|  1|  2|
+---+---+

private[sql] class StreamingQueryStatusListener(
@transient val conf: SparkConf,
@transient val store: ElementTrackingStore) extends StreamingQueryListener {
val conf: SparkConf,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reverts the change in previous PR

Copy link

@rangadi rangadi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for removing a lot of code.
Suggested a change for protobuf field. LGTM with that.

@HyukjinKwon
Copy link
Member

Merged to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants