diff --git a/cdc/owner.go b/cdc/owner.go index a0471b813dd..315bcf810a0 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -85,6 +85,15 @@ func (o *Owner) getMinGCSafePointCache(ctx context.Context) model.Ts { return o.minGCSafePointCache.ts } o.minGCSafePointCache.ts = oracle.ComposeTS(physicalTs-(o.gcTTL*1000), logicalTs) + + // o.pdGCSafePoint pd is the smallest gcSafePoint across all services. + // If tikv_gc_life_time > gcTTL, means that tikv_gc_safe_point < o.minGCSafePointCache.ts here. + // It also means that pd.pdGCSafePoint < o.minGCSafePointCache.ts here, we should use its value as the min value. + // This ensures that when tikv_gc_life_time > gcTTL , cdc will not advance the gcSafePoint. + if o.pdGCSafePoint < o.minGCSafePointCache.ts { + o.minGCSafePointCache.ts = o.pdGCSafePoint + } + o.minGCSafePointCache.lastUpdated = time.Now() } return o.minGCSafePointCache.ts @@ -127,6 +136,8 @@ type Owner struct { gcSafepointLastUpdate time.Time // stores the ts obtained from PD and is updated every MinGCSafePointCacheUpdateInterval. minGCSafePointCache minGCSafePointCacheEntry + // stores the actual gcSafePoint stored in pd + pdGCSafePoint model.Ts // record last time that flushes all changefeeds' replication status lastFlushChangefeeds time.Time flushChangefeedInterval time.Duration @@ -753,12 +764,18 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { if changefeed.status.CheckpointTs < gcSafePoint { gcSafePoint = changefeed.status.CheckpointTs } - // If changefeed's appliedCheckpoinTs < minGCSafePoint, it means this changefeed is stagnant. + // 1. If changefeed's appliedCheckpoinTs <= minGCSafePoint, it means this changefeed is stagnant. // They are collected into this map, and then handleStaleChangeFeed() is called to deal with these stagnant changefeed. // A changefeed will not enter the map twice, because in run(), // handleAdminJob() will always be executed before flushChangeFeedInfos(), // ensuring that the previous changefeed in staleChangeFeeds has been stopped and removed from o.changeFeeds. - if changefeed.status.CheckpointTs < minGCSafePoint { + // 2. We need the `<=` check here is because when a changefeed is stagnant, its checkpointTs will be updated to pd, + // and it would be the minimum gcSafePoint across all services. + // So as described above(line 92) minGCSafePoint = gcSafePoint = CheckpointTs would happens. + // In this case, if we check `<` here , this changefeed will not be put into staleChangeFeeds, and its checkpoints will be updated to pd again and again. + // This will cause the cdc's gcSafePoint never advance. + // If we check `<=` here, when we encounter the changefeed again, we will put it into staleChangeFeeds. + if changefeed.status.CheckpointTs <= minGCSafePoint { staleChangeFeeds[id] = changefeed.status } @@ -781,10 +798,10 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { } for _, status := range o.stoppedFeeds { - // If a stopped changefeed's CheckpoinTs < minGCSafePoint, means this changefeed is stagnant. + // If a stopped changefeed's CheckpoinTs <= minGCSafePoint, means this changefeed is stagnant. // It should never be resumed. This part of the logic is in newChangeFeed() // So here we can skip it. - if status.CheckpointTs < minGCSafePoint { + if status.CheckpointTs <= minGCSafePoint { continue } @@ -810,6 +827,7 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { return cerror.ErrUpdateServiceSafepointFailed.Wrap(err) } } else { + o.pdGCSafePoint = actual o.gcSafepointLastUpdate = time.Now() } diff --git a/cdc/owner_test.go b/cdc/owner_test.go index 84829a014e6..ef1d988ac00 100644 --- a/cdc/owner_test.go +++ b/cdc/owner_test.go @@ -49,6 +49,8 @@ import ( "golang.org/x/sync/errgroup" ) +const TiKVGCLifeTime = 10 * 60 * time.Second // 10 min + type ownerSuite struct { e *embed.Etcd clientURL *url.URL @@ -87,9 +89,10 @@ func (s *ownerSuite) TearDownTest(c *check.C) { type mockPDClient struct { pd.Client - invokeCounter int - mockSafePointLost bool - mockPDFailure bool + invokeCounter int + mockSafePointLost bool + mockPDFailure bool + mockTiKVGCLifeTime bool } func (m *mockPDClient) GetTS(ctx context.Context) (int64, int64, error) { @@ -111,7 +114,10 @@ func (m *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID s if m.mockPDFailure { return 0, errors.New("injected PD failure") } - + if m.mockTiKVGCLifeTime { + Ts := oracle.GoTimeToTS(time.Now().Add(-TiKVGCLifeTime)) + return Ts, nil + } return safePoint, nil } @@ -186,18 +192,19 @@ func (s *ownerSuite) TestOwnerFlushChangeFeedInfosFailed(c *check.C) { s.TearDownTest(c) } -// Test whether the owner handles the stagnant task correctly, so that it can't block the update of gcSafePoint. -// If a changefeed is put into the stop queue due to stagnation, it can no longer affect the update of gcSafePoint. -// So we just need to test whether the stagnant changefeed is put into the stop queue. -func (s *ownerSuite) TestOwnerHandleStaleChangeFeed(c *check.C) { - defer testleak.AfterTest(c)() +// Test whether it is possible to successfully create a changefeed +// with startTs less than currentTs - gcTTL when tikv_gc_life_time is greater than gc-ttl +func (s *ownerSuite) TestTiKVGCLifeTimeLargeThanGCTTL(c *check.C) { + defer testleak.AfterTest(c) mockPDCli := &mockPDClient{} + mockPDCli.mockTiKVGCLifeTime = true + changeFeeds := map[model.ChangeFeedID]*changeFeed{ "test_change_feed_1": { info: &model.ChangeFeedInfo{State: model.StateNormal}, etcdCli: s.client, status: &model.ChangeFeedStatus{ - CheckpointTs: 1000, + CheckpointTs: oracle.GoTimeToTS(time.Now().Add(-6 * time.Second)), }, targetTs: 2000, ddlState: model.ChangeFeedSyncDML, @@ -210,11 +217,59 @@ func (s *ownerSuite) TestOwnerHandleStaleChangeFeed(c *check.C) { "capture_2": {}, }, }, - "test_change_feed_2": { + } + + session, err := concurrency.NewSession(s.client.Client.Unwrap(), + concurrency.WithTTL(config.GetDefaultServerConfig().CaptureSessionTTL)) + c.Assert(err, check.IsNil) + + mockOwner := Owner{ + session: session, + pdClient: mockPDCli, + etcdClient: s.client, + lastFlushChangefeeds: time.Now(), + flushChangefeedInterval: 1 * time.Hour, + // gcSafepointLastUpdate: time.Now(), + gcTTL: 6, // 6 seconds + changeFeeds: changeFeeds, + cfRWriter: s.client, + stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus), + minGCSafePointCache: minGCSafePointCacheEntry{}, + } + + err = mockOwner.flushChangeFeedInfos(s.ctx) + c.Assert(err, check.IsNil) + c.Assert(mockPDCli.invokeCounter, check.Equals, 1) + + err = mockOwner.handleAdminJob(s.ctx) + c.Assert(err, check.IsNil) + c.Assert(mockOwner.stoppedFeeds["test_change_feed_1"], check.IsNil) + c.Assert(mockOwner.changeFeeds["test_change_feed_1"].info.State, check.Equals, model.StateNormal) + + time.Sleep(7 * time.Second) // wait for gcTTL time pass + err = mockOwner.flushChangeFeedInfos(s.ctx) + c.Assert(err, check.IsNil) + c.Assert(mockPDCli.invokeCounter, check.Equals, 2) + + err = mockOwner.handleAdminJob(s.ctx) + c.Assert(err, check.IsNil) + c.Assert(mockOwner.stoppedFeeds["test_change_feed_1"], check.IsNil) + + s.TearDownTest(c) +} + +// Test whether the owner handles the stagnant task correctly, so that it can't block the update of gcSafePoint. +// If a changefeed is put into the stop queue due to stagnation, it can no longer affect the update of gcSafePoint. +// So we just need to test whether the stagnant changefeed is put into the stop queue. +func (s *ownerSuite) TestOwnerHandleStaleChangeFeed(c *check.C) { + defer testleak.AfterTest(c)() + mockPDCli := &mockPDClient{} + changeFeeds := map[model.ChangeFeedID]*changeFeed{ + "test_change_feed_1": { info: &model.ChangeFeedInfo{State: model.StateNormal}, etcdCli: s.client, status: &model.ChangeFeedStatus{ - CheckpointTs: oracle.EncodeTSO(oracle.GetPhysical(time.Now())), + CheckpointTs: 1000, }, targetTs: 2000, ddlState: model.ChangeFeedSyncDML, @@ -227,11 +282,11 @@ func (s *ownerSuite) TestOwnerHandleStaleChangeFeed(c *check.C) { "capture_2": {}, }, }, - "test_change_feed_3": { + "test_change_feed_2": { info: &model.ChangeFeedInfo{State: model.StateNormal}, etcdCli: s.client, status: &model.ChangeFeedStatus{ - CheckpointTs: 0, + CheckpointTs: oracle.EncodeTSO(oracle.GetPhysical(time.Now())), }, targetTs: 2000, ddlState: model.ChangeFeedSyncDML, @@ -256,32 +311,41 @@ func (s *ownerSuite) TestOwnerHandleStaleChangeFeed(c *check.C) { etcdClient: s.client, lastFlushChangefeeds: time.Now(), flushChangefeedInterval: 1 * time.Hour, - // gcSafepointLastUpdate: time.Now(), - gcTTL: 6, // 6 seconds - changeFeeds: changeFeeds, - cfRWriter: s.client, - stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus), - minGCSafePointCache: minGCSafePointCacheEntry{}, + gcSafepointLastUpdate: time.Now().Add(-4 * time.Second), + gcTTL: 6, // 6 seconds + changeFeeds: changeFeeds, + cfRWriter: s.client, + stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus), + minGCSafePointCache: minGCSafePointCacheEntry{}, } - time.Sleep(3 * time.Second) err = mockOwner.flushChangeFeedInfos(s.ctx) c.Assert(err, check.IsNil) c.Assert(mockPDCli.invokeCounter, check.Equals, 1) - err = mockOwner.handleAdminJob(s.ctx) c.Assert(err, check.IsNil) + + time.Sleep(2 * time.Second) + err = mockOwner.flushChangeFeedInfos(s.ctx) + c.Assert(err, check.IsNil) + c.Assert(mockPDCli.invokeCounter, check.Equals, 2) err = mockOwner.handleAdminJob(s.ctx) c.Assert(err, check.IsNil) + c.Assert(mockOwner.stoppedFeeds["test_change_feed_1"], check.NotNil) - c.Assert(mockOwner.stoppedFeeds["test_change_feed_3"], check.NotNil) c.Assert(mockOwner.changeFeeds["test_change_feed_2"].info.State, check.Equals, model.StateNormal) time.Sleep(6 * time.Second) err = mockOwner.flushChangeFeedInfos(s.ctx) c.Assert(err, check.IsNil) - c.Assert(mockPDCli.invokeCounter, check.Equals, 2) + c.Assert(mockPDCli.invokeCounter, check.Equals, 3) + err = mockOwner.handleAdminJob(s.ctx) + c.Assert(err, check.IsNil) + time.Sleep(2 * time.Second) + err = mockOwner.flushChangeFeedInfos(s.ctx) + c.Assert(err, check.IsNil) + c.Assert(mockPDCli.invokeCounter, check.Equals, 4) err = mockOwner.handleAdminJob(s.ctx) c.Assert(err, check.IsNil) c.Assert(mockOwner.stoppedFeeds["test_change_feed_2"], check.NotNil)