From cc5c8d578d85f6c706964d81b777e3bc106203b8 Mon Sep 17 00:00:00 2001 From: fengou1 Date: Mon, 27 Sep 2021 19:33:55 +0800 Subject: [PATCH 1/7] fix issue 27015 --- br/pkg/conn/conn.go | 7 ++++ br/pkg/conn/conn_test.go | 85 +++++++++++++++++++++++++++++++++++++++- br/pkg/utils/retry.go | 6 +++ go.mod | 1 + 4 files changed, 97 insertions(+), 2 deletions(-) diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index ac37dc5f582c5..67db05219e510 100755 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -184,6 +184,13 @@ func GetAllTiKVStoresWithRetry(ctx context.Context, } }) + failpoint.Inject("hint-GetAllTiKVStores-cancel", func(val failpoint.Value) { + if val.(bool) { + logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-cancel injected.") + err = status.Error(codes.Canceled, "Cancel Retry") + } + }) + return errors.Trace(err) }, utils.NewPDReqBackoffer(), diff --git a/br/pkg/conn/conn_test.go b/br/pkg/conn/conn_test.go index 22e434d3e8d6f..89f0d80900ce2 100644 --- a/br/pkg/conn/conn_test.go +++ b/br/pkg/conn/conn_test.go @@ -6,10 +6,14 @@ import ( "context" "testing" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/br/pkg/pdutil" + "github.com/pkg/errors" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type fakePDClient struct { @@ -21,9 +25,86 @@ func (c fakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*me return append([]*metapb.Store{}, c.stores...), nil } -func TestCheckStoresAlive(t *testing.T) { - t.Parallel() +func TestGetAllTiKVStoresWithRetryCancel(t *testing.T) { + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-cancel", "return(true)") + defer func() { + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-cancel") + }() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tiflash", + }, + }, + }, + { + Id: 2, + State: metapb.StoreState_Offline, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tiflash", + }, + }, + }, + } + + fpdc := fakePDClient{ + stores: stores, + } + + kvStores, err := GetAllTiKVStoresWithRetry(ctx, fpdc, SkipTiFlash) + require.Len(t, kvStores, 0) + require.Equal(t, codes.Canceled, status.Code(errors.Cause(err))) +} +func TestGetAllTiKVStoresWithUnknown(t *testing.T) { + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-error", "return(true)") + defer func() { + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-error") + }() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tiflash", + }, + }, + }, + { + Id: 2, + State: metapb.StoreState_Offline, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tiflash", + }, + }, + }, + } + + fpdc := fakePDClient{ + stores: stores, + } + + kvStores, err := GetAllTiKVStoresWithRetry(ctx, fpdc, SkipTiFlash) + require.Len(t, kvStores, 0) + require.Equal(t, codes.Unknown, status.Code(errors.Cause(err))) +} +func TestCheckStoresAlive(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index 38e19fbc87e11..2ce397edbd8fd 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -7,7 +7,10 @@ import ( "strings" "time" + serrors "github.com/pingcap/errors" "go.uber.org/multierr" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) var retryableServerError = []string{ @@ -47,6 +50,9 @@ func WithRetry( err := retryableFunc() if err != nil { allErrors = multierr.Append(allErrors, err) + if status.Code(serrors.Cause(err)) == codes.Canceled { // current context cancelled, stop retry + return allErrors + } select { case <-ctx.Done(): return allErrors // nolint:wrapcheck diff --git a/go.mod b/go.mod index 707903cfffd65..ac37b562af69e 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,7 @@ require ( github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5 github.com/pingcap/tidb-tools v5.0.3+incompatible github.com/pingcap/tipb v0.0.0-20210802080519-94b831c6db55 + github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.9.1 From bd531c8e7cc7ec09e0d2e5ea42806722720a876a Mon Sep 17 00:00:00 2001 From: fengou1 Date: Tue, 28 Sep 2021 09:49:39 +0800 Subject: [PATCH 2/7] fix comments - add general function isRetryableError --- br/pkg/conn/conn_test.go | 8 +++--- br/pkg/utils/retry.go | 55 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 57 insertions(+), 6 deletions(-) diff --git a/br/pkg/conn/conn_test.go b/br/pkg/conn/conn_test.go index 89f0d80900ce2..ceb3b33077549 100644 --- a/br/pkg/conn/conn_test.go +++ b/br/pkg/conn/conn_test.go @@ -60,8 +60,8 @@ func TestGetAllTiKVStoresWithRetryCancel(t *testing.T) { stores: stores, } - kvStores, err := GetAllTiKVStoresWithRetry(ctx, fpdc, SkipTiFlash) - require.Len(t, kvStores, 0) + _, err := GetAllTiKVStoresWithRetry(ctx, fpdc, SkipTiFlash) + require.Error(t, err) require.Equal(t, codes.Canceled, status.Code(errors.Cause(err))) } @@ -100,8 +100,8 @@ func TestGetAllTiKVStoresWithUnknown(t *testing.T) { stores: stores, } - kvStores, err := GetAllTiKVStoresWithRetry(ctx, fpdc, SkipTiFlash) - require.Len(t, kvStores, 0) + _, err := GetAllTiKVStoresWithRetry(ctx, fpdc, SkipTiFlash) + require.Error(t, err) require.Equal(t, codes.Unknown, status.Code(errors.Cause(err))) } func TestCheckStoresAlive(t *testing.T) { diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index 2ce397edbd8fd..1368dd3b92035 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -4,10 +4,14 @@ package utils import ( "context" + "io" + "net" "strings" "time" - serrors "github.com/pingcap/errors" + "github.com/go-sql-driver/mysql" + "github.com/pingcap/errors" + terror "github.com/pingcap/tidb/errno" "go.uber.org/multierr" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -50,9 +54,11 @@ func WithRetry( err := retryableFunc() if err != nil { allErrors = multierr.Append(allErrors, err) - if status.Code(serrors.Cause(err)) == codes.Canceled { // current context cancelled, stop retry + retry := isRetryableError(err) + if !retry { // exited retry return allErrors } + select { case <-ctx.Done(): return allErrors // nolint:wrapcheck @@ -76,3 +82,48 @@ func MessageIsRetryableStorageError(msg string) bool { } return false } + +// IsRetryableError returns whether the error is transient (e.g. network +// connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This +// function returns `false` (irrecoverable) if `err == nil`. +// +// If the error is a multierr, returns true only if all suberrors are retryable. +func isRetryableError(err error) bool { + for _, singleError := range errors.Errors(err) { + if !isSingleRetryableError(singleError) { + return false + } + } + return true +} + +func isSingleRetryableError(err error) bool { + err = errors.Cause(err) + + switch err { + case nil, context.Canceled, context.DeadlineExceeded, io.EOF: + return false + } + + switch nerr := err.(type) { + case net.Error: + return nerr.Timeout() + case *mysql.MySQLError: + switch nerr.Number { + // ErrLockDeadlock can retry to commit while meet deadlock + case terror.ErrUnknown, terror.ErrLockDeadlock, terror.ErrWriteConflictInTiDB, terror.ErrPDServerTimeout, terror.ErrTiKVServerTimeout, terror.ErrTiKVServerBusy, terror.ErrResolveLockTimeout, terror.ErrRegionUnavailable: + return true + default: + return false + } + default: + switch status.Code(err) { + case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: + return true + case codes.Unknown: + return true + default: + return false + } + } +} From 0fbd620dce789a56ec910482bf73ab7c47854728 Mon Sep 17 00:00:00 2001 From: fengou1 Date: Tue, 28 Sep 2021 10:49:52 +0800 Subject: [PATCH 3/7] remote uncessary package errors --- br/pkg/conn/conn_test.go | 2 +- go.mod | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/br/pkg/conn/conn_test.go b/br/pkg/conn/conn_test.go index ceb3b33077549..2f77803fc3f78 100644 --- a/br/pkg/conn/conn_test.go +++ b/br/pkg/conn/conn_test.go @@ -6,10 +6,10 @@ import ( "context" "testing" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/br/pkg/pdutil" - "github.com/pkg/errors" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" "google.golang.org/grpc/codes" diff --git a/go.mod b/go.mod index ac37b562af69e..707903cfffd65 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,6 @@ require ( github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5 github.com/pingcap/tidb-tools v5.0.3+incompatible github.com/pingcap/tipb v0.0.0-20210802080519-94b831c6db55 - github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.9.1 From 6dcdbdbbb107353d3750d95d145ef3fb017837bc Mon Sep 17 00:00:00 2001 From: fengou1 Date: Wed, 29 Sep 2021 10:18:42 +0800 Subject: [PATCH 4/7] reused the retry code from lightning --- br/pkg/utils/retry.go | 55 ++----------------------------------------- 1 file changed, 2 insertions(+), 53 deletions(-) diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index 1368dd3b92035..6b2c787bf060d 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -4,17 +4,11 @@ package utils import ( "context" - "io" - "net" "strings" "time" - "github.com/go-sql-driver/mysql" - "github.com/pingcap/errors" - terror "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/br/pkg/lightning/common" "go.uber.org/multierr" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) var retryableServerError = []string{ @@ -54,7 +48,7 @@ func WithRetry( err := retryableFunc() if err != nil { allErrors = multierr.Append(allErrors, err) - retry := isRetryableError(err) + retry := common.IsRetryableError(err) if !retry { // exited retry return allErrors } @@ -82,48 +76,3 @@ func MessageIsRetryableStorageError(msg string) bool { } return false } - -// IsRetryableError returns whether the error is transient (e.g. network -// connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This -// function returns `false` (irrecoverable) if `err == nil`. -// -// If the error is a multierr, returns true only if all suberrors are retryable. -func isRetryableError(err error) bool { - for _, singleError := range errors.Errors(err) { - if !isSingleRetryableError(singleError) { - return false - } - } - return true -} - -func isSingleRetryableError(err error) bool { - err = errors.Cause(err) - - switch err { - case nil, context.Canceled, context.DeadlineExceeded, io.EOF: - return false - } - - switch nerr := err.(type) { - case net.Error: - return nerr.Timeout() - case *mysql.MySQLError: - switch nerr.Number { - // ErrLockDeadlock can retry to commit while meet deadlock - case terror.ErrUnknown, terror.ErrLockDeadlock, terror.ErrWriteConflictInTiDB, terror.ErrPDServerTimeout, terror.ErrTiKVServerTimeout, terror.ErrTiKVServerBusy, terror.ErrResolveLockTimeout, terror.ErrRegionUnavailable: - return true - default: - return false - } - default: - switch status.Code(err) { - case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: - return true - case codes.Unknown: - return true - default: - return false - } - } -} From e97b450976d26decfa4272f889e8b97bd385bb55 Mon Sep 17 00:00:00 2001 From: fengou1 Date: Thu, 7 Oct 2021 14:10:00 +0800 Subject: [PATCH 5/7] refactoring retryable --- br/pkg/lightning/backend/backend.go | 4 +- br/pkg/lightning/backend/importer/importer.go | 3 +- br/pkg/lightning/backend/tidb/tidb.go | 4 +- br/pkg/lightning/common/util.go | 65 +---------------- br/pkg/lightning/common/util_test.go | 49 ------------- br/pkg/lightning/restore/checksum.go | 2 +- br/pkg/utils/retry.go | 69 ++++++++++++++++++- br/pkg/utils/retry_test.go | 62 +++++++++++++++++ 8 files changed, 137 insertions(+), 121 deletions(-) create mode 100644 br/pkg/utils/retry_test.go diff --git a/br/pkg/lightning/backend/backend.go b/br/pkg/lightning/backend/backend.go index 839928966d3c2..aa656a6576653 100644 --- a/br/pkg/lightning/backend/backend.go +++ b/br/pkg/lightning/backend/backend.go @@ -26,10 +26,10 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" - "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/lightning/metric" "github.com/pingcap/tidb/br/pkg/lightning/mydump" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/table" "go.uber.org/zap" ) @@ -442,7 +442,7 @@ func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize int64) e for i := 0; i < importMaxRetryTimes; i++ { task := engine.logger.With(zap.Int("retryCnt", i)).Begin(zap.InfoLevel, "import") err = engine.backend.ImportEngine(ctx, engine.uuid, regionSplitSize) - if !common.IsRetryableError(err) { + if !utils.IsRetryableError(err) { task.End(zap.ErrorLevel, err) return err } diff --git a/br/pkg/lightning/backend/importer/importer.go b/br/pkg/lightning/backend/importer/importer.go index 828cb05c9e90e..3e06fa58034be 100644 --- a/br/pkg/lightning/backend/importer/importer.go +++ b/br/pkg/lightning/backend/importer/importer.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/lightning/tikv" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/table" "github.com/tikv/client-go/v2/oracle" @@ -249,7 +250,7 @@ outside: switch { case err == nil: continue outside - case common.IsRetryableError(err): + case utils.IsRetryableError(err): // retry next loop default: return err diff --git a/br/pkg/lightning/backend/tidb/tidb.go b/br/pkg/lightning/backend/tidb/tidb.go index 8c8e524933c3c..b592c1acd0dcf 100644 --- a/br/pkg/lightning/backend/tidb/tidb.go +++ b/br/pkg/lightning/backend/tidb/tidb.go @@ -416,7 +416,7 @@ rowLoop: switch { case err == nil: continue rowLoop - case common.IsRetryableError(err): + case utils.IsRetryableError(err): // retry next loop default: // WriteBatchRowsToDB failed in the batch mode and can not be retried, @@ -529,7 +529,7 @@ func (be *tidbBackend) execStmts(ctx context.Context, stmtTasks []stmtTask, tabl return errors.Trace(err) } // Retry the non-batch insert here if this is not the last retry. - if common.IsRetryableError(err) && i != writeRowsMaxRetryTimes-1 { + if utils.IsRetryableError(err) && i != writeRowsMaxRetryTimes-1 { continue } firstRow := stmtTask.rows[0] diff --git a/br/pkg/lightning/common/util.go b/br/pkg/lightning/common/util.go index 95b0934ac9843..8e056265b08cf 100644 --- a/br/pkg/lightning/common/util.go +++ b/br/pkg/lightning/common/util.go @@ -18,28 +18,20 @@ import ( "context" "database/sql" "encoding/json" - stderrors "errors" "fmt" "io" - "net" "net/http" "net/url" "os" - "reflect" - "regexp" "strings" "syscall" "time" - "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/utils" - tmysql "github.com/pingcap/tidb/errno" "go.uber.org/zap" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) const ( @@ -130,7 +122,7 @@ outside: // do not retry NotFound error case errors.IsNotFound(err): break outside - case IsRetryableError(err): + case utils.IsRetryableError(err): logger.Warn(purpose+" failed but going to try again", log.ShortError(err)) continue default: @@ -193,61 +185,6 @@ func (t SQLWithRetry) Exec(ctx context.Context, purpose string, query string, ar }) } -// sqlmock uses fmt.Errorf to produce expectation failures, which will cause -// unnecessary retry if not specially handled >:( -var stdFatalErrorsRegexp = regexp.MustCompile( - `^call to (?s:.*) was not expected|arguments do not match:|could not match actual sql|mock non-retryable error`, -) -var stdErrorType = reflect.TypeOf(stderrors.New("")) - -// IsRetryableError returns whether the error is transient (e.g. network -// connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This -// function returns `false` (irrecoverable) if `err == nil`. -// -// If the error is a multierr, returns true only if all suberrors are retryable. -func IsRetryableError(err error) bool { - for _, singleError := range errors.Errors(err) { - if !isSingleRetryableError(singleError) { - return false - } - } - return true -} - -func isSingleRetryableError(err error) bool { - err = errors.Cause(err) - - switch err { - case nil, context.Canceled, context.DeadlineExceeded, io.EOF, sql.ErrNoRows: - return false - } - - switch nerr := err.(type) { - case net.Error: - return nerr.Timeout() - case *mysql.MySQLError: - switch nerr.Number { - // ErrLockDeadlock can retry to commit while meet deadlock - case tmysql.ErrUnknown, tmysql.ErrLockDeadlock, tmysql.ErrWriteConflictInTiDB, tmysql.ErrPDServerTimeout, tmysql.ErrTiKVServerTimeout, tmysql.ErrTiKVServerBusy, tmysql.ErrResolveLockTimeout, tmysql.ErrRegionUnavailable: - return true - default: - return false - } - default: - switch status.Code(err) { - case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: - return true - case codes.Unknown: - if reflect.TypeOf(err) == stdErrorType { - return !stdFatalErrorsRegexp.MatchString(err.Error()) - } - return true - default: - return false - } - } -} - // IsContextCanceledError returns whether the error is caused by context // cancellation. This function should only be used when the code logic is // affected by whether the error is canceling or not. diff --git a/br/pkg/lightning/common/util_test.go b/br/pkg/lightning/common/util_test.go index 04a1ceecf45b1..60812841ff259 100644 --- a/br/pkg/lightning/common/util_test.go +++ b/br/pkg/lightning/common/util_test.go @@ -17,23 +17,16 @@ package common_test import ( "context" "encoding/json" - "fmt" "io" - "net" "net/http" "net/http/httptest" "time" sqlmock "github.com/DATA-DOG/go-sqlmock" - "github.com/go-sql-driver/mysql" . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/log" - tmysql "github.com/pingcap/tidb/errno" - "go.uber.org/multierr" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) type utilSuite struct{} @@ -85,48 +78,6 @@ func (s *utilSuite) TestGetJSON(c *C) { c.Assert(err, ErrorMatches, ".*http status code != 200.*") } -func (s *utilSuite) TestIsRetryableError(c *C) { - c.Assert(common.IsRetryableError(context.Canceled), IsFalse) - c.Assert(common.IsRetryableError(context.DeadlineExceeded), IsFalse) - c.Assert(common.IsRetryableError(io.EOF), IsFalse) - c.Assert(common.IsRetryableError(&net.AddrError{}), IsFalse) - c.Assert(common.IsRetryableError(&net.DNSError{}), IsFalse) - c.Assert(common.IsRetryableError(&net.DNSError{IsTimeout: true}), IsTrue) - - // MySQL Errors - c.Assert(common.IsRetryableError(&mysql.MySQLError{}), IsFalse) - c.Assert(common.IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrUnknown}), IsTrue) - c.Assert(common.IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrLockDeadlock}), IsTrue) - c.Assert(common.IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrPDServerTimeout}), IsTrue) - c.Assert(common.IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrTiKVServerTimeout}), IsTrue) - c.Assert(common.IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrTiKVServerBusy}), IsTrue) - c.Assert(common.IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrResolveLockTimeout}), IsTrue) - c.Assert(common.IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrRegionUnavailable}), IsTrue) - c.Assert(common.IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrWriteConflictInTiDB}), IsTrue) - - // gRPC Errors - c.Assert(common.IsRetryableError(status.Error(codes.Canceled, "")), IsFalse) - c.Assert(common.IsRetryableError(status.Error(codes.Unknown, "")), IsTrue) - c.Assert(common.IsRetryableError(status.Error(codes.DeadlineExceeded, "")), IsTrue) - c.Assert(common.IsRetryableError(status.Error(codes.NotFound, "")), IsTrue) - c.Assert(common.IsRetryableError(status.Error(codes.AlreadyExists, "")), IsTrue) - c.Assert(common.IsRetryableError(status.Error(codes.PermissionDenied, "")), IsTrue) - c.Assert(common.IsRetryableError(status.Error(codes.ResourceExhausted, "")), IsTrue) - c.Assert(common.IsRetryableError(status.Error(codes.Aborted, "")), IsTrue) - c.Assert(common.IsRetryableError(status.Error(codes.OutOfRange, "")), IsTrue) - c.Assert(common.IsRetryableError(status.Error(codes.Unavailable, "")), IsTrue) - c.Assert(common.IsRetryableError(status.Error(codes.DataLoss, "")), IsTrue) - - // sqlmock errors - c.Assert(common.IsRetryableError(fmt.Errorf("call to database Close was not expected")), IsFalse) - c.Assert(common.IsRetryableError(errors.New("call to database Close was not expected")), IsTrue) - - // multierr - c.Assert(common.IsRetryableError(multierr.Combine(context.Canceled, context.Canceled)), IsFalse) - c.Assert(common.IsRetryableError(multierr.Combine(&net.DNSError{IsTimeout: true}, &net.DNSError{IsTimeout: true})), IsTrue) - c.Assert(common.IsRetryableError(multierr.Combine(context.Canceled, &net.DNSError{IsTimeout: true})), IsFalse) -} - func (s *utilSuite) TestToDSN(c *C) { param := common.MySQLConnectParam{ Host: "127.0.0.1", diff --git a/br/pkg/lightning/restore/checksum.go b/br/pkg/lightning/restore/checksum.go index c634ee48729f5..5b7a5b1501af4 100644 --- a/br/pkg/lightning/restore/checksum.go +++ b/br/pkg/lightning/restore/checksum.go @@ -314,7 +314,7 @@ func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo zap.Int("concurrency", distSQLScanConcurrency), zap.Int("retry", i)) // do not retry context.Canceled error - if !common.IsRetryableError(err) { + if !utils.IsRetryableError(err) { break } if distSQLScanConcurrency > minDistSQLScanConcurrency { diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index 6b2c787bf060d..b5898360bf4c5 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -4,11 +4,21 @@ package utils import ( "context" + "database/sql" + stderrors "errors" + "io" + "net" + "reflect" + "regexp" "strings" "time" - "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/go-sql-driver/mysql" + "github.com/pingcap/errors" + tmysql "github.com/pingcap/tidb/errno" "go.uber.org/multierr" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) var retryableServerError = []string{ @@ -48,7 +58,7 @@ func WithRetry( err := retryableFunc() if err != nil { allErrors = multierr.Append(allErrors, err) - retry := common.IsRetryableError(err) + retry := IsRetryableError(err) if !retry { // exited retry return allErrors } @@ -76,3 +86,58 @@ func MessageIsRetryableStorageError(msg string) bool { } return false } + +// sqlmock uses fmt.Errorf to produce expectation failures, which will cause +// unnecessary retry if not specially handled >:( +var stdFatalErrorsRegexp = regexp.MustCompile( + `^call to (?s:.*) was not expected|arguments do not match:|could not match actual sql|mock non-retryable error`, +) +var stdErrorType = reflect.TypeOf(stderrors.New("")) + +// IsRetryableError returns whether the error is transient (e.g. network +// connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This +// function returns `false` (irrecoverable) if `err == nil`. +// +// If the error is a multierr, returns true only if all suberrors are retryable. +func IsRetryableError(err error) bool { + for _, singleError := range errors.Errors(err) { + if !isSingleRetryableError(singleError) { + return false + } + } + return true +} + +func isSingleRetryableError(err error) bool { + err = errors.Cause(err) + + switch err { + case nil, context.Canceled, context.DeadlineExceeded, io.EOF, sql.ErrNoRows: + return false + } + + switch nerr := err.(type) { + case net.Error: + return nerr.Timeout() + case *mysql.MySQLError: + switch nerr.Number { + // ErrLockDeadlock can retry to commit while meet deadlock + case tmysql.ErrUnknown, tmysql.ErrLockDeadlock, tmysql.ErrWriteConflictInTiDB, tmysql.ErrPDServerTimeout, tmysql.ErrTiKVServerTimeout, tmysql.ErrTiKVServerBusy, tmysql.ErrResolveLockTimeout, tmysql.ErrRegionUnavailable: + return true + default: + return false + } + default: + switch status.Code(err) { + case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: + return true + case codes.Unknown: + if reflect.TypeOf(err) == stdErrorType { + return !stdFatalErrorsRegexp.MatchString(err.Error()) + } + return true + default: + return false + } + } +} diff --git a/br/pkg/utils/retry_test.go b/br/pkg/utils/retry_test.go new file mode 100644 index 0000000000000..b5c54287f1cce --- /dev/null +++ b/br/pkg/utils/retry_test.go @@ -0,0 +1,62 @@ +package utils + +import ( + "context" + "fmt" + "io" + "net" + + "github.com/go-sql-driver/mysql" + . "github.com/pingcap/check" + "github.com/pingcap/errors" + tmysql "github.com/pingcap/tidb/errno" + "go.uber.org/multierr" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type utilSuite struct{} + +var _ = Suite(&utilSuite{}) + +func (s *utilSuite) TestIsRetryableError(c *C) { + c.Assert(IsRetryableError(context.Canceled), IsFalse) + c.Assert(IsRetryableError(context.DeadlineExceeded), IsFalse) + c.Assert(IsRetryableError(io.EOF), IsFalse) + c.Assert(IsRetryableError(&net.AddrError{}), IsFalse) + c.Assert(IsRetryableError(&net.DNSError{}), IsFalse) + c.Assert(IsRetryableError(&net.DNSError{IsTimeout: true}), IsTrue) + + // MySQL Errors + c.Assert(IsRetryableError(&mysql.MySQLError{}), IsFalse) + c.Assert(IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrUnknown}), IsTrue) + c.Assert(IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrLockDeadlock}), IsTrue) + c.Assert(IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrPDServerTimeout}), IsTrue) + c.Assert(IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrTiKVServerTimeout}), IsTrue) + c.Assert(IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrTiKVServerBusy}), IsTrue) + c.Assert(IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrResolveLockTimeout}), IsTrue) + c.Assert(IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrRegionUnavailable}), IsTrue) + c.Assert(IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrWriteConflictInTiDB}), IsTrue) + + // gRPC Errors + c.Assert(IsRetryableError(status.Error(codes.Canceled, "")), IsFalse) + c.Assert(IsRetryableError(status.Error(codes.Unknown, "")), IsTrue) + c.Assert(IsRetryableError(status.Error(codes.DeadlineExceeded, "")), IsTrue) + c.Assert(IsRetryableError(status.Error(codes.NotFound, "")), IsTrue) + c.Assert(IsRetryableError(status.Error(codes.AlreadyExists, "")), IsTrue) + c.Assert(IsRetryableError(status.Error(codes.PermissionDenied, "")), IsTrue) + c.Assert(IsRetryableError(status.Error(codes.ResourceExhausted, "")), IsTrue) + c.Assert(IsRetryableError(status.Error(codes.Aborted, "")), IsTrue) + c.Assert(IsRetryableError(status.Error(codes.OutOfRange, "")), IsTrue) + c.Assert(IsRetryableError(status.Error(codes.Unavailable, "")), IsTrue) + c.Assert(IsRetryableError(status.Error(codes.DataLoss, "")), IsTrue) + + // sqlmock errors + c.Assert(IsRetryableError(fmt.Errorf("call to database Close was not expected")), IsFalse) + c.Assert(IsRetryableError(errors.New("call to database Close was not expected")), IsTrue) + + // multierr + c.Assert(IsRetryableError(multierr.Combine(context.Canceled, context.Canceled)), IsFalse) + c.Assert(IsRetryableError(multierr.Combine(&net.DNSError{IsTimeout: true}, &net.DNSError{IsTimeout: true})), IsTrue) + c.Assert(IsRetryableError(multierr.Combine(context.Canceled, &net.DNSError{IsTimeout: true})), IsFalse) +} From 296b954ab1bc378a1d81cd4924b96f572060a642 Mon Sep 17 00:00:00 2001 From: fengou1 Date: Sat, 27 Nov 2021 13:01:33 +0800 Subject: [PATCH 6/7] add group cancel handling --- br/pkg/restore/pipeline_items.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/br/pkg/restore/pipeline_items.go b/br/pkg/restore/pipeline_items.go index 1bd7502f30642..97c6cf90e89b0 100644 --- a/br/pkg/restore/pipeline_items.go +++ b/br/pkg/restore/pipeline_items.go @@ -323,7 +323,7 @@ func (b *tikvSender) registerTableIsRestoring(ts []CreatedTable) func() { // waitTablesDone block the current goroutine, // till all tables provided are no more ‘current restoring’. -func (b *tikvSender) waitTablesDone(ts []CreatedTable) { +func (b *tikvSender) waitTablesDone(ctx context.Context, ts []CreatedTable) { for _, t := range ts { wg, ok := b.tableWaiters.LoadAndDelete(t.Table.ID) if !ok { @@ -331,7 +331,12 @@ func (b *tikvSender) waitTablesDone(ts []CreatedTable) { zap.Any("wait-table-map", b.tableWaiters), zap.Stringer("table", t.Table.Name)) } - wg.(*sync.WaitGroup).Wait() + select { + case <-ctx.Done(): // error group cancel, in this circumstance do not sync wait. + return + default: + wg.(*sync.WaitGroup).Wait() + } } } @@ -364,7 +369,7 @@ func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan drainResul } log.Info("restore batch done", rtree.ZapRanges(r.result.Ranges)) r.done() - b.waitTablesDone(r.result.BlankTablesAfterSend) + b.waitTablesDone(ectx, r.result.BlankTablesAfterSend) b.sink.EmitTables(r.result.BlankTablesAfterSend...) return nil }) From 81b61f7997df6fc60c9944e9f773f098c330e9e6 Mon Sep 17 00:00:00 2001 From: fengou1 Date: Mon, 29 Nov 2021 14:18:41 +0800 Subject: [PATCH 7/7] new method to do it --- br/pkg/restore/pipeline_items.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/br/pkg/restore/pipeline_items.go b/br/pkg/restore/pipeline_items.go index 97c6cf90e89b0..ce476b1963fa5 100644 --- a/br/pkg/restore/pipeline_items.go +++ b/br/pkg/restore/pipeline_items.go @@ -323,7 +323,7 @@ func (b *tikvSender) registerTableIsRestoring(ts []CreatedTable) func() { // waitTablesDone block the current goroutine, // till all tables provided are no more ‘current restoring’. -func (b *tikvSender) waitTablesDone(ctx context.Context, ts []CreatedTable) { +func (b *tikvSender) waitTablesDone(ts []CreatedTable) { for _, t := range ts { wg, ok := b.tableWaiters.LoadAndDelete(t.Table.ID) if !ok { @@ -331,12 +331,7 @@ func (b *tikvSender) waitTablesDone(ctx context.Context, ts []CreatedTable) { zap.Any("wait-table-map", b.tableWaiters), zap.Stringer("table", t.Table.Name)) } - select { - case <-ctx.Done(): // error group cancel, in this circumstance do not sync wait. - return - default: - wg.(*sync.WaitGroup).Wait() - } + wg.(*sync.WaitGroup).Wait() } } @@ -365,11 +360,12 @@ func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan drainResul eg.Go(func() error { e := b.client.RestoreFiles(ectx, files, r.result.RewriteRules, b.updateCh) if e != nil { + r.done() return e } log.Info("restore batch done", rtree.ZapRanges(r.result.Ranges)) r.done() - b.waitTablesDone(ectx, r.result.BlankTablesAfterSend) + b.waitTablesDone(r.result.BlankTablesAfterSend) b.sink.EmitTables(r.result.BlankTablesAfterSend...) return nil })