Skip to content

Commit

Permalink
ddl: list the retryable errors for ingest mode | tidb-test=pr/2142 (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored May 25, 2023
1 parent 904a40e commit bae2333
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 32 deletions.
8 changes: 2 additions & 6 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ func TestAddExpressionIndexRollback(t *testing.T) {
hook.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(hook)

tk.MustGetErrMsg("alter table t1 add index expr_idx ((pow(c1, c2)));", "[ddl:8202]Cannot decode index value, because [types:1690]DOUBLE value is out of range in 'pow(160, 160)'")
tk.MustGetErrMsg("alter table t1 add index expr_idx ((pow(c1, c2)));", "[types:1690]DOUBLE value is out of range in 'pow(160, 160)'")
require.NoError(t, checkErr)
tk.MustQuery("select * from t1 order by c1;").Check(testkit.Rows("2 2 2", "4 4 4", "5 80 80", "10 3 3", "20 20 20", "160 160 160"))

Expand Down Expand Up @@ -991,11 +991,7 @@ func TestAddIndexFailOnCaseWhenCanExit(t *testing.T) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int)")
tk.MustExec("insert into t values(1, 1)")
if variable.EnableDistTask.Load() {
tk.MustGetErrMsg("alter table t add index idx(b)", "[ddl:-1]job.ErrCount:0, mock unknown type: ast.whenClause.")
} else {
tk.MustGetErrMsg("alter table t add index idx(b)", "[ddl:-1]DDL job rollback, error msg: job.ErrCount:1, mock unknown type: ast.whenClause.")
}
tk.MustGetErrMsg("alter table t add index idx(b)", "[ddl:-1]job.ErrCount:0, mock unknown type: ast.whenClause.")
tk.MustExec("drop table if exists t")
}

Expand Down
20 changes: 13 additions & 7 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,8 +946,17 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
}

func errorIsRetryable(err error, job *model.Job) bool {
return !errors.ErrorEqual(err, dbterror.ErrReorgPanic) &&
job.ErrorCount+1 < variable.GetDDLErrorCountLimit()
if job.ErrorCount+1 >= variable.GetDDLErrorCountLimit() {
return false
}
originErr := errors.Cause(err)
if tErr, ok := originErr.(*terror.Error); ok {
sqlErr := terror.ToSQLError(tErr)
_, ok := dbterror.ReorgRetryableErrCodes[sqlErr.Code]
return ok
}
// For the unknown errors, we should retry.
return true
}

func convertToKeyExistsErr(originErr error, idxInfo *model.IndexInfo, tblInfo *model.TableInfo) error {
Expand Down Expand Up @@ -1017,7 +1026,7 @@ func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
if common.ErrFoundDuplicateKeys.Equal(err) {
err = convertToKeyExistsErr(err, indexInfo, tbl.Meta())
}
if kv.ErrKeyExists.Equal(err) || dbterror.ErrCancelledDDLJob.Equal(err) || dbterror.ErrCantDecodeRecord.Equal(err) ||
if !errorIsRetryable(err, job) ||
// TODO: Remove this check make it can be retry. Related test is TestModifyColumnReorgInfo.
job.ReorgMeta.IsDistReorg {
logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err))
Expand Down Expand Up @@ -1404,10 +1413,7 @@ func (w *baseIndexWorker) getNextKey(taskRange reorgBackfillTask, taskDone bool)
func (w *baseIndexWorker) updateRowDecoder(handle kv.Handle, rawRecord []byte) error {
sysZone := w.sessCtx.GetSessionVars().StmtCtx.TimeZone
_, err := w.rowDecoder.DecodeAndEvalRowWithMap(w.sessCtx, handle, rawRecord, sysZone, w.rowMap)
if err != nil {
return errors.Trace(dbterror.ErrCantDecodeRecord.GenWithStackByArgs("index", err))
}
return nil
return errors.Trace(err)
}

// fetchRowColVals fetch w.batchCnt count records that need to reorganize indices, and build the corresponding indexRecord slice.
Expand Down
9 changes: 8 additions & 1 deletion ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,11 +441,18 @@ func (c *copContext) fetchTableScanResult(ctx context.Context, result distsql.Se
}
err = table.FillVirtualColumnValue(c.virtualColFieldTps, c.virtualColOffsets, c.expColInfos, c.colInfos, c.sessCtx, chk)
if err != nil {
return false, errors.Trace(err)
return false, completeErr(err, c.idxInfo)
}
return false, nil
}

