From b778779b44c924056ae60c29588388bc6ee3656b Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Tue, 25 Jan 2022 10:38:05 +0800 Subject: [PATCH 1/9] optimize WatchWithChan method in pkg/etcd/client.go --- pkg/etcd/client.go | 9 ++++++--- pkg/orchestrator/etcd_worker.go | 1 + 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index b346ac6a3cb..11ebe533543 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -193,7 +193,10 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR lastRevision := getRevisionFromWatchOpts(opts...) watchCtx, cancel := context.WithCancel(ctx) - defer cancel() + defer func() { + // Using closures to handle changes to the cancel function + cancel() + }() watchCh := c.cli.Watch(watchCtx, key, opts...) ticker := c.clock.Ticker(etcdRequestProgressDuration) @@ -203,7 +206,6 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR for { select { case <-ctx.Done(): - cancel() return case response := <-watchCh: lastReceivedResponseTime = c.clock.Now() @@ -217,7 +219,6 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR for { select { case <-ctx.Done(): - cancel() return case outCh <- response: // it may block here break Loop @@ -243,6 +244,8 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR zap.String("role", role)) cancel() watchCtx, cancel = context.WithCancel(ctx) + // to avoid possible context leak warning from govet + _ = cancel watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision)) // we need to reset lastReceivedResponseTime after reset Watch lastReceivedResponseTime = c.clock.Now() diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index f5d47146445..17859cf7d49 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -206,6 +206,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, // Here we have some patches yet to be uploaded to Etcd. pendingPatches, err = worker.applyPatchGroups(ctx, pendingPatches) if err != nil { + // etcd client 报错为 ErrTimeoutDueToLeaderFail 时也应该 continue 重试 if cerrors.ErrEtcdTryAgain.Equal(errors.Cause(err)) { continue } From e2255a2b6a0d0c8719a90d4a0d417efc2c9b2d0d Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Tue, 25 Jan 2022 13:54:15 +0800 Subject: [PATCH 2/9] etcd/client(ticdc): add retry operation for etcd transaction api (#4248) --- pkg/etcd/client.go | 10 ++++++++++ pkg/etcd/etcd.go | 20 +++++++++++++++----- pkg/orchestrator/etcd_worker.go | 23 ++++++++++++++--------- 3 files changed, 39 insertions(+), 14 deletions(-) diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index 11ebe533543..c4f44195bc2 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -129,6 +129,16 @@ func (c *Client) Txn(ctx context.Context) clientv3.Txn { return c.cli.Txn(ctx) } +// Txn delegates request to clientv3.KV.Txn +func (c *Client) Txn1(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (resp *clientv3.TxnResponse, err error) { + err = retryRPC(EtcdTxn, c.metrics[EtcdTxn], func() error { + var inErr error + resp, inErr = c.cli.Txn(ctx).If(cmps...).Then(opsThen...).Else(opsElse...).Commit() + return inErr + }) + return +} + // Grant delegates request to clientv3.Lease.Grant func (c *Client) Grant(ctx context.Context, ttl int64) (resp *clientv3.LeaseGrantResponse, err error) { err = retryRPC(EtcdGrant, c.metrics[EtcdGrant], func() error { diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index bd7d1f12beb..3c77a81ebc8 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -304,12 +304,17 @@ func (c CDCEtcdClient) CreateChangefeedInfo(ctx context.Context, info *model.Cha if err != nil { return errors.Trace(err) } - resp, err := c.Client.Txn(ctx).If( + + cmps := []clientv3.Cmp{ clientv3.Compare(clientv3.ModRevision(infoKey), "=", 0), clientv3.Compare(clientv3.ModRevision(jobKey), "=", 0), - ).Then( + } + opsThen := []clientv3.Op{ clientv3.OpPut(infoKey, value), - ).Commit() + } + opsElse := make([]clientv3.Op, 0) + resp, err := c.Client.Txn1(ctx, cmps, opsThen, opsElse) + if err != nil { return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) } @@ -504,10 +509,15 @@ func (c CDCEtcdClient) PutTaskPositionOnChange( } key := GetEtcdKeyTaskPosition(changefeedID, captureID) - resp, err := c.Client.Txn(ctx).If( + cmps := []clientv3.Cmp{ clientv3.Compare(clientv3.ModRevision(key), ">", 0), clientv3.Compare(clientv3.Value(key), "=", data), - ).Else(clientv3.OpPut(key, data)).Commit() + } + opsThen := make([]clientv3.Op, 0) + opsElse := []clientv3.Op{ + clientv3.OpPut(key, data), + } + resp, err := c.Client.Txn1(ctx, cmps, opsThen, opsElse) if err != nil { return false, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) } diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index 17859cf7d49..cdd5cb3f6dd 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -205,11 +205,10 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, if len(pendingPatches) > 0 { // Here we have some patches yet to be uploaded to Etcd. pendingPatches, err = worker.applyPatchGroups(ctx, pendingPatches) + if isRetryableError(err) { + continue + } if err != nil { - // etcd client 报错为 ErrTimeoutDueToLeaderFail 时也应该 continue 重试 - if cerrors.ErrEtcdTryAgain.Equal(errors.Cause(err)) { - continue - } return errors.Trace(err) } } else { @@ -258,6 +257,10 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, } } +func isRetryableError(err error) bool { + return cerrors.ErrEtcdTryAgain.Equal(errors.Cause(err)) +} + func (worker *EtcdWorker) handleEvent(_ context.Context, event *clientv3.Event) { if worker.isDeleteCounterKey(event.Kv.Key) { switch event.Type { @@ -352,7 +355,8 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m } cmps := make([]clientv3.Cmp, 0, len(changedState)) - ops := make([]clientv3.Op, 0, len(changedState)) + opsThen := make([]clientv3.Op, 0, len(changedState)) + opsElse := make([]clientv3.Op, 0) hasDelete := false for key, value := range changedState { @@ -374,11 +378,11 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m op = clientv3.OpDelete(key.String()) hasDelete = true } - ops = append(ops, op) + opsThen = append(opsThen, op) } if hasDelete { - ops = append(ops, clientv3.OpPut(worker.prefix.String()+deletionCounterKey, fmt.Sprint(worker.deleteCounter+1))) + opsThen = append(opsThen, clientv3.OpPut(worker.prefix.String()+deletionCounterKey, fmt.Sprint(worker.deleteCounter+1))) } if worker.deleteCounter > 0 { cmps = append(cmps, clientv3.Compare(clientv3.Value(worker.prefix.String()+deletionCounterKey), "=", fmt.Sprint(worker.deleteCounter))) @@ -392,7 +396,8 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m startTime := time.Now() txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration) - resp, err := worker.client.Txn(txnCtx).If(cmps...).Then(ops...).Commit() + resp, err := worker.client.Txn1(txnCtx, cmps, opsThen, opsElse) + // resp, err = worker.client.Txn(txnCtx).If(cmps...).Then(opsThen...).Commit() cancel() // For testing the situation where we have a progress notification that @@ -412,7 +417,7 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m return errors.Trace(err) } - logEtcdOps(ops, resp.Succeeded) + logEtcdOps(opsThen, resp.Succeeded) if resp.Succeeded { worker.barrierRev = resp.Header.GetRevision() return nil From 0d9b7849c567f32e792658b17a82a6e617869b73 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Tue, 25 Jan 2022 14:13:33 +0800 Subject: [PATCH 3/9] add retryable control to etcd Txn --- pkg/etcd/client.go | 49 ++++++++++++++++++++++++--------- pkg/etcd/etcd.go | 7 ++--- pkg/orchestrator/etcd_worker.go | 13 ++------- 3 files changed, 41 insertions(+), 28 deletions(-) diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index c4f44195bc2..e9c75fbd198 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -24,7 +24,7 @@ import ( "github.com/pingcap/tiflow/pkg/retry" "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.uber.org/zap" "google.golang.org/grpc/codes" ) @@ -51,6 +51,15 @@ const ( etcdRequestProgressDuration = 1 * time.Second // etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future etcdWatchChBufferSize = 16 + // etcdTxnTimeoutDuration represents the timeout duration for committing a + // transaction to Etcd + etcdTxnTimeoutDuration = 30 * time.Second +) + +var ( + TxnEmptyCmps = []clientv3.Cmp{} + TxnEmptyOpsThen = []clientv3.Op{} + TxnEmptyOpsElse = []clientv3.Op{} ) // set to var instead of const for mocking the value to speedup test @@ -121,19 +130,14 @@ func (c *Client) Delete(ctx context.Context, key string, opts ...clientv3.OpOpti return c.cli.Delete(ctx, key, opts...) } -// Txn delegates request to clientv3.KV.Txn -func (c *Client) Txn(ctx context.Context) clientv3.Txn { - if metric, ok := c.metrics[EtcdTxn]; ok { - metric.Inc() - } - return c.cli.Txn(ctx) -} - -// Txn delegates request to clientv3.KV.Txn -func (c *Client) Txn1(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (resp *clientv3.TxnResponse, err error) { +// Txn delegates request to clientv3.KV.Txn. The error returned can only be a non-retryable error, +// such as context.Canceled, context.DeadlineExceeded, errors.ErrReachMaxTry. +func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (resp *clientv3.TxnResponse, err error) { + txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration) + defer cancel() err = retryRPC(EtcdTxn, c.metrics[EtcdTxn], func() error { var inErr error - resp, inErr = c.cli.Txn(ctx).If(cmps...).Then(opsThen...).Else(opsElse...).Commit() + resp, inErr = c.cli.Txn(txnCtx).If(cmps...).Then(opsThen...).Else(opsElse...).Commit() return inErr }) return @@ -155,12 +159,31 @@ func isRetryableError(rpcName string) retry.IsRetryable { return false } if rpcName == EtcdRevoke { - if etcdErr, ok := err.(rpctypes.EtcdError); ok && etcdErr.Code() == codes.NotFound { + if etcdErr, ok := err.(v3rpc.EtcdError); ok && etcdErr.Code() == codes.NotFound { // it means the etcd lease is already expired or revoked return false } } + if rpcName == EtcdTxn { + switch err { + // Etcd ResourceExhausted errors, may recover after some time + case v3rpc.ErrNoSpace, v3rpc.ErrTooManyRequests: + return true + // Etcd Unavailable errors, may be available after some time + // https://github.com/etcd-io/etcd/pull/9934/files#diff-6d8785d0c9eaf96bc3e2b29c36493c04R162-R167 + // ErrStopped: + // one of the etcd nodes stopped from failure injection + // ErrNotCapable: + // capability check has not been done (in the beginning) + case v3rpc.ErrNoLeader, v3rpc.ErrLeaderChanged, v3rpc.ErrNotCapable, v3rpc.ErrStopped, v3rpc.ErrTimeout, + v3rpc.ErrTimeoutDueToLeaderFail, v3rpc.ErrGRPCTimeoutDueToConnectionLost, v3rpc.ErrUnhealthy: + return true + default: + return false + } + } + return true } } diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 3c77a81ebc8..ccad20a2176 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -312,9 +312,7 @@ func (c CDCEtcdClient) CreateChangefeedInfo(ctx context.Context, info *model.Cha opsThen := []clientv3.Op{ clientv3.OpPut(infoKey, value), } - opsElse := make([]clientv3.Op, 0) - resp, err := c.Client.Txn1(ctx, cmps, opsThen, opsElse) - + resp, err := c.Client.Txn(ctx, cmps, opsThen, TxnEmptyOpsElse) if err != nil { return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) } @@ -513,11 +511,10 @@ func (c CDCEtcdClient) PutTaskPositionOnChange( clientv3.Compare(clientv3.ModRevision(key), ">", 0), clientv3.Compare(clientv3.Value(key), "=", data), } - opsThen := make([]clientv3.Op, 0) opsElse := []clientv3.Op{ clientv3.OpPut(key, data), } - resp, err := c.Client.Txn1(ctx, cmps, opsThen, opsElse) + resp, err := c.Client.Txn(ctx, cmps, TxnEmptyOpsThen, opsElse) if err != nil { return false, cerror.WrapError(cerror.ErrPDEtcdAPIError, err) } diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index cdd5cb3f6dd..dbcec40ce2e 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -36,9 +36,6 @@ import ( ) const ( - // etcdTxnTimeoutDuration represents the timeout duration for committing a - // transaction to Etcd - etcdTxnTimeoutDuration = 30 * time.Second // When EtcdWorker commits a txn to etcd or ticks its reactor // takes more than etcdWorkerLogsWarnDuration, it will print a log etcdWorkerLogsWarnDuration = 1 * time.Second @@ -258,7 +255,8 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, } func isRetryableError(err error) bool { - return cerrors.ErrEtcdTryAgain.Equal(errors.Cause(err)) + return cerrors.ErrEtcdTryAgain.Equal(errors.Cause(err)) || + errors.Cause(err) == context.DeadlineExceeded } func (worker *EtcdWorker) handleEvent(_ context.Context, event *clientv3.Event) { @@ -356,7 +354,6 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m cmps := make([]clientv3.Cmp, 0, len(changedState)) opsThen := make([]clientv3.Op, 0, len(changedState)) - opsElse := make([]clientv3.Op, 0) hasDelete := false for key, value := range changedState { @@ -394,11 +391,7 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m worker.metrics.metricEtcdTxnSize.Observe(float64(size)) startTime := time.Now() - - txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration) - resp, err := worker.client.Txn1(txnCtx, cmps, opsThen, opsElse) - // resp, err = worker.client.Txn(txnCtx).If(cmps...).Then(opsThen...).Commit() - cancel() + resp, err := worker.client.Txn(ctx, cmps, opsThen, etcd.TxnEmptyOpsElse) // For testing the situation where we have a progress notification that // has the same revision as the committed Etcd transaction. From 3afb408277466aa17bd982bc40b1d75452e70a98 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Tue, 25 Jan 2022 15:06:06 +0800 Subject: [PATCH 4/9] optimize isRetryableError in etcd_worker.go --- pkg/orchestrator/etcd_worker.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index dbcec40ce2e..823f7c37608 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -255,8 +255,10 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, } func isRetryableError(err error) bool { - return cerrors.ErrEtcdTryAgain.Equal(errors.Cause(err)) || - errors.Cause(err) == context.DeadlineExceeded + err = errors.Cause(err) + return cerrors.ErrEtcdTryAgain.Equal(err) || + cerrors.ErrReachMaxTry.Equal(err) || + context.DeadlineExceeded == err } func (worker *EtcdWorker) handleEvent(_ context.Context, event *clientv3.Event) { From e66739b5ec3f0c922c869983a9afe112145a6ba7 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Fri, 28 Jan 2022 17:35:58 +0800 Subject: [PATCH 5/9] optimize isRetryableError in etcd_worker.go && add unit tests --- pkg/etcd/client_test.go | 72 ++++++++++++++++++++++++++++ pkg/orchestrator/etcd_worker.go | 23 +++++---- pkg/orchestrator/etcd_worker_test.go | 8 ++++ 3 files changed, 95 insertions(+), 8 deletions(-) diff --git a/pkg/etcd/client_test.go b/pkg/etcd/client_test.go index efa03c6d795..42220d48e9d 100644 --- a/pkg/etcd/client_test.go +++ b/pkg/etcd/client_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/errors" "github.com/stretchr/testify/require" "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" ) type mockClient struct { @@ -43,6 +44,10 @@ func (m *mockClient) Put(ctx context.Context, key, val string, opts ...clientv3. return nil, errors.New("mock error") } +func (m *mockClient) Txn(ctx context.Context) clientv3.Txn { + return &mockTxn{ctx: ctx} +} + type mockWatcher struct { clientv3.Watcher watchCh chan clientv3.WatchResponse @@ -82,6 +87,32 @@ func TestRetry(t *testing.T) { _, err = retrycli.Put(context.TODO(), "", "") require.NotNil(t, err) require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Test Txn case + // case 0: normal + rsp, err := retrycli.Txn(ctx, nil, nil, nil) + require.Nil(t, err) + require.False(t, rsp.Succeeded) + + // case 1: errors.ErrReachMaxTry + _, err = retrycli.Txn(ctx, TxnEmptyCmps, nil, nil) + require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err) + + // case 2: errors.ErrReachMaxTry + _, err = retrycli.Txn(ctx, nil, TxnEmptyOpsThen, nil) + require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err) + + // case 3: context.DeadlineExceeded + _, err = retrycli.Txn(ctx, TxnEmptyCmps, TxnEmptyOpsThen, nil) + require.Equal(t, context.DeadlineExceeded, err) + + // other case: mock error + _, err = retrycli.Txn(ctx, TxnEmptyCmps, TxnEmptyOpsThen, TxnEmptyOpsElse) + require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error()) + maxTries = originValue } @@ -276,3 +307,44 @@ func TestRevisionNotFallBack(t *testing.T) { // while WatchCh was reset require.Equal(t, *watcher.rev, revision) } + +type mockTxn struct { + ctx context.Context + mode int +} + +func (txn *mockTxn) If(cs ...clientv3.Cmp) clientv3.Txn { + if cs != nil { + txn.mode += 1 + } + return txn +} + +func (txn *mockTxn) Then(ops ...clientv3.Op) clientv3.Txn { + if ops != nil { + txn.mode += 1 << 1 + } + return txn +} + +func (txn *mockTxn) Else(ops ...clientv3.Op) clientv3.Txn { + if ops != nil { + txn.mode += 1 << 2 + } + return txn +} + +func (txn *mockTxn) Commit() (*clientv3.TxnResponse, error) { + switch txn.mode { + case 0: + return &clientv3.TxnResponse{}, nil + case 1: + return nil, rpctypes.ErrNoSpace + case 2: + return nil, rpctypes.ErrTimeoutDueToLeaderFail + case 3: + return nil, context.DeadlineExceeded + default: + return nil, errors.New("mock error") + } +} diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index 823f7c37608..16b96fc4008 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -28,6 +28,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" @@ -256,9 +257,15 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, func isRetryableError(err error) bool { err = errors.Cause(err) - return cerrors.ErrEtcdTryAgain.Equal(err) || - cerrors.ErrReachMaxTry.Equal(err) || - context.DeadlineExceeded == err + if cerrors.ErrEtcdTryAgain.Equal(err) || + context.DeadlineExceeded == err { + return true + } else { + // When encountering an abnormal connection with etcd, the worker will keep retrying + // until the session is done. + _, ok := err.(rpctypes.EtcdError) + return ok + } } func (worker *EtcdWorker) handleEvent(_ context.Context, event *clientv3.Event) { @@ -409,18 +416,18 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m } worker.metrics.metricEtcdTxnDuration.Observe(costTime.Seconds()) if err != nil { - return errors.Trace(err) + return cerrors.WrapError(cerrors.ErrEtcdTryAgain, err) } logEtcdOps(opsThen, resp.Succeeded) if resp.Succeeded { worker.barrierRev = resp.Header.GetRevision() return nil + } else { + // Logs the conditions for the failed Etcd transaction. + worker.logEtcdCmps(cmps) + return cerrors.ErrEtcdTryAgain.GenWithStackByArgs() } - - // Logs the conditions for the failed Etcd transaction. - worker.logEtcdCmps(cmps) - return cerrors.ErrEtcdTryAgain.GenWithStackByArgs() } func (worker *EtcdWorker) applyUpdates() error { diff --git a/pkg/orchestrator/etcd_worker_test.go b/pkg/orchestrator/etcd_worker_test.go index 6d77450a012..3a6ef683d3c 100644 --- a/pkg/orchestrator/etcd_worker_test.go +++ b/pkg/orchestrator/etcd_worker_test.go @@ -33,6 +33,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -747,3 +748,10 @@ func TestModifyAfterDelete(t *testing.T) { _ = cli1.Unwrap().Close() _ = cli2.Unwrap().Close() } + +func TestRetryableError(t *testing.T) { + require.True(t, isRetryableError(cerrors.ErrEtcdTryAgain)) + require.True(t, isRetryableError(cerrors.ErrReachMaxTry.Wrap(rpctypes.ErrTimeoutDueToLeaderFail))) + require.True(t, isRetryableError(errors.Trace(context.DeadlineExceeded))) + require.False(t, isRetryableError(context.Canceled)) +} From d3df93928829d7c559aaaadb8c6d602440f5dcb6 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Fri, 28 Jan 2022 20:39:25 +0800 Subject: [PATCH 6/9] update errorutil --- dm/pkg/etcdutil/etcdutil.go | 18 ++---------------- pkg/errorutil/ignore.go | 22 ++++++++++++++++++++++ pkg/errorutil/ignore_test.go | 23 +++++++++++++++++++++-- pkg/etcd/client.go | 20 ++------------------ 4 files changed, 47 insertions(+), 36 deletions(-) diff --git a/dm/pkg/etcdutil/etcdutil.go b/dm/pkg/etcdutil/etcdutil.go index 2907bbe9695..f90b329e391 100644 --- a/dm/pkg/etcdutil/etcdutil.go +++ b/dm/pkg/etcdutil/etcdutil.go @@ -29,6 +29,7 @@ import ( tcontext "github.com/pingcap/tiflow/dm/pkg/context" "github.com/pingcap/tiflow/dm/pkg/log" "github.com/pingcap/tiflow/dm/pkg/retry" + "github.com/pingcap/tiflow/pkg/errorutil" ) const ( @@ -48,22 +49,7 @@ var etcdDefaultTxnRetryParam = retry.Params{ FirstRetryDuration: time.Second, BackoffStrategy: retry.Stable, IsRetryableFn: func(retryTime int, err error) bool { - switch err { - // Etcd ResourceExhausted errors, may recover after some time - case v3rpc.ErrNoSpace, v3rpc.ErrTooManyRequests: - return true - // Etcd Unavailable errors, may be available after some time - // https://github.com/etcd-io/etcd/pull/9934/files#diff-6d8785d0c9eaf96bc3e2b29c36493c04R162-R167 - // ErrStopped: - // one of the etcd nodes stopped from failure injection - // ErrNotCapable: - // capability check has not been done (in the beginning) - case v3rpc.ErrNoLeader, v3rpc.ErrLeaderChanged, v3rpc.ErrNotCapable, v3rpc.ErrStopped, v3rpc.ErrTimeout, - v3rpc.ErrTimeoutDueToLeaderFail, v3rpc.ErrGRPCTimeoutDueToConnectionLost, v3rpc.ErrUnhealthy: - return true - default: - return false - } + return errorutil.IsRetryableEtcdError(err) }, } diff --git a/pkg/errorutil/ignore.go b/pkg/errorutil/ignore.go index 6a48dd203f6..05ecf2ad36d 100644 --- a/pkg/errorutil/ignore.go +++ b/pkg/errorutil/ignore.go @@ -19,6 +19,7 @@ import ( tddl "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/parser/mysql" + v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" ) // IsIgnorableMySQLDDLError is used to check what error can be ignored @@ -46,3 +47,24 @@ func IsIgnorableMySQLDDLError(err error) bool { return false } } + +func IsRetryableEtcdError(err error) bool { + etcdErr := errors.Cause(err) + + switch etcdErr { + // Etcd ResourceExhausted errors, may recover after some time + case v3rpc.ErrNoSpace, v3rpc.ErrTooManyRequests: + return true + // Etcd Unavailable errors, may be available after some time + // https://github.com/etcd-io/etcd/pull/9934/files#diff-6d8785d0c9eaf96bc3e2b29c36493c04R162-R167 + // ErrStopped: + // one of the etcd nodes stopped from failure injection + // ErrNotCapable: + // capability check has not been done (in the beginning) + case v3rpc.ErrNoLeader, v3rpc.ErrLeaderChanged, v3rpc.ErrNotCapable, v3rpc.ErrStopped, v3rpc.ErrTimeout, + v3rpc.ErrTimeoutDueToLeaderFail, v3rpc.ErrGRPCTimeoutDueToConnectionLost, v3rpc.ErrUnhealthy: + return true + default: + return false + } +} diff --git a/pkg/errorutil/ignore_test.go b/pkg/errorutil/ignore_test.go index 7c2dbd0f7e0..825bf7d91b6 100644 --- a/pkg/errorutil/ignore_test.go +++ b/pkg/errorutil/ignore_test.go @@ -20,7 +20,8 @@ import ( "github.com/go-sql-driver/mysql" "github.com/pingcap/tidb/infoschema" tmysql "github.com/pingcap/tidb/parser/mysql" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" ) func newMysqlErr(number uint16, message string) *mysql.MySQLError { @@ -42,6 +43,24 @@ func TestIgnoreMysqlDDLError(t *testing.T) { } for _, item := range cases { - assert.Equal(t, item.ret, IsIgnorableMySQLDDLError(item.err)) + require.Equal(t, item.ret, IsIgnorableMySQLDDLError(item.err)) + } +} + +func TestIsRetryableEtcdError(t *testing.T) { + cases := []struct { + err error + ret bool + }{ + {nil, false}, + {v3rpc.ErrCorrupt, false}, + + {v3rpc.ErrGRPCTimeoutDueToConnectionLost, true}, + {v3rpc.ErrTimeoutDueToLeaderFail, true}, + {v3rpc.ErrNoSpace, true}, + } + + for _, item := range cases { + require.Equal(t, item.ret, IsRetryableEtcdError(item.err)) } } diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index e9c75fbd198..c79bca04aa9 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" cerrors "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/errorutil" "github.com/pingcap/tiflow/pkg/retry" "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/clientv3" @@ -164,26 +165,9 @@ func isRetryableError(rpcName string) retry.IsRetryable { return false } } - if rpcName == EtcdTxn { - switch err { - // Etcd ResourceExhausted errors, may recover after some time - case v3rpc.ErrNoSpace, v3rpc.ErrTooManyRequests: - return true - // Etcd Unavailable errors, may be available after some time - // https://github.com/etcd-io/etcd/pull/9934/files#diff-6d8785d0c9eaf96bc3e2b29c36493c04R162-R167 - // ErrStopped: - // one of the etcd nodes stopped from failure injection - // ErrNotCapable: - // capability check has not been done (in the beginning) - case v3rpc.ErrNoLeader, v3rpc.ErrLeaderChanged, v3rpc.ErrNotCapable, v3rpc.ErrStopped, v3rpc.ErrTimeout, - v3rpc.ErrTimeoutDueToLeaderFail, v3rpc.ErrGRPCTimeoutDueToConnectionLost, v3rpc.ErrUnhealthy: - return true - default: - return false - } + return errorutil.IsRetryableEtcdError(err) } - return true } } From 233a1e39027f1f429a36806f5ec5f027ee99b6f9 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Fri, 28 Jan 2022 21:09:50 +0800 Subject: [PATCH 7/9] handle ling error --- pkg/orchestrator/etcd_worker.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index 16b96fc4008..5a7b3c15dd8 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -260,12 +260,11 @@ func isRetryableError(err error) bool { if cerrors.ErrEtcdTryAgain.Equal(err) || context.DeadlineExceeded == err { return true - } else { - // When encountering an abnormal connection with etcd, the worker will keep retrying - // until the session is done. - _, ok := err.(rpctypes.EtcdError) - return ok } + // When encountering an abnormal connection with etcd, the worker will keep retrying + // until the session is done. + _, ok := err.(rpctypes.EtcdError) + return ok } func (worker *EtcdWorker) handleEvent(_ context.Context, event *clientv3.Event) { @@ -423,11 +422,11 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m if resp.Succeeded { worker.barrierRev = resp.Header.GetRevision() return nil - } else { - // Logs the conditions for the failed Etcd transaction. - worker.logEtcdCmps(cmps) - return cerrors.ErrEtcdTryAgain.GenWithStackByArgs() } + + // Logs the conditions for the failed Etcd transaction. + worker.logEtcdCmps(cmps) + return cerrors.ErrEtcdTryAgain.GenWithStackByArgs() } func (worker *EtcdWorker) applyUpdates() error { From c6b761fa32199cdc6788b1676b9995e37987af99 Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Mon, 7 Feb 2022 10:30:47 +0800 Subject: [PATCH 8/9] fix some error --- pkg/orchestrator/etcd_worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index 5a7b3c15dd8..d334ca12c1e 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -415,7 +415,7 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m } worker.metrics.metricEtcdTxnDuration.Observe(costTime.Seconds()) if err != nil { - return cerrors.WrapError(cerrors.ErrEtcdTryAgain, err) + return errors.Trace(err) } logEtcdOps(opsThen, resp.Succeeded) From bb39701c1060b39b57f2c4bab8020f199e20c68b Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Mon, 7 Feb 2022 14:00:15 +0800 Subject: [PATCH 9/9] address comment --- pkg/etcd/client.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index c79bca04aa9..bb4f50c197c 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -159,15 +159,19 @@ func isRetryableError(rpcName string) retry.IsRetryable { if !cerrors.IsRetryableError(err) { return false } - if rpcName == EtcdRevoke { + + switch rpcName { + case EtcdRevoke: if etcdErr, ok := err.(v3rpc.EtcdError); ok && etcdErr.Code() == codes.NotFound { - // it means the etcd lease is already expired or revoked + // It means the etcd lease is already expired or revoked return false } - } - if rpcName == EtcdTxn { + case EtcdTxn: return errorutil.IsRetryableEtcdError(err) + default: + // For other types of operation, we retry directly without handling errors } + return true } }