Skip to content

Commit

Permalink
owner: Modified the update strategy of gcSafePoint (#1731) (#1756)
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen authored May 11, 2021
1 parent dfd4b81 commit 2f9064f
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 10 deletions.
92 changes: 84 additions & 8 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,24 @@ func (o *ownership) inc() {
}
}

type minGCSafePointCacheEntry struct {
ts model.Ts
lastUpdated time.Time
}

func (o *Owner) getMinGCSafePointCache(ctx context.Context) model.Ts {
if time.Now().After(o.minGCSafePointCache.lastUpdated.Add(MinGCSafePointCacheUpdateInterval)) {
physicalTs, logicalTs, err := o.pdClient.GetTS(ctx)
if err != nil {
log.Warn("Fail to update minGCSafePointCache.", zap.Error(err))
return o.minGCSafePointCache.ts
}
o.minGCSafePointCache.ts = oracle.ComposeTS(physicalTs-(o.gcTTL*1000), logicalTs)
o.minGCSafePointCache.lastUpdated = time.Now()
}
return o.minGCSafePointCache.ts
}

// Owner manages the cdc cluster
type Owner struct {
done chan struct{}
Expand Down Expand Up @@ -107,6 +125,8 @@ type Owner struct {
gcTTL int64
// last update gc safepoint time. zero time means has not updated or cleared
gcSafepointLastUpdate time.Time
// stores the ts obtained from PD and is updated every MinGCSafePointCacheUpdateInterval.
minGCSafePointCache minGCSafePointCacheEntry
// record last time that flushes all changefeeds' replication status
lastFlushChangefeeds time.Time
flushChangefeedInterval time.Duration
Expand All @@ -118,6 +138,8 @@ const (
CDCServiceSafePointID = "ticdc"
// GCSafepointUpdateInterval is the minimual interval that CDC can update gc safepoint
GCSafepointUpdateInterval = time.Duration(2 * time.Second)
// MinGCSafePointCacheUpdateInterval is the interval that update minGCSafePointCache
MinGCSafePointCacheUpdateInterval = time.Second * 2
)

// NewOwner creates a new Owner instance
Expand Down Expand Up @@ -715,13 +737,26 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error {
return nil
}

minCheckpointTs := uint64(math.MaxUint64)
staleChangeFeeds := make(map[model.ChangeFeedID]*model.ChangeFeedStatus, len(o.changeFeeds))
gcSafePoint := uint64(math.MaxUint64)

// get the lower bound of gcSafePoint
minGCSafePoint := o.getMinGCSafePointCache(ctx)

if len(o.changeFeeds) > 0 {
snapshot := make(map[model.ChangeFeedID]*model.ChangeFeedStatus, len(o.changeFeeds))
for id, changefeed := range o.changeFeeds {
snapshot[id] = changefeed.status
if changefeed.appliedCheckpointTs < minCheckpointTs {
minCheckpointTs = changefeed.appliedCheckpointTs
if changefeed.status.CheckpointTs < gcSafePoint {
gcSafePoint = changefeed.status.CheckpointTs
}
// If changefeed's appliedCheckpoinTs < minGCSafePoint, it means this changefeed is stagnant.
// They are collected into this map, and then handleStaleChangeFeed() is called to deal with these stagnant changefeed.
// A changefeed will not enter the map twice, because in run(),
// handleAdminJob() will always be executed before flushChangeFeedInfos(),
// ensuring that the previous changefeed in staleChangeFeeds has been stopped and removed from o.changeFeeds.
if changefeed.status.CheckpointTs < minGCSafePoint {
staleChangeFeeds[id] = changefeed.status
}

phyTs := oracle.ExtractPhysical(changefeed.status.CheckpointTs)
Expand All @@ -741,13 +776,28 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error {
o.lastFlushChangefeeds = time.Now()
}
}

for _, status := range o.stoppedFeeds {
if status.CheckpointTs < minCheckpointTs {
minCheckpointTs = status.CheckpointTs
// If a stopped changefeed's CheckpoinTs < minGCSafePoint, means this changefeed is stagnant.
// It should never be resumed. This part of the logic is in newChangeFeed()
// So here we can skip it.
if status.CheckpointTs < minGCSafePoint {
continue
}

if status.CheckpointTs < gcSafePoint {
gcSafePoint = status.CheckpointTs
}
}

// handle stagnant changefeed collected above
err := o.handleStaleChangeFeed(ctx, staleChangeFeeds, minGCSafePoint)
if err != nil {
log.Warn("failed to handleStaleChangeFeed ", zap.Error(err))
}

if time.Since(o.gcSafepointLastUpdate) > GCSafepointUpdateInterval {
actual, err := o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, o.gcTTL, minCheckpointTs)
actual, err := o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, o.gcTTL, gcSafePoint)
if err != nil {
sinceLastUpdate := time.Since(o.gcSafepointLastUpdate)
log.Warn("failed to update service safe point", zap.Error(err),
Expand All @@ -764,9 +814,9 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error {
actual = uint64(val.(int))
})

if actual > minCheckpointTs {
if actual > gcSafePoint {
// UpdateServiceGCSafePoint has failed.
log.Warn("updating an outdated service safe point", zap.Uint64("checkpoint-ts", minCheckpointTs), zap.Uint64("actual-safepoint", actual))
log.Warn("updating an outdated service safe point", zap.Uint64("checkpoint-ts", gcSafePoint), zap.Uint64("actual-safepoint", actual))

for cfID, cf := range o.changeFeeds {
if cf.status.CheckpointTs < actual {
Expand Down Expand Up @@ -871,6 +921,7 @@ func (o *Owner) dispatchJob(ctx context.Context, job model.AdminJob) error {
// For `AdminResume`, we remove stopped feed in changefeed initialization phase.
// For `AdminRemove`, we need to update stoppedFeeds when removing a stopped changefeed.
if job.Type == model.AdminStop {
log.Debug("put changfeed into stoppedFeeds queue", zap.String("changefeed", job.CfID))
o.stoppedFeeds[job.CfID] = cf.status
}
for captureID := range cf.taskStatus {
Expand Down Expand Up @@ -1623,3 +1674,28 @@ func (o *Owner) startCaptureWatcher(ctx context.Context) {
}
}()
}

// handle the StaleChangeFeed
// By setting the AdminJob type to AdminStop and the Error code to indicate that the changefeed is stagnant.
func (o *Owner) handleStaleChangeFeed(ctx context.Context, staleChangeFeeds map[model.ChangeFeedID]*model.ChangeFeedStatus, minGCSafePoint uint64) error {
for id, status := range staleChangeFeeds {
message := cerror.ErrSnapshotLostByGC.GenWithStackByArgs(status.CheckpointTs, minGCSafePoint).Error()
log.Warn("changefeed checkpoint is lagging too much, so it will be stopped.", zap.String("changefeed", id), zap.String("Error message", message))
runningError := &model.RunningError{
Addr: util.CaptureAddrFromCtx(ctx),
Code: string(cerror.ErrSnapshotLostByGC.RFCCode()), // changfeed is stagnant
Message: message,
}

err := o.EnqueueJob(model.AdminJob{
CfID: id,
Type: model.AdminStop,
Error: runningError,
})
if err != nil {
return errors.Trace(err)
}
delete(staleChangeFeeds, id)
}
return nil
}
117 changes: 116 additions & 1 deletion cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv/oracle"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
Expand Down Expand Up @@ -91,6 +92,16 @@ type mockPDClient struct {
mockPDFailure bool
}

func (m *mockPDClient) GetTS(ctx context.Context) (int64, int64, error) {
if m.mockPDFailure {
return 0, 0, errors.New("injected PD failure")
}
if m.mockSafePointLost {
return 0, 0, nil
}
return oracle.GetPhysical(time.Now()), 0, nil
}

func (m *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
m.invokeCounter++

Expand Down Expand Up @@ -173,12 +184,114 @@ func (s *ownerSuite) TestOwnerFlushChangeFeedInfosFailed(c *check.C) {
s.TearDownTest(c)
}

// Test whether the owner handles the stagnant task correctly, so that it can't block the update of gcSafePoint.
// If a changefeed is put into the stop queue due to stagnation, it can no longer affect the update of gcSafePoint.
// So we just need to test whether the stagnant changefeed is put into the stop queue.
func (s *ownerSuite) TestOwnerHandleStaleChangeFeed(c *check.C) {
defer testleak.AfterTest(c)()
mockPDCli := &mockPDClient{}
changeFeeds := map[model.ChangeFeedID]*changeFeed{
"test_change_feed_1": {
info: &model.ChangeFeedInfo{State: model.StateNormal},
etcdCli: s.client,
status: &model.ChangeFeedStatus{
CheckpointTs: 1000,
},
targetTs: 2000,
ddlState: model.ChangeFeedSyncDML,
taskStatus: model.ProcessorsInfos{
"capture_1": {},
"capture_2": {},
},
taskPositions: map[string]*model.TaskPosition{
"capture_1": {},
"capture_2": {},
},
},
"test_change_feed_2": {
info: &model.ChangeFeedInfo{State: model.StateNormal},
etcdCli: s.client,
status: &model.ChangeFeedStatus{
CheckpointTs: oracle.EncodeTSO(oracle.GetPhysical(time.Now())),
},
targetTs: 2000,
ddlState: model.ChangeFeedSyncDML,
taskStatus: model.ProcessorsInfos{
"capture_1": {},
"capture_2": {},
},
taskPositions: map[string]*model.TaskPosition{
"capture_1": {},
"capture_2": {},
},
},
"test_change_feed_3": {
info: &model.ChangeFeedInfo{State: model.StateNormal},
etcdCli: s.client,
status: &model.ChangeFeedStatus{
CheckpointTs: 0,
},
targetTs: 2000,
ddlState: model.ChangeFeedSyncDML,
taskStatus: model.ProcessorsInfos{
"capture_1": {},
"capture_2": {},
},
taskPositions: map[string]*model.TaskPosition{
"capture_1": {},
"capture_2": {},
},
},
}

session, err := concurrency.NewSession(s.client.Client.Unwrap(),
concurrency.WithTTL(defaultCaptureSessionTTL))
c.Assert(err, check.IsNil)

mockOwner := Owner{
session: session,
pdClient: mockPDCli,
etcdClient: s.client,
lastFlushChangefeeds: time.Now(),
flushChangefeedInterval: 1 * time.Hour,
// gcSafepointLastUpdate: time.Now(),
gcTTL: 6, // 6 seconds
changeFeeds: changeFeeds,
cfRWriter: s.client,
stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus),
minGCSafePointCache: minGCSafePointCacheEntry{},
}

time.Sleep(3 * time.Second)
err = mockOwner.flushChangeFeedInfos(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockPDCli.invokeCounter, check.Equals, 1)

err = mockOwner.handleAdminJob(s.ctx)
c.Assert(err, check.IsNil)
err = mockOwner.handleAdminJob(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockOwner.stoppedFeeds["test_change_feed_1"], check.NotNil)
c.Assert(mockOwner.stoppedFeeds["test_change_feed_3"], check.NotNil)
c.Assert(mockOwner.changeFeeds["test_change_feed_2"].info.State, check.Equals, model.StateNormal)

time.Sleep(6 * time.Second)
err = mockOwner.flushChangeFeedInfos(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockPDCli.invokeCounter, check.Equals, 2)

err = mockOwner.handleAdminJob(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockOwner.stoppedFeeds["test_change_feed_2"], check.NotNil)

s.TearDownTest(c)
}

func (s *ownerSuite) TestOwnerUploadGCSafePointOutdated(c *check.C) {
defer testleak.AfterTest(c)()
mockPDCli := &mockPDClient{
mockSafePointLost: true,
}

changeFeeds := map[model.ChangeFeedID]*changeFeed{
"test_change_feed_1": {
info: &model.ChangeFeedInfo{State: model.StateNormal},
Expand Down Expand Up @@ -219,6 +332,7 @@ func (s *ownerSuite) TestOwnerUploadGCSafePointOutdated(c *check.C) {
session, err := concurrency.NewSession(s.client.Client.Unwrap(),
concurrency.WithTTL(defaultCaptureSessionTTL))
c.Assert(err, check.IsNil)

mockOwner := Owner{
pdClient: mockPDCli,
session: session,
Expand All @@ -228,6 +342,7 @@ func (s *ownerSuite) TestOwnerUploadGCSafePointOutdated(c *check.C) {
changeFeeds: changeFeeds,
cfRWriter: s.client,
stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus),
minGCSafePointCache: minGCSafePointCacheEntry{},
}

err = mockOwner.flushChangeFeedInfos(s.ctx)
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,11 @@ error = '''
sink uri invalid
'''

["CDC:ErrSnapshotLostByGC"]
error = '''
fail to create or maintain changefeed due to snapshot loss caused by GC. checkpoint-ts %d is earlier than GC safepoint at %d
'''

["CDC:ErrSnapshotSchemaExists"]
error = '''
schema %s(%d) already exists
Expand Down
2 changes: 1 addition & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ var (
ErrServiceSafepointLost = errors.Normalize("service safepoint lost. current safepoint is %d, please remove all changefeed(s) whose checkpoints are behind the current safepoint", errors.RFCCodeText("CDC:ErrServiceSafepointLost"))
ErrUpdateServiceSafepointFailed = errors.Normalize("updating service safepoint failed", errors.RFCCodeText("CDC:ErrUpdateServiceSafepointFailed"))
ErrStartTsBeforeGC = errors.Normalize("fail to create changefeed because start-ts %d is earlier than GC safepoint at %d", errors.RFCCodeText("CDC:ErrStartTsBeforeGC"))

ErrSnapshotLostByGC = errors.Normalize("fail to create or maintain changefeed due to snapshot loss caused by GC. checkpoint-ts %d is earlier than GC safepoint at %d", errors.RFCCodeText("CDC:ErrSnapshotLostByGC"))
// EtcdWorker related errors. Internal use only.
// ErrEtcdTryAgain is used by a PatchFunc to force a transaction abort.
ErrEtcdTryAgain = errors.Normalize("the etcd txn should be aborted and retried immediately", errors.RFCCodeText("CDC:ErrEtcdTryAgain"))
Expand Down

0 comments on commit 2f9064f

Please sign in to comment.