Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: add lease not found and deadline exceed to retryable errors #56630

Merged
merged 4 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ import (
"github.com/pingcap/tidb/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -223,17 +221,6 @@ func (*backfillDistExecutor) IsIdempotent(*proto.Subtask) bool {
return true
}

func isRetryableError(err error) bool {
originErr := errors.Cause(err)
if tErr, ok := originErr.(*terror.Error); ok {
sqlErr := terror.ToSQLError(tErr)
_, ok := dbterror.ReorgRetryableErrCodes[sqlErr.Code]
return ok
}
// can't retry Unknown err.
return false
}

func (*backfillDistExecutor) IsRetryableError(err error) bool {
return common.IsRetryableError(err) || isRetryableError(err)
}
Expand Down
22 changes: 16 additions & 6 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ func (w *worker) checkVectorIndexProcessOnTiFlash(jobCtx *jobContext, job *model
if dbterror.ErrWaitReorgTimeout.Equal(err) {
return false, ver, nil
}
if !errorIsRetryable(err, job) {
if !isRetryableJobError(err, job.ErrorCount) {
logutil.DDLLogger().Warn("run add vector index job failed, convert job to rollback", zap.Stringer("job", job), zap.Error(err))
ver, err = convertAddIdxJob2RollbackJob(jobCtx, job, tbl.Meta(), []*model.IndexInfo{indexInfo}, err)
}
Expand Down Expand Up @@ -970,7 +970,7 @@ SwitchIndexState:
var reorgTp model.ReorgType
reorgTp, err = pickBackfillType(job)
if err != nil {
if !errorIsRetryable(err, job) {
if !isRetryableJobError(err, job.ErrorCount) {
job.State = model.JobStateCancelled
}
return ver, err
Expand Down Expand Up @@ -1250,7 +1250,7 @@ func runIngestReorgJob(w *worker, jobCtx *jobContext, job *model.Job,
if kv.ErrKeyExists.Equal(err) {
logutil.DDLLogger().Warn("import index duplicate key, convert job to rollback", zap.Stringer("job", job), zap.Error(err))
ver, err = convertAddIdxJob2RollbackJob(jobCtx, job, tbl.Meta(), allIndexInfos, err)
} else if !errorIsRetryable(err, job) {
} else if !isRetryableJobError(err, job.ErrorCount) {
logutil.DDLLogger().Warn("run reorg job failed, convert job to rollback",
zap.String("job", job.String()), zap.Error(err))
ver, err = convertAddIdxJob2RollbackJob(jobCtx, job, tbl.Meta(), allIndexInfos, err)
Expand All @@ -1263,10 +1263,20 @@ func runIngestReorgJob(w *worker, jobCtx *jobContext, job *model.Job,
return done, ver, nil
}

func errorIsRetryable(err error, job *model.Job) bool {
if job.ErrorCount+1 >= variable.GetDDLErrorCountLimit() {
func isRetryableJobError(err error, jobErrCnt int64) bool {
if jobErrCnt+1 >= variable.GetDDLErrorCountLimit() {
return false
}
return isRetryableError(err)
}

func isRetryableError(err error) bool {
errMsg := err.Error()
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
for _, m := range dbterror.ReorgRetryableErrMsgs {
if strings.Contains(errMsg, m) {
return true
}
}
originErr := errors.Cause(err)
if tErr, ok := originErr.(*terror.Error); ok {
sqlErr := terror.ToSQLError(tErr)
Expand Down Expand Up @@ -1336,7 +1346,7 @@ func runReorgJobAndHandleErr(
}
// TODO(tangenta): get duplicate column and match index.
err = ingest.TryConvertToKeyExistsErr(err, allIndexInfos[0], tbl.Meta())
if !errorIsRetryable(err, job) {
if !isRetryableJobError(err, job.ErrorCount) {
logutil.DDLLogger().Warn("run add index job failed, convert job to rollback", zap.Stringer("job", job), zap.Error(err))
ver, err = convertAddIdxJob2RollbackJob(jobCtx, job, tbl.Meta(), allIndexInfos, err)
if err1 := rh.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ func (w *worker) transitOneJobStep(jobCtx *jobContext, job *model.Job) (int64, e
jobCtx.addUnSynced(job.ID)

// If error is non-retryable, we can ignore the sleep.
if runJobErr != nil && errorIsRetryable(runJobErr, job) {
if runJobErr != nil && isRetryableJobError(runJobErr, job.ErrorCount) {
jobCtx.logger.Info("run DDL job failed, sleeps a while then retries it.",
zap.Duration("waitTime", GetWaitTimeWhenErrorOccurred()), zap.Error(runJobErr))
// wait a while to retry again. If we don't wait here, DDL will retry this job immediately,
Expand Down
3 changes: 3 additions & 0 deletions pkg/owner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,9 @@ func AcquireDistributedLock(
}
return false, nil
})
failpoint.Inject("mockAcquireDistLockFailed", func() {
err = errors.Errorf("requested lease not found")
})
if err != nil {
err1 := se.Close()
if err1 != nil {
Expand Down
8 changes: 7 additions & 1 deletion pkg/util/dbterror/ddl_terror.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ var (
ErrWarnGlobalIndexNeedManuallyAnalyze = ClassDDL.NewStd(mysql.ErrWarnGlobalIndexNeedManuallyAnalyze)
)

// ReorgRetryableErrCodes is the error codes that are retryable for reorganization.
// ReorgRetryableErrCodes are the error codes that are retryable for reorganization.
var ReorgRetryableErrCodes = map[uint16]struct{}{
mysql.ErrPDServerTimeout: {},
mysql.ErrTiKVServerTimeout: {},
Expand All @@ -526,3 +526,9 @@ var ReorgRetryableErrCodes = map[uint16]struct{}{
// Temporary network partitioning may cause pk commit failure.
uint16(terror.CodeResultUndetermined): {},
}

// ReorgRetryableErrMsgs are the error messages that are retryable for reorganization.
var ReorgRetryableErrMsgs = []string{
"context deadline exceeded",
"requested lease not found",
}
14 changes: 14 additions & 0 deletions tests/realtikvtest/addindextest1/disttask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,17 @@ func TestAddUKErrorMessage(t *testing.T) {
err := tk.ExecToErr("alter table t add unique index uk(b);")
require.ErrorContains(t, err, "Duplicate entry '1' for key 't.uk'")
}

func TestAddIndexDistLockAcquireFailed(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_dist_task = on;")
t.Cleanup(func() {
tk.MustExec("set global tidb_enable_dist_task = off;")
})
tk.MustExec("create table t (a int, b int);")
tk.MustExec("insert into t values (1, 1);")
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/owner/mockAcquireDistLockFailed", "1*return(true)")
tk.MustExec("alter table t add index idx(b);")
}
6 changes: 3 additions & 3 deletions tests/realtikvtest/addindextest3/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,14 +538,14 @@ func TestAddIndexIngestFailures(t *testing.T) {
tk.MustExec("insert into t values (1, 1, 1);")

// Test precheck failed.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/ingest/mockIngestCheckEnvFailed", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/ingest/mockIngestCheckEnvFailed", "1*return"))
tk.MustGetErrMsg("alter table t add index idx(b);", "[ddl:8256]Check ingest environment failed: mock error")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/ingest/mockIngestCheckEnvFailed"))

tk.MustExec(`set global tidb_enable_dist_task=on;`)
// Test reset engine failed.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/ingest/mockResetEngineFailed", "return"))
tk.MustGetErrMsg("alter table t add index idx(b);", "[0]mock reset engine failed")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/ingest/mockResetEngineFailed", "1*return"))
tk.MustExec("alter table t add index idx(b);")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/ingest/mockResetEngineFailed"))
tk.MustExec(`set global tidb_enable_dist_task=off;`)
}
Expand Down