From 601c89c4cf3a14b6577e59d3de44abfc530c083c Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 18 May 2023 16:25:36 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #43782 Signed-off-by: ti-chi-bot --- ddl/db_test.go | 8 ++------ ddl/index.go | 24 +++++++++++++++++++----- ddl/index_cop.go | 9 ++++++++- ddl/ingest/BUILD.bazel | 4 ++++ ddl/ingest/integration_test.go | 11 +++++++++++ ddl/mv_index_test.go | 2 +- ddl/rollingback.go | 4 ++++ util/dbterror/ddl_terror.go | 21 +++++++++++++++++++++ 8 files changed, 70 insertions(+), 13 deletions(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index 61f5b64d571e4..d8ad57baf215a 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -611,7 +611,7 @@ func TestAddExpressionIndexRollback(t *testing.T) { hook.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) d.SetHook(hook) - tk.MustGetErrMsg("alter table t1 add index expr_idx ((pow(c1, c2)));", "[ddl:8202]Cannot decode index value, because [types:1690]DOUBLE value is out of range in 'pow(160, 160)'") + tk.MustGetErrMsg("alter table t1 add index expr_idx ((pow(c1, c2)));", "[types:1690]DOUBLE value is out of range in 'pow(160, 160)'") require.NoError(t, checkErr) tk.MustQuery("select * from t1 order by c1;").Check(testkit.Rows("2 2 2", "4 4 4", "5 80 80", "10 3 3", "20 20 20", "160 160 160")) @@ -991,11 +991,7 @@ func TestAddIndexFailOnCaseWhenCanExit(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("create table t(a int, b int)") tk.MustExec("insert into t values(1, 1)") - if variable.EnableDistTask.Load() { - tk.MustGetErrMsg("alter table t add index idx(b)", "[ddl:-1]job.ErrCount:0, mock unknown type: ast.whenClause.") - } else { - tk.MustGetErrMsg("alter table t add index idx(b)", "[ddl:-1]DDL job rollback, error msg: job.ErrCount:1, mock unknown type: ast.whenClause.") - } + tk.MustGetErrMsg("alter table t add index idx(b)", "[ddl:-1]job.ErrCount:0, mock unknown type: ast.whenClause.") tk.MustExec("drop table if exists t") } diff --git a/ddl/index.go b/ddl/index.go index 41d3f6c8e7f7b..5e01e0d45a4d6 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -957,6 +957,23 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, return true, ver, nil } +<<<<<<< HEAD +======= +func errorIsRetryable(err error, job *model.Job) bool { + if job.ErrorCount+1 >= variable.GetDDLErrorCountLimit() { + return false + } + originErr := errors.Cause(err) + if tErr, ok := originErr.(*terror.Error); ok { + sqlErr := terror.ToSQLError(tErr) + _, ok := dbterror.ReorgRetryableErrCodes[sqlErr.Code] + return ok + } + // For the unknown errors, we should retry. + return true +} + +>>>>>>> f636e442e31 (ddl: list the retryable errors for ingest mode (#43782)) func convertToKeyExistsErr(originErr error, idxInfo *model.IndexInfo, tblInfo *model.TableInfo) error { tErr, ok := errors.Cause(originErr).(*terror.Error) if !ok { @@ -1020,7 +1037,7 @@ func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, if common.ErrFoundDuplicateKeys.Equal(err) { err = convertToKeyExistsErr(err, indexInfo, tbl.Meta()) } - if kv.ErrKeyExists.Equal(err) || dbterror.ErrCancelledDDLJob.Equal(err) || dbterror.ErrCantDecodeRecord.Equal(err) || + if !errorIsRetryable(err, job) || // TODO: Remove this check make it can be retry. Related test is TestModifyColumnReorgInfo. job.ReorgMeta.IsDistReorg { logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err)) @@ -1407,10 +1424,7 @@ func (w *baseIndexWorker) getNextKey(taskRange reorgBackfillTask, taskDone bool) func (w *baseIndexWorker) updateRowDecoder(handle kv.Handle, rawRecord []byte) error { sysZone := w.sessCtx.GetSessionVars().StmtCtx.TimeZone _, err := w.rowDecoder.DecodeAndEvalRowWithMap(w.sessCtx, handle, rawRecord, sysZone, w.rowMap) - if err != nil { - return errors.Trace(dbterror.ErrCantDecodeRecord.GenWithStackByArgs("index", err)) - } - return nil + return errors.Trace(err) } // fetchRowColVals fetch w.batchCnt count records that need to reorganize indices, and build the corresponding indexRecord slice. diff --git a/ddl/index_cop.go b/ddl/index_cop.go index 124858988c427..8098855cb79ef 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -441,11 +441,18 @@ func (c *copContext) fetchTableScanResult(ctx context.Context, result distsql.Se } err = table.FillVirtualColumnValue(c.virtualColFieldTps, c.virtualColOffsets, c.expColInfos, c.colInfos, c.sessCtx, chk) if err != nil { - return false, errors.Trace(err) + return false, completeErr(err, c.idxInfo) } return false, nil } +func completeErr(err error, idxInfo *model.IndexInfo) error { + if expression.ErrInvalidJSONForFuncIndex.Equal(err) { + err = expression.ErrInvalidJSONForFuncIndex.GenWithStackByArgs(idxInfo.Name.O) + } + return errors.Trace(err) +} + func getRestoreData(tblInfo *model.TableInfo, targetIdx, pkIdx *model.IndexInfo, handleDts []types.Datum) []types.Datum { if !collate.NewCollationEnabled() || !tblInfo.IsCommonHandle || tblInfo.CommonHandleVersion == 0 { return nil diff --git a/ddl/ingest/BUILD.bazel b/ddl/ingest/BUILD.bazel index 0a372b4bb116b..4fd3a5555d56e 100644 --- a/ddl/ingest/BUILD.bazel +++ b/ddl/ingest/BUILD.bazel @@ -60,7 +60,11 @@ go_test( embed = [":ingest"], flaky = True, race = "on", +<<<<<<< HEAD shard_count = 9, +======= + shard_count = 12, +>>>>>>> f636e442e31 (ddl: list the retryable errors for ingest mode (#43782)) deps = [ "//config", "//ddl/internal/session", diff --git a/ddl/ingest/integration_test.go b/ddl/ingest/integration_test.go index 4800ff62e0226..5b89462c282af 100644 --- a/ddl/ingest/integration_test.go +++ b/ddl/ingest/integration_test.go @@ -135,3 +135,14 @@ func TestIngestPartitionRowCount(t *testing.T) { require.Equal(t, "3", rowCount) tk.MustExec("admin check table t;") } + +func TestAddIndexIngestClientError(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + defer injectMockBackendMgr(t, store)() + + tk.MustExec("CREATE TABLE t1 (f1 json);") + tk.MustExec(`insert into t1(f1) values (cast("null" as json));`) + tk.MustGetErrCode("create index i1 on t1((cast(f1 as unsigned array)));", errno.ErrInvalidJSONValueForFuncIndex) +} diff --git a/ddl/mv_index_test.go b/ddl/mv_index_test.go index 18d162ef14717..e25ac3bdd10f1 100644 --- a/ddl/mv_index_test.go +++ b/ddl/mv_index_test.go @@ -67,7 +67,7 @@ func TestMultiValuedIndexOnlineDDL(t *testing.T) { tk.MustExec("insert into t values (3, '[3,4,5]');") tk.MustExec("insert into t values (4, '[-4,5,6]');") tk.MustGetErrCode("alter table t add unique index idx((cast(a as signed array)));", errno.ErrDupEntry) - tk.MustGetErrMsg("alter table t add index idx((cast(a as unsigned array)));", "[ddl:8202]Cannot decode index value, because [types:1690]constant -4 overflows bigint") + tk.MustGetErrMsg("alter table t add index idx((cast(a as unsigned array)));", "[types:1690]constant -4 overflows bigint") tk.MustExec("drop table if exists t;") tk.MustExec("create table t (pk int primary key, a json);") diff --git a/ddl/rollingback.go b/ddl/rollingback.go index 1578c7d4b3937..0e5f0d8fffc6d 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -78,9 +78,13 @@ func convertAddIdxJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, tblIn return ver, errors.Trace(err1) } job.State = model.JobStateRollingback +<<<<<<< HEAD if job.ReorgMeta != nil && job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { cleanupLocalIndexData(job.ID) } +======= + err = completeErr(err, indexInfo) +>>>>>>> f636e442e31 (ddl: list the retryable errors for ingest mode (#43782)) return ver, errors.Trace(err) } diff --git a/util/dbterror/ddl_terror.go b/util/dbterror/ddl_terror.go index 079b9f533cf54..6b181e54ecca6 100644 --- a/util/dbterror/ddl_terror.go +++ b/util/dbterror/ddl_terror.go @@ -443,3 +443,24 @@ var ( // ErrNotSupportedYet returns when tidb does not support this feature. ErrNotSupportedYet = ClassDDL.NewStd(mysql.ErrNotSupportedYet) ) + +// ReorgRetryableErrCodes is the error codes that are retryable for reorganization. +var ReorgRetryableErrCodes = map[uint16]struct{}{ + mysql.ErrPDServerTimeout: {}, + mysql.ErrTiKVServerTimeout: {}, + mysql.ErrTiKVServerBusy: {}, + mysql.ErrResolveLockTimeout: {}, + mysql.ErrRegionUnavailable: {}, + mysql.ErrGCTooEarly: {}, + mysql.ErrWriteConflict: {}, + mysql.ErrTiKVStoreLimit: {}, + mysql.ErrTiKVStaleCommand: {}, + mysql.ErrTiKVMaxTimestampNotSynced: {}, + mysql.ErrTiFlashServerTimeout: {}, + mysql.ErrTiFlashServerBusy: {}, + mysql.ErrInfoSchemaExpired: {}, + mysql.ErrInfoSchemaChanged: {}, + mysql.ErrWriteConflictInTiDB: {}, + mysql.ErrTxnRetryable: {}, + mysql.ErrNotOwner: {}, +} From 13b1cc226f97850c30e47c220ce814f2c610bb4c Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 25 May 2023 18:22:05 +0800 Subject: [PATCH 2/2] resolve conflict --- ddl/rollingback.go | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/ddl/rollingback.go b/ddl/rollingback.go index 093ee8317db2f..100ea0a1d88f8 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -16,12 +16,9 @@ package ddl import ( "fmt" - "os" - "path/filepath" "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/ddl/ingest" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" @@ -78,25 +75,10 @@ func convertAddIdxJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, tblIn return ver, errors.Trace(err1) } job.State = model.JobStateRollingback -<<<<<<< HEAD - if job.ReorgMeta != nil && job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { - cleanupLocalIndexData(job.ID) - } -======= err = completeErr(err, indexInfo) ->>>>>>> f636e442e31 (ddl: list the retryable errors for ingest mode (#43782)) return ver, errors.Trace(err) } -func cleanupLocalIndexData(jobID int64) { - sortPath := ingest.ConfigSortPath() - f := filepath.Join(sortPath, ingest.EncodeBackendTag(jobID)) - err := os.RemoveAll(f) - if err != nil { - logutil.BgLogger().Error("[ddl-ingest] can not remove local index data", zap.Error(err)) - } -} - // convertNotReorgAddIdxJob2RollbackJob converts the add index job that are not started workers to rollingbackJob, // to rollback add index operations. job.SnapshotVer == 0 indicates the workers are not started. func convertNotReorgAddIdxJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, occuredErr error) (ver int64, err error) {