diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index ba77e7c7bf2b..b6a603fa8cee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.streaming._ +import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent import org.apache.spark.util.Clock /** @@ -55,6 +56,7 @@ trait ProgressReporter extends Logging { protected def streamExecutionMetadata: StreamExecutionMetadata protected def currentBatchId: Long protected def sparkSession: SparkSession + protected def postEvent(event: StreamingQueryListener.Event): Unit // Local timestamps and counters. private var currentTriggerStartTimestamp = -1L @@ -69,6 +71,12 @@ trait ProgressReporter extends Logging { /** Holds the most recent query progress updates. Accesses must lock on the queue itself. */ private val progressBuffer = new mutable.Queue[StreamingQueryProgress]() + private val noDataProgressEventInterval = + sparkSession.sessionState.conf.streamingNoDataProgressEventInterval + + // The timestamp we report an event that has no input data + private var lastNoDataProgressEventTime = Long.MinValue + @volatile protected var currentStatus: StreamingQueryStatus = { new StreamingQueryStatus( @@ -99,6 +107,17 @@ trait ProgressReporter extends Logging { currentDurationsMs.clear() } + private def updateProgress(newProgress: StreamingQueryProgress): Unit = { + progressBuffer.synchronized { + progressBuffer += newProgress + while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) { + progressBuffer.dequeue() + } + } + postEvent(new QueryProgressEvent(newProgress)) + logInfo(s"Streaming query made progress: $newProgress") + } + /** Finalizes the query progress and adds it to list of recent status updates. */ protected def finishTrigger(hasNewData: Boolean): Unit = { currentTriggerEndTimestamp = triggerClock.getTimeMillis() @@ -143,14 +162,18 @@ trait ProgressReporter extends Logging { sources = sourceProgress.toArray, sink = sinkProgress) - progressBuffer.synchronized { - progressBuffer += newProgress - while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) { - progressBuffer.dequeue() + if (hasNewData) { + // Reset noDataEventTimestamp if we processed any data + lastNoDataProgressEventTime = Long.MinValue + updateProgress(newProgress) + } else { + val now = triggerClock.getTimeMillis() + if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) { + lastNoDataProgressEventTime = now + updateProgress(newProgress) } } - logInfo(s"Streaming query made progress: $newProgress") currentStatus = currentStatus.copy(isTriggerActive = false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 6b1c01ab2a06..9c2bc29170dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -63,9 +63,6 @@ class StreamExecution( private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay - private val noDataProgressEventInterval = - sparkSession.sessionState.conf.streamingNoDataProgressEventInterval - /** * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation. */ @@ -199,9 +196,6 @@ class StreamExecution( // While active, repeatedly attempt to run batches. SparkSession.setActiveSession(sparkSession) - // The timestamp we report an event that has no input data - var lastNoDataProgressEventTime = Long.MinValue - triggerExecutor.execute(() => { startTrigger() @@ -224,18 +218,6 @@ class StreamExecution( // Report trigger as finished and construct progress object. finishTrigger(dataAvailable) - if (dataAvailable) { - // Reset noDataEventTimestamp if we processed any data - lastNoDataProgressEventTime = Long.MinValue - postEvent(new QueryProgressEvent(lastProgress)) - } else { - val now = triggerClock.getTimeMillis() - if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) { - lastNoDataProgressEventTime = now - postEvent(new QueryProgressEvent(lastProgress)) - } - } - if (dataAvailable) { // We'll increase currentBatchId after we complete processing current batch's data currentBatchId += 1 @@ -486,7 +468,7 @@ class StreamExecution( } } - private def postEvent(event: StreamingQueryListener.Event) { + override protected def postEvent(event: StreamingQueryListener.Event): Unit = { sparkSession.streams.postListenerEvent(event) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 3086abf03cd6..9b42cd91c1fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -225,6 +225,10 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } true } + // `recentProgresses` should not receive too many no data events + actions += AssertOnQuery { q => + q.recentProgresses.size > 1 && q.recentProgresses.size <= 11 + } testStream(input.toDS)(actions: _*) spark.sparkContext.listenerBus.waitUntilEmpty(10000) // 11 is the max value of the possible numbers of events.