diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index c563dd67ae8..348bbccbd9a 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -244,6 +244,7 @@ func (o *Owner) WriteDebugInfo(w io.Writer) { // AsyncStop stops the owner asynchronously func (o *Owner) AsyncStop() { atomic.StoreInt32(&o.closed, 1) + o.cleanStaleMetrics() } func (o *Owner) cleanUpChangefeed(state *orchestrator.ChangefeedReactorState) { @@ -273,6 +274,7 @@ func (o *Owner) cleanUpChangefeed(state *orchestrator.ChangefeedReactorState) { // Bootstrap checks if the state contains incompatible or incorrect information and tries to fix it. func (o *Owner) Bootstrap(state *orchestrator.GlobalReactorState) { log.Info("Start bootstrapping", zap.Any("state", state)) + o.cleanStaleMetrics() fixChangefeedInfos(state) } @@ -291,6 +293,18 @@ func fixChangefeedInfos(state *orchestrator.GlobalReactorState) { } } +func (o *Owner) cleanStaleMetrics() { + // The gauge metrics of the Owner should be reset + // each time a new owner is launched, in case the previous owner + // has crashed and has not cleaned up the stale metrics values. + changefeedCheckpointTsGauge.Reset() + changefeedCheckpointTsLagGauge.Reset() + changefeedResolvedTsGauge.Reset() + changefeedResolvedTsLagGauge.Reset() + ownerMaintainTableNumGauge.Reset() + changefeedStatusGauge.Reset() +} + func (o *Owner) updateMetrics(state *orchestrator.GlobalReactorState) { // Keep the value of prometheus expression `rate(counter)` = 1 // Please also change alert rule in ticdc.rules.yml when change the expression value. @@ -300,6 +314,7 @@ func (o *Owner) updateMetrics(state *orchestrator.GlobalReactorState) { ownerMaintainTableNumGauge.Reset() changefeedStatusGauge.Reset() + for changefeedID, changefeedState := range state.Changefeeds { for captureID, captureInfo := range state.Captures { taskStatus, exist := changefeedState.TaskStatuses[captureID]