Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cordinator(ticdc): Fix Puller Resolved TS Lag Calculation and Deprecate current_ts Field in Stats (#11624) #11646

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,11 +386,9 @@ func (p *processor) getStatsFromSourceManagerAndSinkManager(
span tablepb.Span, sinkStats sinkmanager.TableStats,
) tablepb.Stats {
pullerStats := p.sourceManager.r.GetTablePullerStats(span)
now := p.upstream.PDClock.CurrentTime()

stats := tablepb.Stats{
RegionCount: pullerStats.RegionCount,
CurrentTs: oracle.ComposeTS(oracle.GetPhysical(now), 0),
BarrierTs: sinkStats.BarrierTs,
StageCheckpoints: map[string]tablepb.Checkpoint{
"puller-ingress": {
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/tablepb/table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ message Stats {
// Number of captured regions.
uint64 region_count = 1;
// The current timestamp from the table's point of view.
uint64 current_ts = 2 [(gogoproto.casttype) = "Ts"];
uint64 current_ts = 2 [(gogoproto.casttype) = "Ts", deprecated = true]; // Deprecated: Do not use this field.
// Checkponits at each stage.
map<string, Checkpoint> stage_checkpoints = 3 [(gogoproto.nullable) = false];
// The barrier timestamp of the table.
Expand Down
8 changes: 7 additions & 1 deletion cdc/scheduler/internal/v3/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,13 @@
}
c.lastCollectTime = now

pdTime := now
// only nil in unit test
if c.pdClock != nil {
pdTime = c.pdClock.CurrentTime()
}

Check warning on line 456 in cdc/scheduler/internal/v3/coordinator.go

View check run for this annotation

Codecov / codecov/patch

cdc/scheduler/internal/v3/coordinator.go#L455-L456

Added lines #L455 - L456 were not covered by tests

c.schedulerM.CollectMetrics()
c.replicationM.CollectMetrics()
c.replicationM.CollectMetrics(pdTime)
c.captureM.CollectMetrics()
}
12 changes: 5 additions & 7 deletions cdc/scheduler/internal/v3/replication/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@
}

// CollectMetrics collects metrics.
func (r *Manager) CollectMetrics() {
func (r *Manager) CollectMetrics(currentPDTime time.Time) {

Check warning on line 776 in cdc/scheduler/internal/v3/replication/replication_manager.go

View check run for this annotation

Codecov / codecov/patch

cdc/scheduler/internal/v3/replication/replication_manager.go#L776

Added line #L776 was not covered by tests
cf := r.changefeedID
tableGauge.
WithLabelValues(cf.Namespace, cf.ID).Set(float64(r.spans.Len()))
Expand All @@ -790,13 +790,12 @@
WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyRTs))

// Slow table latency metrics.
phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs)
for stage, checkpoint := range table.Stats.StageCheckpoints {
// Checkpoint ts
phyCkpTs := oracle.ExtractPhysical(checkpoint.CheckpointTs)
slowestTableStageCheckpointTsGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyCkpTs))
checkpointLag := float64(phyCurrentTs-phyCkpTs) / 1e3
checkpointLag := currentPDTime.Sub(oracle.GetTimeFromTS(checkpoint.CheckpointTs)).Seconds()

Check warning on line 798 in cdc/scheduler/internal/v3/replication/replication_manager.go

View check run for this annotation

Codecov / codecov/patch

cdc/scheduler/internal/v3/replication/replication_manager.go#L798

Added line #L798 was not covered by tests
slowestTableStageCheckpointTsLagGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(checkpointLag)
slowestTableStageCheckpointTsLagHistogramVec.
Expand All @@ -805,7 +804,7 @@
phyRTs := oracle.ExtractPhysical(checkpoint.ResolvedTs)
slowestTableStageResolvedTsGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyRTs))
resolvedTsLag := float64(phyCurrentTs-phyRTs) / 1e3
resolvedTsLag := currentPDTime.Sub(oracle.GetTimeFromTS(checkpoint.ResolvedTs)).Seconds()

Check warning on line 807 in cdc/scheduler/internal/v3/replication/replication_manager.go

View check run for this annotation

Codecov / codecov/patch

cdc/scheduler/internal/v3/replication/replication_manager.go#L807

Added line #L807 was not covered by tests
slowestTableStageResolvedTsLagGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(resolvedTsLag)
slowestTableStageResolvedTsLagHistogramVec.
Expand All @@ -816,7 +815,7 @@
phyBTs := oracle.ExtractPhysical(table.Stats.BarrierTs)
slowestTableStageResolvedTsGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyBTs))
barrierTsLag := float64(phyCurrentTs-phyBTs) / 1e3
barrierTsLag := currentPDTime.Sub(oracle.GetTimeFromTS(table.Stats.BarrierTs)).Seconds()

Check warning on line 818 in cdc/scheduler/internal/v3/replication/replication_manager.go

View check run for this annotation

Codecov / codecov/patch

cdc/scheduler/internal/v3/replication/replication_manager.go#L818

Added line #L818 was not covered by tests
slowestTableStageResolvedTsLagGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(barrierTsLag)
slowestTableStageResolvedTsLagHistogramVec.
Expand Down Expand Up @@ -867,8 +866,7 @@
phyCkptTs := oracle.ExtractPhysical(pullerCkpt.ResolvedTs)
slowestTablePullerResolvedTs.WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyCkptTs))

phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs)
lag := float64(phyCurrentTs-phyCkptTs) / 1e3
lag := currentPDTime.Sub(oracle.GetTimeFromTS(pullerCkpt.ResolvedTs)).Seconds()

Check warning on line 869 in cdc/scheduler/internal/v3/replication/replication_manager.go

View check run for this annotation

Codecov / codecov/patch

cdc/scheduler/internal/v3/replication/replication_manager.go#L869

Added line #L869 was not covered by tests
slowestTablePullerResolvedTsLag.WithLabelValues(cf.Namespace, cf.ID).Set(lag)
}
}
Expand Down
Loading