From 2c6516b6202fc5b6cdd08fd75814ad35268dec96 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 14 Oct 2024 15:06:05 +0800 Subject: [PATCH 1/4] ddl: add 'lease not found' and 'deadline exceed' to retryable errors --- pkg/ddl/backfilling_dist_executor.go | 3 +++ pkg/ddl/index.go | 3 +++ pkg/owner/manager.go | 15 +++++++++++++-- pkg/util/dbterror/ddl_terror.go | 15 +++++++++++++++ 4 files changed, 34 insertions(+), 2 deletions(-) diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index c9867bc228fd5..f7ee958bfda6a 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -225,6 +225,9 @@ func (*backfillDistExecutor) IsIdempotent(*proto.Subtask) bool { func isRetryableError(err error) bool { originErr := errors.Cause(err) + if _, ok := originErr.(dbterror.ReorgRetryableError); ok { + return true + } if tErr, ok := originErr.(*terror.Error); ok { sqlErr := terror.ToSQLError(tErr) _, ok := dbterror.ReorgRetryableErrCodes[sqlErr.Code] diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 82d570b1a8434..c40b490bd497f 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -1268,6 +1268,9 @@ func errorIsRetryable(err error, job *model.Job) bool { return false } originErr := errors.Cause(err) + if _, ok := originErr.(dbterror.ReorgRetryableError); ok { + return true + } if tErr, ok := originErr.(*terror.Error); ok { sqlErr := terror.ToSQLError(tErr) _, ok := dbterror.ReorgRetryableErrCodes[sqlErr.Code] diff --git a/pkg/owner/manager.go b/pkg/owner/manager.go index ea989b75f4856..2c70a2f33fcab 100644 --- a/pkg/owner/manager.go +++ b/pkg/owner/manager.go @@ -20,6 +20,7 @@ import ( "fmt" "os" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -30,6 +31,7 @@ import ( "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/terror" util2 "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/logutil" "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" @@ -519,7 +521,7 @@ func AcquireDistributedLock( ) (release func(), err error) { se, err := concurrency.NewSession(cli, concurrency.WithTTL(ttlInSec)) if err != nil { - return nil, err + return nil, wrapIntoRetryable(err) } mu := concurrency.NewMutex(se, key) maxRetryCnt := 10 @@ -535,7 +537,7 @@ func AcquireDistributedLock( if err1 != nil { logutil.Logger(ctx).Warn("close session error", zap.Error(err1)) } - return nil, err + return nil, wrapIntoRetryable(err) } logutil.Logger(ctx).Info("acquire distributed flush lock success", zap.String("key", key)) return func() { @@ -551,3 +553,12 @@ func AcquireDistributedLock( } }, nil } + +func wrapIntoRetryable(err error) error { + msg := err.Error() + if strings.Contains(msg, "requested lease not found") || + strings.Contains(msg, "context deadline exceeded") { + return dbterror.NewReorgRetryableError(msg) + } + return err +} diff --git a/pkg/util/dbterror/ddl_terror.go b/pkg/util/dbterror/ddl_terror.go index 81c8203c5c10b..b8a7561d33dfc 100644 --- a/pkg/util/dbterror/ddl_terror.go +++ b/pkg/util/dbterror/ddl_terror.go @@ -526,3 +526,18 @@ var ReorgRetryableErrCodes = map[uint16]struct{}{ // Temporary network partitioning may cause pk commit failure. uint16(terror.CodeResultUndetermined): {}, } + +// ReorgRetryableError is the retryable error during DDL reorganization. +type ReorgRetryableError struct { + message string +} + +// Error implements error interface. +func (r ReorgRetryableError) Error() string { + return r.message +} + +// NewReorgRetryableError creates a new ReorgRetryableError. +func NewReorgRetryableError(msg string) ReorgRetryableError { + return ReorgRetryableError{message: msg} +} From a48265e44c82e7dc858be25994c7e8c0100caf10 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 14 Oct 2024 15:41:38 +0800 Subject: [PATCH 2/4] add fail test for acquire dist lock --- pkg/ddl/backfilling_dist_executor.go | 10 +++++++--- pkg/ddl/index.go | 9 ++++++--- pkg/owner/manager.go | 20 +++++++------------ pkg/util/dbterror/ddl_terror.go | 19 +++++------------- .../addindextest1/disttask_test.go | 14 +++++++++++++ 5 files changed, 39 insertions(+), 33 deletions(-) diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index f7ee958bfda6a..f9b1ed0c2587e 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -17,6 +17,7 @@ package ddl import ( "context" "encoding/json" + "strings" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/ddl/ingest" @@ -224,10 +225,13 @@ func (*backfillDistExecutor) IsIdempotent(*proto.Subtask) bool { } func isRetryableError(err error) bool { - originErr := errors.Cause(err) - if _, ok := originErr.(dbterror.ReorgRetryableError); ok { - return true + errMsg := err.Error() + for _, m := range dbterror.ReorgRetryableErrMsgs { + if strings.Contains(errMsg, m) { + return true + } } + originErr := errors.Cause(err) if tErr, ok := originErr.(*terror.Error); ok { sqlErr := terror.ToSQLError(tErr) _, ok := dbterror.ReorgRetryableErrCodes[sqlErr.Code] diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index c40b490bd497f..f54522a582382 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -1267,10 +1267,13 @@ func errorIsRetryable(err error, job *model.Job) bool { if job.ErrorCount+1 >= variable.GetDDLErrorCountLimit() { return false } - originErr := errors.Cause(err) - if _, ok := originErr.(dbterror.ReorgRetryableError); ok { - return true + errMsg := err.Error() + for _, m := range dbterror.ReorgRetryableErrMsgs { + if strings.Contains(errMsg, m) { + return true + } } + originErr := errors.Cause(err) if tErr, ok := originErr.(*terror.Error); ok { sqlErr := terror.ToSQLError(tErr) _, ok := dbterror.ReorgRetryableErrCodes[sqlErr.Code] diff --git a/pkg/owner/manager.go b/pkg/owner/manager.go index 2c70a2f33fcab..f2f091823db8a 100644 --- a/pkg/owner/manager.go +++ b/pkg/owner/manager.go @@ -20,7 +20,6 @@ import ( "fmt" "os" "strconv" - "strings" "sync" "sync/atomic" "time" @@ -31,7 +30,6 @@ import ( "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/terror" util2 "github.com/pingcap/tidb/pkg/util" - "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/logutil" "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" @@ -521,7 +519,7 @@ func AcquireDistributedLock( ) (release func(), err error) { se, err := concurrency.NewSession(cli, concurrency.WithTTL(ttlInSec)) if err != nil { - return nil, wrapIntoRetryable(err) + return nil, err } mu := concurrency.NewMutex(se, key) maxRetryCnt := 10 @@ -532,12 +530,17 @@ func AcquireDistributedLock( } return false, nil }) + failpoint.Inject("mockAcquireDistLockFailed", func(val failpoint.Value) { + if ok := val.(bool); ok { + err = errors.Errorf("requested lease not found") + } + }) if err != nil { err1 := se.Close() if err1 != nil { logutil.Logger(ctx).Warn("close session error", zap.Error(err1)) } - return nil, wrapIntoRetryable(err) + return nil, err } logutil.Logger(ctx).Info("acquire distributed flush lock success", zap.String("key", key)) return func() { @@ -553,12 +556,3 @@ func AcquireDistributedLock( } }, nil } - -func wrapIntoRetryable(err error) error { - msg := err.Error() - if strings.Contains(msg, "requested lease not found") || - strings.Contains(msg, "context deadline exceeded") { - return dbterror.NewReorgRetryableError(msg) - } - return err -} diff --git a/pkg/util/dbterror/ddl_terror.go b/pkg/util/dbterror/ddl_terror.go index b8a7561d33dfc..11dd9caf781ec 100644 --- a/pkg/util/dbterror/ddl_terror.go +++ b/pkg/util/dbterror/ddl_terror.go @@ -503,7 +503,7 @@ var ( ErrWarnGlobalIndexNeedManuallyAnalyze = ClassDDL.NewStd(mysql.ErrWarnGlobalIndexNeedManuallyAnalyze) ) -// ReorgRetryableErrCodes is the error codes that are retryable for reorganization. +// ReorgRetryableErrCodes are the error codes that are retryable for reorganization. var ReorgRetryableErrCodes = map[uint16]struct{}{ mysql.ErrPDServerTimeout: {}, mysql.ErrTiKVServerTimeout: {}, @@ -527,17 +527,8 @@ var ReorgRetryableErrCodes = map[uint16]struct{}{ uint16(terror.CodeResultUndetermined): {}, } -// ReorgRetryableError is the retryable error during DDL reorganization. -type ReorgRetryableError struct { - message string -} - -// Error implements error interface. -func (r ReorgRetryableError) Error() string { - return r.message -} - -// NewReorgRetryableError creates a new ReorgRetryableError. -func NewReorgRetryableError(msg string) ReorgRetryableError { - return ReorgRetryableError{message: msg} +// ReorgRetryableErrMsgs are the error messages that are retryable for reorganization. +var ReorgRetryableErrMsgs = []string{ + "context deadline exceeded", + "requested lease not found", } diff --git a/tests/realtikvtest/addindextest1/disttask_test.go b/tests/realtikvtest/addindextest1/disttask_test.go index c5ddb4a1ac24b..8e5b008c4f7eb 100644 --- a/tests/realtikvtest/addindextest1/disttask_test.go +++ b/tests/realtikvtest/addindextest1/disttask_test.go @@ -286,3 +286,17 @@ func TestAddUKErrorMessage(t *testing.T) { err := tk.ExecToErr("alter table t add unique index uk(b);") require.ErrorContains(t, err, "Duplicate entry '1' for key 't.uk'") } + +func TestAddIndexDistLockAcquireFailed(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set global tidb_enable_dist_task = on;") + t.Cleanup(func() { + tk.MustExec("set global tidb_enable_dist_task = off;") + }) + tk.MustExec("create table t (a int, b int);") + tk.MustExec("insert into t values (1, 1);") + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/owner/mockAcquireDistLockFailed", "1*return(true)") + tk.MustExec("alter table t add index idx(b);") +} From 53627b28cb694ee80f95c0a265b05b7c43f0deb3 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 14 Oct 2024 15:53:54 +0800 Subject: [PATCH 3/4] unify isRetryableError function --- pkg/ddl/backfilling_dist_executor.go | 20 -------------------- pkg/ddl/index.go | 16 ++++++++++------ pkg/ddl/job_worker.go | 2 +- pkg/owner/manager.go | 6 ++---- 4 files changed, 13 insertions(+), 31 deletions(-) diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index f9b1ed0c2587e..d9b5e6c062273 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -17,7 +17,6 @@ package ddl import ( "context" "encoding/json" - "strings" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/ddl/ingest" @@ -28,10 +27,8 @@ import ( "github.com/pingcap/tidb/pkg/lightning/backend/external" "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/meta/model" - "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" - "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" ) @@ -224,23 +221,6 @@ func (*backfillDistExecutor) IsIdempotent(*proto.Subtask) bool { return true } -func isRetryableError(err error) bool { - errMsg := err.Error() - for _, m := range dbterror.ReorgRetryableErrMsgs { - if strings.Contains(errMsg, m) { - return true - } - } - originErr := errors.Cause(err) - if tErr, ok := originErr.(*terror.Error); ok { - sqlErr := terror.ToSQLError(tErr) - _, ok := dbterror.ReorgRetryableErrCodes[sqlErr.Code] - return ok - } - // can't retry Unknown err. - return false -} - func (*backfillDistExecutor) IsRetryableError(err error) bool { return common.IsRetryableError(err) || isRetryableError(err) } diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index f54522a582382..c00a3ef274af6 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -832,7 +832,7 @@ func (w *worker) checkVectorIndexProcessOnTiFlash(jobCtx *jobContext, job *model if dbterror.ErrWaitReorgTimeout.Equal(err) { return false, ver, nil } - if !errorIsRetryable(err, job) { + if !isRetryableJobError(err, job.ErrorCount) { logutil.DDLLogger().Warn("run add vector index job failed, convert job to rollback", zap.Stringer("job", job), zap.Error(err)) ver, err = convertAddIdxJob2RollbackJob(jobCtx, job, tbl.Meta(), []*model.IndexInfo{indexInfo}, err) } @@ -970,7 +970,7 @@ SwitchIndexState: var reorgTp model.ReorgType reorgTp, err = pickBackfillType(job) if err != nil { - if !errorIsRetryable(err, job) { + if !isRetryableJobError(err, job.ErrorCount) { job.State = model.JobStateCancelled } return ver, err @@ -1250,7 +1250,7 @@ func runIngestReorgJob(w *worker, jobCtx *jobContext, job *model.Job, if kv.ErrKeyExists.Equal(err) { logutil.DDLLogger().Warn("import index duplicate key, convert job to rollback", zap.Stringer("job", job), zap.Error(err)) ver, err = convertAddIdxJob2RollbackJob(jobCtx, job, tbl.Meta(), allIndexInfos, err) - } else if !errorIsRetryable(err, job) { + } else if !isRetryableJobError(err, job.ErrorCount) { logutil.DDLLogger().Warn("run reorg job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err)) ver, err = convertAddIdxJob2RollbackJob(jobCtx, job, tbl.Meta(), allIndexInfos, err) @@ -1263,10 +1263,14 @@ func runIngestReorgJob(w *worker, jobCtx *jobContext, job *model.Job, return done, ver, nil } -func errorIsRetryable(err error, job *model.Job) bool { - if job.ErrorCount+1 >= variable.GetDDLErrorCountLimit() { +func isRetryableJobError(err error, jobErrCnt int64) bool { + if jobErrCnt+1 >= variable.GetDDLErrorCountLimit() { return false } + return isRetryableError(err) +} + +func isRetryableError(err error) bool { errMsg := err.Error() for _, m := range dbterror.ReorgRetryableErrMsgs { if strings.Contains(errMsg, m) { @@ -1342,7 +1346,7 @@ func runReorgJobAndHandleErr( } // TODO(tangenta): get duplicate column and match index. err = ingest.TryConvertToKeyExistsErr(err, allIndexInfos[0], tbl.Meta()) - if !errorIsRetryable(err, job) { + if !isRetryableJobError(err, job.ErrorCount) { logutil.DDLLogger().Warn("run add index job failed, convert job to rollback", zap.Stringer("job", job), zap.Error(err)) ver, err = convertAddIdxJob2RollbackJob(jobCtx, job, tbl.Meta(), allIndexInfos, err) if err1 := rh.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil { diff --git a/pkg/ddl/job_worker.go b/pkg/ddl/job_worker.go index dea816eb8c043..5eaf7b155b964 100644 --- a/pkg/ddl/job_worker.go +++ b/pkg/ddl/job_worker.go @@ -604,7 +604,7 @@ func (w *worker) transitOneJobStep(jobCtx *jobContext, job *model.Job) (int64, e jobCtx.addUnSynced(job.ID) // If error is non-retryable, we can ignore the sleep. - if runJobErr != nil && errorIsRetryable(runJobErr, job) { + if runJobErr != nil && isRetryableJobError(runJobErr, job.ErrorCount) { jobCtx.logger.Info("run DDL job failed, sleeps a while then retries it.", zap.Duration("waitTime", GetWaitTimeWhenErrorOccurred()), zap.Error(runJobErr)) // wait a while to retry again. If we don't wait here, DDL will retry this job immediately, diff --git a/pkg/owner/manager.go b/pkg/owner/manager.go index f2f091823db8a..e4d467b16b1db 100644 --- a/pkg/owner/manager.go +++ b/pkg/owner/manager.go @@ -530,10 +530,8 @@ func AcquireDistributedLock( } return false, nil }) - failpoint.Inject("mockAcquireDistLockFailed", func(val failpoint.Value) { - if ok := val.(bool); ok { - err = errors.Errorf("requested lease not found") - } + failpoint.Inject("mockAcquireDistLockFailed", func() { + err = errors.Errorf("requested lease not found") }) if err != nil { err1 := se.Close() From d68df9bbcb1d441340bb501bd9291e0d158a40ac Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 15 Oct 2024 11:43:46 +0800 Subject: [PATCH 4/4] fix TestAddIndexIngestFailures --- tests/realtikvtest/addindextest3/ingest_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/realtikvtest/addindextest3/ingest_test.go b/tests/realtikvtest/addindextest3/ingest_test.go index 36020ff7c01b3..5e8d773ac284e 100644 --- a/tests/realtikvtest/addindextest3/ingest_test.go +++ b/tests/realtikvtest/addindextest3/ingest_test.go @@ -538,14 +538,14 @@ func TestAddIndexIngestFailures(t *testing.T) { tk.MustExec("insert into t values (1, 1, 1);") // Test precheck failed. - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/ingest/mockIngestCheckEnvFailed", "return")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/ingest/mockIngestCheckEnvFailed", "1*return")) tk.MustGetErrMsg("alter table t add index idx(b);", "[ddl:8256]Check ingest environment failed: mock error") require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/ingest/mockIngestCheckEnvFailed")) tk.MustExec(`set global tidb_enable_dist_task=on;`) // Test reset engine failed. - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/ingest/mockResetEngineFailed", "return")) - tk.MustGetErrMsg("alter table t add index idx(b);", "[0]mock reset engine failed") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/ingest/mockResetEngineFailed", "1*return")) + tk.MustExec("alter table t add index idx(b);") require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/ingest/mockResetEngineFailed")) tk.MustExec(`set global tidb_enable_dist_task=off;`) }