Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#4474
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Feb 8, 2022
1 parent 8b83d27 commit 81b0541
Show file tree
Hide file tree
Showing 8 changed files with 983 additions and 27 deletions.
150 changes: 150 additions & 0 deletions dm/pkg/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// learn from https://github.com/pingcap/pd/blob/v3.0.5/pkg/etcdutil/etcdutil.go.

package etcdutil

import (
"context"
"crypto/tls"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"go.etcd.io/etcd/clientv3"
v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.uber.org/zap"

tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/retry"
"github.com/pingcap/tiflow/pkg/errorutil"
)

const (
// DefaultDialTimeout is the maximum amount of time a dial will wait for a
// connection to setup. 30s is long enough for most of the network conditions.
DefaultDialTimeout = 30 * time.Second

// DefaultRequestTimeout 10s is long enough for most of etcd clusters.
DefaultRequestTimeout = 10 * time.Second

// DefaultRevokeLeaseTimeout is the maximum amount of time waiting for revoke etcd lease.
DefaultRevokeLeaseTimeout = 3 * time.Second
)

var etcdDefaultTxnRetryParam = retry.Params{
RetryCount: 5,
FirstRetryDuration: time.Second,
BackoffStrategy: retry.Stable,
IsRetryableFn: func(retryTime int, err error) bool {
return errorutil.IsRetryableEtcdError(err)
},
}

var etcdDefaultTxnStrategy = retry.FiniteRetryStrategy{}

// CreateClient creates an etcd client with some default config items.
func CreateClient(endpoints []string, tlsCfg *tls.Config) (*clientv3.Client, error) {
return clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: DefaultDialTimeout,
TLS: tlsCfg,
})
}

// ListMembers returns a list of internal etcd members.
func ListMembers(client *clientv3.Client) (*clientv3.MemberListResponse, error) {
ctx, cancel := context.WithTimeout(client.Ctx(), DefaultRequestTimeout)
defer cancel()
return client.MemberList(ctx)
}

// AddMember adds an etcd member.
func AddMember(client *clientv3.Client, peerAddrs []string) (*clientv3.MemberAddResponse, error) {
ctx, cancel := context.WithTimeout(client.Ctx(), DefaultRequestTimeout)
defer cancel()
return client.MemberAdd(ctx, peerAddrs)
}

// RemoveMember removes an etcd member by the given id.
func RemoveMember(client *clientv3.Client, id uint64) (*clientv3.MemberRemoveResponse, error) {
ctx, cancel := context.WithTimeout(client.Ctx(), DefaultRequestTimeout)
defer cancel()
return client.MemberRemove(ctx, id)
}

// DoOpsInOneTxnWithRetry do multiple etcd operations in one txn.
// TODO: add unit test to test encountered an retryable error first but then recovered.
func DoOpsInOneTxnWithRetry(cli *clientv3.Client, ops ...clientv3.Op) (*clientv3.TxnResponse, int64, error) {
ctx, cancel := context.WithTimeout(cli.Ctx(), DefaultRequestTimeout)
defer cancel()
tctx := tcontext.NewContext(ctx, log.L())
ret, _, err := etcdDefaultTxnStrategy.Apply(tctx, etcdDefaultTxnRetryParam, func(t *tcontext.Context) (ret interface{}, err error) {
resp, err := cli.Txn(ctx).Then(ops...).Commit()
if err != nil {
return nil, errors.Trace(err)
}
return resp, nil
})
if err != nil {
return nil, 0, err
}
resp := ret.(*clientv3.TxnResponse)
return resp, resp.Header.Revision, nil
}

