Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner: check update safepoint error (#1282) #1367

Merged
merged 2 commits into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -987,20 +987,29 @@ func (c *changeFeed) startSyncPointTicker(ctx context.Context, interval time.Dur
}

func (c *changeFeed) Close() {
err := c.ddlHandler.Close()
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("failed to close ddl handler", zap.Error(err))
if c.ddlHandler != nil {
err := c.ddlHandler.Close()
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("failed to close ddl handler", zap.Error(err))
}
}
err = c.sink.Close()
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("failed to close owner sink", zap.Error(err))

if c.sink != nil {
err := c.sink.Close()
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("failed to close owner sink", zap.Error(err))
}
}

if c.syncpointStore != nil {
err = c.syncpointStore.Close()
err := c.syncpointStore.Close()
if err != nil && errors.Cause(err) != context.Canceled {
log.Warn("failed to close owner sink", zap.Error(err))
}
}
c.cancel()

if c.cancel != nil {
c.cancel()
}
log.Info("changefeed closed", zap.String("id", c.id))
}
42 changes: 38 additions & 4 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/pingcap/ticdc/pkg/scheduler"
"github.com/pingcap/ticdc/pkg/security"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3"
Expand Down Expand Up @@ -242,7 +241,7 @@ func (o *Owner) newChangeFeed(
}
}
failpoint.Inject("NewChangefeedNoRetryError", func() {
failpoint.Return(nil, tikv.ErrGCTooEarly.GenWithStackByArgs(checkpointTs-300, checkpointTs))
failpoint.Return(nil, cerror.ErrStartTsBeforeGC.GenWithStackByArgs(checkpointTs-300, checkpointTs))
})

