Skip to content

Commit

Permalink
owner: add some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen authored and ti-chi-bot committed May 29, 2021
1 parent 13a6eeb commit a6af59d
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func (o *Owner) getMinGCSafePointCache(ctx context.Context) model.Ts {
}
o.minGCSafePointCache.ts = oracle.ComposeTS(physicalTs-(o.gcTTL*1000), logicalTs)

// o.pdGCSafePoint pd is the smallest gcSafePoint across all services.
// If tikv_gc_life_time > gcTTL, means that tikv_gc_safe_point < o.minGCSafePointCache.ts here.
// It also means that pd.pdGCSafePoint < o.minGCSafePointCache.ts here, we should use its value as the min value.
// This ensures that when tikv_gc_life_time > gcTTL , cdc will not advance the gcSafePoint.
if o.pdGCSafePoint < o.minGCSafePointCache.ts {
o.minGCSafePointCache.ts = o.pdGCSafePoint
}
Expand Down Expand Up @@ -132,7 +136,7 @@ type Owner struct {
gcSafepointLastUpdate time.Time
// stores the ts obtained from PD and is updated every MinGCSafePointCacheUpdateInterval.
minGCSafePointCache minGCSafePointCacheEntry
// stores the actual gcSafePoint store in pd
// stores the actual gcSafePoint stored in pd
pdGCSafePoint model.Ts
// record last time that flushes all changefeeds' replication status
lastFlushChangefeeds time.Time
Expand Down Expand Up @@ -765,9 +769,12 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error {
// A changefeed will not enter the map twice, because in run(),
// handleAdminJob() will always be executed before flushChangeFeedInfos(),
// ensuring that the previous changefeed in staleChangeFeeds has been stopped and removed from o.changeFeeds.
// 2. We need the <= check here is because when a changefeed is stagnant, its checkpointTs will be updated to pd,
// and it would be the minimum gcSafePoint of cdc. In order to avoid circular dependence, it is necessary to check for equality.
// Checking whether they are equal will make the loop assignment to happen only once, and then it will be skipped in next time.
// 2. We need the `<=` check here is because when a changefeed is stagnant, its checkpointTs will be updated to pd,
// and it would be the minimum gcSafePoint across all services.
// So as described above(line 92) minGCSafePoint = gcSafePoint = CheckpointTs would happens.
// In this case, if we check `<` here , this changefeed will not be put into staleChangeFeeds, and its checkpoints will be updated to pd again and again.
// This will cause the cdc's gcSafePoint never advance.
// If we check `<=` here, when we encounter the changefeed again, we will put it into staleChangeFeeds.
if changefeed.status.CheckpointTs <= minGCSafePoint {
staleChangeFeeds[id] = changefeed.status
}
Expand Down

0 comments on commit a6af59d

Please sign in to comment.