// DoOpsInOneCmpsTxnWithRetry do multiple etcd operations in one txn and with comparisons.
func DoOpsInOneCmpsTxnWithRetry(cli *clientv3.Client, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (*clientv3.TxnResponse, int64, error) {
ctx, cancel := context.WithTimeout(cli.Ctx(), DefaultRequestTimeout)
defer cancel()
tctx := tcontext.NewContext(ctx, log.L())

ret, _, err := etcdDefaultTxnStrategy.Apply(tctx, etcdDefaultTxnRetryParam, func(t *tcontext.Context) (ret interface{}, err error) {
failpoint.Inject("ErrNoSpace", func() {
tctx.L().Info("fail to do ops in etcd", zap.String("failpoint", "ErrNoSpace"))
failpoint.Return(nil, v3rpc.ErrNoSpace)
})
resp, err := cli.Txn(ctx).If(cmps...).Then(opsThen...).Else(opsElse...).Commit()
if err != nil {
return nil, err
}
return resp, nil
})
if err != nil {
return nil, 0, err
}
resp := ret.(*clientv3.TxnResponse)
return resp, resp.Header.Revision, nil
}

// IsRetryableError check whether error is retryable error for etcd to build again.
func IsRetryableError(err error) bool {
switch errors.Cause(err) {
case v3rpc.ErrCompacted, v3rpc.ErrNoLeader, v3rpc.ErrNoSpace, context.DeadlineExceeded:
return true
default:
return false
}
}

// IsLimitedRetryableError check whether error is retryable error for etcd to build again in a limited number of times.
func IsLimitedRetryableError(err error) bool {
switch errors.Cause(err) {
case v3rpc.ErrNoSpace, context.DeadlineExceeded:
return true
default:
return false
}
}
70 changes: 70 additions & 0 deletions pkg/errorutil/ignore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package errorutil

import (
dmysql "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
tddl "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/mysql"
v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
)

// IsIgnorableMySQLDDLError is used to check what error can be ignored
// we can get error code from:
// infoschema's error definition: https://github.com/pingcap/tidb/blob/master/infoschema/infoschema.go
// DDL's error definition: https://github.com/pingcap/tidb/blob/master/ddl/ddl.go
// tidb/mysql error code definition: https://github.com/pingcap/tidb/blob/master/mysql/errcode.go
func IsIgnorableMySQLDDLError(err error) bool {
err = errors.Cause(err)
mysqlErr, ok := err.(*dmysql.MySQLError)
if !ok {
return false
}

errCode := errors.ErrCode(mysqlErr.Number)
switch errCode {
case infoschema.ErrDatabaseExists.Code(), infoschema.ErrDatabaseDropExists.Code(),
infoschema.ErrTableExists.Code(), infoschema.ErrTableDropExists.Code(),
infoschema.ErrColumnExists.Code(), infoschema.ErrIndexExists.Code(),
infoschema.ErrKeyNotExists.Code(), tddl.ErrCantDropFieldOrKey.Code(),
mysql.ErrDupKeyName, mysql.ErrSameNamePartition,
mysql.ErrDropPartitionNonExistent, mysql.ErrMultiplePriKey:
return true
default:
return false
}
}

func IsRetryableEtcdError(err error) bool {
etcdErr := errors.Cause(err)

switch etcdErr {
// Etcd ResourceExhausted errors, may recover after some time
case v3rpc.ErrNoSpace, v3rpc.ErrTooManyRequests:
return true
// Etcd Unavailable errors, may be available after some time
// https://github.com/etcd-io/etcd/pull/9934/files#diff-6d8785d0c9eaf96bc3e2b29c36493c04R162-R167
// ErrStopped:
// one of the etcd nodes stopped from failure injection
// ErrNotCapable:
// capability check has not been done (in the beginning)
case v3rpc.ErrNoLeader, v3rpc.ErrLeaderChanged, v3rpc.ErrNotCapable, v3rpc.ErrStopped, v3rpc.ErrTimeout,
v3rpc.ErrTimeoutDueToLeaderFail, v3rpc.ErrGRPCTimeoutDueToConnectionLost, v3rpc.ErrUnhealthy:
return true
default:
return false
}
}
66 changes: 66 additions & 0 deletions pkg/errorutil/ignore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package errorutil

import (
"errors"
"testing"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/tidb/infoschema"
tmysql "github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/require"
v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
)

func newMysqlErr(number uint16, message string) *mysql.MySQLError {
return &mysql.MySQLError{
Number: number,
Message: message,
}
}

func TestIgnoreMysqlDDLError(t *testing.T) {
cases := []struct {
err error
ret bool
}{
{errors.New("raw error"), false},
{newMysqlErr(tmysql.ErrDupKeyName, "Error: Duplicate key name 'some_key'"), true},
{newMysqlErr(uint16(infoschema.ErrDatabaseExists.Code()), "Can't create database"), true},
{newMysqlErr(uint16(infoschema.ErrAccessDenied.Code()), "Access denied for user"), false},
}

for _, item := range cases {
require.Equal(t, item.ret, IsIgnorableMySQLDDLError(item.err))
}
}

func TestIsRetryableEtcdError(t *testing.T) {
cases := []struct {
err error
ret bool
}{
{nil, false},
{v3rpc.ErrCorrupt, false},

{v3rpc.ErrGRPCTimeoutDueToConnectionLost, true},
{v3rpc.ErrTimeoutDueToLeaderFail, true},
{v3rpc.ErrNoSpace, true},
}

for _, item := range cases {
require.Equal(t, item.ret, IsRetryableEtcdError(item.err))
}
}
Loading

0 comments on commit 81b0541

Please sign in to comment.