Skip to content

Commit

Permalink
etcd/client(ticdc): add retry operation for etcd transaction api (pin…
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Feb 16, 2022
1 parent 0b79db0 commit 93110ac
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 50 deletions.
18 changes: 2 additions & 16 deletions dm/pkg/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/retry"
"github.com/pingcap/tiflow/pkg/errorutil"
)

const (
Expand All @@ -48,22 +49,7 @@ var etcdDefaultTxnRetryParam = retry.Params{
FirstRetryDuration: time.Second,
BackoffStrategy: retry.Stable,
IsRetryableFn: func(retryTime int, err error) bool {
switch err {
// Etcd ResourceExhausted errors, may recover after some time
case v3rpc.ErrNoSpace, v3rpc.ErrTooManyRequests:
return true
// Etcd Unavailable errors, may be available after some time
// https://github.com/etcd-io/etcd/pull/9934/files#diff-6d8785d0c9eaf96bc3e2b29c36493c04R162-R167
// ErrStopped:
// one of the etcd nodes stopped from failure injection
// ErrNotCapable:
// capability check has not been done (in the beginning)
case v3rpc.ErrNoLeader, v3rpc.ErrLeaderChanged, v3rpc.ErrNotCapable, v3rpc.ErrStopped, v3rpc.ErrTimeout,
v3rpc.ErrTimeoutDueToLeaderFail, v3rpc.ErrGRPCTimeoutDueToConnectionLost, v3rpc.ErrUnhealthy:
return true
default:
return false
}
return errorutil.IsRetryableEtcdError(err)
},
}

Expand Down
22 changes: 22 additions & 0 deletions pkg/errorutil/ignore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
tddl "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/mysql"
v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
)

// IsIgnorableMySQLDDLError is used to check what error can be ignored
Expand Down Expand Up @@ -46,3 +47,24 @@ func IsIgnorableMySQLDDLError(err error) bool {
return false
}
}

func IsRetryableEtcdError(err error) bool {
etcdErr := errors.Cause(err)

switch etcdErr {
// Etcd ResourceExhausted errors, may recover after some time
case v3rpc.ErrNoSpace, v3rpc.ErrTooManyRequests:
return true
// Etcd Unavailable errors, may be available after some time
// https://github.com/etcd-io/etcd/pull/9934/files#diff-6d8785d0c9eaf96bc3e2b29c36493c04R162-R167
// ErrStopped:
// one of the etcd nodes stopped from failure injection
// ErrNotCapable:
// capability check has not been done (in the beginning)
case v3rpc.ErrNoLeader, v3rpc.ErrLeaderChanged, v3rpc.ErrNotCapable, v3rpc.ErrStopped, v3rpc.ErrTimeout,
v3rpc.ErrTimeoutDueToLeaderFail, v3rpc.ErrGRPCTimeoutDueToConnectionLost, v3rpc.ErrUnhealthy:
return true
default:
return false
}
}
23 changes: 21 additions & 2 deletions pkg/errorutil/ignore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
"github.com/go-sql-driver/mysql"
"github.com/pingcap/tidb/infoschema"
tmysql "github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
)

func newMysqlErr(number uint16, message string) *mysql.MySQLError {
Expand All @@ -42,6 +43,24 @@ func TestIgnoreMysqlDDLError(t *testing.T) {
}

for _, item := range cases {
assert.Equal(t, item.ret, IsIgnorableMySQLDDLError(item.err))
require.Equal(t, item.ret, IsIgnorableMySQLDDLError(item.err))
}
}

func TestIsRetryableEtcdError(t *testing.T) {
cases := []struct {
err error
ret bool
}{
{nil, false},
{v3rpc.ErrCorrupt, false},

{v3rpc.ErrGRPCTimeoutDueToConnectionLost, true},
{v3rpc.ErrTimeoutDueToLeaderFail, true},
{v3rpc.ErrNoSpace, true},
}

for _, item := range cases {
require.Equal(t, item.ret, IsRetryableEtcdError(item.err))
}
}
50 changes: 37 additions & 13 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/errorutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
)
Expand All @@ -51,6 +52,15 @@ const (
etcdRequestProgressDuration = 1 * time.Second
// etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future
etcdWatchChBufferSize = 16
// etcdTxnTimeoutDuration represents the timeout duration for committing a
// transaction to Etcd
etcdTxnTimeoutDuration = 30 * time.Second
)

