Skip to content

Commit

Permalink
Owner: Fix gcttl bug (#1861)
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen authored May 29, 2021
1 parent f30b3cb commit 714bc03
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 28 deletions.
26 changes: 22 additions & 4 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ func (o *Owner) getMinGCSafePointCache(ctx context.Context) model.Ts {
return o.minGCSafePointCache.ts
}
o.minGCSafePointCache.ts = oracle.ComposeTS(physicalTs-(o.gcTTL*1000), logicalTs)

// o.pdGCSafePoint pd is the smallest gcSafePoint across all services.
// If tikv_gc_life_time > gcTTL, means that tikv_gc_safe_point < o.minGCSafePointCache.ts here.
// It also means that pd.pdGCSafePoint < o.minGCSafePointCache.ts here, we should use its value as the min value.
// This ensures that when tikv_gc_life_time > gcTTL , cdc will not advance the gcSafePoint.
if o.pdGCSafePoint < o.minGCSafePointCache.ts {
o.minGCSafePointCache.ts = o.pdGCSafePoint
}

o.minGCSafePointCache.lastUpdated = time.Now()
}
return o.minGCSafePointCache.ts
Expand Down Expand Up @@ -127,6 +136,8 @@ type Owner struct {
gcSafepointLastUpdate time.Time
// stores the ts obtained from PD and is updated every MinGCSafePointCacheUpdateInterval.
minGCSafePointCache minGCSafePointCacheEntry
// stores the actual gcSafePoint stored in pd
pdGCSafePoint model.Ts
// record last time that flushes all changefeeds' replication status
lastFlushChangefeeds time.Time
flushChangefeedInterval time.Duration
Expand Down Expand Up @@ -753,12 +764,18 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error {
if changefeed.status.CheckpointTs < gcSafePoint {
gcSafePoint = changefeed.status.CheckpointTs
}
// If changefeed's appliedCheckpoinTs < minGCSafePoint, it means this changefeed is stagnant.
// 1. If changefeed's appliedCheckpoinTs <= minGCSafePoint, it means this changefeed is stagnant.
// They are collected into this map, and then handleStaleChangeFeed() is called to deal with these stagnant changefeed.
// A changefeed will not enter the map twice, because in run(),
// handleAdminJob() will always be executed before flushChangeFeedInfos(),
// ensuring that the previous changefeed in staleChangeFeeds has been stopped and removed from o.changeFeeds.
if changefeed.status.CheckpointTs < minGCSafePoint {
// 2. We need the `<=` check here is because when a changefeed is stagnant, its checkpointTs will be updated to pd,
// and it would be the minimum gcSafePoint across all services.
// So as described above(line 92) minGCSafePoint = gcSafePoint = CheckpointTs would happens.
// In this case, if we check `<` here , this changefeed will not be put into staleChangeFeeds, and its checkpoints will be updated to pd again and again.
// This will cause the cdc's gcSafePoint never advance.
// If we check `<=` here, when we encounter the changefeed again, we will put it into staleChangeFeeds.
if changefeed.status.CheckpointTs <= minGCSafePoint {
staleChangeFeeds[id] = changefeed.status
}

Expand All @@ -781,10 +798,10 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error {
}

for _, status := range o.stoppedFeeds {
// If a stopped changefeed's CheckpoinTs < minGCSafePoint, means this changefeed is stagnant.
// If a stopped changefeed's CheckpoinTs <= minGCSafePoint, means this changefeed is stagnant.
// It should never be resumed. This part of the logic is in newChangeFeed()
// So here we can skip it.
if status.CheckpointTs < minGCSafePoint {
if status.CheckpointTs <= minGCSafePoint {
continue
}

Expand All @@ -810,6 +827,7 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error {
return cerror.ErrUpdateServiceSafepointFailed.Wrap(err)
}
} else {
o.pdGCSafePoint = actual
o.gcSafepointLastUpdate = time.Now()
}

Expand Down
112 changes: 88 additions & 24 deletions cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ import (
"golang.org/x/sync/errgroup"
)

const TiKVGCLifeTime = 10 * 60 * time.Second // 10 min

type ownerSuite struct {
e *embed.Etcd
clientURL *url.URL
Expand Down Expand Up @@ -87,9 +89,10 @@ func (s *ownerSuite) TearDownTest(c *check.C) {

type mockPDClient struct {
pd.Client
invokeCounter int
mockSafePointLost bool
mockPDFailure bool
invokeCounter int
mockSafePointLost bool
mockPDFailure bool
mockTiKVGCLifeTime bool
}

func (m *mockPDClient) GetTS(ctx context.Context) (int64, int64, error) {
Expand All @@ -111,7 +114,10 @@ func (m *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID s
if m.mockPDFailure {
return 0, errors.New("injected PD failure")
}

if m.mockTiKVGCLifeTime {
Ts := oracle.GoTimeToTS(time.Now().Add(-TiKVGCLifeTime))
return Ts, nil
}
return safePoint, nil
}

Expand Down Expand Up @@ -186,18 +192,19 @@ func (s *ownerSuite) TestOwnerFlushChangeFeedInfosFailed(c *check.C) {
s.TearDownTest(c)
}

// Test whether the owner handles the stagnant task correctly, so that it can't block the update of gcSafePoint.
// If a changefeed is put into the stop queue due to stagnation, it can no longer affect the update of gcSafePoint.
// So we just need to test whether the stagnant changefeed is put into the stop queue.
func (s *ownerSuite) TestOwnerHandleStaleChangeFeed(c *check.C) {
defer testleak.AfterTest(c)()
// Test whether it is possible to successfully create a changefeed
// with startTs less than currentTs - gcTTL when tikv_gc_life_time is greater than gc-ttl
func (s *ownerSuite) TestTiKVGCLifeTimeLargeThanGCTTL(c *check.C) {
defer testleak.AfterTest(c)
mockPDCli := &mockPDClient{}
mockPDCli.mockTiKVGCLifeTime = true

changeFeeds := map[model.ChangeFeedID]*changeFeed{
"test_change_feed_1": {
info: &model.ChangeFeedInfo{State: model.StateNormal},
etcdCli: s.client,
status: &model.ChangeFeedStatus{
CheckpointTs: 1000,
CheckpointTs: oracle.GoTimeToTS(time.Now().Add(-6 * time.Second)),
},
targetTs: 2000,
ddlState: model.ChangeFeedSyncDML,
Expand All @@ -210,11 +217,59 @@ func (s *ownerSuite) TestOwnerHandleStaleChangeFeed(c *check.C) {
"capture_2": {},
},
},
"test_change_feed_2": {
}

session, err := concurrency.NewSession(s.client.Client.Unwrap(),
concurrency.WithTTL(config.GetDefaultServerConfig().CaptureSessionTTL))
c.Assert(err, check.IsNil)

mockOwner := Owner{
session: session,
pdClient: mockPDCli,
etcdClient: s.client,
lastFlushChangefeeds: time.Now(),
flushChangefeedInterval: 1 * time.Hour,
// gcSafepointLastUpdate: time.Now(),
gcTTL: 6, // 6 seconds
changeFeeds: changeFeeds,
cfRWriter: s.client,
stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus),
minGCSafePointCache: minGCSafePointCacheEntry{},
}

err = mockOwner.flushChangeFeedInfos(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockPDCli.invokeCounter, check.Equals, 1)

err = mockOwner.handleAdminJob(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockOwner.stoppedFeeds["test_change_feed_1"], check.IsNil)
c.Assert(mockOwner.changeFeeds["test_change_feed_1"].info.State, check.Equals, model.StateNormal)

time.Sleep(7 * time.Second) // wait for gcTTL time pass
err = mockOwner.flushChangeFeedInfos(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockPDCli.invokeCounter, check.Equals, 2)

err = mockOwner.handleAdminJob(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockOwner.stoppedFeeds["test_change_feed_1"], check.IsNil)

s.TearDownTest(c)
}

// Test whether the owner handles the stagnant task correctly, so that it can't block the update of gcSafePoint.
// If a changefeed is put into the stop queue due to stagnation, it can no longer affect the update of gcSafePoint.
// So we just need to test whether the stagnant changefeed is put into the stop queue.
func (s *ownerSuite) TestOwnerHandleStaleChangeFeed(c *check.C) {
defer testleak.AfterTest(c)()
mockPDCli := &mockPDClient{}
changeFeeds := map[model.ChangeFeedID]*changeFeed{
"test_change_feed_1": {
info: &model.ChangeFeedInfo{State: model.StateNormal},
etcdCli: s.client,
status: &model.ChangeFeedStatus{
CheckpointTs: oracle.EncodeTSO(oracle.GetPhysical(time.Now())),
CheckpointTs: 1000,
},
targetTs: 2000,
ddlState: model.ChangeFeedSyncDML,
Expand All @@ -227,11 +282,11 @@ func (s *ownerSuite) TestOwnerHandleStaleChangeFeed(c *check.C) {
"capture_2": {},
},
},
"test_change_feed_3": {
"test_change_feed_2": {
info: &model.ChangeFeedInfo{State: model.StateNormal},
etcdCli: s.client,
status: &model.ChangeFeedStatus{
CheckpointTs: 0,
CheckpointTs: oracle.EncodeTSO(oracle.GetPhysical(time.Now())),
},
targetTs: 2000,
ddlState: model.ChangeFeedSyncDML,
Expand All @@ -256,32 +311,41 @@ func (s *ownerSuite) TestOwnerHandleStaleChangeFeed(c *check.C) {
etcdClient: s.client,
lastFlushChangefeeds: time.Now(),
flushChangefeedInterval: 1 * time.Hour,
// gcSafepointLastUpdate: time.Now(),
gcTTL: 6, // 6 seconds
changeFeeds: changeFeeds,
cfRWriter: s.client,
stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus),
minGCSafePointCache: minGCSafePointCacheEntry{},
gcSafepointLastUpdate: time.Now().Add(-4 * time.Second),
gcTTL: 6, // 6 seconds
changeFeeds: changeFeeds,
cfRWriter: s.client,
stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus),
minGCSafePointCache: minGCSafePointCacheEntry{},
}

time.Sleep(3 * time.Second)
err = mockOwner.flushChangeFeedInfos(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockPDCli.invokeCounter, check.Equals, 1)

err = mockOwner.handleAdminJob(s.ctx)
c.Assert(err, check.IsNil)

time.Sleep(2 * time.Second)
err = mockOwner.flushChangeFeedInfos(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockPDCli.invokeCounter, check.Equals, 2)
err = mockOwner.handleAdminJob(s.ctx)
c.Assert(err, check.IsNil)

c.Assert(mockOwner.stoppedFeeds["test_change_feed_1"], check.NotNil)
c.Assert(mockOwner.stoppedFeeds["test_change_feed_3"], check.NotNil)
c.Assert(mockOwner.changeFeeds["test_change_feed_2"].info.State, check.Equals, model.StateNormal)

time.Sleep(6 * time.Second)
err = mockOwner.flushChangeFeedInfos(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockPDCli.invokeCounter, check.Equals, 2)
c.Assert(mockPDCli.invokeCounter, check.Equals, 3)
err = mockOwner.handleAdminJob(s.ctx)
c.Assert(err, check.IsNil)

time.Sleep(2 * time.Second)
err = mockOwner.flushChangeFeedInfos(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockPDCli.invokeCounter, check.Equals, 4)
err = mockOwner.handleAdminJob(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockOwner.stoppedFeeds["test_change_feed_2"], check.NotNil)
Expand Down

0 comments on commit 714bc03

Please sign in to comment.