Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcd/client(ticdc): add retry operation for etcd transaction api (#4248) #4474

Merged
merged 17 commits into from
Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 2 additions & 16 deletions dm/pkg/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/retry"
"github.com/pingcap/tiflow/pkg/errorutil"
)

const (
Expand All @@ -48,22 +49,7 @@ var etcdDefaultTxnRetryParam = retry.Params{
FirstRetryDuration: time.Second,
BackoffStrategy: retry.Stable,
IsRetryableFn: func(retryTime int, err error) bool {
switch err {
// Etcd ResourceExhausted errors, may recover after some time
case v3rpc.ErrNoSpace, v3rpc.ErrTooManyRequests:
return true
// Etcd Unavailable errors, may be available after some time
// https://github.com/etcd-io/etcd/pull/9934/files#diff-6d8785d0c9eaf96bc3e2b29c36493c04R162-R167
// ErrStopped:
// one of the etcd nodes stopped from failure injection
// ErrNotCapable:
// capability check has not been done (in the beginning)
case v3rpc.ErrNoLeader, v3rpc.ErrLeaderChanged, v3rpc.ErrNotCapable, v3rpc.ErrStopped, v3rpc.ErrTimeout,
v3rpc.ErrTimeoutDueToLeaderFail, v3rpc.ErrGRPCTimeoutDueToConnectionLost, v3rpc.ErrUnhealthy:
return true
default:
return false
}
return errorutil.IsRetryableEtcdError(err)
},
}

Expand Down
22 changes: 22 additions & 0 deletions pkg/errorutil/ignore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
tddl "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/mysql"
v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
)

// IsIgnorableMySQLDDLError is used to check what error can be ignored
Expand Down Expand Up @@ -46,3 +47,24 @@ func IsIgnorableMySQLDDLError(err error) bool {
return false
}
}

func IsRetryableEtcdError(err error) bool {
etcdErr := errors.Cause(err)

switch etcdErr {
// Etcd ResourceExhausted errors, may recover after some time
case v3rpc.ErrNoSpace, v3rpc.ErrTooManyRequests:
return true
// Etcd Unavailable errors, may be available after some time
// https://github.com/etcd-io/etcd/pull/9934/files#diff-6d8785d0c9eaf96bc3e2b29c36493c04R162-R167
// ErrStopped:
// one of the etcd nodes stopped from failure injection
// ErrNotCapable:
// capability check has not been done (in the beginning)
case v3rpc.ErrNoLeader, v3rpc.ErrLeaderChanged, v3rpc.ErrNotCapable, v3rpc.ErrStopped, v3rpc.ErrTimeout,
v3rpc.ErrTimeoutDueToLeaderFail, v3rpc.ErrGRPCTimeoutDueToConnectionLost, v3rpc.ErrUnhealthy:
return true
default:
return false
}
}
23 changes: 21 additions & 2 deletions pkg/errorutil/ignore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
"github.com/go-sql-driver/mysql"
"github.com/pingcap/tidb/infoschema"
tmysql "github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
)

func newMysqlErr(number uint16, message string) *mysql.MySQLError {
Expand All @@ -42,6 +43,24 @@ func TestIgnoreMysqlDDLError(t *testing.T) {
}

for _, item := range cases {
assert.Equal(t, item.ret, IsIgnorableMySQLDDLError(item.err))
require.Equal(t, item.ret, IsIgnorableMySQLDDLError(item.err))
}
}

func TestIsRetryableEtcdError(t *testing.T) {
cases := []struct {
err error
ret bool
}{
{nil, false},
{v3rpc.ErrCorrupt, false},

{v3rpc.ErrGRPCTimeoutDueToConnectionLost, true},
{v3rpc.ErrTimeoutDueToLeaderFail, true},
{v3rpc.ErrNoSpace, true},
}

for _, item := range cases {
require.Equal(t, item.ret, IsRetryableEtcdError(item.err))
}
}
50 changes: 37 additions & 13 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/errorutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
)
Expand All @@ -51,6 +52,15 @@ const (
etcdRequestProgressDuration = 1 * time.Second
// etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future
etcdWatchChBufferSize = 16
// etcdTxnTimeoutDuration represents the timeout duration for committing a
// transaction to Etcd
etcdTxnTimeoutDuration = 30 * time.Second
)

