Skip to content

Commit

Permalink
cdc: add slow table barrier ts
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Oct 13, 2022
1 parent 02f6abd commit 945c4e1
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 41 deletions.
1 change: 1 addition & 0 deletions cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ func (t *tableActor) Stats() tablepb.Stats {
return tablepb.Stats{
RegionCount: pullerStats.RegionCount,
CurrentTs: oracle.ComposeTS(oracle.GetPhysical(now), 0),
BarrierTs: sinkStats.BarrierTs,
StageCheckpoints: map[string]tablepb.Checkpoint{
"puller-ingress": {
CheckpointTs: pullerStats.CheckpointTsIngress,
Expand Down
115 changes: 76 additions & 39 deletions cdc/processor/tablepb/table.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions cdc/processor/tablepb/table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ message Stats {
uint64 current_ts = 2 [(gogoproto.casttype) = "github.com/pingcap/tiflow/cdc/model.Ts"];
// Checkponits at each stage.
map<string, Checkpoint> stage_checkpoints = 3 [(gogoproto.nullable) = false];
// The barrier timestamp of the table.
uint64 barrier_ts = 4 [(gogoproto.casttype) = "github.com/pingcap/tiflow/cdc/model.Ts"];
}

// TableStatus is the running status of a table.
Expand Down
4 changes: 2 additions & 2 deletions cdc/scheduler/internal/v3/replication/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,15 @@ var (
Subsystem: "scheduler",
Name: "slow_table_stage_checkpoint_ts_lag_histogram",
Help: "Histogram of the slowest table checkpoint ts lag of each stage",
Buckets: prometheus.LinearBuckets(0.5, 1, 16),
Buckets: prometheus.LinearBuckets(0.5, 0.5, 36),
}, []string{"namespace", "changefeed", "stage"})
slowestTableStageResolvedTsLagHistogramVec = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "scheduler",
Name: "slow_table_stage_resolved_ts_lag_histogram",
Help: "Histogram of the slowest table resolved ts lag of each stage",
Buckets: prometheus.LinearBuckets(0.5, 1, 16),
Buckets: prometheus.LinearBuckets(0.5, 0.5, 36),
}, []string{"namespace", "changefeed", "stage"})
slowestTableRegionGaugeVec = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand Down
13 changes: 13 additions & 0 deletions cdc/scheduler/internal/v3/replication/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,8 @@ func (r *Manager) CollectMetrics() {
phyRTs := oracle.ExtractPhysical(table.Checkpoint.ResolvedTs)
slowestTableResolvedTsGauge.
WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyRTs))

// Slow table latency metrics.
phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs)
for stage, checkpoint := range table.Stats.StageCheckpoints {
// Checkpoint ts
Expand All @@ -539,6 +541,17 @@ func (r *Manager) CollectMetrics() {
slowestTableStageResolvedTsLagHistogramVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Observe(resolvedTsLag)
}
// Barrier ts
stage := "barrier"
phyBTs := oracle.ExtractPhysical(table.Stats.BarrierTs)
slowestTableStageResolvedTsGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyBTs))
barrierTsLag := float64(phyCurrentTs-phyBTs) / 1e3
slowestTableStageResolvedTsLagGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(barrierTsLag)
slowestTableStageResolvedTsLagHistogramVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Observe(barrierTsLag)
// Region count
slowestTableRegionGaugeVec.
WithLabelValues(cf.Namespace, cf.ID).Set(float64(table.Stats.RegionCount))
}
Expand Down

0 comments on commit 945c4e1

Please sign in to comment.