Skip to content

Commit

Permalink
owner: check update safepoint error (#1282) (#1367)
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
ti-srebot authored Jan 28, 2021
1 parent 26748b3 commit e3b93d6
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 47 deletions.
25 changes: 17 additions & 8 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -987,20 +987,29 @@ func (c *changeFeed) startSyncPointTicker(ctx context.Context, interval time.Dur
}

func (c *changeFeed) Close() {
err := c.ddlHandler.Close()
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("failed to close ddl handler", zap.Error(err))
if c.ddlHandler != nil {
err := c.ddlHandler.Close()
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("failed to close ddl handler", zap.Error(err))
}
}
err = c.sink.Close()
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("failed to close owner sink", zap.Error(err))

if c.sink != nil {
err := c.sink.Close()
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("failed to close owner sink", zap.Error(err))
}
}

if c.syncpointStore != nil {
err = c.syncpointStore.Close()
err := c.syncpointStore.Close()
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("failed to close owner sink", zap.Error(err))
}
}
c.cancel()

if c.cancel != nil {
c.cancel()
}
log.Info("changefeed closed", zap.String("id", c.id))
}
42 changes: 38 additions & 4 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/pingcap/ticdc/pkg/scheduler"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3"
Expand Down Expand Up @@ -242,7 +241,7 @@ func (o *Owner) newChangeFeed(
}
}
failpoint.Inject("NewChangefeedNoRetryError", func() {
failpoint.Return(nil, tikv.ErrGCTooEarly.GenWithStackByArgs(checkpointTs-300, checkpointTs))
failpoint.Return(nil, cerror.ErrStartTsBeforeGC.GenWithStackByArgs(checkpointTs-300, checkpointTs))
})

failpoint.Inject("NewChangefeedRetryError", func() {
Expand Down Expand Up @@ -711,12 +710,46 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error {
}
}
if time.Since(o.gcSafepointLastUpdate) > GCSafepointUpdateInterval {
_, err := o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, o.gcTTL, minCheckpointTs)
actual, err := o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, o.gcTTL, minCheckpointTs)
if err != nil {
log.Warn("failed to update service safe point", zap.Error(err))
sinceLastUpdate := time.Since(o.gcSafepointLastUpdate)
log.Warn("failed to update service safe point", zap.Error(err),
zap.Duration("since-last-update", sinceLastUpdate))
// We do not throw an error unless updating GC safepoint has been failing for more than gcTTL.
if sinceLastUpdate >= time.Second*time.Duration(o.gcTTL) {
return cerror.ErrUpdateServiceSafepointFailed.Wrap(err)
}
} else {
o.gcSafepointLastUpdate = time.Now()
}

failpoint.Inject("InjectActualGCSafePoint", func(val failpoint.Value) {
actual = uint64(val.(int))
})

