Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ trait ProgressReporter extends Logging {
assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != null)
currentTriggerEndTimestamp = triggerClock.getTimeMillis()

val executionStats = extractExecutionStats(hasNewData)
val executionStats = extractExecutionStats(hasNewData, hasExecuted)
val processingTimeMills = currentTriggerEndTimestamp - currentTriggerStartTimestamp
val processingTimeSec = Math.max(1L, processingTimeMills).toDouble / MILLIS_PER_SECOND

Expand Down Expand Up @@ -215,26 +215,26 @@ trait ProgressReporter extends Logging {
}

/** Extract statistics about stateful operators from the executed query plan. */
private def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = {
private def extractStateOperatorMetrics(hasExecuted: Boolean): Seq[StateOperatorProgress] = {
if (lastExecution == null) return Nil
// lastExecution could belong to one of the previous triggers if `!hasNewData`.
// lastExecution could belong to one of the previous triggers if `!hasExecuted`.
// Walking the plan again should be inexpensive.
lastExecution.executedPlan.collect {
case p if p.isInstanceOf[StateStoreWriter] =>
val progress = p.asInstanceOf[StateStoreWriter].getProgress()
if (hasNewData) progress else progress.copy(newNumRowsUpdated = 0)
if (hasExecuted) progress else progress.copy(newNumRowsUpdated = 0)
}
}

/** Extracts statistics from the most recent query execution. */
private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
private def extractExecutionStats(hasNewData: Boolean, hasExecuted: Boolean): ExecutionStats = {
val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty
val watermarkTimestamp =
if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
else Map.empty[String, String]

// SPARK-19378: Still report metrics even though no data was processed while reporting progress.
val stateOperators = extractStateOperatorMetrics(hasNewData)
val stateOperators = extractStateOperatorMetrics(hasExecuted)

if (!hasNewData) {
return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {
}
},
CheckNewAnswer(("c", "-1")),
assertNumStateRows(total = 0, updated = 0)
assertNumStateRows(total = 0, updated = 1)
Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Oct 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This represents the patch is correct fix, though total = 0, updated = 1 might make someone feeling odd. This is because FlatMapGroupsWithState counts deletion of state as "update", whereas we don't count it as "update" in native streaming aggregation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there other tests where we can check this fix as well?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Apr 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the change we encounter the issue is small, as the bug is affecting the metric only when no data batch updates the state row. This is not occurred in non-arbitrary stateful operations, because possible state update is only eviction in this case, and state row eviction is not counted as updates.

This can be occurred in arbitrary stateful operations as timer can be triggered for no data batch and the query can update/delete state row which would trigger update count. This test is verifying one of these cases (delete state), hence I didn't add the new test. If we would like to have another test for state func to update the state row on timeout I can do, but most of things would be redundant.

)
}

Expand Down