diff --git a/cdc/owner.go b/cdc/owner.go index 17c2c81f94c..315bcf810a0 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -86,6 +86,10 @@ func (o *Owner) getMinGCSafePointCache(ctx context.Context) model.Ts { } o.minGCSafePointCache.ts = oracle.ComposeTS(physicalTs-(o.gcTTL*1000), logicalTs) + // o.pdGCSafePoint pd is the smallest gcSafePoint across all services. + // If tikv_gc_life_time > gcTTL, means that tikv_gc_safe_point < o.minGCSafePointCache.ts here. + // It also means that pd.pdGCSafePoint < o.minGCSafePointCache.ts here, we should use its value as the min value. + // This ensures that when tikv_gc_life_time > gcTTL , cdc will not advance the gcSafePoint. if o.pdGCSafePoint < o.minGCSafePointCache.ts { o.minGCSafePointCache.ts = o.pdGCSafePoint } @@ -132,7 +136,7 @@ type Owner struct { gcSafepointLastUpdate time.Time // stores the ts obtained from PD and is updated every MinGCSafePointCacheUpdateInterval. minGCSafePointCache minGCSafePointCacheEntry - // stores the actual gcSafePoint store in pd + // stores the actual gcSafePoint stored in pd pdGCSafePoint model.Ts // record last time that flushes all changefeeds' replication status lastFlushChangefeeds time.Time @@ -765,9 +769,12 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { // A changefeed will not enter the map twice, because in run(), // handleAdminJob() will always be executed before flushChangeFeedInfos(), // ensuring that the previous changefeed in staleChangeFeeds has been stopped and removed from o.changeFeeds. - // 2. We need the <= check here is because when a changefeed is stagnant, its checkpointTs will be updated to pd, - // and it would be the minimum gcSafePoint of cdc. In order to avoid circular dependence, it is necessary to check for equality. - // Checking whether they are equal will make the loop assignment to happen only once, and then it will be skipped in next time. + // 2. We need the `<=` check here is because when a changefeed is stagnant, its checkpointTs will be updated to pd, + // and it would be the minimum gcSafePoint across all services. + // So as described above(line 92) minGCSafePoint = gcSafePoint = CheckpointTs would happens. + // In this case, if we check `<` here , this changefeed will not be put into staleChangeFeeds, and its checkpoints will be updated to pd again and again. + // This will cause the cdc's gcSafePoint never advance. + // If we check `<=` here, when we encounter the changefeed again, we will put it into staleChangeFeeds. if changefeed.status.CheckpointTs <= minGCSafePoint { staleChangeFeeds[id] = changefeed.status }