Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgress.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgress.queryInfo"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryTerminated.queryInfo"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryStarted"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryStarted"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryStarted"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryProgress"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryProgress"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"),

// [SPARK-17338][SQL] add global temp view
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.dropGlobalTempView"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class StreamExecution(
new Path(new Path(checkpointRoot), name).toUri.toString

/**
* Starts the execution. This returns only after the thread has started and [[QueryStarted]] event
* Starts the execution. This returns only after the thread has started and [[QueryStartedEvent]]
* has been posted to all the listeners.
*/
def start(): Unit = {
Expand All @@ -177,9 +177,10 @@ class StreamExecution(
/**
* Repeatedly attempts to run batches as data arrives.
*
* Note that this method ensures that [[QueryStarted]] and [[QueryTerminated]] events are posted
* such that listeners are guaranteed to get a start event before a termination. Furthermore, this
* method also ensures that [[QueryStarted]] event is posted before the `start()` method returns.
* Note that this method ensures that [[QueryStartedEvent]] and [[QueryTerminatedEvent]] are
* posted such that listeners are guaranteed to get a start event before a termination.
* Furthermore, this method also ensures that [[QueryStartedEvent]] event is posted before the
* `start()` method returns.
*/
private def runBatches(): Unit = {
try {
Expand All @@ -190,7 +191,7 @@ class StreamExecution(
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
}
updateStatus()
postEvent(new QueryStarted(currentStatus)) // Assumption: Does not throw exception.
postEvent(new QueryStartedEvent(currentStatus)) // Assumption: Does not throw exception.

// Unblock starting thread
startLatch.countDown()
Expand Down Expand Up @@ -232,7 +233,7 @@ class StreamExecution(
// Update metrics and notify others
streamMetrics.reportTriggerFinished()
updateStatus()
postEvent(new QueryProgress(currentStatus))
postEvent(new QueryProgressEvent(currentStatus))
isTerminated
})
} catch {
Expand Down Expand Up @@ -260,7 +261,7 @@ class StreamExecution(
// Notify others
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
postEvent(
new QueryTerminated(currentStatus, exception.map(_.cause).map(Utils.exceptionString)))
new QueryTerminatedEvent(currentStatus, exception.map(_.cause).map(Utils.exceptionString)))
terminationLatch.countDown()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
*/
def post(event: StreamingQueryListener.Event) {
event match {
case s: QueryStarted =>
case s: QueryStartedEvent =>
postToAll(s)
case _ =>
sparkListenerBus.post(event)
Expand All @@ -59,11 +59,11 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
listener: StreamingQueryListener,
event: StreamingQueryListener.Event): Unit = {
event match {
case queryStarted: QueryStarted =>
case queryStarted: QueryStartedEvent =>
listener.onQueryStarted(queryStarted)
case queryProgress: QueryProgress =>
case queryProgress: QueryProgressEvent =>
listener.onQueryProgress(queryProgress)
case queryTerminated: QueryTerminated =>
case queryTerminated: QueryTerminatedEvent =>
listener.onQueryTerminated(queryTerminated)
case _ =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,24 @@ abstract class StreamingQueryListener {
* don't block this method as it will block your query.
* @since 2.0.0
*/
def onQueryStarted(queryStarted: QueryStarted): Unit
def onQueryStarted(event: QueryStartedEvent): Unit

/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] will always be
* latest no matter when this method is called. Therefore, the status of [[StreamingQuery]]
* may be changed before/when you process the event. E.g., you may find [[StreamingQuery]]
* is terminated when you are processing [[QueryProgress]].
* is terminated when you are processing [[QueryProgressEvent]].
* @since 2.0.0
*/
def onQueryProgress(queryProgress: QueryProgress): Unit
def onQueryProgress(event: QueryProgressEvent): Unit

/**
* Called when a query is stopped, with or without error.
* @since 2.0.0
*/
def onQueryTerminated(queryTerminated: QueryTerminated): Unit
def onQueryTerminated(event: QueryTerminatedEvent): Unit
}


Expand All @@ -84,15 +84,15 @@ object StreamingQueryListener {
* @since 2.0.0
*/
@Experimental
class QueryStarted private[sql](val queryStatus: StreamingQueryStatus) extends Event
class QueryStartedEvent private[sql](val queryStatus: StreamingQueryStatus) extends Event

/**
* :: Experimental ::
* Event representing any progress updates in a query
* @since 2.0.0
*/
@Experimental
class QueryProgress private[sql](val queryStatus: StreamingQueryStatus) extends Event
class QueryProgressEvent private[sql](val queryStatus: StreamingQueryStatus) extends Event

/**
* :: Experimental ::
Expand All @@ -104,7 +104,7 @@ object StreamingQueryListener {
* @since 2.0.0
*/
@Experimental
class QueryTerminated private[sql](
class QueryTerminatedEvent private[sql](
val queryStatus: StreamingQueryStatus,
val exception: Option[String]) extends Event
}
Original file line number Diff line number Diff line change
Expand Up @@ -682,20 +682,20 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
}


override def onQueryStarted(queryStarted: QueryStarted): Unit = {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
asyncTestWaiter {
startStatus = queryStarted.queryStatus
}
}

override def onQueryProgress(queryProgress: QueryProgress): Unit = {
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
asyncTestWaiter {
assert(startStatus != null, "onQueryProgress called before onQueryStarted")
synchronized { progressStatuses += queryProgress.queryStatus }
}
}

override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = {
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
asyncTestWaiter {
assert(startStatus != null, "onQueryTerminated called before onQueryStarted")
terminationStatus = queryTerminated.queryStatus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,30 +177,31 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}

test("QueryStarted serialization") {
val queryStarted = new StreamingQueryListener.QueryStarted(StreamingQueryStatus.testStatus)
val queryStarted = new StreamingQueryListener.QueryStartedEvent(StreamingQueryStatus.testStatus)
val json = JsonProtocol.sparkEventToJson(queryStarted)
val newQueryStarted = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[StreamingQueryListener.QueryStarted]
.asInstanceOf[StreamingQueryListener.QueryStartedEvent]
assertStreamingQueryInfoEquals(queryStarted.queryStatus, newQueryStarted.queryStatus)
}

test("QueryProgress serialization") {
val queryProcess = new StreamingQueryListener.QueryProgress(StreamingQueryStatus.testStatus)
val queryProcess = new StreamingQueryListener.QueryProgressEvent(
StreamingQueryStatus.testStatus)
val json = JsonProtocol.sparkEventToJson(queryProcess)
val newQueryProcess = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[StreamingQueryListener.QueryProgress]
.asInstanceOf[StreamingQueryListener.QueryProgressEvent]
assertStreamingQueryInfoEquals(queryProcess.queryStatus, newQueryProcess.queryStatus)
}

test("QueryTerminated serialization") {
val exception = new RuntimeException("exception")
val queryQueryTerminated = new StreamingQueryListener.QueryTerminated(
val queryQueryTerminated = new StreamingQueryListener.QueryTerminatedEvent(
StreamingQueryStatus.testStatus,
Some(exception.getMessage))
val json =
JsonProtocol.sparkEventToJson(queryQueryTerminated)
val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[StreamingQueryListener.QueryTerminated]
.asInstanceOf[StreamingQueryListener.QueryTerminatedEvent]
assertStreamingQueryInfoEquals(queryQueryTerminated.queryStatus, newQueryTerminated.queryStatus)
assert(queryQueryTerminated.exception === newQueryTerminated.exception)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,11 +290,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
// A StreamingQueryListener that gets the query status after the first completed trigger
val listener = new StreamingQueryListener {
@volatile var firstStatus: StreamingQueryStatus = null
override def onQueryStarted(queryStarted: QueryStarted): Unit = { }
override def onQueryProgress(queryProgress: QueryProgress): Unit = {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { }
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
if (firstStatus == null) firstStatus = queryProgress.queryStatus
}
override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = { }
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { }
}

try {
Expand Down