failpoint.Inject("NewChangefeedRetryError", func() {
Expand Down Expand Up @@ -711,12 +710,46 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error {
}
}
if time.Since(o.gcSafepointLastUpdate) > GCSafepointUpdateInterval {
_, err := o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, o.gcTTL, minCheckpointTs)
actual, err := o.pdClient.UpdateServiceGCSafePoint(ctx, CDCServiceSafePointID, o.gcTTL, minCheckpointTs)
if err != nil {
log.Warn("failed to update service safe point", zap.Error(err))
sinceLastUpdate := time.Since(o.gcSafepointLastUpdate)
log.Warn("failed to update service safe point", zap.Error(err),
zap.Duration("since-last-update", sinceLastUpdate))
// We do not throw an error unless updating GC safepoint has been failing for more than gcTTL.
if sinceLastUpdate >= time.Second*time.Duration(o.gcTTL) {
return cerror.ErrUpdateServiceSafepointFailed.Wrap(err)
}
} else {
o.gcSafepointLastUpdate = time.Now()
}

failpoint.Inject("InjectActualGCSafePoint", func(val failpoint.Value) {
actual = uint64(val.(int))
})

if actual > minCheckpointTs {
// UpdateServiceGCSafePoint has failed.
log.Warn("updating an outdated service safe point", zap.Uint64("checkpoint-ts", minCheckpointTs), zap.Uint64("actual-safepoint", actual))

for cfID, cf := range o.changeFeeds {
if cf.status.CheckpointTs < actual {
runningError := &model.RunningError{
Addr: util.CaptureAddrFromCtx(ctx),
Code: "CDC-owner-1001",
Message: cerror.ErrServiceSafepointLost.GenWithStackByArgs(actual).Error(),
}

err := o.EnqueueJob(model.AdminJob{
CfID: cfID,
Type: model.AdminStop,
Error: runningError,
})
if err != nil {
return errors.Trace(err)
}
}
}
}
}
return nil
}
Expand Down Expand Up @@ -917,6 +950,7 @@ func (o *Owner) handleAdminJob(ctx context.Context) error {
if job.Error != nil {
cf.info.ErrorHis = append(cf.info.ErrorHis, time.Now().UnixNano()/1e6)
}

err := o.etcdClient.SaveChangeFeedInfo(ctx, cf.info, job.CfID)
if err != nil {
return errors.Trace(err)
Expand Down
128 changes: 125 additions & 3 deletions cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,22 @@ func (s *ownerSuite) TearDownTest(c *check.C) {

type mockPDClient struct {
pd.Client
invokeCounter int
invokeCounter int
mockSafePointLost bool
mockPDFailure bool
}

func (m *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
m.invokeCounter++
return 0, errors.New("mock error")

if m.mockSafePointLost {
return 1000, nil
}
if m.mockPDFailure {
return 0, errors.New("injected PD failure")
}

return safePoint, nil
}

func (s *ownerSuite) TestOwnerFlushChangeFeedInfos(c *check.C) {
Expand All @@ -101,13 +111,125 @@ func (s *ownerSuite) TestOwnerFlushChangeFeedInfos(c *check.C) {
gcSafepointLastUpdate: time.Now(),
}

// Owner should ignore UpdateServiceGCSafePoint error.
err := mockOwner.flushChangeFeedInfos(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockPDCli.invokeCounter, check.Equals, 1)
s.TearDownTest(c)
}

func (s *ownerSuite) TestOwnerFlushChangeFeedInfosFailed(c *check.C) {
defer testleak.AfterTest(c)()
mockPDCli := &mockPDClient{
mockPDFailure: true,
}

changeFeeds := map[model.ChangeFeedID]*changeFeed{
"test_change_feed_1": {
info: &model.ChangeFeedInfo{State: model.StateNormal},
status: &model.ChangeFeedStatus{
CheckpointTs: 100,
},
targetTs: 2000,
ddlState: model.ChangeFeedSyncDML,
taskStatus: model.ProcessorsInfos{
"capture_1": {},
"capture_2": {},
},
taskPositions: map[string]*model.TaskPosition{
"capture_1": {},
"capture_2": {},
},
},
}

mockOwner := Owner{
pdClient: mockPDCli,
etcdClient: s.client,
lastFlushChangefeeds: time.Now(),
flushChangefeedInterval: 1 * time.Hour,
gcSafepointLastUpdate: time.Now(),
gcTTL: 6, // 6 seconds
changeFeeds: changeFeeds,
}

time.Sleep(3 * time.Second)
err := mockOwner.flushChangeFeedInfos(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockPDCli.invokeCounter, check.Equals, 1)

time.Sleep(6 * time.Second)
err = mockOwner.flushChangeFeedInfos(s.ctx)
c.Assert(err, check.ErrorMatches, ".*CDC:ErrUpdateServiceSafepointFailed.*")
c.Assert(mockPDCli.invokeCounter, check.Equals, 2)

s.TearDownTest(c)
}

func (s *ownerSuite) TestOwnerUploadGCSafePointOutdated(c *check.C) {
defer testleak.AfterTest(c)()
mockPDCli := &mockPDClient{
mockSafePointLost: true,
}

changeFeeds := map[model.ChangeFeedID]*changeFeed{
"test_change_feed_1": {
info: &model.ChangeFeedInfo{State: model.StateNormal},
etcdCli: s.client,
status: &model.ChangeFeedStatus{
CheckpointTs: 100,
},
targetTs: 2000,
ddlState: model.ChangeFeedSyncDML,
taskStatus: model.ProcessorsInfos{
"capture_1": {},
"capture_2": {},
},
taskPositions: map[string]*model.TaskPosition{
"capture_1": {},
"capture_2": {},
},
},
"test_change_feed_2": {
info: &model.ChangeFeedInfo{State: model.StateNormal},
etcdCli: s.client,
status: &model.ChangeFeedStatus{
CheckpointTs: 1100,
},
targetTs: 2000,
ddlState: model.ChangeFeedSyncDML,
taskStatus: model.ProcessorsInfos{
"capture_1": {},
"capture_2": {},
},
taskPositions: map[string]*model.TaskPosition{
"capture_1": {},
"capture_2": {},
},
},
}

mockOwner := Owner{
pdClient: mockPDCli,
etcdClient: s.client,
lastFlushChangefeeds: time.Now(),
flushChangefeedInterval: 1 * time.Hour,
changeFeeds: changeFeeds,
cfRWriter: s.client,
stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus),
}

err := mockOwner.flushChangeFeedInfos(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(mockPDCli.invokeCounter, check.Equals, 1)

err = mockOwner.handleAdminJob(s.ctx)
c.Assert(err, check.IsNil)

c.Assert(mockOwner.stoppedFeeds["test_change_feed_1"], check.NotNil)
c.Assert(changeFeeds["test_change_feed_2"].info.State, check.Equals, model.StateNormal)
s.TearDownTest(c)
}

/*
type handlerForPrueDMLTest struct {
mu sync.RWMutex
Expand Down
15 changes: 15 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,11 @@ error = '''
server creates pd client failed
'''

["CDC:ErrServiceSafepointLost"]
error = '''
service safepoint lost. current safepoint is %d, please remove all changefeed(s) whose checkpoints are behind the current safepoint
'''

["CDC:ErrSinkURIInvalid"]
error = '''
sink uri invalid
Expand All @@ -641,6 +646,11 @@ error = '''
table %d not found in schema snapshot
'''

["CDC:ErrStartTsBeforeGC"]
error = '''
fail to create changefeed because start-ts %d is earlier than GC safepoint at %d
'''

["CDC:ErrSupportPostOnly"]
error = '''
this api supports POST method only
Expand Down Expand Up @@ -696,6 +706,11 @@ error = '''
unmarshal failed
'''

["CDC:ErrUpdateServiceSafepointFailed"]
error = '''
updating service safepoint failed
'''

["CDC:ErrVersionIncompatible"]
error = '''
version is incompatible: %s
Expand Down
53 changes: 28 additions & 25 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,31 +150,34 @@ var (
ErrFileSorterInvalidData = errors.Normalize("invalid data", errors.RFCCodeText("CDC:ErrFileSorterInvalidData"))

// server related errors
ErrCaptureSuicide = errors.Normalize("capture suicide", errors.RFCCodeText("CDC:ErrCaptureSuicide"))
ErrNewCaptureFailed = errors.Normalize("new capture failed", errors.RFCCodeText("CDC:ErrNewCaptureFailed"))
ErrCaptureRegister = errors.Normalize("capture register to etcd failed", errors.RFCCodeText("CDC:ErrCaptureRegister"))
ErrNewProcessorFailed = errors.Normalize("new processor failed", errors.RFCCodeText("CDC:ErrNewProcessorFailed"))
ErrProcessorUnknown = errors.Normalize("processor running unknown error", errors.RFCCodeText("CDC:ErrProcessorUnknown"))
ErrProcessorTableNotFound = errors.Normalize("table not found in processor cache", errors.RFCCodeText("CDC:ErrProcessorTableNotFound"))
ErrProcessorEtcdWatch = errors.Normalize("etcd watch returns error", errors.RFCCodeText("CDC:ErrProcessorEtcdWatch"))
ErrProcessorSortDir = errors.Normalize("sort dir error", errors.RFCCodeText("CDC:ErrProcessorSortDir"))
ErrUnknownSortEngine = errors.Normalize("unknown sort engine %s", errors.RFCCodeText("CDC:ErrUnknownSortEngine"))
ErrInvalidTaskKey = errors.Normalize("invalid task key: %s", errors.RFCCodeText("CDC:ErrInvalidTaskKey"))
ErrInvalidServerOption = errors.Normalize("invalid server option", errors.RFCCodeText("CDC:ErrInvalidServerOption"))
ErrServerNewPDClient = errors.Normalize("server creates pd client failed", errors.RFCCodeText("CDC:ErrServerNewPDClient"))
ErrServeHTTP = errors.Normalize("serve http error", errors.RFCCodeText("CDC:ErrServeHTTP"))
ErrCaptureCampaignOwner = errors.Normalize("campaign owner failed", errors.RFCCodeText("CDC:ErrCaptureCampaignOwner"))
ErrCaptureResignOwner = errors.Normalize("resign owner failed", errors.RFCCodeText("CDC:ErrCaptureResignOwner"))
ErrWaitHandleOperationTimeout = errors.Normalize("waiting processor to handle the operation finished timeout", errors.RFCCodeText("CDC:ErrWaitHandleOperationTimeout"))
ErrSupportPostOnly = errors.Normalize("this api supports POST method only", errors.RFCCodeText("CDC:ErrSupportPostOnly"))
ErrAPIInvalidParam = errors.Normalize("invalid api parameter", errors.RFCCodeText("CDC:ErrAPIInvalidParam"))
ErrInternalServerError = errors.Normalize("internal server error", errors.RFCCodeText("CDC:ErrInternalServerError"))
ErrOwnerSortDir = errors.Normalize("owner sort dir", errors.RFCCodeText("CDC:ErrOwnerSortDir"))
ErrOwnerChangefeedNotFound = errors.Normalize("changefeed %s not found in owner cache", errors.RFCCodeText("CDC:ErrOwnerChangefeedNotFound"))
ErrChangefeedAbnormalState = errors.Normalize("changefeed in abnormal state: %s, replication status: %+v", errors.RFCCodeText("CDC:ErrChangefeedAbnormalState"))
ErrInvalidAdminJobType = errors.Normalize("invalid admin job type: %d", errors.RFCCodeText("CDC:ErrInvalidAdminJobType"))
ErrOwnerEtcdWatch = errors.Normalize("etcd watch returns error", errors.RFCCodeText("CDC:ErrOwnerEtcdWatch"))
ErrOwnerCampaignKeyDeleted = errors.Normalize("owner campaign key deleted", errors.RFCCodeText("CDC:ErrOwnerCampaignKeyDeleted"))
ErrCaptureSuicide = errors.Normalize("capture suicide", errors.RFCCodeText("CDC:ErrCaptureSuicide"))
ErrNewCaptureFailed = errors.Normalize("new capture failed", errors.RFCCodeText("CDC:ErrNewCaptureFailed"))
ErrCaptureRegister = errors.Normalize("capture register to etcd failed", errors.RFCCodeText("CDC:ErrCaptureRegister"))
ErrNewProcessorFailed = errors.Normalize("new processor failed", errors.RFCCodeText("CDC:ErrNewProcessorFailed"))
ErrProcessorUnknown = errors.Normalize("processor running unknown error", errors.RFCCodeText("CDC:ErrProcessorUnknown"))
ErrProcessorTableNotFound = errors.Normalize("table not found in processor cache", errors.RFCCodeText("CDC:ErrProcessorTableNotFound"))
ErrProcessorEtcdWatch = errors.Normalize("etcd watch returns error", errors.RFCCodeText("CDC:ErrProcessorEtcdWatch"))
ErrProcessorSortDir = errors.Normalize("sort dir error", errors.RFCCodeText("CDC:ErrProcessorSortDir"))
ErrUnknownSortEngine = errors.Normalize("unknown sort engine %s", errors.RFCCodeText("CDC:ErrUnknownSortEngine"))
ErrInvalidTaskKey = errors.Normalize("invalid task key: %s", errors.RFCCodeText("CDC:ErrInvalidTaskKey"))
ErrInvalidServerOption = errors.Normalize("invalid server option", errors.RFCCodeText("CDC:ErrInvalidServerOption"))
ErrServerNewPDClient = errors.Normalize("server creates pd client failed", errors.RFCCodeText("CDC:ErrServerNewPDClient"))
ErrServeHTTP = errors.Normalize("serve http error", errors.RFCCodeText("CDC:ErrServeHTTP"))
ErrCaptureCampaignOwner = errors.Normalize("campaign owner failed", errors.RFCCodeText("CDC:ErrCaptureCampaignOwner"))
ErrCaptureResignOwner = errors.Normalize("resign owner failed", errors.RFCCodeText("CDC:ErrCaptureResignOwner"))
ErrWaitHandleOperationTimeout = errors.Normalize("waiting processor to handle the operation finished timeout", errors.RFCCodeText("CDC:ErrWaitHandleOperationTimeout"))
ErrSupportPostOnly = errors.Normalize("this api supports POST method only", errors.RFCCodeText("CDC:ErrSupportPostOnly"))
ErrAPIInvalidParam = errors.Normalize("invalid api parameter", errors.RFCCodeText("CDC:ErrAPIInvalidParam"))
ErrInternalServerError = errors.Normalize("internal server error", errors.RFCCodeText("CDC:ErrInternalServerError"))
ErrOwnerSortDir = errors.Normalize("owner sort dir", errors.RFCCodeText("CDC:ErrOwnerSortDir"))
ErrOwnerChangefeedNotFound = errors.Normalize("changefeed %s not found in owner cache", errors.RFCCodeText("CDC:ErrOwnerChangefeedNotFound"))
ErrChangefeedAbnormalState = errors.Normalize("changefeed in abnormal state: %s, replication status: %+v", errors.RFCCodeText("CDC:ErrChangefeedAbnormalState"))
ErrInvalidAdminJobType = errors.Normalize("invalid admin job type: %d", errors.RFCCodeText("CDC:ErrInvalidAdminJobType"))
ErrOwnerEtcdWatch = errors.Normalize("etcd watch returns error", errors.RFCCodeText("CDC:ErrOwnerEtcdWatch"))
ErrOwnerCampaignKeyDeleted = errors.Normalize("owner campaign key deleted", errors.RFCCodeText("CDC:ErrOwnerCampaignKeyDeleted"))
ErrServiceSafepointLost = errors.Normalize("service safepoint lost. current safepoint is %d, please remove all changefeed(s) whose checkpoints are behind the current safepoint", errors.RFCCodeText("CDC:ErrServiceSafepointLost"))
ErrUpdateServiceSafepointFailed = errors.Normalize("updating service safepoint failed", errors.RFCCodeText("CDC:ErrUpdateServiceSafepointFailed"))
ErrStartTsBeforeGC = errors.Normalize("fail to create changefeed because start-ts %d is earlier than GC safepoint at %d", errors.RFCCodeText("CDC:ErrStartTsBeforeGC"))

// EtcdWorker related errors. Internal use only.
// ErrEtcdTryAgain is used by a PatchFunc to force a transaction abort.
Expand Down
6 changes: 3 additions & 3 deletions pkg/filter/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
package filter

import (
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/errors"
cerror "github.com/pingcap/ticdc/pkg/errors"
)

// ChangefeedFastFailError checks the error, returns true if it is meaningless
// to retry on this error
func ChangefeedFastFailError(err error) bool {
return terror.ErrorEqual(err, tikv.ErrGCTooEarly)
return cerror.ErrStartTsBeforeGC.Equal(errors.Cause(err))
}
4 changes: 2 additions & 2 deletions pkg/util/gc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/store/tikv"
cerrors "github.com/pingcap/ticdc/pkg/errors"
pd "github.com/tikv/pd/client"
)

Expand All @@ -37,7 +37,7 @@ func CheckSafetyOfStartTs(ctx context.Context, pdCli pd.Client, startTs uint64)
return errors.Trace(err)
}
if startTs < minServiceGCTs {
return errors.Wrap(tikv.ErrGCTooEarly.GenWithStackByArgs(startTs, minServiceGCTs), "startTs less than gcSafePoint")
return cerrors.ErrStartTsBeforeGC.GenWithStackByArgs(startTs, minServiceGCTs)
}
return nil
}
Loading