var (
TxnEmptyCmps = []clientv3.Cmp{}
TxnEmptyOpsThen = []clientv3.Op{}
TxnEmptyOpsElse = []clientv3.Op{}
)

// set to var instead of const for mocking the value to speedup test
Expand Down Expand Up @@ -121,12 +131,17 @@ func (c *Client) Delete(ctx context.Context, key string, opts ...clientv3.OpOpti
return c.cli.Delete(ctx, key, opts...)
}

// Txn delegates request to clientv3.KV.Txn
func (c *Client) Txn(ctx context.Context) clientv3.Txn {
if metric, ok := c.metrics[EtcdTxn]; ok {
metric.Inc()
}
return c.cli.Txn(ctx)
// Txn delegates request to clientv3.KV.Txn. The error returned can only be a non-retryable error,
// such as context.Canceled, context.DeadlineExceeded, errors.ErrReachMaxTry.
func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (resp *clientv3.TxnResponse, err error) {
txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration)
defer cancel()
err = retryRPC(EtcdTxn, c.metrics[EtcdTxn], func() error {
var inErr error
resp, inErr = c.cli.Txn(txnCtx).If(cmps...).Then(opsThen...).Else(opsElse...).Commit()
return inErr
})
return
}

// Grant delegates request to clientv3.Lease.Grant
Expand All @@ -144,11 +159,17 @@ func isRetryableError(rpcName string) retry.IsRetryable {
if !cerrors.IsRetryableError(err) {
return false
}
if rpcName == EtcdRevoke {
if etcdErr, ok := err.(rpctypes.EtcdError); ok && etcdErr.Code() == codes.NotFound {
// it means the etcd lease is already expired or revoked

switch rpcName {
case EtcdRevoke:
if etcdErr, ok := err.(v3rpc.EtcdError); ok && etcdErr.Code() == codes.NotFound {
// It means the etcd lease is already expired or revoked
return false
}
case EtcdTxn:
return errorutil.IsRetryableEtcdError(err)
default:
// For other types of operation, we retry directly without handling errors
}

return true
Expand Down Expand Up @@ -193,7 +214,10 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
lastRevision := getRevisionFromWatchOpts(opts...)

watchCtx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
// Using closures to handle changes to the cancel function
cancel()
}()
watchCh := c.cli.Watch(watchCtx, key, opts...)

ticker := c.clock.Ticker(etcdRequestProgressDuration)
Expand All @@ -203,7 +227,6 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
for {
select {
case <-ctx.Done():
cancel()
return
case response := <-watchCh:
lastReceivedResponseTime = c.clock.Now()
Expand All @@ -217,7 +240,6 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
for {
select {
case <-ctx.Done():
cancel()
return
case outCh <- response: // it may block here
break Loop
Expand All @@ -238,6 +260,8 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
log.Warn("etcd client watchCh blocking too long, reset the watchCh", zap.Duration("duration", c.clock.Since(lastReceivedResponseTime)), zap.Stack("stack"))
cancel()
watchCtx, cancel = context.WithCancel(ctx)
// to avoid possible context leak warning from govet
_ = cancel
watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision))
// we need to reset lastReceivedResponseTime after reset Watch
lastReceivedResponseTime = c.clock.Now()
Expand Down
72 changes: 72 additions & 0 deletions pkg/etcd/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/errors"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
)

type mockClient struct {
Expand All @@ -43,6 +44,10 @@ func (m *mockClient) Put(ctx context.Context, key, val string, opts ...clientv3.
return nil, errors.New("mock error")
}

func (m *mockClient) Txn(ctx context.Context) clientv3.Txn {
return &mockTxn{ctx: ctx}
}

type mockWatcher struct {
clientv3.Watcher
watchCh chan clientv3.WatchResponse
Expand Down Expand Up @@ -82,6 +87,32 @@ func TestRetry(t *testing.T) {
_, err = retrycli.Put(context.TODO(), "", "")
require.NotNil(t, err)
require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Test Txn case
// case 0: normal
rsp, err := retrycli.Txn(ctx, nil, nil, nil)
require.Nil(t, err)
require.False(t, rsp.Succeeded)

// case 1: errors.ErrReachMaxTry
_, err = retrycli.Txn(ctx, TxnEmptyCmps, nil, nil)
require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err)

// case 2: errors.ErrReachMaxTry
_, err = retrycli.Txn(ctx, nil, TxnEmptyOpsThen, nil)
require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err)

// case 3: context.DeadlineExceeded
_, err = retrycli.Txn(ctx, TxnEmptyCmps, TxnEmptyOpsThen, nil)
require.Equal(t, context.DeadlineExceeded, err)

// other case: mock error
_, err = retrycli.Txn(ctx, TxnEmptyCmps, TxnEmptyOpsThen, TxnEmptyOpsElse)
require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error())

maxTries = originValue
}

Expand Down Expand Up @@ -276,3 +307,44 @@ func TestRevisionNotFallBack(t *testing.T) {
// while WatchCh was reset
require.Equal(t, *watcher.rev, revision)
}

type mockTxn struct {
ctx context.Context
mode int
}

func (txn *mockTxn) If(cs ...clientv3.Cmp) clientv3.Txn {
if cs != nil {
txn.mode += 1
}
return txn
}

func (txn *mockTxn) Then(ops ...clientv3.Op) clientv3.Txn {
if ops != nil {
txn.mode += 1 << 1
}
return txn
}

func (txn *mockTxn) Else(ops ...clientv3.Op) clientv3.Txn {
if ops != nil {
txn.mode += 1 << 2
}
return txn
}

func (txn *mockTxn) Commit() (*clientv3.TxnResponse, error) {
switch txn.mode {
case 0:
return &clientv3.TxnResponse{}, nil
case 1:
return nil, rpctypes.ErrNoSpace
case 2:
return nil, rpctypes.ErrTimeoutDueToLeaderFail
case 3:
return nil, context.DeadlineExceeded
default:
return nil, errors.New("mock error")
}
}
17 changes: 12 additions & 5 deletions pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,15 @@ func (c CDCEtcdClient) CreateChangefeedInfo(ctx context.Context, info *model.Cha
if err != nil {
return errors.Trace(err)
}
resp, err := c.Client.Txn(ctx).If(

cmps := []clientv3.Cmp{
clientv3.Compare(clientv3.ModRevision(infoKey), "=", 0),
clientv3.Compare(clientv3.ModRevision(jobKey), "=", 0),
).Then(
}
opsThen := []clientv3.Op{
clientv3.OpPut(infoKey, value),
).Commit()
}
resp, err := c.Client.Txn(ctx, cmps, opsThen, TxnEmptyOpsElse)
if err != nil {
return cerror.WrapError(cerror.ErrPDEtcdAPIError, err)
}
Expand Down Expand Up @@ -504,10 +507,14 @@ func (c CDCEtcdClient) PutTaskPositionOnChange(
}

key := GetEtcdKeyTaskPosition(changefeedID, captureID)
resp, err := c.Client.Txn(ctx).If(
cmps := []clientv3.Cmp{
clientv3.Compare(clientv3.ModRevision(key), ">", 0),
clientv3.Compare(clientv3.Value(key), "=", data),
).Else(clientv3.OpPut(key, data)).Commit()
}
opsElse := []clientv3.Op{
clientv3.OpPut(key, data),
}
resp, err := c.Client.Txn(ctx, cmps, TxnEmptyOpsThen, opsElse)
if err != nil {
return false, cerror.WrapError(cerror.ErrPDEtcdAPIError, err)
}
Expand Down
Loading

0 comments on commit 93110ac

Please sign in to comment.