From 7e02bbb1f5c34329c00c8ee093c3121a6d8407b0 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 18 Oct 2016 01:06:18 -0700 Subject: [PATCH 1/5] Refactored APIs --- .../execution/streaming/StreamExecution.scala | 15 ++++++++------- .../streaming/StreamingQueryListenerBus.scala | 8 ++++---- .../streaming/StreamingQueryListener.scala | 18 +++++++++--------- .../spark/sql/streaming/StreamTest.scala | 12 ++++++------ .../StreamingQueryListenerSuite.scala | 19 ++++++++++--------- .../sql/streaming/StreamingQuerySuite.scala | 8 ++++---- 6 files changed, 41 insertions(+), 39 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 6330e0a911f4..627b87b8955d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -165,7 +165,7 @@ class StreamExecution( new Path(new Path(checkpointRoot), name).toUri.toString /** - * Starts the execution. This returns only after the thread has started and [[QueryStarted]] event + * Starts the execution. This returns only after the thread has started and [[QueryStartedEvent]] * has been posted to all the listeners. */ def start(): Unit = { @@ -177,9 +177,10 @@ class StreamExecution( /** * Repeatedly attempts to run batches as data arrives. * - * Note that this method ensures that [[QueryStarted]] and [[QueryTerminated]] events are posted - * such that listeners are guaranteed to get a start event before a termination. Furthermore, this - * method also ensures that [[QueryStarted]] event is posted before the `start()` method returns. + * Note that this method ensures that [[QueryStartedEvent]] and [[QueryTerminatedEvent]] are + * posted such that listeners are guaranteed to get a start event before a termination. + * Furthermore, this method also ensures that [[QueryStartedEvent]] event is posted before the + * `start()` method returns. */ private def runBatches(): Unit = { try { @@ -190,7 +191,7 @@ class StreamExecution( sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics) } updateStatus() - postEvent(new QueryStarted(currentStatus)) // Assumption: Does not throw exception. + postEvent(new QueryStartedEvent(currentStatus)) // Assumption: Does not throw exception. // Unblock starting thread startLatch.countDown() @@ -232,7 +233,7 @@ class StreamExecution( // Update metrics and notify others streamMetrics.reportTriggerFinished() updateStatus() - postEvent(new QueryProgress(currentStatus)) + postEvent(new QueryProgressEvent(currentStatus)) isTerminated }) } catch { @@ -260,7 +261,7 @@ class StreamExecution( // Notify others sparkSession.streams.notifyQueryTermination(StreamExecution.this) postEvent( - new QueryTerminated(currentStatus, exception.map(_.cause).map(Utils.exceptionString))) + new QueryTerminatedEvent(currentStatus, exception.map(_.cause).map(Utils.exceptionString))) terminationLatch.countDown() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index 1e663956f980..fc2190d39da4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -40,7 +40,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) */ def post(event: StreamingQueryListener.Event) { event match { - case s: QueryStarted => + case s: QueryStartedEvent => postToAll(s) case _ => sparkListenerBus.post(event) @@ -59,11 +59,11 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) listener: StreamingQueryListener, event: StreamingQueryListener.Event): Unit = { event match { - case queryStarted: QueryStarted => + case queryStarted: QueryStartedEvent => listener.onQueryStarted(queryStarted) - case queryProgress: QueryProgress => + case queryProgress: QueryProgressEvent => listener.onQueryProgress(queryProgress) - case queryTerminated: QueryTerminated => + case queryTerminated: QueryTerminatedEvent => listener.onQueryTerminated(queryTerminated) case _ => } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index 69790e33b216..a213a3c8896f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -41,7 +41,7 @@ abstract class StreamingQueryListener { * don't block this method as it will block your query. * @since 2.0.0 */ - def onQueryStarted(queryStarted: QueryStarted): Unit + def onQueryStarted(queryStarted: QueryStartedEvent): Unit /** * Called when there is some status update (ingestion rate updated, etc.) @@ -49,16 +49,16 @@ abstract class StreamingQueryListener { * @note This method is asynchronous. The status in [[StreamingQuery]] will always be * latest no matter when this method is called. Therefore, the status of [[StreamingQuery]] * may be changed before/when you process the event. E.g., you may find [[StreamingQuery]] - * is terminated when you are processing [[QueryProgress]]. + * is terminated when you are processing [[QueryProgressEvent]]. * @since 2.0.0 */ - def onQueryProgress(queryProgress: QueryProgress): Unit + def onQueryProgress(queryProgress: QueryProgressEvent): Unit /** * Called when a query is stopped, with or without error. * @since 2.0.0 */ - def onQueryTerminated(queryTerminated: QueryTerminated): Unit + def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit } @@ -84,7 +84,7 @@ object StreamingQueryListener { * @since 2.0.0 */ @Experimental - class QueryStarted private[sql](val queryStatus: StreamingQueryStatus) extends Event + class QueryStartedEvent private[sql](val status: StreamingQueryStatus) extends Event /** * :: Experimental :: @@ -92,19 +92,19 @@ object StreamingQueryListener { * @since 2.0.0 */ @Experimental - class QueryProgress private[sql](val queryStatus: StreamingQueryStatus) extends Event + class QueryProgressEvent private[sql](val status: StreamingQueryStatus) extends Event /** * :: Experimental :: * Event representing that termination of a query * - * @param queryStatus Information about the status of the query. + * @param status Information about the status of the query. * @param exception The exception message of the [[StreamingQuery]] if the query was terminated * with an exception. Otherwise, it will be `None`. * @since 2.0.0 */ @Experimental - class QueryTerminated private[sql]( - val queryStatus: StreamingQueryStatus, + class QueryTerminatedEvent private[sql]( + val status: StreamingQueryStatus, val exception: Option[String]) extends Event } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 04e20c7284d9..080edd1f94b7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -674,23 +674,23 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { } - override def onQueryStarted(queryStarted: QueryStarted): Unit = { + override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { asyncTestWaiter { - startStatus = queryStarted.queryStatus + startStatus = queryStarted.status } } - override def onQueryProgress(queryProgress: QueryProgress): Unit = { + override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { asyncTestWaiter { assert(startStatus != null, "onQueryProgress called before onQueryStarted") - synchronized { progressStatuses += queryProgress.queryStatus } + synchronized { progressStatuses += queryProgress.status } } } - override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = { + override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { asyncTestWaiter { assert(startStatus != null, "onQueryTerminated called before onQueryStarted") - terminationStatus = queryTerminated.queryStatus + terminationStatus = queryTerminated.status terminationException = queryTerminated.exception } asyncTestWaiter.dismiss() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 623f66a778ea..1c273e836c2f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -177,31 +177,32 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } test("QueryStarted serialization") { - val queryStarted = new StreamingQueryListener.QueryStarted(StreamingQueryStatus.testStatus) + val queryStarted = new StreamingQueryListener.QueryStartedEvent(StreamingQueryStatus.testStatus) val json = JsonProtocol.sparkEventToJson(queryStarted) val newQueryStarted = JsonProtocol.sparkEventFromJson(json) - .asInstanceOf[StreamingQueryListener.QueryStarted] - assertStreamingQueryInfoEquals(queryStarted.queryStatus, newQueryStarted.queryStatus) + .asInstanceOf[StreamingQueryListener.QueryStartedEvent] + assertStreamingQueryInfoEquals(queryStarted.status, newQueryStarted.status) } test("QueryProgress serialization") { - val queryProcess = new StreamingQueryListener.QueryProgress(StreamingQueryStatus.testStatus) + val queryProcess = new StreamingQueryListener.QueryProgressEvent( + StreamingQueryStatus.testStatus) val json = JsonProtocol.sparkEventToJson(queryProcess) val newQueryProcess = JsonProtocol.sparkEventFromJson(json) - .asInstanceOf[StreamingQueryListener.QueryProgress] - assertStreamingQueryInfoEquals(queryProcess.queryStatus, newQueryProcess.queryStatus) + .asInstanceOf[StreamingQueryListener.QueryProgressEvent] + assertStreamingQueryInfoEquals(queryProcess.status, newQueryProcess.status) } test("QueryTerminated serialization") { val exception = new RuntimeException("exception") - val queryQueryTerminated = new StreamingQueryListener.QueryTerminated( + val queryQueryTerminated = new StreamingQueryListener.QueryTerminatedEvent( StreamingQueryStatus.testStatus, Some(exception.getMessage)) val json = JsonProtocol.sparkEventToJson(queryQueryTerminated) val newQueryTerminated = JsonProtocol.sparkEventFromJson(json) - .asInstanceOf[StreamingQueryListener.QueryTerminated] - assertStreamingQueryInfoEquals(queryQueryTerminated.queryStatus, newQueryTerminated.queryStatus) + .asInstanceOf[StreamingQueryListener.QueryTerminatedEvent] + assertStreamingQueryInfoEquals(queryQueryTerminated.status, newQueryTerminated.status) assert(queryQueryTerminated.exception === newQueryTerminated.exception) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 9f8e2db96636..54aa242e11b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -290,11 +290,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { // A StreamingQueryListener that gets the query status after the first completed trigger val listener = new StreamingQueryListener { @volatile var firstStatus: StreamingQueryStatus = null - override def onQueryStarted(queryStarted: QueryStarted): Unit = { } - override def onQueryProgress(queryProgress: QueryProgress): Unit = { - if (firstStatus == null) firstStatus = queryProgress.queryStatus + override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { } + override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { + if (firstStatus == null) firstStatus = queryProgress.status } - override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = { } + override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { } } try { From 0fe56d2960fa53aeacadcc0edd08164899f9a9d6 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 18 Oct 2016 13:24:52 -0700 Subject: [PATCH 2/5] More refactoring --- .../spark/sql/streaming/StreamingQueryListener.scala | 10 +++++----- .../org/apache/spark/sql/streaming/StreamTest.scala | 4 ++-- .../sql/streaming/StreamingQueryListenerSuite.scala | 4 ++-- .../spark/sql/streaming/StreamingQuerySuite.scala | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index a213a3c8896f..c13afbdc409b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -41,7 +41,7 @@ abstract class StreamingQueryListener { * don't block this method as it will block your query. * @since 2.0.0 */ - def onQueryStarted(queryStarted: QueryStartedEvent): Unit + def onQueryStarted(event: QueryStartedEvent): Unit /** * Called when there is some status update (ingestion rate updated, etc.) @@ -52,13 +52,13 @@ abstract class StreamingQueryListener { * is terminated when you are processing [[QueryProgressEvent]]. * @since 2.0.0 */ - def onQueryProgress(queryProgress: QueryProgressEvent): Unit + def onQueryProgress(event: QueryProgressEvent): Unit /** * Called when a query is stopped, with or without error. * @since 2.0.0 */ - def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit + def onQueryTerminated(event: QueryTerminatedEvent): Unit } @@ -84,7 +84,7 @@ object StreamingQueryListener { * @since 2.0.0 */ @Experimental - class QueryStartedEvent private[sql](val status: StreamingQueryStatus) extends Event + class QueryStartedEvent private[sql](val queryStatus: StreamingQueryStatus) extends Event /** * :: Experimental :: @@ -92,7 +92,7 @@ object StreamingQueryListener { * @since 2.0.0 */ @Experimental - class QueryProgressEvent private[sql](val status: StreamingQueryStatus) extends Event + class QueryProgressEvent private[sql](val queryStatus: StreamingQueryStatus) extends Event /** * :: Experimental :: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 080edd1f94b7..f5128a3ebc06 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -676,14 +676,14 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { asyncTestWaiter { - startStatus = queryStarted.status + startStatus = queryStarted.queryStatus } } override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { asyncTestWaiter { assert(startStatus != null, "onQueryProgress called before onQueryStarted") - synchronized { progressStatuses += queryProgress.status } + synchronized { progressStatuses += queryProgress.queryStatus } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 1c273e836c2f..6bda03fdeef6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -181,7 +181,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { val json = JsonProtocol.sparkEventToJson(queryStarted) val newQueryStarted = JsonProtocol.sparkEventFromJson(json) .asInstanceOf[StreamingQueryListener.QueryStartedEvent] - assertStreamingQueryInfoEquals(queryStarted.status, newQueryStarted.status) + assertStreamingQueryInfoEquals(queryStarted.queryStatus, newQueryStarted.queryStatus) } test("QueryProgress serialization") { @@ -190,7 +190,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { val json = JsonProtocol.sparkEventToJson(queryProcess) val newQueryProcess = JsonProtocol.sparkEventFromJson(json) .asInstanceOf[StreamingQueryListener.QueryProgressEvent] - assertStreamingQueryInfoEquals(queryProcess.status, newQueryProcess.status) + assertStreamingQueryInfoEquals(queryProcess.queryStatus, newQueryProcess.queryStatus) } test("QueryTerminated serialization") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 54aa242e11b6..92020be9789f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -292,7 +292,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { @volatile var firstStatus: StreamingQueryStatus = null override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { } override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { - if (firstStatus == null) firstStatus = queryProgress.status + if (firstStatus == null) firstStatus = queryProgress.queryStatus } override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { } } From 9549e580f78ac17830e03c075bdc5445329c42c8 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 18 Oct 2016 13:50:16 -0700 Subject: [PATCH 3/5] MimaExcludes --- project/MimaExcludes.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index ddf53bbce65b..e384825341fb 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -796,7 +796,16 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryTerminated.queryInfo"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.SourceStatus.offsetDesc"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.SourceStatus.this"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.status") + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.status"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryStarted"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryStarted"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryStarted"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryProgress"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryProgress"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated") ) } From bb1b2344b7f9ab4df550254830af42e91a3790a3 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 18 Oct 2016 14:37:36 -0700 Subject: [PATCH 4/5] Added missingn comment --- project/MimaExcludes.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index e384825341fb..ee6e31a0ea50 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -788,6 +788,7 @@ object MimaExcludes { // SPARK-16240: ML persistence backward compatibility for LDA ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$") ) ++ Seq( + // [SPARK-17731][SQL][Streaming] Metrics for structured streaming ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryInfo"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryStarted.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryStarted.queryInfo"), From 4cac044cd2931748148cc3470ac78a0cb1341dd8 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 18 Oct 2016 14:59:44 -0700 Subject: [PATCH 5/5] Missed refactoring --- .../apache/spark/sql/streaming/StreamingQueryListener.scala | 4 ++-- .../scala/org/apache/spark/sql/streaming/StreamTest.scala | 2 +- .../spark/sql/streaming/StreamingQueryListenerSuite.scala | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index c13afbdc409b..9e311fae842b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -98,13 +98,13 @@ object StreamingQueryListener { * :: Experimental :: * Event representing that termination of a query * - * @param status Information about the status of the query. + * @param queryStatus Information about the status of the query. * @param exception The exception message of the [[StreamingQuery]] if the query was terminated * with an exception. Otherwise, it will be `None`. * @since 2.0.0 */ @Experimental class QueryTerminatedEvent private[sql]( - val status: StreamingQueryStatus, + val queryStatus: StreamingQueryStatus, val exception: Option[String]) extends Event } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index f5128a3ebc06..35db0f6707bf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -690,7 +690,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { asyncTestWaiter { assert(startStatus != null, "onQueryTerminated called before onQueryStarted") - terminationStatus = queryTerminated.status + terminationStatus = queryTerminated.queryStatus terminationException = queryTerminated.exception } asyncTestWaiter.dismiss() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 6bda03fdeef6..ff843865a017 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -202,7 +202,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { JsonProtocol.sparkEventToJson(queryQueryTerminated) val newQueryTerminated = JsonProtocol.sparkEventFromJson(json) .asInstanceOf[StreamingQueryListener.QueryTerminatedEvent] - assertStreamingQueryInfoEquals(queryQueryTerminated.status, newQueryTerminated.status) + assertStreamingQueryInfoEquals(queryQueryTerminated.queryStatus, newQueryTerminated.queryStatus) assert(queryQueryTerminated.exception === newQueryTerminated.exception) }