var (
TxnEmptyCmps = []clientv3.Cmp{}
TxnEmptyOpsThen = []clientv3.Op{}
TxnEmptyOpsElse = []clientv3.Op{}
)
amyangfei marked this conversation as resolved.
Show resolved Hide resolved

// set to var instead of const for mocking the value to speedup test
Expand Down Expand Up @@ -121,12 +131,17 @@ func (c *Client) Delete(ctx context.Context, key string, opts ...clientv3.OpOpti
return c.cli.Delete(ctx, key, opts...)
}

// Txn delegates request to clientv3.KV.Txn
func (c *Client) Txn(ctx context.Context) clientv3.Txn {
if metric, ok := c.metrics[EtcdTxn]; ok {
metric.Inc()
}
return c.cli.Txn(ctx)
// Txn delegates request to clientv3.KV.Txn. The error returned can only be a non-retryable error,
// such as context.Canceled, context.DeadlineExceeded, errors.ErrReachMaxTry.
func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (resp *clientv3.TxnResponse, err error) {
txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration)
defer cancel()
err = retryRPC(EtcdTxn, c.metrics[EtcdTxn], func() error {
var inErr error
resp, inErr = c.cli.Txn(txnCtx).If(cmps...).Then(opsThen...).Else(opsElse...).Commit()
return inErr
})
return
}