func completeErr(err error, idxInfo *model.IndexInfo) error {
if expression.ErrInvalidJSONForFuncIndex.Equal(err) {
err = expression.ErrInvalidJSONForFuncIndex.GenWithStackByArgs(idxInfo.Name.O)
}
return errors.Trace(err)
}

func getRestoreData(tblInfo *model.TableInfo, targetIdx, pkIdx *model.IndexInfo, handleDts []types.Datum) []types.Datum {
if !collate.NewCollationEnabled() || !tblInfo.IsCommonHandle || tblInfo.CommonHandleVersion == 0 {
return nil
Expand Down
3 changes: 1 addition & 2 deletions ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ go_test(
],
embed = [":ingest"],
flaky = True,
race = "on",
shard_count = 10,
shard_count = 11,
deps = [
"//config",
"//ddl/internal/session",
Expand Down
11 changes: 11 additions & 0 deletions ddl/ingest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,14 @@ func TestIngestPartitionRowCount(t *testing.T) {
require.Equal(t, "3", rowCount)
tk.MustExec("admin check table t;")
}

func TestAddIndexIngestClientError(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
defer injectMockBackendMgr(t, store)()

tk.MustExec("CREATE TABLE t1 (f1 json);")
tk.MustExec(`insert into t1(f1) values (cast("null" as json));`)
tk.MustGetErrCode("create index i1 on t1((cast(f1 as unsigned array)));", errno.ErrInvalidJSONValueForFuncIndex)
}
2 changes: 1 addition & 1 deletion ddl/mv_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestMultiValuedIndexOnlineDDL(t *testing.T) {
tk.MustExec("insert into t values (3, '[3,4,5]');")
tk.MustExec("insert into t values (4, '[-4,5,6]');")
tk.MustGetErrCode("alter table t add unique index idx((cast(a as signed array)));", errno.ErrDupEntry)
tk.MustGetErrMsg("alter table t add index idx((cast(a as unsigned array)));", "[ddl:8202]Cannot decode index value, because [types:1690]constant -4 overflows bigint")
tk.MustGetErrMsg("alter table t add index idx((cast(a as unsigned array)));", "[types:1690]constant -4 overflows bigint")

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (pk int primary key, a json);")
Expand Down
16 changes: 1 addition & 15 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@ package ddl

import (
"fmt"
"os"
"path/filepath"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/ingest"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -78,21 +75,10 @@ func convertAddIdxJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, tblIn
return ver, errors.Trace(err1)
}
job.State = model.JobStateRollingback
if job.ReorgMeta != nil && job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
cleanupLocalIndexData(job.ID)
}
err = completeErr(err, indexInfo)
return ver, errors.Trace(err)
}

func cleanupLocalIndexData(jobID int64) {
sortPath := ingest.ConfigSortPath()
f := filepath.Join(sortPath, ingest.EncodeBackendTag(jobID))
err := os.RemoveAll(f)
if err != nil {
logutil.BgLogger().Error("[ddl-ingest] can not remove local index data", zap.Error(err))
}
}

// convertNotReorgAddIdxJob2RollbackJob converts the add index job that are not started workers to rollingbackJob,
// to rollback add index operations. job.SnapshotVer == 0 indicates the workers are not started.
func convertNotReorgAddIdxJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, occuredErr error) (ver int64, err error) {
Expand Down
21 changes: 21 additions & 0 deletions util/dbterror/ddl_terror.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,3 +443,24 @@ var (
// ErrNotSupportedYet returns when tidb does not support this feature.
ErrNotSupportedYet = ClassDDL.NewStd(mysql.ErrNotSupportedYet)
)

// ReorgRetryableErrCodes is the error codes that are retryable for reorganization.
var ReorgRetryableErrCodes = map[uint16]struct{}{
mysql.ErrPDServerTimeout: {},
mysql.ErrTiKVServerTimeout: {},
mysql.ErrTiKVServerBusy: {},
mysql.ErrResolveLockTimeout: {},
mysql.ErrRegionUnavailable: {},
mysql.ErrGCTooEarly: {},
mysql.ErrWriteConflict: {},
mysql.ErrTiKVStoreLimit: {},
mysql.ErrTiKVStaleCommand: {},
mysql.ErrTiKVMaxTimestampNotSynced: {},
mysql.ErrTiFlashServerTimeout: {},
mysql.ErrTiFlashServerBusy: {},
mysql.ErrInfoSchemaExpired: {},
mysql.ErrInfoSchemaChanged: {},
mysql.ErrWriteConflictInTiDB: {},
mysql.ErrTxnRetryable: {},
mysql.ErrNotOwner: {},
}

0 comments on commit bae2333

Please sign in to comment.