diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 7e9bed7ac89..a8a3700a913 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "io" + "strings" "sync" "time" @@ -417,11 +418,13 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error { } // Campaign to be the owner, it blocks until it been elected. if err := c.campaign(ctx); err != nil { - switch errors.Cause(err) { - case context.Canceled: + + rootErr := errors.Cause(err) + if rootErr == context.Canceled { return nil - case mvcc.ErrCompacted: - // the revision we requested is compacted, just retry + } else if rootErr == mvcc.ErrCompacted || isErrCompacted(rootErr) { + log.Warn("campaign owner failed due to etcd revision "+ + "has been compacted, retry later", zap.Error(err)) continue } log.Warn("campaign owner failed", @@ -564,9 +567,6 @@ func (c *captureImpl) GetOwner() (owner.Owner, error) { // campaign to be an owner. func (c *captureImpl) campaign(ctx context.Context) error { - failpoint.Inject("capture-campaign-compacted-error", func() { - failpoint.Return(errors.Trace(mvcc.ErrCompacted)) - }) // TODO: `Campaign` will get stuck when send SIGSTOP to pd leader. // For `Campaign`, when send SIGSTOP to pd leader, cdc maybe call `cancel` // (cause by `processor routine` exit). And inside `Campaign`, the routine @@ -728,3 +728,7 @@ func (c *captureImpl) StatusProvider() owner.StatusProvider { func (c *captureImpl) IsReady() bool { return c.migrator.IsMigrateDone() } + +func isErrCompacted(err error) bool { + return strings.Contains(err.Error(), "required revision has been compacted") +} diff --git a/cdc/capture/election.go b/cdc/capture/election.go index 139c849cdba..9012d78e596 100644 --- a/cdc/capture/election.go +++ b/cdc/capture/election.go @@ -16,7 +16,10 @@ package capture import ( "context" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "go.etcd.io/etcd/client/v3/concurrency" + "go.etcd.io/etcd/server/v3/mvcc" ) // election wraps the owner election methods. @@ -37,6 +40,9 @@ func newElection(sess *concurrency.Session, key string) election { } func (e *electionImpl) campaign(ctx context.Context, key string) error { + failpoint.Inject("capture-campaign-compacted-error", func() { + failpoint.Return(errors.Trace(mvcc.ErrCompacted)) + }) return e.election.Campaign(ctx, key) } diff --git a/tests/integration_tests/availability/owner.sh b/tests/integration_tests/availability/owner.sh index f81ac4f136c..b78c0ecd78e 100755 --- a/tests/integration_tests/availability/owner.sh +++ b/tests/integration_tests/availability/owner.sh @@ -137,6 +137,7 @@ function test_owner_cleanup_stale_tasks() { function test_owner_retryable_error() { echo "run test case test_owner_retryable_error" export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/capture/capture-campaign-compacted-error=1*return(true)' + # start a capture server run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix test_owner_retryable_error.server1 # ensure the server become the owner