From c9e2e8fa60d5a39f32fa5f55971c8872cf00e221 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Fri, 16 Apr 2021 17:57:52 +0800 Subject: [PATCH] ddl: fix the covert job to rollingback job (#23903) (#24080) --- ddl/rollingback.go | 67 +++++++++++++++----------- ddl/rollingback_test.go | 101 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+), 27 deletions(-) create mode 100644 ddl/rollingback_test.go diff --git a/ddl/rollingback.go b/ddl/rollingback.go index 7b44d437edfe6..845ef175f9204 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -17,11 +17,13 @@ import ( "fmt" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/meta" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" ) @@ -40,8 +42,11 @@ func updateColsNull2NotNull(tblInfo *model.TableInfo, indexInfo *model.IndexInfo } func convertAddIdxJob2RollbackJob(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, indexInfo *model.IndexInfo, err error) (int64, error) { - job.State = model.JobStateRollingback - + failpoint.Inject("mockConvertAddIdxJob2RollbackJobError", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(0, errors.New("mock convert add index job to rollback job error")) + } + }) if indexInfo.Primary { nullCols, err := getNullColInfos(tblInfo, indexInfo) if err != nil { @@ -68,7 +73,7 @@ func convertAddIdxJob2RollbackJob(t *meta.Meta, job *model.Job, tblInfo *model.T if err1 != nil { return ver, errors.Trace(err1) } - + job.State = model.JobStateRollingback return ver, errors.Trace(err) } @@ -139,7 +144,6 @@ func rollingbackModifyColumn(t *meta.Meta, job *model.Job) (ver int64, err error } func rollingbackAddColumn(t *meta.Meta, job *model.Job) (ver int64, err error) { - job.State = model.JobStateRollingback tblInfo, columnInfo, col, _, _, err := checkAddColumn(t, job) if err != nil { return ver, errors.Trace(err) @@ -158,11 +162,12 @@ func rollingbackAddColumn(t *meta.Meta, job *model.Job) (ver int64, err error) { if err != nil { return ver, errors.Trace(err) } + + job.State = model.JobStateRollingback return ver, errCancelledDDLJob } func rollingbackAddColumns(t *meta.Meta, job *model.Job) (ver int64, err error) { - job.State = model.JobStateRollingback tblInfo, columnInfos, _, _, _, _, err := checkAddColumns(t, job) if err != nil { return ver, errors.Trace(err) @@ -186,11 +191,12 @@ func rollingbackAddColumns(t *meta.Meta, job *model.Job) (ver int64, err error) if err != nil { return ver, errors.Trace(err) } + job.State = model.JobStateRollingback return ver, errCancelledDDLJob } func rollingbackDropColumn(t *meta.Meta, job *model.Job) (ver int64, err error) { - tblInfo, colInfo, idxInfos, err := checkDropColumn(t, job) + _, colInfo, idxInfos, err := checkDropColumn(t, job) if err != nil { return ver, errors.Trace(err) } @@ -213,7 +219,6 @@ func rollingbackDropColumn(t *meta.Meta, job *model.Job) (ver int64, err error) // StatePublic means when the job is not running yet. if colInfo.State == model.StatePublic { job.State = model.JobStateCancelled - job.FinishTableJob(model.JobStateRollbackDone, model.StatePublic, ver, tblInfo) return ver, errCancelledDDLJob } // In the state of drop column `write only -> delete only -> reorganization`, @@ -223,7 +228,7 @@ func rollingbackDropColumn(t *meta.Meta, job *model.Job) (ver int64, err error) } func rollingbackDropColumns(t *meta.Meta, job *model.Job) (ver int64, err error) { - tblInfo, colInfos, _, idxInfos, err := checkDropColumns(t, job) + _, colInfos, _, idxInfos, err := checkDropColumns(t, job) if err != nil { return ver, errors.Trace(err) } @@ -246,7 +251,6 @@ func rollingbackDropColumns(t *meta.Meta, job *model.Job) (ver int64, err error) // StatePublic means when the job is not running yet. if colInfos[0].State == model.StatePublic { job.State = model.JobStateCancelled - job.FinishTableJob(model.JobStateRollbackDone, model.StatePublic, ver, tblInfo) return ver, errCancelledDDLJob } // In the state of drop columns `write only -> delete only -> reorganization`, @@ -256,12 +260,11 @@ func rollingbackDropColumns(t *meta.Meta, job *model.Job) (ver int64, err error) } func rollingbackDropIndex(t *meta.Meta, job *model.Job) (ver int64, err error) { - tblInfo, indexInfo, err := checkDropIndex(t, job) + _, indexInfo, err := checkDropIndex(t, job) if err != nil { return ver, errors.Trace(err) } - originalState := indexInfo.State switch indexInfo.State { case model.StateWriteOnly, model.StateDeleteOnly, model.StateDeleteReorganization, model.StateNone: // We can not rollback now, so just continue to drop index. @@ -269,20 +272,11 @@ func rollingbackDropIndex(t *meta.Meta, job *model.Job) (ver int64, err error) { job.State = model.JobStateRunning return ver, nil case model.StatePublic: - job.State = model.JobStateRollbackDone - indexInfo.State = model.StatePublic + job.State = model.JobStateCancelled + return ver, errCancelledDDLJob default: return ver, ErrInvalidDDLState.GenWithStackByArgs("index", indexInfo.State) } - - job.SchemaState = indexInfo.State - job.Args = []interface{}{indexInfo.Name} - ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != indexInfo.State) - if err != nil { - return ver, errors.Trace(err) - } - job.FinishTableJob(model.JobStateRollbackDone, model.StatePublic, ver, tblInfo) - return ver, errCancelledDDLJob } func rollingbackAddIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, isPK bool) (ver int64, err error) { @@ -300,7 +294,6 @@ func rollingbackAddIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, isP } func convertAddTablePartitionJob2RollbackJob(t *meta.Meta, job *model.Job, otherwiseErr error, tblInfo *model.TableInfo) (ver int64, err error) { - job.State = model.JobStateRollingback addingDefinitions := tblInfo.Partition.AddingDefinitions partNames := make([]string, 0, len(addingDefinitions)) for _, pd := range addingDefinitions { @@ -311,6 +304,7 @@ func convertAddTablePartitionJob2RollbackJob(t *meta.Meta, job *model.Job, other if err != nil { return ver, errors.Trace(err) } + job.State = model.JobStateRollingback return ver, errors.Trace(otherwiseErr) } @@ -447,12 +441,31 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) if job.Error == nil { job.Error = toTError(err) } - if !job.Error.Equal(errCancelledDDLJob) { - job.Error = terror.GetErrClass(job.Error).Synthesize(terror.ErrCode(job.Error.Code()), - fmt.Sprintf("DDL job rollback, error msg: %s", terror.ToSQLError(job.Error).Message)) - } job.ErrorCount++ + if errCancelledDDLJob.Equal(err) { + // The job is normally cancelled. + if !job.Error.Equal(errCancelledDDLJob) { + job.Error = terror.GetErrClass(job.Error).Synthesize(terror.ErrCode(job.Error.Code()), + fmt.Sprintf("DDL job rollback, error msg: %s", terror.ToSQLError(job.Error).Message)) + } + } else { + // A job canceling meet other error. + // + // Once `convertJob2RollbackJob` meets an error, the job state can't be set as `JobStateRollingback` since + // job state and args may not be correctly overwritten. The job will be fetched to run with the cancelling + // state again. So we should check the error count here. + if err1 := loadDDLVars(w); err1 != nil { + logutil.Logger(w.logCtx).Error("[ddl] load DDL global variable failed", zap.Error(err1)) + } + errorCount := variable.GetDDLErrorCountLimit() + if job.ErrorCount > errorCount { + logutil.Logger(w.logCtx).Warn("[ddl] rollback DDL job error count exceed the limit, cancelled it now", zap.Int64("jobID", job.ID), zap.Int64("errorCountLimit", errorCount)) + job.Error = toTError(errors.Errorf("rollback DDL job error count exceed the limit %d, cancelled it now", errorCount)) + job.State = model.JobStateCancelled + } + } + if job.State != model.JobStateRollingback && job.State != model.JobStateCancelled { logutil.Logger(w.logCtx).Error("[ddl] run DDL job failed", zap.String("job", job.String()), zap.Error(err)) } else { diff --git a/ddl/rollingback_test.go b/ddl/rollingback_test.go new file mode 100644 index 0000000000000..25c2d75dc6a30 --- /dev/null +++ b/ddl/rollingback_test.go @@ -0,0 +1,101 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl_test + +import ( + "context" + "strconv" + + . "github.com/pingcap/check" + errors2 "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" + "github.com/pingcap/tidb/util/sqlexec" + "github.com/pingcap/tidb/util/testkit" +) + +var _ = SerialSuites(&testRollingBackSuite{&testDBSuite{}}) + +type testRollingBackSuite struct{ *testDBSuite } + +// TestCancelJobMeetError is used to test canceling ddl job failure when convert ddl job to a rollingback job. +func (s *testRollingBackSuite) TestCancelAddIndexJobError(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk1 := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk1.MustExec("use test") + + tk.MustExec("create table t_cancel_add_index (a int)") + tk.MustExec("insert into t_cancel_add_index values(1),(2),(3)") + tk.MustExec("set @@global.tidb_ddl_error_count_limit=3") + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/ddl/mockConvertAddIdxJob2RollbackJobError", `return(true)`), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/ddl/mockConvertAddIdxJob2RollbackJobError"), IsNil) + }() + + tbl := testGetTableByName(c, tk.Se, "test", "t_cancel_add_index") + c.Assert(tbl, NotNil) + + d := s.dom.DDL() + hook := &ddl.TestDDLCallback{Do: s.dom} + var ( + checkErr error + jobID int64 + res sqlexec.RecordSet + ) + hook.OnJobUpdatedExported = func(job *model.Job) { + if job.TableID != tbl.Meta().ID { + return + } + if job.Type != model.ActionAddIndex { + return + } + if job.SchemaState == model.StateDeleteOnly { + jobID = job.ID + res, checkErr = tk1.Exec("admin cancel ddl jobs " + strconv.Itoa(int(job.ID))) + // drain the result set here, otherwise the cancel action won't take effect immediately. + chk := res.NewChunk() + if err := res.Next(context.Background(), chk); err != nil { + checkErr = err + return + } + if err := res.Close(); err != nil { + checkErr = err + } + } + } + d.(ddl.DDLForTest).SetHook(hook) + + // This will hang on stateDeleteOnly, and the job will be canceled. + _, err := tk.Exec("alter table t_cancel_add_index add index idx(a)") + c.Assert(err, NotNil) + c.Assert(checkErr, IsNil) + c.Assert(err.Error(), Equals, "[ddl:-1]rollback DDL job error count exceed the limit 3, cancelled it now") + + // Verification of the history job state. + var job *model.Job + err = kv.RunInNewTxn(context.Background(), s.store, false, func(ctx context.Context, txn kv.Transaction) error { + t := meta.NewMeta(txn) + var err1 error + job, err1 = t.GetHistoryDDLJob(jobID) + return errors2.Trace(err1) + }) + c.Assert(err, IsNil) + c.Assert(job.ErrorCount, Equals, int64(4)) + c.Assert(job.Error.Error(), Equals, "[ddl:-1]rollback DDL job error count exceed the limit 3, cancelled it now") +}