Skip to content

[SPARK-44201][CONNECT][SS]Add support for Streaming Listener in Scala for Spark Connect#41752

Closed
bogao007 wants to merge 15 commits intoapache:masterfrom
bogao007:sc-streaming-listener
Closed

[SPARK-44201][CONNECT][SS]Add support for Streaming Listener in Scala for Spark Connect#41752
bogao007 wants to merge 15 commits intoapache:masterfrom
bogao007:sc-streaming-listener

Conversation

@bogao007
Copy link
Contributor

@bogao007 bogao007 commented Jun 27, 2023

What changes were proposed in this pull request?

Add support for Streaming Listener in Scala for Spark Connect

Why are the changes needed?

Streaming Listener is a key feature for monitoring Structured Streaming and we want it to be available in Spark Connect.

Does this PR introduce any user-facing change?

Yes

How was this patch tested?

Added unit test.

Also manual Tested:

  • Define the custom listener class and add it
@ import scala.collection.mutable
import scala.collection.mutable

@ import org.apache.spark.sql.streaming.StreamingQueryProgress
import org.apache.spark.sql.streaming.StreamingQueryProgress

@ import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener

@ import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryIdleEvent, QueryStartedEvent, QueryTerminatedEvent}
import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryIdleEvent, QueryStartedEvent, QueryTerminatedEvent}

@ class EventCollector extends StreamingQueryListener {
  @volatile var startEvent: QueryStartedEvent = null
  @volatile var terminationEvent: QueryTerminatedEvent = null
  @volatile var idleEvent: QueryIdleEvent = null

  private val _progressEvents = new mutable.Queue[StreamingQueryProgress]

  def progressEvents: Seq[StreamingQueryProgress] = _progressEvents.synchronized {
    _progressEvents.clone().toSeq
  }

  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
    startEvent = event
    val spark = SparkSession.builder.getOrCreate()
    val df = spark.createDataFrame(Seq((1, 2), (4, 5)))
    df.write.saveAsTable("listener_table")
  }

  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    _progressEvents += event.progress
  }

  override def onQueryIdle(event: StreamingQueryListener.QueryIdleEvent): Unit = {
    idleEvent = event
  }

  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
    terminationEvent = event
  }
}
defined class EventCollector

@ val listener = new EventCollector
listener: EventCollector = ammonite.$sess.cmd4$Helper$EventCollector@e6fbf82

@ spark.streams.addListener(listener)


@ val q = spark.readStream.format("rate").load().writeStream.format("console").start()
q: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.streaming.RemoteStreamingQuery@71eb4ea4

@ q.stop()
  • List and remove listeners
@ spark.streams.listListeners()
res9: Array[StreamingQueryListener] = Array(org.apache.spark.sql.streaming.ui.StreamingQueryStatusListener@6cd31e5f, ammonite.$sess.cmd4$Helper$EventCollector@57be0d93)

@ spark.streams.removeListener(listener)


@ spark.streams.listListeners()
res14: Array[StreamingQueryListener] = Array(org.apache.spark.sql.streaming.ui.StreamingQueryStatusListener@79b3f1c6)
  • Validate spark.write in listener
@ spark.table("listener_table").show()
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
23/07/11 11:19:02 INFO CodeGenerator: Code generated in 104.03775 ms
+---+---+
| _1| _2|
+---+---+
|  1|  2|
|  4|  5|
+---+---+

*/
@Evolving
abstract class StreamingQueryListener {
abstract class StreamingQueryListener extends Serializable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serializing the entire class requires to update the StreamingQueryListener on server side to extend Serializable

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Client side class is different from server side, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are the same, in order to serialize the class, we'll need to let both extend the Serializable

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Sure.

Copy link
Contributor

@vicennial vicennial left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove and list again, there's no difference since the class id has changed after serialization and deserialization

Spark internally uses reference equality checks to find and remove a specific listener object. The issue, in this case, is that when the StreamingQueryListener from the remove operation is deserialised on the server, it's a copy of the original instance and does not refer to the same object.

I see a couple of ways to address this:

  • For the list operation, instead of returning just the StreamingQueryListener instance, we can return a wrapper class with the instance + a UUID (OR modify the proto to include the UUID).
    • For the remove operation, we can use the UUID to refer to the listener.
    • On the server side, this means that we cache the mapping between UUID and the actual instance. This way, we can reuse the streams.remove() method without any intrusive changes.
  • We could modify the removeListener method in StreamingQueryManager to first find the matching StreamingQueryListener from a map of active listeners using an equality check to find the "original" instance
    • We might have to define custom equalTo and hashcode methods for the class and potentially add something a classUUID that is generated in the constructor. (Since deser does not go through the constructor, the desert object would retain the same classUUID)

Custom variables defined in EventCollector doesn't get serialized

The example is missing a cast to EventCollector. For example, the output that I get when casting:

@ val listener = new EventCollector
listener: EventCollector = ammonite.$sess.cmd4$Helper$EventCollector@63318b56

@ spark.streams.addListener(listener)

@ val q = spark.readStream.format("rate").load().writeStream.format("console").start()
q: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.streaming.RemoteStreamingQuery@b9c9bf5

@ q.stop()

@ print(listener.startEvent)
null

@ val currListener = spark.streams.listListeners()(1)
currListener: StreamingQueryListener = ammonite.$sess.cmd4$Helper$EventCollector@6fb2b972

@ print(currListener.asInstanceOf[EventCollector].startEvent)
org.apache.spark.sql.streaming.StreamingQueryListener$QueryStartedEvent@31887330

@WweiL
Copy link
Contributor

WweiL commented Jul 11, 2023

Thanks! I'll take a look later. Just a quick sanity check, can you test if you could do spark.write in the listener and in the client verify if the write is there? For example:

>>> class MyListener(ConnectStreamingQueryListener):
...     def onQueryStarted(self, event: QueryStartedEvent) -> None: 
             print("hi, event query id is: " +  str(event.id)); 
             df=spark.createDataFrame(["10","11","13"], "string").toDF("age"); 
            df.write.saveAsTable("tbllistener")

and do a spark.read.table() in client repl

@bogao007
Copy link
Contributor Author

Thanks! I'll take a look later. Just a quick sanity check, can you test if you could do spark.write in the listener and in the client verify if the write is there? For example:

>>> class MyListener(ConnectStreamingQueryListener):
...     def onQueryStarted(self, event: QueryStartedEvent) -> None: 
             print("hi, event query id is: " +  str(event.id)); 
             df=spark.createDataFrame(["10","11","13"], "string").toDF("age"); 
            df.write.saveAsTable("tbllistener")

and do a spark.read.table() in client repl

Sure, will do that, thanks!

*/
@Evolving
abstract class StreamingQueryListener {
abstract class StreamingQueryListener extends Serializable {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Client side class is different from server side, right?

.asInstanceOf[StreamingQueryListener]
val id: String = listenerPacket.id
sessionHolder.cacheListenerById(id, listener)
session.streams.addListener(listener)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, from this point on this listener is treated as legacy StreamingListener. I.e. it is not a Connect streaming listener. Is that fine? Wondering what happens if user tries to run a spark command inside the listener (added a comment about this in unit test).
For the first version here it might be fine, but I think we should have clearly defined TODO for this improvement. cc: @WweiL

Copy link
Contributor Author

@bogao007 bogao007 Jul 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you talking about the cache we introduced here? We'll have to do this for removeListener() to work properly. spark.write() inside listener is working as expected in my manual testing, will update the unit test to cover it.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache is ok. What happens if the users want to access spark session?
How do they do that? If we create a session and make it available, would that be a connect session or a legacy session?
We don't necessarily solve it in this PR. I think we should have detailed TODO here (and a follow up task).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, will add a TODO for that. What do we do for Python Listener @WweiL?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python is adding a new method on the listener to get the session (which would be a connect remote session).
This API might change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a TODO for this. Also created a Jira SPARK-44400. I might not have the permission for our streaming spark connect epic, not able to add the new task under it.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I moved it to the Epic.

@bogao007
Copy link
Contributor Author

bogao007 commented Jul 11, 2023

Thanks! I'll take a look later. Just a quick sanity check, can you test if you could do spark.write in the listener and in the client verify if the write is there? For example:

>>> class MyListener(ConnectStreamingQueryListener):
...     def onQueryStarted(self, event: QueryStartedEvent) -> None: 
             print("hi, event query id is: " +  str(event.id)); 
             df=spark.createDataFrame(["10","11","13"], "string").toDF("age"); 
            df.write.saveAsTable("tbllistener")

and do a spark.read.table() in client repl

Sure, will do that, thanks!

Updated the manual test and unit test to include the spark.write() case.

// StreamingQueryListener on server side. This is used by removeListener() to find the id
// of previously added StreamingQueryListener and pass it to server side to find the
// corresponding listener on server side.
private lazy val listenerCache: ConcurrentMap[StreamingQueryListener, String] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test case here that check's the behaviour when two instances of a StreamingQueryListener whose constructor values were the same are inserted into this Map?

Looking at the how the default hashes of an object are created here, different instances should have different hashes but they also specify the following:

This is typically implemented by converting the internal address of the object into an integer, but this implementation technique is not required by the JavaTM programming language.

Let's make sure that there aren't hash collisions in this case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I changed the client side cache to id -> listener mapping to make sure there's no hash collision, also updated the test case to cover this scenario. There were some issues when the same listener instance were added and removed multiple times, this change should fix this as well.

Copy link
Contributor

@zhenlineo zhenlineo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you enable compatibility test on the APIs you added for stream? Both in checkMiMaCompatibilityWithSqlModule and checkMiMaCompatibilityWithReversedSqlModule?

Right now we skipped a lot e.g. org.apache.spark.sql.streaming.ui, StreamingQueryListener etc. I would like to make sure that the API is compatible and if not, you did it on purpose.

@bogao007
Copy link
Contributor Author

Can you enable compatibility test on the APIs you added for stream? Both in checkMiMaCompatibilityWithSqlModule and checkMiMaCompatibilityWithReversedSqlModule?

Right now we skipped a lot e.g. org.apache.spark.sql.streaming.ui, StreamingQueryListener etc. I would like to make sure that the API is compatible and if not, you did it on purpose.

Sure, will do that

@rangadi
Copy link

rangadi commented Jul 13, 2023

@bogao007 could you add foreachBatch() API to that as well? I forgot in my PR.

@bogao007
Copy link
Contributor Author

@bogao007 could you add foreachBatch() API to that as well? I forgot in my PR.

Will do

@github-actions github-actions bot removed the CORE label Jul 13, 2023
@github-actions github-actions bot added the CORE label Jul 13, 2023
@bogao007
Copy link
Contributor Author

bogao007 commented Jul 13, 2023

Can you enable compatibility test on the APIs you added for stream? Both in checkMiMaCompatibilityWithSqlModule and checkMiMaCompatibilityWithReversedSqlModule?
Right now we skipped a lot e.g. org.apache.spark.sql.streaming.ui, StreamingQueryListener etc. I would like to make sure that the API is compatible and if not, you did it on purpose.

Sure, will do that

@zhenlineo Updated, could you help to take another look? Thanks! (commit)

@zhenlineo
Copy link
Contributor

LGTM

@HyukjinKwon
Copy link
Member

Merged to master.

asl3 pushed a commit to asl3/spark that referenced this pull request Jul 17, 2023
…a for Spark Connect

### What changes were proposed in this pull request?

Add support for Streaming Listener in Scala for Spark Connect

### Why are the changes needed?

Streaming Listener is a key feature for monitoring Structured Streaming and we want it to be available in Spark Connect.

### Does this PR introduce _any_ user-facing change?

Yes

### How was this patch tested?

Added unit test.

Also manual Tested:
- Define the custom listener class and add it
```
 import scala.collection.mutable
import scala.collection.mutable

 import org.apache.spark.sql.streaming.StreamingQueryProgress
import org.apache.spark.sql.streaming.StreamingQueryProgress

 import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener

 import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryIdleEvent, QueryStartedEvent, QueryTerminatedEvent}
import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryIdleEvent, QueryStartedEvent, QueryTerminatedEvent}

 class EventCollector extends StreamingQueryListener {
  volatile var startEvent: QueryStartedEvent = null
  volatile var terminationEvent: QueryTerminatedEvent = null
  volatile var idleEvent: QueryIdleEvent = null

  private val _progressEvents = new mutable.Queue[StreamingQueryProgress]

  def progressEvents: Seq[StreamingQueryProgress] = _progressEvents.synchronized {
    _progressEvents.clone().toSeq
  }

  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
    startEvent = event
    val spark = SparkSession.builder.getOrCreate()
    val df = spark.createDataFrame(Seq((1, 2), (4, 5)))
    df.write.saveAsTable("listener_table")
  }

  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    _progressEvents += event.progress
  }

  override def onQueryIdle(event: StreamingQueryListener.QueryIdleEvent): Unit = {
    idleEvent = event
  }

  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
    terminationEvent = event
  }
}
defined class EventCollector

 val listener = new EventCollector
listener: EventCollector = ammonite.$sess.cmd4$Helper$EventCollectore6fbf82

 spark.streams.addListener(listener)

 val q = spark.readStream.format("rate").load().writeStream.format("console").start()
q: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.streaming.RemoteStreamingQuery71eb4ea4

 q.stop()
```
- List and remove listeners
```
 spark.streams.listListeners()
res9: Array[StreamingQueryListener] = Array(org.apache.spark.sql.streaming.ui.StreamingQueryStatusListener6cd31e5f, ammonite.$sess.cmd4$Helper$EventCollector57be0d93)

 spark.streams.removeListener(listener)

 spark.streams.listListeners()
res14: Array[StreamingQueryListener] = Array(org.apache.spark.sql.streaming.ui.StreamingQueryStatusListener79b3f1c6)
```
- Validate `spark.write` in listener
```
 spark.table("listener_table").show()
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
23/07/11 11:19:02 INFO CodeGenerator: Code generated in 104.03775 ms
+---+---+
| _1| _2|
+---+---+
|  1|  2|
|  4|  5|
+---+---+
```

Closes apache#41752 from bogao007/sc-streaming-listener.

Authored-by: bogao007 <bo.gao@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon pushed a commit that referenced this pull request Jul 17, 2023
…to client

### What changes were proposed in this pull request?

This PR fixes listListener that's being implemented in a previous [PR](#41752) that sends listener class payload to client. We will only send ids as references of listener classes to client side instead.

### Why are the changes needed?

We should only return what users added previously in `listListeners` API. Only sending ids would be much more light-weighted.

### Does this PR introduce _any_ user-facing change?

yes

### How was this patch tested?

Unit tested.
Also performed manual integration test
```
 import scala.collection.mutable
import scala.collection.mutable

 import org.apache.spark.sql.streaming.StreamingQueryProgress
import org.apache.spark.sql.streaming.StreamingQueryProgress

 import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener

 import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryIdleEvent, QueryStartedEvent, QueryTerminatedEvent}
import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryIdleEvent, QueryStartedEvent, QueryTerminatedEvent}

 import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

 class EventCollector extends StreamingQueryListener {
    volatile var startEvent: QueryStartedEvent = null
    volatile var terminationEvent: QueryTerminatedEvent = null
    volatile var idleEvent: QueryIdleEvent = null

    private val _progressEvents = new mutable.Queue[StreamingQueryProgress]

    def progressEvents: Seq[StreamingQueryProgress] = _progressEvents.synchronized {
      _progressEvents.clone().toSeq
    }

    override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
      startEvent = event
      val spark = SparkSession.builder().getOrCreate()
      val df = spark.createDataFrame(Seq((1, 2), (4, 5)))
      df.write.saveAsTable("my_listener_table")
    }

    override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
      _progressEvents += event.progress
    }

    override def onQueryIdle(event: StreamingQueryListener.QueryIdleEvent): Unit = {
      idleEvent = event
    }

    override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
      terminationEvent = event
    }
  }
defined class EventCollector

 spark.streams.listListeners()
res6: Array[StreamingQueryListener] = Array()

 val listener = new EventCollector
listener: EventCollector = ammonite.$sess.cmd5$Helper$EventCollector1cd4082a

 spark.streams.addListener(listener)

 spark.streams.listListeners()
res9: Array[StreamingQueryListener] = Array(ammonite.$sess.cmd5$Helper$EventCollector1cd4082a)

 spark.streams.listListeners()
res10: Array[StreamingQueryListener] = Array(ammonite.$sess.cmd5$Helper$EventCollector1cd4082a)

 val listener1 = new EventCollector
listener1: EventCollector = ammonite.$sess.cmd5$Helper$EventCollector3c251258

 spark.streams.addListener(listener1)

 spark.streams.listListeners()
res13: Array[StreamingQueryListener] = Array(ammonite.$sess.cmd5$Helper$EventCollector1cd4082a, ammonite.$sess.cmd5$Helper$EventCollector3c251258)

 spark.streams.listListeners()
res14: Array[StreamingQueryListener] = Array(ammonite.$sess.cmd5$Helper$EventCollector1cd4082a, ammonite.$sess.cmd5$Helper$EventCollector3c251258)

 val q = spark.readStream.format("rate").load().writeStream.format("console").start()
q: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.streaming.RemoteStreamingQuery42be0eca

 q.stop()

 spark.streams.listListeners()
res17: Array[StreamingQueryListener] = Array(ammonite.$sess.cmd5$Helper$EventCollector1cd4082a, ammonite.$sess.cmd5$Helper$EventCollector3c251258)

 spark.table("my_listener_table").show()
23/07/14 16:41:36 INFO CodeGenerator: Code generated in 98.321167 ms
+---+---+
| _1| _2|
+---+---+
|  4|  5|
|  1|  2|
+---+---+
```

Closes #42013 from bogao007/list-listener.

Authored-by: bogao007 <bo.gao@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
dongjoon-hyun pushed a commit that referenced this pull request Dec 10, 2025
…rom `connect.StreamingQueryManager`

### What changes were proposed in this pull request?
#41752 introduced a `listenerCache` and related private methods (`cacheListenerById`, `getIdByListener`, and `removeCachedListener`) for `connect.StreamingQueryManager`. However, in #46287, the usage related to `listenerCache` was replaced by `streamingQueryListenerBus`. As a result, `listenerCache` and its associated private methods are no longer in use, and this current pr cleans them up.

### Why are the changes needed?
Code cleanup.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass Github Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #53420 from LuciferYang/StreamingQueryManager.

Lead-authored-by: yangjie01 <yangjie01@baidu.com>
Co-authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants

Comments