diff --git a/dm/pkg/etcdutil/etcdutil.go b/dm/pkg/etcdutil/etcdutil.go index 2907bbe9695..f90b329e391 100644 --- a/dm/pkg/etcdutil/etcdutil.go +++ b/dm/pkg/etcdutil/etcdutil.go @@ -29,6 +29,7 @@ import ( tcontext "github.com/pingcap/tiflow/dm/pkg/context" "github.com/pingcap/tiflow/dm/pkg/log" "github.com/pingcap/tiflow/dm/pkg/retry" + "github.com/pingcap/tiflow/pkg/errorutil" ) const ( @@ -48,22 +49,7 @@ var etcdDefaultTxnRetryParam = retry.Params{ FirstRetryDuration: time.Second, BackoffStrategy: retry.Stable, IsRetryableFn: func(retryTime int, err error) bool { - switch err { - // Etcd ResourceExhausted errors, may recover after some time - case v3rpc.ErrNoSpace, v3rpc.ErrTooManyRequests: - return true - // Etcd Unavailable errors, may be available after some time - // https://github.com/etcd-io/etcd/pull/9934/files#diff-6d8785d0c9eaf96bc3e2b29c36493c04R162-R167 - // ErrStopped: - // one of the etcd nodes stopped from failure injection - // ErrNotCapable: - // capability check has not been done (in the beginning) - case v3rpc.ErrNoLeader, v3rpc.ErrLeaderChanged, v3rpc.ErrNotCapable, v3rpc.ErrStopped, v3rpc.ErrTimeout, - v3rpc.ErrTimeoutDueToLeaderFail, v3rpc.ErrGRPCTimeoutDueToConnectionLost, v3rpc.ErrUnhealthy: - return true - default: - return false - } + return errorutil.IsRetryableEtcdError(err) }, } diff --git a/pkg/errorutil/ignore.go b/pkg/errorutil/ignore.go index 6a48dd203f6..05ecf2ad36d 100644 --- a/pkg/errorutil/ignore.go +++ b/pkg/errorutil/ignore.go @@ -19,6 +19,7 @@ import ( tddl "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/parser/mysql" + v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" ) // IsIgnorableMySQLDDLError is used to check what error can be ignored @@ -46,3 +47,24 @@ func IsIgnorableMySQLDDLError(err error) bool { return false } } + +func IsRetryableEtcdError(err error) bool { + etcdErr := errors.Cause(err) + + switch etcdErr { + // Etcd ResourceExhausted errors, may recover after some time + case v3rpc.ErrNoSpace, v3rpc.ErrTooManyRequests: + return true + // Etcd Unavailable errors, may be available after some time + // https://github.com/etcd-io/etcd/pull/9934/files#diff-6d8785d0c9eaf96bc3e2b29c36493c04R162-R167 + // ErrStopped: + // one of the etcd nodes stopped from failure injection + // ErrNotCapable: + // capability check has not been done (in the beginning) + case v3rpc.ErrNoLeader, v3rpc.ErrLeaderChanged, v3rpc.ErrNotCapable, v3rpc.ErrStopped, v3rpc.ErrTimeout, + v3rpc.ErrTimeoutDueToLeaderFail, v3rpc.ErrGRPCTimeoutDueToConnectionLost, v3rpc.ErrUnhealthy: + return true + default: + return false + } +} diff --git a/pkg/errorutil/ignore_test.go b/pkg/errorutil/ignore_test.go index 7c2dbd0f7e0..825bf7d91b6 100644 --- a/pkg/errorutil/ignore_test.go +++ b/pkg/errorutil/ignore_test.go @@ -20,7 +20,8 @@ import ( "github.com/go-sql-driver/mysql" "github.com/pingcap/tidb/infoschema" tmysql "github.com/pingcap/tidb/parser/mysql" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" ) func newMysqlErr(number uint16, message string) *mysql.MySQLError { @@ -42,6 +43,24 @@ func TestIgnoreMysqlDDLError(t *testing.T) { } for _, item := range cases { - assert.Equal(t, item.ret, IsIgnorableMySQLDDLError(item.err)) + require.Equal(t, item.ret, IsIgnorableMySQLDDLError(item.err)) + } +} + +func TestIsRetryableEtcdError(t *testing.T) { + cases := []struct { + err error + ret bool + }{ + {nil, false}, + {v3rpc.ErrCorrupt, false}, + + {v3rpc.ErrGRPCTimeoutDueToConnectionLost, true}, + {v3rpc.ErrTimeoutDueToLeaderFail, true}, + {v3rpc.ErrNoSpace, true}, + } + + for _, item := range cases { + require.Equal(t, item.ret, IsRetryableEtcdError(item.err)) } } diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index b346ac6a3cb..bb4f50c197c 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -21,10 +21,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" cerrors "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/errorutil" "github.com/pingcap/tiflow/pkg/retry" "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.uber.org/zap" "google.golang.org/grpc/codes" ) @@ -51,6 +52,15 @@ const ( etcdRequestProgressDuration = 1 * time.Second // etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future etcdWatchChBufferSize = 16 + // etcdTxnTimeoutDuration represents the timeout duration for committing a + // transaction to Etcd + etcdTxnTimeoutDuration = 30 * time.Second +) + +var ( + TxnEmptyCmps = []clientv3.Cmp{} + TxnEmptyOpsThen = []clientv3.Op{} + TxnEmptyOpsElse = []clientv3.Op{} ) // set to var instead of const for mocking the value to speedup test @@ -121,12 +131,17 @@ func (c *Client) Delete(ctx context.Context, key string, opts ...clientv3.OpOpti return c.cli.Delete(ctx, key, opts...) } -// Txn delegates request to clientv3.KV.Txn -func (c *Client) Txn(ctx context.Context) clientv3.Txn { - if metric, ok := c.metrics[EtcdTxn]; ok { - metric.Inc() - } - return c.cli.Txn(ctx) +// Txn delegates request to clientv3.KV.Txn. The error returned can only be a non-retryable error, +// such as context.Canceled, context.DeadlineExceeded, errors.ErrReachMaxTry. +func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (resp *clientv3.TxnResponse, err error) { + txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration) + defer cancel() + err = retryRPC(EtcdTxn, c.metrics[EtcdTxn], func() error { + var inErr error + resp, inErr = c.cli.Txn(txnCtx).If(cmps...).Then(opsThen...).Else(opsElse...).Commit() + return inErr + }) + return } // Grant delegates request to clientv3.Lease.Grant @@ -144,11 +159,17 @@ func isRetryableError(rpcName string) retry.IsRetryable { if !cerrors.IsRetryableError(err) { return false } - if rpcName == EtcdRevoke { - if etcdErr, ok := err.(rpctypes.EtcdError); ok && etcdErr.Code() == codes.NotFound { - // it means the etcd lease is already expired or revoked + + switch rpcName { + case EtcdRevoke: + if etcdErr, ok := err.(v3rpc.EtcdError); ok && etcdErr.Code() == codes.NotFound { + // It means the etcd lease is already expired or revoked return false } + case EtcdTxn: + return errorutil.IsRetryableEtcdError(err) + default: + // For other types of operation, we retry directly without handling errors } return true @@ -193,7 +214,10 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR lastRevision := getRevisionFromWatchOpts(opts...) watchCtx, cancel := context.WithCancel(ctx) - defer cancel() + defer func() { + // Using closures to handle changes to the cancel function + cancel() + }() watchCh := c.cli.Watch(watchCtx, key, opts...) ticker := c.clock.Ticker(etcdRequestProgressDuration) @@ -203,7 +227,6 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR for { select { case <-ctx.Done(): - cancel() return case response := <-watchCh: lastReceivedResponseTime = c.clock.Now() @@ -217,7 +240,6 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR for { select { case <-ctx.Done(): - cancel() return case outCh <- response: // it may block here break Loop @@ -243,6 +265,8 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR zap.String("role", role)) cancel() watchCtx, cancel = context.WithCancel(ctx) + // to avoid possible context leak warning from govet + _ = cancel watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision)) // we need to reset lastReceivedResponseTime after reset Watch lastReceivedResponseTime = c.clock.Now() diff --git a/pkg/etcd/client_test.go b/pkg/etcd/client_test.go index efa03c6d795..42220d48e9d 100644 --- a/pkg/etcd/client_test.go +++ b/pkg/etcd/client_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/errors" "github.com/stretchr/testify/require" "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" ) type mockClient struct { @@ -43,6 +44,10 @@ func (m *mockClient) Put(ctx context.Context, key, val string, opts ...clientv3. return nil, errors.New("mock error") } +func (m *mockClient) Txn(ctx context.Context) clientv3.Txn { + return &mockTxn{ctx: ctx} +} + type mockWatcher struct { clientv3.Watcher watchCh chan clientv3.WatchResponse @@ -82,6 +87,32 @@ func TestRetry(t *testing.T) { _, err = retrycli.Put(context.TODO(), "", "") require.NotNil(t, err) require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Test Txn case + // case 0: normal + rsp, err := retrycli.Txn(ctx, nil, nil, nil) + require.Nil(t, err) + require.False(t, rsp.Succeeded) + + // case 1: errors.ErrReachMaxTry + _, err = retrycli.Txn(ctx, TxnEmptyCmps, nil, nil) + require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err) + + // case 2: errors.ErrReachMaxTry + _, err = retrycli.Txn(ctx, nil, TxnEmptyOpsThen, nil) + require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err) + + // case 3: context.DeadlineExceeded + _, err = retrycli.Txn(ctx, TxnEmptyCmps, TxnEmptyOpsThen, nil) + require.Equal(t, context.DeadlineExceeded, err) + + // other case: mock error + _, err = retrycli.Txn(ctx, TxnEmptyCmps, TxnEmptyOpsThen, TxnEmptyOpsElse) + require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error()) + maxTries = originValue } @@ -276,3 +307,44 @@ func TestRevisionNotFallBack(t *testing.T) { // while WatchCh was reset require.Equal(t, *watcher.rev, revision) } + +type mockTxn struct { + ctx context.Context + mode int +} + +func (txn *mockTxn) If(cs ...clientv3.Cmp) clientv3.Txn { + if cs != nil { + txn.mode += 1 + } + return txn +} + +func (txn *mockTxn) Then(ops ...clientv3.Op) clientv3.Txn { + if ops != nil { + txn.mode += 1 << 1 + } + return txn +} + +func (txn *mockTxn) Else(ops ...clientv3.Op) clientv3.Txn { + if ops != nil { + txn.mode += 1 << 2 + } + return txn +} + +func (txn *mockTxn) Commit() (*clientv3.TxnResponse, error) { + switch txn.mode { + case 0: + return &clientv3.TxnResponse{}, nil + case 1: + return nil, rpctypes.ErrNoSpace + case 2: + return nil, rpctypes.ErrTimeoutDueToLeaderFail + case 3: + return nil, context.DeadlineExceeded + default: + return nil, errors.New("mock error") + } +} diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index bd7d1f12beb..ccad20a2176 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -304,12 +304,15 @@ func (c CDCEtcdClient) CreateChangefeedInfo(ctx context.Context, info *model.Cha if err != nil { return errors.Trace(err) } - resp, err := c.Client.Txn(ctx).If( + + cmps := []clientv3.Cmp{ clientv3.Compare(clientv3.ModRevision(infoKey), "=", 0), clientv3.Compare(clientv3.ModRevision(jobKey), "=", 0), - ).Then( + } + opsThen := []clientv3.Op{ clientv3.OpPut(infoKey, value), - ).Commit() + } + resp, err := c.Client.Txn(ctx, cmps, opsThen, TxnEmptyOpsElse) if err != nil { return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) } @@ -504,10 +507,14 @@ func (c CDCEtcdClient) PutTaskPositionOnChange( } key := GetEtcdKeyTaskPosition(changefeedID, captureID) - resp, err := c.Client.Txn(ctx).If( + cmps := []clientv3.Cmp{ clientv3.Compare(clientv3.ModRevision(key), ">", 0), clientv3.Compare(clientv3.Value(key), "=", data), - ).Else(clientv3.OpPut(key, data)).Commit() + } + opsElse := []clientv3.Op{ + clientv3.OpPut(key, data), + } + resp, err := c.Client.Txn(ctx, cmps, TxnEmptyOpsThen, opsElse) if err != nil { return false, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) } diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index f5d47146445..d334ca12c1e 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -28,6 +28,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" @@ -36,9 +37,6 @@ import ( ) const ( - // etcdTxnTimeoutDuration represents the timeout duration for committing a - // transaction to Etcd - etcdTxnTimeoutDuration = 30 * time.Second // When EtcdWorker commits a txn to etcd or ticks its reactor // takes more than etcdWorkerLogsWarnDuration, it will print a log etcdWorkerLogsWarnDuration = 1 * time.Second @@ -205,10 +203,10 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, if len(pendingPatches) > 0 { // Here we have some patches yet to be uploaded to Etcd. pendingPatches, err = worker.applyPatchGroups(ctx, pendingPatches) + if isRetryableError(err) { + continue + } if err != nil { - if cerrors.ErrEtcdTryAgain.Equal(errors.Cause(err)) { - continue - } return errors.Trace(err) } } else { @@ -257,6 +255,18 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, } } +func isRetryableError(err error) bool { + err = errors.Cause(err) + if cerrors.ErrEtcdTryAgain.Equal(err) || + context.DeadlineExceeded == err { + return true + } + // When encountering an abnormal connection with etcd, the worker will keep retrying + // until the session is done. + _, ok := err.(rpctypes.EtcdError) + return ok +} + func (worker *EtcdWorker) handleEvent(_ context.Context, event *clientv3.Event) { if worker.isDeleteCounterKey(event.Kv.Key) { switch event.Type { @@ -351,7 +361,7 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m } cmps := make([]clientv3.Cmp, 0, len(changedState)) - ops := make([]clientv3.Op, 0, len(changedState)) + opsThen := make([]clientv3.Op, 0, len(changedState)) hasDelete := false for key, value := range changedState { @@ -373,11 +383,11 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m op = clientv3.OpDelete(key.String()) hasDelete = true } - ops = append(ops, op) + opsThen = append(opsThen, op) } if hasDelete { - ops = append(ops, clientv3.OpPut(worker.prefix.String()+deletionCounterKey, fmt.Sprint(worker.deleteCounter+1))) + opsThen = append(opsThen, clientv3.OpPut(worker.prefix.String()+deletionCounterKey, fmt.Sprint(worker.deleteCounter+1))) } if worker.deleteCounter > 0 { cmps = append(cmps, clientv3.Compare(clientv3.Value(worker.prefix.String()+deletionCounterKey), "=", fmt.Sprint(worker.deleteCounter))) @@ -389,10 +399,7 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m worker.metrics.metricEtcdTxnSize.Observe(float64(size)) startTime := time.Now() - - txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration) - resp, err := worker.client.Txn(txnCtx).If(cmps...).Then(ops...).Commit() - cancel() + resp, err := worker.client.Txn(ctx, cmps, opsThen, etcd.TxnEmptyOpsElse) // For testing the situation where we have a progress notification that // has the same revision as the committed Etcd transaction. @@ -411,7 +418,7 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m return errors.Trace(err) } - logEtcdOps(ops, resp.Succeeded) + logEtcdOps(opsThen, resp.Succeeded) if resp.Succeeded { worker.barrierRev = resp.Header.GetRevision() return nil diff --git a/pkg/orchestrator/etcd_worker_test.go b/pkg/orchestrator/etcd_worker_test.go index 6d77450a012..3a6ef683d3c 100644 --- a/pkg/orchestrator/etcd_worker_test.go +++ b/pkg/orchestrator/etcd_worker_test.go @@ -33,6 +33,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -747,3 +748,10 @@ func TestModifyAfterDelete(t *testing.T) { _ = cli1.Unwrap().Close() _ = cli2.Unwrap().Close() } + +func TestRetryableError(t *testing.T) { + require.True(t, isRetryableError(cerrors.ErrEtcdTryAgain)) + require.True(t, isRetryableError(cerrors.ErrReachMaxTry.Wrap(rpctypes.ErrTimeoutDueToLeaderFail))) + require.True(t, isRetryableError(errors.Trace(context.DeadlineExceeded))) + require.False(t, isRetryableError(context.Canceled)) +}