Skip to content

Commit

Permalink
puller(cdc): add metrics for slowest changefeed puller (pingcap#10054) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored and hicqu committed Nov 27, 2023
1 parent a26d8d9 commit 7242ec0
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 10 deletions.
18 changes: 18 additions & 0 deletions cdc/scheduler/internal/v3/replication/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,21 @@ var (
Name: "slow_table_region_count",
Help: "The number of regions captured by the slowest table",
}, []string{"namespace", "changefeed"})

slowestTablePullerResolvedTs = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "scheduler",
Name: "slow_table_puller_resolved_ts",
Help: "Puller Slowest ResolvedTs",
}, []string{"namespace", "changefeed"})
slowestTablePullerResolvedTsLag = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "scheduler",
Name: "slow_table_puller_resolved_ts_lag",
Help: "Puller Slowest ResolvedTs lag",
}, []string{"namespace", "changefeed"})
)

// InitMetrics registers all metrics used in scheduler
Expand All @@ -144,4 +159,7 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(slowestTableStageCheckpointTsLagHistogramVec)
registry.MustRegister(slowestTableStageResolvedTsLagHistogramVec)
registry.MustRegister(slowestTableRegionGaugeVec)

registry.MustRegister(slowestTablePullerResolvedTs)
registry.MustRegister(slowestTablePullerResolvedTsLag)
}
35 changes: 27 additions & 8 deletions cdc/scheduler/internal/v3/replication/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ type Manager struct { //nolint:revive
maxTaskConcurrency int

changefeedID model.ChangeFeedID
slowestTableID model.TableID
slowestPuller model.TableID
slowestSink model.TableID
slowTableHeap SetHeap
acceptAddTableTask int
acceptRemoveTableTask int
Expand Down Expand Up @@ -527,8 +528,11 @@ func (r *Manager) AdvanceCheckpoint(
}
}()

r.slowestPuller = model.TableID(0)
r.slowestSink = model.TableID(0)
var slowestPullerResolvedTs uint64 = math.MaxUint64

newCheckpointTs, newResolvedTs = math.MaxUint64, math.MaxUint64
slowestTableID := int64(0)
for _, tableID := range currentTables {
table, ok := r.tables[tableID]
if !ok {
Expand All @@ -551,14 +555,18 @@ func (r *Manager) AdvanceCheckpoint(
// Find the minimum checkpoint ts and resolved ts.
if newCheckpointTs > table.Checkpoint.CheckpointTs {
newCheckpointTs = table.Checkpoint.CheckpointTs
slowestTableID = tableID
r.slowestSink = tableID
}
if newResolvedTs > table.Checkpoint.ResolvedTs {
newResolvedTs = table.Checkpoint.ResolvedTs
}
}
if slowestTableID != 0 {
r.slowestTableID = slowestTableID
// Find the minimum puller resolved ts.
if pullerCkpt, ok := table.Stats.StageCheckpoints["puller-egress"]; ok {
if slowestPullerResolvedTs > pullerCkpt.ResolvedTs {
slowestPullerResolvedTs = pullerCkpt.ResolvedTs
r.slowestPuller = tableID
}
}
}

// If currentTables is empty, we should advance newResolvedTs to global barrier ts and
Expand Down Expand Up @@ -649,9 +657,9 @@ func (r *Manager) CollectMetrics() {
cf := r.changefeedID
tableGauge.
WithLabelValues(cf.Namespace, cf.ID).Set(float64(len(r.tables)))
if table, ok := r.tables[r.slowestTableID]; ok {
if table, ok := r.tables[r.slowestSink]; ok {
slowestTableIDGauge.
WithLabelValues(cf.Namespace, cf.ID).Set(float64(r.slowestTableID))
WithLabelValues(cf.Namespace, cf.ID).Set(float64(r.slowestSink))
slowestTableStateGauge.
WithLabelValues(cf.Namespace, cf.ID).Set(float64(table.State))
phyCkpTs := oracle.ExtractPhysical(table.Checkpoint.CheckpointTs)
Expand Down Expand Up @@ -732,6 +740,17 @@ func (r *Manager) CollectMetrics() {
WithLabelValues(cf.Namespace, cf.ID, ReplicationSetState(s).String()).
Set(float64(counter))
}

if table, ok := r.tables[r.slowestSink]; ok {
if pullerCkpt, ok := table.Stats.StageCheckpoints["puller-egress"]; ok {
phyCkptTs := oracle.ExtractPhysical(pullerCkpt.ResolvedTs)
slowestTablePullerResolvedTs.WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyCkptTs))

phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs)
lag := float64(phyCurrentTs-phyCkptTs) / 1e3
slowestTablePullerResolvedTsLag.WithLabelValues(cf.Namespace, cf.ID).Set(lag)
}
}
}

// CleanMetrics cleans metrics.
Expand Down
20 changes: 18 additions & 2 deletions metrics/grafana/ticdc.json
Original file line number Diff line number Diff line change
Expand Up @@ -3426,8 +3426,16 @@
"hide": false,
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{changefeed}}",
"legendFormat": "{{changefeed}}-barrier",
"refId": "C"
},
{
"exemplar": true,
"expr": "max(ticdc_scheduler_slow_table_puller_resolved_ts{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}) by (changefeed)",
"hide": false,
"interval": "",
"legendFormat": "{{changefeed}}-puller",
"refId": "B"
}
],
"thresholds": [],
Expand Down Expand Up @@ -3632,8 +3640,16 @@
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{changefeed}}",
"legendFormat": "{{changefeed}}-barrier",
"refId": "C"
},
{
"exemplar": true,
"expr": "max(ticdc_scheduler_slow_table_puller_resolved_ts_lag{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}) by (changefeed)",
"hide": false,
"interval": "",
"legendFormat": "{{changefeed}}-puller",
"refId": "A"
}
],
"thresholds": [],
Expand Down

0 comments on commit 7242ec0

Please sign in to comment.