if actual > minCheckpointTs {
// UpdateServiceGCSafePoint has failed.
log.Warn("updating an outdated service safe point", zap.Uint64("checkpoint-ts", minCheckpointTs), zap.Uint64("actual-safepoint", actual))

for cfID, cf := range o.changeFeeds {
if cf.status.CheckpointTs < actual {
runningError := &model.RunningError{
Addr: util.CaptureAddrFromCtx(ctx),
Code: "CDC-owner-1001",
Message: cerror.ErrServiceSafepointLost.GenWithStackByArgs(actual).Error(),
}

err := o.EnqueueJob(model.AdminJob{
CfID: cfID,
Type: model.AdminStop,
Error: runningError,
})
if err != nil {
return errors.Trace(err)
}
}
}
}
}
return nil
}
Expand Down Expand Up @@ -917,6 +950,7 @@ func (o *Owner) handleAdminJob(ctx context.Context) error {
if job.Error != nil {
cf.info.ErrorHis = append(cf.info.ErrorHis, time.Now().UnixNano()/1e6)
}

err := o.etcdClient.SaveChangeFeedInfo(ctx, cf.info, job.CfID)
if err != nil {
return errors.Trace(err)
Expand Down
128 changes: 125 additions & 3 deletions cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,22 @@ func (s *ownerSuite) TearDownTest(c *check.C) {

type mockPDClient struct {
pd.Client
invokeCounter int
invokeCounter int
mockSafePointLost bool
mockPDFailure bool
}

func (m *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
m.invokeCounter++
return 0, errors.New("mock error")

if m.mockSafePointLost {
return 1000, nil
}
if m.mockPDFailure {
return 0, errors.New("injected PD failure")
}

return safePoint, nil
}

func (s *ownerSuite) TestOwnerFlushChangeFeedInfos(c *check.C) {
Expand All @@ -101,13 +111,125 @@ func (s *ownerSuite) TestOwnerFlushChangeFeedInfos(c *check.C) {
gcSafepointLastUpdate: time.Now(),
}

// Owner should ignore UpdateServiceGCSafePoint error.
err := mockOwner.flushChangeFeedInfos(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockPDCli.invokeCounter, check.Equals, 1)
s.TearDownTest(c)
}

func (s *ownerSuite) TestOwnerFlushChangeFeedInfosFailed(c *check.C) {
defer testleak.AfterTest(c)()
mockPDCli := &mockPDClient{
mockPDFailure: true,
}

changeFeeds := map[model.ChangeFeedID]*changeFeed{
"test_change_feed_1": {
info: &model.ChangeFeedInfo{State: model.StateNormal},
status: &model.ChangeFeedStatus{
CheckpointTs: 100,
},
targetTs: 2000,
ddlState: model.ChangeFeedSyncDML,
taskStatus: model.ProcessorsInfos{
"capture_1": {},
"capture_2": {},
},
taskPositions: map[string]*model.TaskPosition{
"capture_1": {},
"capture_2": {},
},
},
}

mockOwner := Owner{
pdClient: mockPDCli,
etcdClient: s.client,
lastFlushChangefeeds: time.Now(),
flushChangefeedInterval: 1 * time.Hour,
gcSafepointLastUpdate: time.Now(),
gcTTL: 6, // 6 seconds
changeFeeds: changeFeeds,
}

time.Sleep(3 * time.Second)
err := mockOwner.flushChangeFeedInfos(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockPDCli.invokeCounter, check.Equals, 1)

time.Sleep(6 * time.Second)
err = mockOwner.flushChangeFeedInfos(s.ctx)
c.Assert(err, check.ErrorMatches, ".*CDC:ErrUpdateServiceSafepointFailed.*")
c.Assert(mockPDCli.invokeCounter, check.Equals, 2)

s.TearDownTest(c)
}

func (s *ownerSuite) TestOwnerUploadGCSafePointOutdated(c *check.C) {
defer testleak.AfterTest(c)()
mockPDCli := &mockPDClient{
mockSafePointLost: true,
}

changeFeeds := map[model.ChangeFeedID]*changeFeed{
"test_change_feed_1": {
info: &model.ChangeFeedInfo{State: model.StateNormal},
etcdCli: s.client,
status: &model.ChangeFeedStatus{
CheckpointTs: 100,
},
targetTs: 2000,
ddlState: model.ChangeFeedSyncDML,
taskStatus: model.ProcessorsInfos{
"capture_1": {},
"capture_2": {},
},
taskPositions: map[string]*model.TaskPosition{
"capture_1": {},
"capture_2": {},
},
},
"test_change_feed_2": {
info: &model.ChangeFeedInfo{State: model.StateNormal},
etcdCli: s.client,
status: &model.ChangeFeedStatus{
CheckpointTs: 1100,
},
targetTs: 2000,
ddlState: model.ChangeFeedSyncDML,
taskStatus: model.ProcessorsInfos{
"capture_1": {},
"capture_2": {},
},
taskPositions: map[string]*model.TaskPosition{
"capture_1": {},
"capture_2": {},
},
},
}

mockOwner := Owner{
pdClient: mockPDCli,
etcdClient: s.client,
lastFlushChangefeeds: time.Now(),
flushChangefeedInterval: 1 * time.Hour,
changeFeeds: changeFeeds,
cfRWriter: s.client,
stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus),
}

err := mockOwner.flushChangeFeedInfos(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockPDCli.invokeCounter, check.Equals, 1)

err = mockOwner.handleAdminJob(s.ctx)
c.Assert(err, check.IsNil)

c.Assert(mockOwner.stoppedFeeds["test_change_feed_1"], check.NotNil)
c.Assert(changeFeeds["test_change_feed_2"].info.State, check.Equals, model.StateNormal)
s.TearDownTest(c)
}

/*
type handlerForPrueDMLTest struct {
mu sync.RWMutex
Expand Down
15 changes: 15 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,11 @@ error = '''
server creates pd client failed
'''

["CDC:ErrServiceSafepointLost"]
error = '''
service safepoint lost. current safepoint is %d, please remove all changefeed(s) whose checkpoints are behind the current safepoint
'''

["CDC:ErrSinkURIInvalid"]
error = '''
sink uri invalid
Expand All @@ -641,6 +646,11 @@ error = '''
table %d not found in schema snapshot
'''

["CDC:ErrStartTsBeforeGC"]
error = '''
fail to create changefeed because start-ts %d is earlier than GC safepoint at %d
'''

["CDC:ErrSupportPostOnly"]
error = '''
this api supports POST method only
Expand Down Expand Up @@ -696,6 +706,11 @@ error = '''
unmarshal failed
'''

["CDC:ErrUpdateServiceSafepointFailed"]
error = '''
updating service safepoint failed
'''

["CDC:ErrVersionIncompatible"]
error = '''
version is incompatible: %s
Expand Down
53 changes: 28 additions & 25 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,31 +150,34 @@ var (
ErrFileSorterInvalidData = errors.Normalize("invalid data", errors.RFCCodeText("CDC:ErrFileSorterInvalidData"))

// server related errors
ErrCaptureSuicide = errors.Normalize("capture suicide", errors.RFCCodeText("CDC:ErrCaptureSuicide"))
ErrNewCaptureFailed = errors.Normalize("new capture failed", errors.RFCCodeText("CDC:ErrNewCaptureFailed"))
ErrCaptureRegister = errors.Normalize("capture register to etcd failed", errors.RFCCodeText("CDC:ErrCaptureRegister"))
ErrNewProcessorFailed = errors.Normalize("new processor failed", errors.RFCCodeText("CDC:ErrNewProcessorFailed"))
ErrProcessorUnknown = errors.Normalize("processor running unknown error", errors.RFCCodeText("CDC:ErrProcessorUnknown"))
ErrProcessorTableNotFound = errors.Normalize("table not found in processor cache", errors.RFCCodeText("CDC:ErrProcessorTableNotFound"))
ErrProcessorEtcdWatch = errors.Normalize("etcd watch returns error", errors.RFCCodeText("CDC:ErrProcessorEtcdWatch"))
ErrProcessorSortDir = errors.Normalize("sort dir error", errors.RFCCodeText("CDC:ErrProcessorSortDir"))
ErrUnknownSortEngine = errors.Normalize("unknown sort engine %s", errors.RFCCodeText("CDC:ErrUnknownSortEngine"))
ErrInvalidTaskKey = errors.Normalize("invalid task key: %s", errors.RFCCodeText("CDC:ErrInvalidTaskKey"))
ErrInvalidServerOption = errors.Normalize("invalid server option", errors.RFCCodeText("CDC:ErrInvalidServerOption"))
ErrServerNewPDClient = errors.Normalize("server creates pd client failed", errors.RFCCodeText("CDC:ErrServerNewPDClient"))
ErrServeHTTP = errors.Normalize("serve http error", errors.RFCCodeText("CDC:ErrServeHTTP"))
ErrCaptureCampaignOwner = errors.Normalize("campaign owner failed", errors.RFCCodeText("CDC:ErrCaptureCampaignOwner"))
ErrCaptureResignOwner = errors.Normalize("resign owner failed", errors.RFCCodeText("CDC:ErrCaptureResignOwner"))
ErrWaitHandleOperationTimeout = errors.Normalize("waiting processor to handle the operation finished timeout", errors.RFCCodeText("CDC:ErrWaitHandleOperationTimeout"))
ErrSupportPostOnly = errors.Normalize("this api supports POST method only", errors.RFCCodeText("CDC:ErrSupportPostOnly"))
ErrAPIInvalidParam = errors.Normalize("invalid api parameter", errors.RFCCodeText("CDC:ErrAPIInvalidParam"))
ErrInternalServerError = errors.Normalize("internal server error", errors.RFCCodeText("CDC:ErrInternalServerError"))
ErrOwnerSortDir = errors.Normalize("owner sort dir", errors.RFCCodeText("CDC:ErrOwnerSortDir"))
ErrOwnerChangefeedNotFound = errors.Normalize("changefeed %s not found in owner cache", errors.RFCCodeText("CDC:ErrOwnerChangefeedNotFound"))
ErrChangefeedAbnormalState = errors.Normalize("changefeed in abnormal state: %s, replication status: %+v", errors.RFCCodeText("CDC:ErrChangefeedAbnormalState"))
ErrInvalidAdminJobType = errors.Normalize("invalid admin job type: %d", errors.RFCCodeText("CDC:ErrInvalidAdminJobType"))
ErrOwnerEtcdWatch = errors.Normalize("etcd watch returns error", errors.RFCCodeText("CDC:ErrOwnerEtcdWatch"))
ErrOwnerCampaignKeyDeleted = errors.Normalize("owner campaign key deleted", errors.RFCCodeText("CDC:ErrOwnerCampaignKeyDeleted"))
ErrCaptureSuicide = errors.Normalize("capture suicide", errors.RFCCodeText("CDC:ErrCaptureSuicide"))
ErrNewCaptureFailed = errors.Normalize("new capture failed", errors.RFCCodeText("CDC:ErrNewCaptureFailed"))
ErrCaptureRegister = errors.Normalize("capture register to etcd failed", errors.RFCCodeText("CDC:ErrCaptureRegister"))
ErrNewProcessorFailed = errors.Normalize("new processor failed", errors.RFCCodeText("CDC:ErrNewProcessorFailed"))
ErrProcessorUnknown = errors.Normalize("processor running unknown error", errors.RFCCodeText("CDC:ErrProcessorUnknown"))
ErrProcessorTableNotFound = errors.Normalize("table not found in processor cache", errors.RFCCodeText("CDC:ErrProcessorTableNotFound"))
ErrProcessorEtcdWatch = errors.Normalize("etcd watch returns error", errors.RFCCodeText("CDC:ErrProcessorEtcdWatch"))
ErrProcessorSortDir = errors.Normalize("sort dir error", errors.RFCCodeText("CDC:ErrProcessorSortDir"))
ErrUnknownSortEngine = errors.Normalize("unknown sort engine %s", errors.RFCCodeText("CDC:ErrUnknownSortEngine"))
ErrInvalidTaskKey = errors.Normalize("invalid task key: %s", errors.RFCCodeText("CDC:ErrInvalidTaskKey"))
ErrInvalidServerOption = errors.Normalize("invalid server option", errors.RFCCodeText("CDC:ErrInvalidServerOption"))
ErrServerNewPDClient = errors.Normalize("server creates pd client failed", errors.RFCCodeText("CDC:ErrServerNewPDClient"))
ErrServeHTTP = errors.Normalize("serve http error", errors.RFCCodeText("CDC:ErrServeHTTP"))
ErrCaptureCampaignOwner = errors.Normalize("campaign owner failed", errors.RFCCodeText("CDC:ErrCaptureCampaignOwner"))
ErrCaptureResignOwner = errors.Normalize("resign owner failed", errors.RFCCodeText("CDC:ErrCaptureResignOwner"))
ErrWaitHandleOperationTimeout = errors.Normalize("waiting processor to handle the operation finished timeout", errors.RFCCodeText("CDC:ErrWaitHandleOperationTimeout"))
ErrSupportPostOnly = errors.Normalize("this api supports POST method only", errors.RFCCodeText("CDC:ErrSupportPostOnly"))
ErrAPIInvalidParam = errors.Normalize("invalid api parameter", errors.RFCCodeText("CDC:ErrAPIInvalidParam"))
ErrInternalServerError = errors.Normalize("internal server error", errors.RFCCodeText("CDC:ErrInternalServerError"))
ErrOwnerSortDir = errors.Normalize("owner sort dir", errors.RFCCodeText("CDC:ErrOwnerSortDir"))
ErrOwnerChangefeedNotFound = errors.Normalize("changefeed %s not found in owner cache", errors.RFCCodeText("CDC:ErrOwnerChangefeedNotFound"))
ErrChangefeedAbnormalState = errors.Normalize("changefeed in abnormal state: %s, replication status: %+v", errors.RFCCodeText("CDC:ErrChangefeedAbnormalState"))
ErrInvalidAdminJobType = errors.Normalize("invalid admin job type: %d", errors.RFCCodeText("CDC:ErrInvalidAdminJobType"))
ErrOwnerEtcdWatch = errors.Normalize("etcd watch returns error", errors.RFCCodeText("CDC:ErrOwnerEtcdWatch"))
ErrOwnerCampaignKeyDeleted = errors.Normalize("owner campaign key deleted", errors.RFCCodeText("CDC:ErrOwnerCampaignKeyDeleted"))
ErrServiceSafepointLost = errors.Normalize("service safepoint lost. current safepoint is %d, please remove all changefeed(s) whose checkpoints are behind the current safepoint", errors.RFCCodeText("CDC:ErrServiceSafepointLost"))
ErrUpdateServiceSafepointFailed = errors.Normalize("updating service safepoint failed", errors.RFCCodeText("CDC:ErrUpdateServiceSafepointFailed"))
ErrStartTsBeforeGC = errors.Normalize("fail to create changefeed because start-ts %d is earlier than GC safepoint at %d", errors.RFCCodeText("CDC:ErrStartTsBeforeGC"))

// EtcdWorker related errors. Internal use only.
// ErrEtcdTryAgain is used by a PatchFunc to force a transaction abort.
Expand Down
6 changes: 3 additions & 3 deletions pkg/filter/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
package filter

import (
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/errors"
cerror "github.com/pingcap/ticdc/pkg/errors"
)

// ChangefeedFastFailError checks the error, returns true if it is meaningless
// to retry on this error
func ChangefeedFastFailError(err error) bool {
return terror.ErrorEqual(err, tikv.ErrGCTooEarly)
return cerror.ErrStartTsBeforeGC.Equal(errors.Cause(err))
}
4 changes: 2 additions & 2 deletions pkg/util/gc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/store/tikv"
cerrors "github.com/pingcap/ticdc/pkg/errors"
pd "github.com/tikv/pd/client"
)

Expand All @@ -37,7 +37,7 @@ func CheckSafetyOfStartTs(ctx context.Context, pdCli pd.Client, startTs uint64)
return errors.Trace(err)
}
if startTs < minServiceGCTs {
return errors.Wrap(tikv.ErrGCTooEarly.GenWithStackByArgs(startTs, minServiceGCTs), "startTs less than gcSafePoint")
return cerrors.ErrStartTsBeforeGC.GenWithStackByArgs(startTs, minServiceGCTs)
}
return nil
}
Loading

0 comments on commit e3b93d6

Please sign in to comment.