From 12dc9f8c9edaf044e50d1c00e9488297fc40a0d6 Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Thu, 27 May 2021 17:13:42 +0800 Subject: [PATCH 1/7] owner: fix bug : Can not create changefeed when start ts is less than current ts - gcttl --- cdc/owner.go | 17 +++++++-- cdc/owner_test.go | 87 +++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 95 insertions(+), 9 deletions(-) diff --git a/cdc/owner.go b/cdc/owner.go index a0471b813dd..d5ecea845bf 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -85,6 +85,15 @@ func (o *Owner) getMinGCSafePointCache(ctx context.Context) model.Ts { return o.minGCSafePointCache.ts } o.minGCSafePointCache.ts = oracle.ComposeTS(physicalTs-(o.gcTTL*1000), logicalTs) + // get minimum safepoint across all services + actualGCSafePoint, err := o.pdClient.UpdateServiceGCSafePoint(ctx, CDCMinGCSafePointCacheServiceID, CDCMinGCSafePointCacheServiceTTL, o.minGCSafePointCache.ts) + if err != nil { + log.Warn("Fail to get actualGCSafePoint from PD.", zap.Error(err)) + } + if actualGCSafePoint < o.minGCSafePointCache.ts { + o.minGCSafePointCache.ts = actualGCSafePoint + } + o.minGCSafePointCache.lastUpdated = time.Now() } return o.minGCSafePointCache.ts @@ -136,6 +145,10 @@ type Owner struct { const ( // CDCServiceSafePointID is the ID of CDC service in pd.UpdateServiceGCSafePoint. CDCServiceSafePointID = "ticdc" + // minGCSafePointCacheServiceID is this service GC safe point ID + CDCMinGCSafePointCacheServiceID = "ticdc-minGCSafePoint-cache" + //CDCMinGCSafePointCacheServiceTTL is this service GC safe point TTL + CDCMinGCSafePointCacheServiceTTL = 10 * 60 // 10mins // GCSafepointUpdateInterval is the minimual interval that CDC can update gc safepoint GCSafepointUpdateInterval = time.Duration(2 * time.Second) // MinGCSafePointCacheUpdateInterval is the interval that update minGCSafePointCache @@ -758,7 +771,7 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { // A changefeed will not enter the map twice, because in run(), // handleAdminJob() will always be executed before flushChangeFeedInfos(), // ensuring that the previous changefeed in staleChangeFeeds has been stopped and removed from o.changeFeeds. - if changefeed.status.CheckpointTs < minGCSafePoint { + if changefeed.status.CheckpointTs <= minGCSafePoint { staleChangeFeeds[id] = changefeed.status } @@ -784,7 +797,7 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { // If a stopped changefeed's CheckpoinTs < minGCSafePoint, means this changefeed is stagnant. // It should never be resumed. This part of the logic is in newChangeFeed() // So here we can skip it. - if status.CheckpointTs < minGCSafePoint { + if status.CheckpointTs <= minGCSafePoint { continue } diff --git a/cdc/owner_test.go b/cdc/owner_test.go index 84829a014e6..ebe3f570bd8 100644 --- a/cdc/owner_test.go +++ b/cdc/owner_test.go @@ -17,6 +17,8 @@ import ( "bytes" "context" "fmt" + "github.com/pingcap/log" + "go.uber.org/zap" "net/url" "sync" "sync/atomic" @@ -49,6 +51,8 @@ import ( "golang.org/x/sync/errgroup" ) +const TiKVGCLifeTime = 10 * 60 * time.Second // 10 min + type ownerSuite struct { e *embed.Etcd clientURL *url.URL @@ -87,9 +91,10 @@ func (s *ownerSuite) TearDownTest(c *check.C) { type mockPDClient struct { pd.Client - invokeCounter int - mockSafePointLost bool - mockPDFailure bool + invokeCounter int + mockSafePointLost bool + mockPDFailure bool + mockTiKVGCLifeTime bool } func (m *mockPDClient) GetTS(ctx context.Context) (int64, int64, error) { @@ -111,7 +116,11 @@ func (m *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID s if m.mockPDFailure { return 0, errors.New("injected PD failure") } - + if m.mockTiKVGCLifeTime { + Ts := oracle.GoTimeToTS(time.Now().Add(-TiKVGCLifeTime)) + log.Info("actual ts is", zap.Time("actual", oracle.GetTimeFromTS(Ts))) + return Ts, nil + } return safePoint, nil } @@ -186,6 +195,70 @@ func (s *ownerSuite) TestOwnerFlushChangeFeedInfosFailed(c *check.C) { s.TearDownTest(c) } +func (s *ownerSuite) TestTiKVGCLifeTimeLargeThanGCTTL(c *check.C) { + defer testleak.AfterTest(c) + mockPDCli := &mockPDClient{} + mockPDCli.mockTiKVGCLifeTime = true + + changeFeeds := map[model.ChangeFeedID]*changeFeed{ + "test_change_feed_1": { + info: &model.ChangeFeedInfo{State: model.StateNormal}, + etcdCli: s.client, + status: &model.ChangeFeedStatus{ + CheckpointTs: oracle.GoTimeToTS(time.Now()), + }, + targetTs: 2000, + ddlState: model.ChangeFeedSyncDML, + taskStatus: model.ProcessorsInfos{ + "capture_1": {}, + "capture_2": {}, + }, + taskPositions: map[string]*model.TaskPosition{ + "capture_1": {}, + "capture_2": {}, + }, + }, + } + + session, err := concurrency.NewSession(s.client.Client.Unwrap(), + concurrency.WithTTL(config.GetDefaultServerConfig().CaptureSessionTTL)) + c.Assert(err, check.IsNil) + + mockOwner := Owner{ + session: session, + pdClient: mockPDCli, + etcdClient: s.client, + lastFlushChangefeeds: time.Now(), + flushChangefeedInterval: 1 * time.Hour, + // gcSafepointLastUpdate: time.Now(), + gcTTL: 6, // 6 seconds + changeFeeds: changeFeeds, + cfRWriter: s.client, + stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus), + minGCSafePointCache: minGCSafePointCacheEntry{}, + } + + err = mockOwner.flushChangeFeedInfos(s.ctx) + c.Assert(err, check.IsNil) + c.Assert(mockPDCli.invokeCounter, check.Equals, 2) + + err = mockOwner.handleAdminJob(s.ctx) + c.Assert(mockOwner.stoppedFeeds["test_change_feed_1"], check.IsNil) + c.Assert(mockOwner.changeFeeds["test_change_feed_1"].info.State, check.Equals, model.StateNormal) + + time.Sleep(7 * time.Second) // wait for gcTTL time pass + err = mockOwner.flushChangeFeedInfos(s.ctx) + c.Assert(err, check.IsNil) + c.Assert(mockPDCli.invokeCounter, check.Equals, 4) + + err = mockOwner.handleAdminJob(s.ctx) + c.Assert(err, check.IsNil) + c.Assert(mockOwner.stoppedFeeds["test_change_feed_1"], check.IsNil) + + s.TearDownTest(c) + +} + // Test whether the owner handles the stagnant task correctly, so that it can't block the update of gcSafePoint. // If a changefeed is put into the stop queue due to stagnation, it can no longer affect the update of gcSafePoint. // So we just need to test whether the stagnant changefeed is put into the stop queue. @@ -267,7 +340,7 @@ func (s *ownerSuite) TestOwnerHandleStaleChangeFeed(c *check.C) { time.Sleep(3 * time.Second) err = mockOwner.flushChangeFeedInfos(s.ctx) c.Assert(err, check.IsNil) - c.Assert(mockPDCli.invokeCounter, check.Equals, 1) + c.Assert(mockPDCli.invokeCounter, check.Equals, 2) err = mockOwner.handleAdminJob(s.ctx) c.Assert(err, check.IsNil) @@ -280,7 +353,7 @@ func (s *ownerSuite) TestOwnerHandleStaleChangeFeed(c *check.C) { time.Sleep(6 * time.Second) err = mockOwner.flushChangeFeedInfos(s.ctx) c.Assert(err, check.IsNil) - c.Assert(mockPDCli.invokeCounter, check.Equals, 2) + c.Assert(mockPDCli.invokeCounter, check.Equals, 4) err = mockOwner.handleAdminJob(s.ctx) c.Assert(err, check.IsNil) @@ -349,7 +422,7 @@ func (s *ownerSuite) TestOwnerUploadGCSafePointOutdated(c *check.C) { err = mockOwner.flushChangeFeedInfos(s.ctx) c.Assert(err, check.IsNil) - c.Assert(mockPDCli.invokeCounter, check.Equals, 1) + c.Assert(mockPDCli.invokeCounter, check.Equals, 2) err = mockOwner.handleAdminJob(s.ctx) c.Assert(err, check.IsNil) From c3816bb70de97c22a9138899352de35fd8b0a715 Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Thu, 27 May 2021 17:23:14 +0800 Subject: [PATCH 2/7] owner: update unit test --- cdc/owner.go | 4 ++-- cdc/owner_test.go | 11 +++++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/cdc/owner.go b/cdc/owner.go index d5ecea845bf..e16546ab19a 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -145,9 +145,9 @@ type Owner struct { const ( // CDCServiceSafePointID is the ID of CDC service in pd.UpdateServiceGCSafePoint. CDCServiceSafePointID = "ticdc" - // minGCSafePointCacheServiceID is this service GC safe point ID + // CDCMinGCSafePointCacheServiceID is this service GC safe point ID CDCMinGCSafePointCacheServiceID = "ticdc-minGCSafePoint-cache" - //CDCMinGCSafePointCacheServiceTTL is this service GC safe point TTL + // CDCMinGCSafePointCacheServiceTTL is this service GC safe point TTL CDCMinGCSafePointCacheServiceTTL = 10 * 60 // 10mins // GCSafepointUpdateInterval is the minimual interval that CDC can update gc safepoint GCSafepointUpdateInterval = time.Duration(2 * time.Second) diff --git a/cdc/owner_test.go b/cdc/owner_test.go index ebe3f570bd8..e0ab46ccc1d 100644 --- a/cdc/owner_test.go +++ b/cdc/owner_test.go @@ -17,13 +17,14 @@ import ( "bytes" "context" "fmt" - "github.com/pingcap/log" - "go.uber.org/zap" "net/url" "sync" "sync/atomic" "time" + "github.com/pingcap/log" + "go.uber.org/zap" + "github.com/google/uuid" "github.com/pingcap/check" "github.com/pingcap/errors" @@ -195,6 +196,8 @@ func (s *ownerSuite) TestOwnerFlushChangeFeedInfosFailed(c *check.C) { s.TearDownTest(c) } +// Test whether it is possible to successfully create a changefeed +// with startTs less than currentTs - gcTTL when tikv_gc_life_time is greater than gc-ttl func (s *ownerSuite) TestTiKVGCLifeTimeLargeThanGCTTL(c *check.C) { defer testleak.AfterTest(c) mockPDCli := &mockPDClient{} @@ -205,7 +208,7 @@ func (s *ownerSuite) TestTiKVGCLifeTimeLargeThanGCTTL(c *check.C) { info: &model.ChangeFeedInfo{State: model.StateNormal}, etcdCli: s.client, status: &model.ChangeFeedStatus{ - CheckpointTs: oracle.GoTimeToTS(time.Now()), + CheckpointTs: oracle.GoTimeToTS(time.Now().Add(-6 * time.Second)), }, targetTs: 2000, ddlState: model.ChangeFeedSyncDML, @@ -243,6 +246,7 @@ func (s *ownerSuite) TestTiKVGCLifeTimeLargeThanGCTTL(c *check.C) { c.Assert(mockPDCli.invokeCounter, check.Equals, 2) err = mockOwner.handleAdminJob(s.ctx) + c.Assert(err, check.IsNil) c.Assert(mockOwner.stoppedFeeds["test_change_feed_1"], check.IsNil) c.Assert(mockOwner.changeFeeds["test_change_feed_1"].info.State, check.Equals, model.StateNormal) @@ -256,7 +260,6 @@ func (s *ownerSuite) TestTiKVGCLifeTimeLargeThanGCTTL(c *check.C) { c.Assert(mockOwner.stoppedFeeds["test_change_feed_1"], check.IsNil) s.TearDownTest(c) - } // Test whether the owner handles the stagnant task correctly, so that it can't block the update of gcSafePoint. From 68e7303e4cbfb4bec636e540efb6d24134a1b684 Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Thu, 27 May 2021 17:34:19 +0800 Subject: [PATCH 3/7] owner: delete useless log --- cdc/owner_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/cdc/owner_test.go b/cdc/owner_test.go index e0ab46ccc1d..596472e3622 100644 --- a/cdc/owner_test.go +++ b/cdc/owner_test.go @@ -22,9 +22,6 @@ import ( "sync/atomic" "time" - "github.com/pingcap/log" - "go.uber.org/zap" - "github.com/google/uuid" "github.com/pingcap/check" "github.com/pingcap/errors" @@ -119,7 +116,6 @@ func (m *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID s } if m.mockTiKVGCLifeTime { Ts := oracle.GoTimeToTS(time.Now().Add(-TiKVGCLifeTime)) - log.Info("actual ts is", zap.Time("actual", oracle.GetTimeFromTS(Ts))) return Ts, nil } return safePoint, nil From 48c592c4902cb54cf7f5aac48091c1eb2a11f4ff Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Thu, 27 May 2021 22:08:17 +0800 Subject: [PATCH 4/7] owner: Modify the way of obtaining pdGCSafePoint --- cdc/owner.go | 21 +++++++----------- cdc/owner_test.go | 54 ++++++++++++++++++++--------------------------- 2 files changed, 31 insertions(+), 44 deletions(-) diff --git a/cdc/owner.go b/cdc/owner.go index e16546ab19a..62130bc3aae 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -85,13 +85,9 @@ func (o *Owner) getMinGCSafePointCache(ctx context.Context) model.Ts { return o.minGCSafePointCache.ts } o.minGCSafePointCache.ts = oracle.ComposeTS(physicalTs-(o.gcTTL*1000), logicalTs) - // get minimum safepoint across all services - actualGCSafePoint, err := o.pdClient.UpdateServiceGCSafePoint(ctx, CDCMinGCSafePointCacheServiceID, CDCMinGCSafePointCacheServiceTTL, o.minGCSafePointCache.ts) - if err != nil { - log.Warn("Fail to get actualGCSafePoint from PD.", zap.Error(err)) - } - if actualGCSafePoint < o.minGCSafePointCache.ts { - o.minGCSafePointCache.ts = actualGCSafePoint + + if o.pdGCSafePoint < o.minGCSafePointCache.ts { + o.minGCSafePointCache.ts = o.pdGCSafePoint } o.minGCSafePointCache.lastUpdated = time.Now() @@ -136,6 +132,8 @@ type Owner struct { gcSafepointLastUpdate time.Time // stores the ts obtained from PD and is updated every MinGCSafePointCacheUpdateInterval. minGCSafePointCache minGCSafePointCacheEntry + // stores the actual gcSafePoint store in pd + pdGCSafePoint model.Ts // record last time that flushes all changefeeds' replication status lastFlushChangefeeds time.Time flushChangefeedInterval time.Duration @@ -145,10 +143,6 @@ type Owner struct { const ( // CDCServiceSafePointID is the ID of CDC service in pd.UpdateServiceGCSafePoint. CDCServiceSafePointID = "ticdc" - // CDCMinGCSafePointCacheServiceID is this service GC safe point ID - CDCMinGCSafePointCacheServiceID = "ticdc-minGCSafePoint-cache" - // CDCMinGCSafePointCacheServiceTTL is this service GC safe point TTL - CDCMinGCSafePointCacheServiceTTL = 10 * 60 // 10mins // GCSafepointUpdateInterval is the minimual interval that CDC can update gc safepoint GCSafepointUpdateInterval = time.Duration(2 * time.Second) // MinGCSafePointCacheUpdateInterval is the interval that update minGCSafePointCache @@ -771,7 +765,7 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { // A changefeed will not enter the map twice, because in run(), // handleAdminJob() will always be executed before flushChangeFeedInfos(), // ensuring that the previous changefeed in staleChangeFeeds has been stopped and removed from o.changeFeeds. - if changefeed.status.CheckpointTs <= minGCSafePoint { + if minGCSafePoint != 0 && changefeed.status.CheckpointTs <= minGCSafePoint { staleChangeFeeds[id] = changefeed.status } @@ -797,7 +791,7 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { // If a stopped changefeed's CheckpoinTs < minGCSafePoint, means this changefeed is stagnant. // It should never be resumed. This part of the logic is in newChangeFeed() // So here we can skip it. - if status.CheckpointTs <= minGCSafePoint { + if minGCSafePoint != 0 && status.CheckpointTs <= minGCSafePoint { continue } @@ -823,6 +817,7 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { return cerror.ErrUpdateServiceSafepointFailed.Wrap(err) } } else { + o.pdGCSafePoint = actual o.gcSafepointLastUpdate = time.Now() } diff --git a/cdc/owner_test.go b/cdc/owner_test.go index 596472e3622..ef1d988ac00 100644 --- a/cdc/owner_test.go +++ b/cdc/owner_test.go @@ -239,7 +239,7 @@ func (s *ownerSuite) TestTiKVGCLifeTimeLargeThanGCTTL(c *check.C) { err = mockOwner.flushChangeFeedInfos(s.ctx) c.Assert(err, check.IsNil) - c.Assert(mockPDCli.invokeCounter, check.Equals, 2) + c.Assert(mockPDCli.invokeCounter, check.Equals, 1) err = mockOwner.handleAdminJob(s.ctx) c.Assert(err, check.IsNil) @@ -249,7 +249,7 @@ func (s *ownerSuite) TestTiKVGCLifeTimeLargeThanGCTTL(c *check.C) { time.Sleep(7 * time.Second) // wait for gcTTL time pass err = mockOwner.flushChangeFeedInfos(s.ctx) c.Assert(err, check.IsNil) - c.Assert(mockPDCli.invokeCounter, check.Equals, 4) + c.Assert(mockPDCli.invokeCounter, check.Equals, 2) err = mockOwner.handleAdminJob(s.ctx) c.Assert(err, check.IsNil) @@ -299,23 +299,6 @@ func (s *ownerSuite) TestOwnerHandleStaleChangeFeed(c *check.C) { "capture_2": {}, }, }, - "test_change_feed_3": { - info: &model.ChangeFeedInfo{State: model.StateNormal}, - etcdCli: s.client, - status: &model.ChangeFeedStatus{ - CheckpointTs: 0, - }, - targetTs: 2000, - ddlState: model.ChangeFeedSyncDML, - taskStatus: model.ProcessorsInfos{ - "capture_1": {}, - "capture_2": {}, - }, - taskPositions: map[string]*model.TaskPosition{ - "capture_1": {}, - "capture_2": {}, - }, - }, } session, err := concurrency.NewSession(s.client.Client.Unwrap(), @@ -328,32 +311,41 @@ func (s *ownerSuite) TestOwnerHandleStaleChangeFeed(c *check.C) { etcdClient: s.client, lastFlushChangefeeds: time.Now(), flushChangefeedInterval: 1 * time.Hour, - // gcSafepointLastUpdate: time.Now(), - gcTTL: 6, // 6 seconds - changeFeeds: changeFeeds, - cfRWriter: s.client, - stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus), - minGCSafePointCache: minGCSafePointCacheEntry{}, + gcSafepointLastUpdate: time.Now().Add(-4 * time.Second), + gcTTL: 6, // 6 seconds + changeFeeds: changeFeeds, + cfRWriter: s.client, + stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus), + minGCSafePointCache: minGCSafePointCacheEntry{}, } - time.Sleep(3 * time.Second) err = mockOwner.flushChangeFeedInfos(s.ctx) c.Assert(err, check.IsNil) - c.Assert(mockPDCli.invokeCounter, check.Equals, 2) - + c.Assert(mockPDCli.invokeCounter, check.Equals, 1) err = mockOwner.handleAdminJob(s.ctx) c.Assert(err, check.IsNil) + + time.Sleep(2 * time.Second) + err = mockOwner.flushChangeFeedInfos(s.ctx) + c.Assert(err, check.IsNil) + c.Assert(mockPDCli.invokeCounter, check.Equals, 2) err = mockOwner.handleAdminJob(s.ctx) c.Assert(err, check.IsNil) + c.Assert(mockOwner.stoppedFeeds["test_change_feed_1"], check.NotNil) - c.Assert(mockOwner.stoppedFeeds["test_change_feed_3"], check.NotNil) c.Assert(mockOwner.changeFeeds["test_change_feed_2"].info.State, check.Equals, model.StateNormal) time.Sleep(6 * time.Second) err = mockOwner.flushChangeFeedInfos(s.ctx) c.Assert(err, check.IsNil) - c.Assert(mockPDCli.invokeCounter, check.Equals, 4) + c.Assert(mockPDCli.invokeCounter, check.Equals, 3) + err = mockOwner.handleAdminJob(s.ctx) + c.Assert(err, check.IsNil) + time.Sleep(2 * time.Second) + err = mockOwner.flushChangeFeedInfos(s.ctx) + c.Assert(err, check.IsNil) + c.Assert(mockPDCli.invokeCounter, check.Equals, 4) err = mockOwner.handleAdminJob(s.ctx) c.Assert(err, check.IsNil) c.Assert(mockOwner.stoppedFeeds["test_change_feed_2"], check.NotNil) @@ -421,7 +413,7 @@ func (s *ownerSuite) TestOwnerUploadGCSafePointOutdated(c *check.C) { err = mockOwner.flushChangeFeedInfos(s.ctx) c.Assert(err, check.IsNil) - c.Assert(mockPDCli.invokeCounter, check.Equals, 2) + c.Assert(mockPDCli.invokeCounter, check.Equals, 1) err = mockOwner.handleAdminJob(s.ctx) c.Assert(err, check.IsNil) From 4a825f1a9d06c7c2faf086c05b0cbc996d51f0be Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Fri, 28 May 2021 11:02:44 +0800 Subject: [PATCH 5/7] owner: add some comments --- cdc/owner.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cdc/owner.go b/cdc/owner.go index 62130bc3aae..e74f357970f 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -760,11 +760,16 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { if changefeed.status.CheckpointTs < gcSafePoint { gcSafePoint = changefeed.status.CheckpointTs } - // If changefeed's appliedCheckpoinTs < minGCSafePoint, it means this changefeed is stagnant. + // 1. If changefeed's appliedCheckpoinTs <= minGCSafePoint, it means this changefeed is stagnant. // They are collected into this map, and then handleStaleChangeFeed() is called to deal with these stagnant changefeed. // A changefeed will not enter the map twice, because in run(), // handleAdminJob() will always be executed before flushChangeFeedInfos(), // ensuring that the previous changefeed in staleChangeFeeds has been stopped and removed from o.changeFeeds. + // 2. We need to check minGCSafePoint != 0 here because o.pdGCSafePoint may be 0 at the beginning, + // resulting in minGCSafePoint being 0 and will stop all changefeed. + // 3. We need the <= check here is because when a changefeed is stagnant, its checkpointTs will be updated to pd, + // and it would be the minimum gcSafePoint of cdc. In order to avoid circular dependence, it is necessary to check for equality. + // Checking whether they are equal will make the loop assignment to happen only once, and then it will be skipped in next time. if minGCSafePoint != 0 && changefeed.status.CheckpointTs <= minGCSafePoint { staleChangeFeeds[id] = changefeed.status } From 8c1bf0db20ebb3e361bc1b97175e4f642dcda420 Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Fri, 28 May 2021 11:18:02 +0800 Subject: [PATCH 6/7] owner: Delete a redundant check --- cdc/owner.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/cdc/owner.go b/cdc/owner.go index e74f357970f..17c2c81f94c 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -765,12 +765,10 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { // A changefeed will not enter the map twice, because in run(), // handleAdminJob() will always be executed before flushChangeFeedInfos(), // ensuring that the previous changefeed in staleChangeFeeds has been stopped and removed from o.changeFeeds. - // 2. We need to check minGCSafePoint != 0 here because o.pdGCSafePoint may be 0 at the beginning, - // resulting in minGCSafePoint being 0 and will stop all changefeed. - // 3. We need the <= check here is because when a changefeed is stagnant, its checkpointTs will be updated to pd, + // 2. We need the <= check here is because when a changefeed is stagnant, its checkpointTs will be updated to pd, // and it would be the minimum gcSafePoint of cdc. In order to avoid circular dependence, it is necessary to check for equality. // Checking whether they are equal will make the loop assignment to happen only once, and then it will be skipped in next time. - if minGCSafePoint != 0 && changefeed.status.CheckpointTs <= minGCSafePoint { + if changefeed.status.CheckpointTs <= minGCSafePoint { staleChangeFeeds[id] = changefeed.status } @@ -793,10 +791,10 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { } for _, status := range o.stoppedFeeds { - // If a stopped changefeed's CheckpoinTs < minGCSafePoint, means this changefeed is stagnant. + // If a stopped changefeed's CheckpoinTs <= minGCSafePoint, means this changefeed is stagnant. // It should never be resumed. This part of the logic is in newChangeFeed() // So here we can skip it. - if minGCSafePoint != 0 && status.CheckpointTs <= minGCSafePoint { + if status.CheckpointTs <= minGCSafePoint { continue } From 213bb2d96b207919a9d436106d49015aa8fd4346 Mon Sep 17 00:00:00 2001 From: asddongmen <414110582@qq.com> Date: Fri, 28 May 2021 14:45:27 +0800 Subject: [PATCH 7/7] owner: add some comments --- cdc/owner.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/cdc/owner.go b/cdc/owner.go index 17c2c81f94c..315bcf810a0 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -86,6 +86,10 @@ func (o *Owner) getMinGCSafePointCache(ctx context.Context) model.Ts { } o.minGCSafePointCache.ts = oracle.ComposeTS(physicalTs-(o.gcTTL*1000), logicalTs) + // o.pdGCSafePoint pd is the smallest gcSafePoint across all services. + // If tikv_gc_life_time > gcTTL, means that tikv_gc_safe_point < o.minGCSafePointCache.ts here. + // It also means that pd.pdGCSafePoint < o.minGCSafePointCache.ts here, we should use its value as the min value. + // This ensures that when tikv_gc_life_time > gcTTL , cdc will not advance the gcSafePoint. if o.pdGCSafePoint < o.minGCSafePointCache.ts { o.minGCSafePointCache.ts = o.pdGCSafePoint } @@ -132,7 +136,7 @@ type Owner struct { gcSafepointLastUpdate time.Time // stores the ts obtained from PD and is updated every MinGCSafePointCacheUpdateInterval. minGCSafePointCache minGCSafePointCacheEntry - // stores the actual gcSafePoint store in pd + // stores the actual gcSafePoint stored in pd pdGCSafePoint model.Ts // record last time that flushes all changefeeds' replication status lastFlushChangefeeds time.Time @@ -765,9 +769,12 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { // A changefeed will not enter the map twice, because in run(), // handleAdminJob() will always be executed before flushChangeFeedInfos(), // ensuring that the previous changefeed in staleChangeFeeds has been stopped and removed from o.changeFeeds. - // 2. We need the <= check here is because when a changefeed is stagnant, its checkpointTs will be updated to pd, - // and it would be the minimum gcSafePoint of cdc. In order to avoid circular dependence, it is necessary to check for equality. - // Checking whether they are equal will make the loop assignment to happen only once, and then it will be skipped in next time. + // 2. We need the `<=` check here is because when a changefeed is stagnant, its checkpointTs will be updated to pd, + // and it would be the minimum gcSafePoint across all services. + // So as described above(line 92) minGCSafePoint = gcSafePoint = CheckpointTs would happens. + // In this case, if we check `<` here , this changefeed will not be put into staleChangeFeeds, and its checkpoints will be updated to pd again and again. + // This will cause the cdc's gcSafePoint never advance. + // If we check `<=` here, when we encounter the changefeed again, we will put it into staleChangeFeeds. if changefeed.status.CheckpointTs <= minGCSafePoint { staleChangeFeeds[id] = changefeed.status }