// Grant delegates request to clientv3.Lease.Grant
Expand All @@ -144,11 +159,17 @@ func isRetryableError(rpcName string) retry.IsRetryable {
if !cerrors.IsRetryableError(err) {
return false
}
if rpcName == EtcdRevoke {
if etcdErr, ok := err.(rpctypes.EtcdError); ok && etcdErr.Code() == codes.NotFound {
// it means the etcd lease is already expired or revoked

Copy link
Contributor Author

@CharlesCheung96 CharlesCheung96 Jan 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can extract a function, such as EtcdRetryableError() bool in the future, similar to the work in #3242

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, PTAL

switch rpcName {
case EtcdRevoke:
if etcdErr, ok := err.(v3rpc.EtcdError); ok && etcdErr.Code() == codes.NotFound {
// It means the etcd lease is already expired or revoked
return false
}
case EtcdTxn:
return errorutil.IsRetryableEtcdError(err)
default:
// For other types of operation, we retry directly without handling errors
}

return true
Expand Down Expand Up @@ -193,7 +214,10 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
lastRevision := getRevisionFromWatchOpts(opts...)

watchCtx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
// Using closures to handle changes to the cancel function
cancel()
}()
watchCh := c.cli.Watch(watchCtx, key, opts...)

ticker := c.clock.Ticker(etcdRequestProgressDuration)
Expand All @@ -203,7 +227,6 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
for {
select {
case <-ctx.Done():
cancel()
return
case response := <-watchCh:
lastReceivedResponseTime = c.clock.Now()
Expand All @@ -217,7 +240,6 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
for {
select {
case <-ctx.Done():
cancel()
return
case outCh <- response: // it may block here
break Loop
Expand All @@ -243,6 +265,8 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
zap.String("role", role))
cancel()
watchCtx, cancel = context.WithCancel(ctx)
// to avoid possible context leak warning from govet
_ = cancel
watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision))
// we need to reset lastReceivedResponseTime after reset Watch
lastReceivedResponseTime = c.clock.Now()
Expand Down
72 changes: 72 additions & 0 deletions pkg/etcd/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/errors"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
)

type mockClient struct {
Expand All @@ -43,6 +44,10 @@ func (m *mockClient) Put(ctx context.Context, key, val string, opts ...clientv3.
return nil, errors.New("mock error")
}

func (m *mockClient) Txn(ctx context.Context) clientv3.Txn {
return &mockTxn{ctx: ctx}
}

type mockWatcher struct {
clientv3.Watcher
watchCh chan clientv3.WatchResponse
Expand Down Expand Up @@ -82,6 +87,32 @@ func TestRetry(t *testing.T) {
_, err = retrycli.Put(context.TODO(), "", "")
require.NotNil(t, err)
require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Test Txn case
// case 0: normal
rsp, err := retrycli.Txn(ctx, nil, nil, nil)
require.Nil(t, err)
require.False(t, rsp.Succeeded)

// case 1: errors.ErrReachMaxTry
_, err = retrycli.Txn(ctx, TxnEmptyCmps, nil, nil)
require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err)

// case 2: errors.ErrReachMaxTry
_, err = retrycli.Txn(ctx, nil, TxnEmptyOpsThen, nil)
require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err)

// case 3: context.DeadlineExceeded
_, err = retrycli.Txn(ctx, TxnEmptyCmps, TxnEmptyOpsThen, nil)
require.Equal(t, context.DeadlineExceeded, err)

// other case: mock error
_, err = retrycli.Txn(ctx, TxnEmptyCmps, TxnEmptyOpsThen, TxnEmptyOpsElse)
require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error())

maxTries = originValue
}

Expand Down Expand Up @@ -276,3 +307,44 @@ func TestRevisionNotFallBack(t *testing.T) {
// while WatchCh was reset
require.Equal(t, *watcher.rev, revision)
}

type mockTxn struct {
ctx context.Context
mode int
}

func (txn *mockTxn) If(cs ...clientv3.Cmp) clientv3.Txn {
if cs != nil {
txn.mode += 1
}
return txn
}

func (txn *mockTxn) Then(ops ...clientv3.Op) clientv3.Txn {
if ops != nil {
txn.mode += 1 << 1
}
return txn
}

func (txn *mockTxn) Else(ops ...clientv3.Op) clientv3.Txn {
if ops != nil {
txn.mode += 1 << 2
}
return txn
}

func (txn *mockTxn) Commit() (*clientv3.TxnResponse, error) {
switch txn.mode {
case 0:
return &clientv3.TxnResponse{}, nil
case 1:
return nil, rpctypes.ErrNoSpace
case 2:
return nil, rpctypes.ErrTimeoutDueToLeaderFail
case 3:
return nil, context.DeadlineExceeded
default:
return nil, errors.New("mock error")
}
}
17 changes: 12 additions & 5 deletions pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,15 @@ func (c CDCEtcdClient) CreateChangefeedInfo(ctx context.Context, info *model.Cha
if err != nil {
return errors.Trace(err)
}
resp, err := c.Client.Txn(ctx).If(

cmps := []clientv3.Cmp{
clientv3.Compare(clientv3.ModRevision(infoKey), "=", 0),
clientv3.Compare(clientv3.ModRevision(jobKey), "=", 0),
).Then(
}
opsThen := []clientv3.Op{
clientv3.OpPut(infoKey, value),
).Commit()
}
resp, err := c.Client.Txn(ctx, cmps, opsThen, TxnEmptyOpsElse)
if err != nil {
return cerror.WrapError(cerror.ErrPDEtcdAPIError, err)
}
Expand Down Expand Up @@ -504,10 +507,14 @@ func (c CDCEtcdClient) PutTaskPositionOnChange(
}

key := GetEtcdKeyTaskPosition(changefeedID, captureID)
resp, err := c.Client.Txn(ctx).If(
cmps := []clientv3.Cmp{
clientv3.Compare(clientv3.ModRevision(key), ">", 0),
clientv3.Compare(clientv3.Value(key), "=", data),
).Else(clientv3.OpPut(key, data)).Commit()
}
opsElse := []clientv3.Op{
clientv3.OpPut(key, data),
}
resp, err := c.Client.Txn(ctx, cmps, TxnEmptyOpsThen, opsElse)
if err != nil {
return false, cerror.WrapError(cerror.ErrPDEtcdAPIError, err)
}
Expand Down
Loading