Skip to content

Commit

Permalink
owner(cdc): backport stale metrics fix to 5.3 (#5894)
Browse files Browse the repository at this point in the history
ref #4774
  • Loading branch information
liuzix committed Jun 16, 2022
1 parent 97e9cde commit dcb5b39
Showing 1 changed file with 15 additions and 0 deletions.
15 changes: 15 additions & 0 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ func (o *Owner) WriteDebugInfo(w io.Writer) {
// AsyncStop stops the owner asynchronously
func (o *Owner) AsyncStop() {
atomic.StoreInt32(&o.closed, 1)
o.cleanStaleMetrics()
}

func (o *Owner) cleanUpChangefeed(state *orchestrator.ChangefeedReactorState) {
Expand Down Expand Up @@ -273,6 +274,7 @@ func (o *Owner) cleanUpChangefeed(state *orchestrator.ChangefeedReactorState) {
// Bootstrap checks if the state contains incompatible or incorrect information and tries to fix it.
func (o *Owner) Bootstrap(state *orchestrator.GlobalReactorState) {
log.Info("Start bootstrapping", zap.Any("state", state))
o.cleanStaleMetrics()
fixChangefeedInfos(state)
}

Expand All @@ -291,6 +293,18 @@ func fixChangefeedInfos(state *orchestrator.GlobalReactorState) {
}
}

func (o *Owner) cleanStaleMetrics() {
// The gauge metrics of the Owner should be reset
// each time a new owner is launched, in case the previous owner
// has crashed and has not cleaned up the stale metrics values.
changefeedCheckpointTsGauge.Reset()
changefeedCheckpointTsLagGauge.Reset()
changefeedResolvedTsGauge.Reset()
changefeedResolvedTsLagGauge.Reset()
ownerMaintainTableNumGauge.Reset()
changefeedStatusGauge.Reset()
}

func (o *Owner) updateMetrics(state *orchestrator.GlobalReactorState) {
// Keep the value of prometheus expression `rate(counter)` = 1
// Please also change alert rule in ticdc.rules.yml when change the expression value.
Expand All @@ -300,6 +314,7 @@ func (o *Owner) updateMetrics(state *orchestrator.GlobalReactorState) {

ownerMaintainTableNumGauge.Reset()
changefeedStatusGauge.Reset()

for changefeedID, changefeedState := range state.Changefeeds {
for captureID, captureInfo := range state.Captures {
taskStatus, exist := changefeedState.TaskStatuses[captureID]
Expand Down

0 comments on commit dcb5b39

Please sign in to comment.