Skip to content

Commit

Permalink
etcd/client(ticdc): add retry operation for etcd transaction api (pin…
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Feb 24, 2022
1 parent 19b9177 commit be88e9d
Show file tree
Hide file tree
Showing 9 changed files with 304 additions and 26 deletions.
10 changes: 5 additions & 5 deletions cdc/kv/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (c CDCEtcdClient) CreateChangefeedInfo(ctx context.Context, info *model.Cha
if err != nil {
return errors.Trace(err)
}
resp, err := c.Client.Txn(ctx).If(
resp, err := c.Client.TxnWithoutRetry(ctx).If(
clientv3.Compare(clientv3.ModRevision(infoKey), "=", 0),
clientv3.Compare(clientv3.ModRevision(jobKey), "=", 0),
).Then(
Expand Down Expand Up @@ -674,7 +674,7 @@ func (c CDCEtcdClient) AtomicPutTaskStatus(
return errors.Trace(err)
}

resp, err := c.Client.Txn(ctx).If(writeCmp).Then(
resp, err := c.Client.TxnWithoutRetry(ctx).If(writeCmp).Then(
clientv3.OpPut(key, value),
).Commit()
if err != nil {
Expand Down Expand Up @@ -731,7 +731,7 @@ func (c CDCEtcdClient) PutTaskPositionOnChange(
}

key := GetEtcdKeyTaskPosition(changefeedID, captureID)
resp, err := c.Client.Txn(ctx).If(
resp, err := c.Client.TxnWithoutRetry(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), ">", 0),
clientv3.Compare(clientv3.Value(key), "=", data),
).Else(clientv3.OpPut(key, data)).Commit()
Expand Down Expand Up @@ -799,7 +799,7 @@ func (c CDCEtcdClient) SetChangeFeedStatusTTL(
// PutAllChangeFeedStatus puts ChangeFeedStatus of each changefeed into etcd
func (c CDCEtcdClient) PutAllChangeFeedStatus(ctx context.Context, infos map[model.ChangeFeedID]*model.ChangeFeedStatus) error {
var (
txn = c.Client.Txn(ctx)
txn = c.Client.TxnWithoutRetry(ctx)
ops = make([]clientv3.Op, 0, embed.DefaultMaxTxnOps)
)
for changefeedID, info := range infos {
Expand All @@ -814,7 +814,7 @@ func (c CDCEtcdClient) PutAllChangeFeedStatus(ctx context.Context, infos map[mod
if err != nil {
return cerror.WrapError(cerror.ErrPDEtcdAPIError, err)
}
txn = c.Client.Txn(ctx)
txn = c.Client.TxnWithoutRetry(ctx)
ops = ops[:0]
}
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
github.com/pingcap/parser v0.0.0-20210831085004-b5390aa83f65
github.com/pingcap/tidb v1.1.0-beta.0.20210907130457-cd8fb24c5f7e
github.com/pingcap/tidb-tools v5.0.3+incompatible
github.com/pingcap/tidb/parser v0.0.0-20220224040743-5af053e9d314
github.com/prometheus/client_golang v1.5.1
github.com/r3labs/diff v1.1.0
github.com/spf13/cobra v1.0.0
Expand Down
18 changes: 17 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,8 @@ github.com/pingcap/tidb-dashboard v0.0.0-20210312062513-eef5d6404638/go.mod h1:O
github.com/pingcap/tidb-dashboard v0.0.0-20210716172320-2226872e3296/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ=
github.com/pingcap/tidb-tools v5.0.3+incompatible h1:vYMrW9ux+3HRMeRZ1fUOjy2nyiodtuVyAyK270EKBEs=
github.com/pingcap/tidb-tools v5.0.3+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb/parser v0.0.0-20220224040743-5af053e9d314 h1:acXc/9Fs1/L78dKmMlENohrBXTLq6pXB7FYJ4L/gnAo=
github.com/pingcap/tidb/parser v0.0.0-20220224040743-5af053e9d314/go.mod h1:ElJiub4lRy6UZDb+0JHDkGEdr6aOli+ykhyej7VCLoI=
github.com/pingcap/tipb v0.0.0-20210708040514-0f154bb0dc0f h1:q6WgGOeY+hbkvtKLyi6nAew7Ptl5vXyeI61VJuJdXnQ=
github.com/pingcap/tipb v0.0.0-20210708040514-0f154bb0dc0f/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -939,6 +941,7 @@ golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNm
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -1295,8 +1298,21 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9
honnef.co/go/tools v0.2.0 h1:ws8AfbgTX3oIczLPNPCu5166oBg9ST2vNs0rcht+mDE=
honnef.co/go/tools v0.2.0/go.mod h1:lPVVZ2BS5TfnjLyizF7o7hv7j9/L+8cZY2hLyjP9cGY=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
modernc.org/mathutil v1.2.2 h1:+yFk8hBprV+4c0U9GjFtL+dV3N8hOJ8JCituQcMShFY=
modernc.org/fileutil v1.0.0/go.mod h1:JHsWpkrk/CnVV1H/eGlFf85BEpfkrp56ro8nojIq9Q8=
modernc.org/golex v1.0.1/go.mod h1:QCA53QtsT1NdGkaZZkF5ezFwk4IXh4BGNafAARTC254=
modernc.org/lex v1.0.0/go.mod h1:G6rxMTy3cH2iA0iXL/HRRv4Znu8MK4higxph/lE7ypk=
modernc.org/lexer v1.0.0/go.mod h1:F/Dld0YKYdZCLQ7bD0USbWL4YKCyTDRDHiDTOs0q0vk=
modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k=
modernc.org/mathutil v1.2.2/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/mathutil v1.4.1 h1:ij3fYGe8zBF4Vu+g0oT7mB06r8sqGWKuJu1yXeR4by8=
modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/parser v1.0.0/go.mod h1:H20AntYJ2cHHL6MHthJ8LZzXCdDCHMWt1KZXtIMjejA=
modernc.org/parser v1.0.2/go.mod h1:TXNq3HABP3HMaqLK7brD1fLA/LfN0KS6JxZn71QdDqs=
modernc.org/scanner v1.0.1/go.mod h1:OIzD2ZtjYk6yTuyqZr57FmifbM9fIH74SumloSsajuE=
modernc.org/sortutil v1.0.0/go.mod h1:1QO0q8IlIlmjBIwm6t/7sof874+xCfZouyqZMLIAtxM=
modernc.org/strutil v1.0.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs=
modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs=
modernc.org/y v1.0.1/go.mod h1:Ho86I+LVHEI+LYXoUKlmOMAM1JTXOCfj8qi1T8PsClE=
moul.io/zapgorm2 v1.1.0/go.mod h1:emRfKjNqSzVj5lcgasBdovIXY1jSOwFz2GQZn1Rddks=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
Expand Down
71 changes: 71 additions & 0 deletions pkg/errorutil/ignore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package errorutil

import (
dmysql "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
tddl "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/mysql"
v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
)

// IsIgnorableMySQLDDLError is used to check what error can be ignored
// we can get error code from:
// infoschema's error definition: https://github.com/pingcap/tidb/blob/master/infoschema/infoschema.go
// DDL's error definition: https://github.com/pingcap/tidb/blob/master/ddl/ddl.go
// tidb/mysql error code definition: https://github.com/pingcap/tidb/blob/master/mysql/errcode.go
func IsIgnorableMySQLDDLError(err error) bool {
err = errors.Cause(err)
mysqlErr, ok := err.(*dmysql.MySQLError)
if !ok {
return false
}

errCode := errors.ErrCode(mysqlErr.Number)
switch errCode {
case infoschema.ErrDatabaseExists.Code(), infoschema.ErrDatabaseDropExists.Code(),
infoschema.ErrTableExists.Code(), infoschema.ErrTableDropExists.Code(),
infoschema.ErrColumnExists.Code(), infoschema.ErrIndexExists.Code(),
infoschema.ErrKeyNotExists.Code(), tddl.ErrCantDropFieldOrKey.Code(),
mysql.ErrDupKeyName, mysql.ErrSameNamePartition,
mysql.ErrDropPartitionNonExistent, mysql.ErrMultiplePriKey:
return true
default:
return false
}
}

// IsRetryableEtcdError is used to check what error can be ignored for an etcd Txn
func IsRetryableEtcdError(err error) bool {
etcdErr := errors.Cause(err)

switch etcdErr {
// Etcd ResourceExhausted errors, may recover after some time
case v3rpc.ErrNoSpace, v3rpc.ErrTooManyRequests:
return true
// Etcd Unavailable errors, may be available after some time
// https://github.com/etcd-io/etcd/pull/9934/files#diff-6d8785d0c9eaf96bc3e2b29c36493c04R162-R167
// ErrStopped:
// one of the etcd nodes stopped from failure injection
// ErrNotCapable:
// capability check has not been done (in the beginning)
case v3rpc.ErrNoLeader, v3rpc.ErrLeaderChanged, v3rpc.ErrNotCapable, v3rpc.ErrStopped, v3rpc.ErrTimeout,
v3rpc.ErrTimeoutDueToLeaderFail, v3rpc.ErrGRPCTimeoutDueToConnectionLost, v3rpc.ErrUnhealthy:
return true
default:
return false
}
}
66 changes: 66 additions & 0 deletions pkg/errorutil/ignore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package errorutil

import (
"errors"
"testing"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/tidb/infoschema"
tmysql "github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/require"
v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
)

func newMysqlErr(number uint16, message string) *mysql.MySQLError {
return &mysql.MySQLError{
Number: number,
Message: message,
}
}

func TestIgnoreMysqlDDLError(t *testing.T) {
cases := []struct {
err error
ret bool
}{
{errors.New("raw error"), false},
{newMysqlErr(tmysql.ErrDupKeyName, "Error: Duplicate key name 'some_key'"), true},
{newMysqlErr(uint16(infoschema.ErrDatabaseExists.Code()), "Can't create database"), true},
{newMysqlErr(uint16(infoschema.ErrAccessDenied.Code()), "Access denied for user"), false},
}

for _, item := range cases {
require.Equal(t, item.ret, IsIgnorableMySQLDDLError(item.err))
}
}

func TestIsRetryableEtcdError(t *testing.T) {
cases := []struct {
err error
ret bool
}{
{nil, false},
{v3rpc.ErrCorrupt, false},

{v3rpc.ErrGRPCTimeoutDueToConnectionLost, true},
{v3rpc.ErrTimeoutDueToLeaderFail, true},
{v3rpc.ErrNoSpace, true},
}

for _, item := range cases {
require.Equal(t, item.ret, IsRetryableEtcdError(item.err))
}
}
51 changes: 42 additions & 9 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/errorutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
)
Expand All @@ -51,6 +52,18 @@ const (
etcdRequestProgressDuration = 1 * time.Second
// etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future
etcdWatchChBufferSize = 16
// etcdTxnTimeoutDuration represents the timeout duration for committing a
// transaction to Etcd
etcdTxnTimeoutDuration = 30 * time.Second
)

var (
// TxnEmptyCmps represents empty compare-opration of an etcd Txn
TxnEmptyCmps = []clientv3.Cmp{}
// TxnEmptyOpsThen represents empty then-opration of an etcd Txn
TxnEmptyOpsThen = []clientv3.Op{}
// TxnEmptyOpsElse represents empty else-opration of an etcd Txn
TxnEmptyOpsElse = []clientv3.Op{}
)

// set to var instead of const for mocking the value to speedup test
Expand Down Expand Up @@ -121,14 +134,27 @@ func (c *Client) Delete(ctx context.Context, key string, opts ...clientv3.OpOpti
return c.cli.Delete(ctx, key, opts...)
}

// Txn delegates request to clientv3.KV.Txn
func (c *Client) Txn(ctx context.Context) clientv3.Txn {
// TxnWithoutRetry delegates request to clientv3.KV.Txn
func (c *Client) TxnWithoutRetry(ctx context.Context) clientv3.Txn {
if metric, ok := c.metrics[EtcdTxn]; ok {
metric.Inc()
}
return c.cli.Txn(ctx)
}

// Txn delegates request to clientv3.KV.Txn. The error returned can only be a non-retryable error,
// such as context.Canceled, context.DeadlineExceeded, errors.ErrReachMaxTry.
func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (resp *clientv3.TxnResponse, err error) {
txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration)
defer cancel()
err = retryRPC(EtcdTxn, c.metrics[EtcdTxn], func() error {
var inErr error
resp, inErr = c.cli.Txn(txnCtx).If(cmps...).Then(opsThen...).Else(opsElse...).Commit()
return inErr
})
return
}

// Grant delegates request to clientv3.Lease.Grant
func (c *Client) Grant(ctx context.Context, ttl int64) (resp *clientv3.LeaseGrantResponse, err error) {
err = retryRPC(EtcdGrant, c.metrics[EtcdGrant], func() error {
Expand All @@ -144,11 +170,17 @@ func isRetryableError(rpcName string) retry.IsRetryable {
if !cerrors.IsRetryableError(err) {
return false
}
if rpcName == EtcdRevoke {
if etcdErr, ok := err.(rpctypes.EtcdError); ok && etcdErr.Code() == codes.NotFound {
// it means the etcd lease is already expired or revoked

switch rpcName {
case EtcdRevoke:
if etcdErr, ok := err.(v3rpc.EtcdError); ok && etcdErr.Code() == codes.NotFound {
// It means the etcd lease is already expired or revoked
return false
}
case EtcdTxn:
return errorutil.IsRetryableEtcdError(err)
default:
// For other types of operation, we retry directly without handling errors
}

return true
Expand Down Expand Up @@ -190,7 +222,10 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
}()
var lastRevision int64
watchCtx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
// Using closures to handle changes to the cancel function
cancel()
}()
watchCh := c.cli.Watch(watchCtx, key, opts...)

ticker := c.clock.Ticker(etcdRequestProgressDuration)
Expand All @@ -200,7 +235,6 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
for {
select {
case <-ctx.Done():
cancel()
return
case response := <-watchCh:
lastReceivedResponseTime = c.clock.Now()
Expand All @@ -214,7 +248,6 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
for {
select {
case <-ctx.Done():
cancel()
return
case outCh <- response: // it may block here
break Loop
Expand Down
Loading

0 comments on commit be88e9d

Please sign in to comment.