Skip to content

Commit

Permalink
[SPARK-50378][SS] Add custom metric for tracking spent for proc initi…
Browse files Browse the repository at this point in the history
…al state in transformWithState

### What changes were proposed in this pull request?
Add custom metric for tracking spent for proc initial state in transformWithState

### Why are the changes needed?
Adds tracking for time spent in populating initial state

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests

```
[info] Run completed in 2 minutes, 38 seconds.
[info] Total number of tests run: 22
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 22, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #48913 from anishshri-db/task/SPARK-50378.

Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
anishshri-db authored and HeartSaVioR committed Nov 22, 2024
1 parent 190c504 commit d3119fa
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,9 @@ case class TransformWithStateExec(
// operator specific metrics
override def customStatefulOperatorMetrics: Seq[StatefulOperatorCustomMetric] = {
Seq(
// metrics around initial state
StatefulOperatorCustomSumMetric("initialStateProcessingTimeMs",
"Number of milliseconds taken to process all initial state"),
// metrics around state variables
StatefulOperatorCustomSumMetric("numValueStateVars", "Number of value state variables"),
StatefulOperatorCustomSumMetric("numListStateVars", "Number of list state variables"),
Expand Down Expand Up @@ -655,6 +658,8 @@ case class TransformWithStateExec(
statefulProcessor.init(outputMode, timeMode)
processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)

val initialStateProcTimeMs = longMetric("initialStateProcessingTimeMs")
val initialStateStartTimeNs = System.nanoTime
// Check if is first batch
// Only process initial states for first batch
if (processorHandle.getQueryInfo().getBatchId == 0) {
Expand All @@ -667,6 +672,7 @@ case class TransformWithStateExec(
processInitialStateRows(keyRow.asInstanceOf[UnsafeRow], valueRowIter)
}
}
initialStateProcTimeMs += NANOSECONDS.toMillis(System.nanoTime - initialStateStartTimeNs)

processDataWithPartition(childDataIterator, store, processorHandle)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,10 @@ class TransformWithStateInitialStateSuite extends StateStoreMetricsTest
AddData(inputData, InitInputRow("k2", "update", 40.0)),
AddData(inputData, InitInputRow("non-exist", "getOption", -1.0)),
CheckNewAnswer(("non-exist", "getOption", -1.0)),
Execute { q =>
assert(q.lastProgress
.stateOperators(0).customMetrics.get("initialStateProcessingTimeMs") > 0)
},
AddData(inputData, InitInputRow("k1", "appendList", 37.0)),
AddData(inputData, InitInputRow("k2", "appendList", 40.0)),
AddData(inputData, InitInputRow("non-exist", "getList", -1.0)),
Expand Down Expand Up @@ -514,6 +518,10 @@ class TransformWithStateInitialStateSuite extends StateStoreMetricsTest
AdvanceManualClock(1 * 1000),
// registered timer for "a" and "b" is 6000, first batch is processed at ts = 1000
CheckNewAnswer(("c", "1")),
Execute { q =>
assert(q.lastProgress
.stateOperators(0).customMetrics.get("initialStateProcessingTimeMs") > 0)
},

AddData(inputData, "c"),
AdvanceManualClock(6 * 1000), // ts = 7000, "a" expires
Expand Down

0 comments on commit d3119fa

Please sign in to comment.