diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index d1086cd42d133..0dff1c2fe5768 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -154,7 +154,7 @@ trait ProgressReporter extends Logging { assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != null) currentTriggerEndTimestamp = triggerClock.getTimeMillis() - val executionStats = extractExecutionStats(hasNewData) + val executionStats = extractExecutionStats(hasNewData, hasExecuted) val processingTimeMills = currentTriggerEndTimestamp - currentTriggerStartTimestamp val processingTimeSec = Math.max(1L, processingTimeMills).toDouble / MILLIS_PER_SECOND @@ -215,26 +215,26 @@ trait ProgressReporter extends Logging { } /** Extract statistics about stateful operators from the executed query plan. */ - private def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = { + private def extractStateOperatorMetrics(hasExecuted: Boolean): Seq[StateOperatorProgress] = { if (lastExecution == null) return Nil - // lastExecution could belong to one of the previous triggers if `!hasNewData`. + // lastExecution could belong to one of the previous triggers if `!hasExecuted`. // Walking the plan again should be inexpensive. lastExecution.executedPlan.collect { case p if p.isInstanceOf[StateStoreWriter] => val progress = p.asInstanceOf[StateStoreWriter].getProgress() - if (hasNewData) progress else progress.copy(newNumRowsUpdated = 0) + if (hasExecuted) progress else progress.copy(newNumRowsUpdated = 0) } } /** Extracts statistics from the most recent query execution. */ - private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = { + private def extractExecutionStats(hasNewData: Boolean, hasExecuted: Boolean): ExecutionStats = { val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty val watermarkTimestamp = if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) else Map.empty[String, String] // SPARK-19378: Still report metrics even though no data was processed while reporting progress. - val stateOperators = extractStateOperatorMetrics(hasNewData) + val stateOperators = extractStateOperatorMetrics(hasExecuted) if (!hasNewData) { return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala index d36c64f61a726..b04f8b0d4d174 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala @@ -798,7 +798,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { } }, CheckNewAnswer(("c", "-1")), - assertNumStateRows(total = 0, updated = 0) + assertNumStateRows(total = 0, updated = 1) ) }