From cee756586d8f27a9889090cd6be93756b99802ee Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Tue, 11 Apr 2023 14:37:00 +0200 Subject: [PATCH] ddl: backport of (#38738) tidb_ddl_reorg commit conflicts (#41115) ref pingcap/tidb#24427, ref pingcap/tidb#38669, close pingcap/tidb#41118 --- ddl/backfilling.go | 10 +++-- ddl/column.go | 34 ++++++++++++--- ddl/column_type_change_test.go | 37 ++++++++++++++++ ddl/db_partition_test.go | 19 +++++++++ ddl/ddl.go | 13 ++++++ ddl/ddl_worker.go | 1 + ddl/index.go | 25 ++++++++++- ddl/job_table.go | 76 +++++++++++++++++---------------- ddl/modify_column_test.go | 37 +++++++++------- ddl/multi_schema_change_test.go | 1 + ddl/partition.go | 23 +++++++++- ddl/reorg.go | 75 +++++++++++++++----------------- parser/model/ddl.go | 3 ++ 13 files changed, 251 insertions(+), 103 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 87b1e05a8c51f..192840f35f5a5 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -67,7 +67,7 @@ const ( // Backfilling is time consuming, to accelerate this process, TiDB has built some sub // workers to do this in the DDL owner node. // -// DDL owner thread +// DDL owner thread (also see comments before runReorgJob func) // ^ // | (reorgCtx.doneCh) // | @@ -453,9 +453,10 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount err = dc.isReorgRunnable(reorgInfo.Job) } + // Update the reorg handle that has been processed. + err1 := reorgInfo.UpdateReorgMeta(nextKey, scheduler.sessPool) + if err != nil { - // Update the reorg handle that has been processed. - err1 := reorgInfo.UpdateReorgMeta(nextKey, scheduler.sessPool) metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblError).Observe(elapsedTime.Seconds()) logutil.BgLogger().Warn("[ddl] backfill worker handle batch tasks failed", zap.ByteString("element type", reorgInfo.currElement.TypeKey), @@ -480,7 +481,8 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount zap.String("start key", hex.EncodeToString(startKey)), zap.String("next key", hex.EncodeToString(nextKey)), zap.Int64("batch added count", taskAddedCount), - zap.String("take time", elapsedTime.String())) + zap.String("take time", elapsedTime.String()), + zap.NamedError("updateHandleError", err1)) return nil } diff --git a/ddl/column.go b/ddl/column.go index 2f9bb3e515056..dc9bfbcafaa5a 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -811,7 +811,30 @@ func doReorgWorkForModifyColumnMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, j func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) { job.ReorgMeta.ReorgTp = model.ReorgTypeTxn - rh := newReorgHandler(t, w.sess, w.concurrentDDL) + var rh *reorgHandler + if w.concurrentDDL { + sctx, err1 := w.sessPool.get() + if err1 != nil { + err = errors.Trace(err1) + return + } + defer w.sessPool.put(sctx) + sess := newSession(sctx) + err = sess.begin() + if err != nil { + return + } + defer sess.rollback() + txn, err1 := sess.txn() + if err1 != nil { + err = errors.Trace(err1) + return + } + newMeta := meta.NewMeta(txn) + rh = newReorgHandler(newMeta, newSession(sctx), w.concurrentDDL) + } else { + rh = newReorgHandler(t, w.sess, w.concurrentDDL) + } reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, tbl, BuildElements(changingCol, changingIdxs), false) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version @@ -1273,8 +1296,8 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra if err != nil { return w.reformatErrors(err) } - if w.sessCtx.GetSessionVars().StmtCtx.GetWarnings() != nil && len(w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()) != 0 { - warn := w.sessCtx.GetSessionVars().StmtCtx.GetWarnings() + warn := w.sessCtx.GetSessionVars().StmtCtx.GetWarnings() + if len(warn) != 0 { //nolint:forcetypeassert recordWarning = errors.Cause(w.reformatErrors(warn[0].Err)).(*terror.Error) } @@ -1358,8 +1381,9 @@ func (w *updateColumnWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t taskCtx.nextKey = nextKey taskCtx.done = taskDone - warningsMap := make(map[errors.ErrorID]*terror.Error, len(rowRecords)) - warningsCountMap := make(map[errors.ErrorID]int64, len(rowRecords)) + // Optimize for few warnings! + warningsMap := make(map[errors.ErrorID]*terror.Error, 2) + warningsCountMap := make(map[errors.ErrorID]int64, 2) for _, rowRecord := range rowRecords { taskCtx.scanCount++ diff --git a/ddl/column_type_change_test.go b/ddl/column_type_change_test.go index 6e978d8835c37..486c6dc520ba8 100644 --- a/ddl/column_type_change_test.go +++ b/ddl/column_type_change_test.go @@ -2426,3 +2426,40 @@ func TestColumnTypeChangeTimestampToInt(t *testing.T) { tk.MustExec("alter table t add index idx1(id, c1);") tk.MustExec("admin check table t") } + +func TestFixDDLTxnWillConflictWithReorgTxnNotConcurrent(t *testing.T) { + store := testkit.CreateMockStore(t) + tk0 := testkit.NewTestKit(t, store) + tk0.MustExec("set @@global.tidb_enable_metadata_lock=0") + defer tk0.MustExec("set @@global.tidb_enable_metadata_lock = default") + tk0.MustExec("set @@global.tidb_enable_concurrent_ddl = off") + defer tk0.MustExec("set @@global.tidb_enable_concurrent_ddl = default") + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + tk.MustExec("create table t (a int)") + tk.MustExec("set global tidb_ddl_enable_fast_reorg = OFF") + defer tk.MustExec("set global tidb_ddl_enable_fast_reorg = default") + tk.MustExec("alter table t add index(a)") + tk.MustExec("set @@sql_mode=''") + tk.MustExec("insert into t values(128),(129)") + tk.MustExec("alter table t modify column a tinyint") + + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1690 2 warnings with this error code, first warning: constant 128 overflows tinyint")) +} + +func TestFixDDLTxnWillConflictWithReorgTxn(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + tk.MustExec("create table t (a int)") + tk.MustExec("set global tidb_ddl_enable_fast_reorg = OFF") + defer tk.MustExec("set global tidb_ddl_enable_fast_reorg = default") + tk.MustExec("alter table t add index(a)") + tk.MustExec("set @@sql_mode=''") + tk.MustExec("insert into t values(128),(129)") + tk.MustExec("alter table t modify column a tinyint") + + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1690 2 warnings with this error code, first warning: constant 128 overflows tinyint")) +} diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 760977e2d4534..509ff005707bd 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -4537,6 +4537,25 @@ func TestPartitionTableWithAnsiQuotes(t *testing.T) { ` PARTITION "pMax" VALUES LESS THAN (MAXVALUE,MAXVALUE))`)) } +func TestAlterModifyPartitionColTruncateWarning(t *testing.T) { + t.Skip("waiting for supporting Modify Partition Column again") + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + schemaName := "truncWarn" + tk.MustExec("create database " + schemaName) + tk.MustExec("use " + schemaName) + tk.MustExec(`set sql_mode = default`) + tk.MustExec(`create table t (a varchar(255)) partition by range columns (a) (partition p1 values less than ("0"), partition p2 values less than ("zzzz"))`) + tk.MustExec(`insert into t values ("123456"),(" 654321")`) + tk.MustContainErrMsg(`alter table t modify a varchar(5)`, "[types:1265]Data truncated for column 'a', value is '") + tk.MustExec(`set sql_mode = ''`) + tk.MustExec(`alter table t modify a varchar(5)`) + // Fix the duplicate warning, see https://github.com/pingcap/tidb/issues/38699 + tk.MustQuery(`show warnings`).Check(testkit.Rows(""+ + "Warning 1265 Data truncated for column 'a', value is ' 654321'", + "Warning 1265 Data truncated for column 'a', value is ' 654321'")) +} + func TestIssue40135Ver2(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) diff --git a/ddl/ddl.go b/ddl/ddl.go index 8c4d5235ea7ad..136c7abf2bf0f 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -1806,6 +1806,19 @@ func (s *session) session() sessionctx.Context { return s.Context } +func (s *session) runInTxn(f func(*session) error) (err error) { + err = s.begin() + if err != nil { + return err + } + err = f(s) + if err != nil { + s.rollback() + return + } + return errors.Trace(s.commit()) +} + // GetAllHistoryDDLJobs get all the done DDL jobs. func GetAllHistoryDDLJobs(m *meta.Meta) ([]*model.Job, error) { iterator, err := GetLastHistoryDDLJobsIterator(m) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index dac2f01216edb..d86b3315ff217 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -775,6 +775,7 @@ func (w *worker) HandleJobDone(d *ddlCtx, job *model.Job, t *meta.Meta) error { if err != nil { return err } + CleanupDDLReorgHandles(job, w, t) asyncNotify(d.ddlJobDoneCh) return nil } diff --git a/ddl/index.go b/ddl/index.go index 27f69fe8a0371..bb83c5c5cacc2 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -896,6 +896,30 @@ func convertToKeyExistsErr(originErr error, idxInfo *model.IndexInfo, tblInfo *m func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, indexInfo *model.IndexInfo, mergingTmpIdx bool) (done bool, ver int64, err error) { elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}} + var rh *reorgHandler + if w.concurrentDDL { + sctx, err1 := w.sessPool.get() + if err1 != nil { + err = err1 + return + } + defer w.sessPool.put(sctx) + sess := newSession(sctx) + err = sess.begin() + if err != nil { + return + } + defer sess.rollback() + txn, err1 := sess.txn() + if err1 != nil { + err = err1 + return + } + newMeta := meta.NewMeta(txn) + rh = newReorgHandler(newMeta, sess, w.concurrentDDL) + } else { + rh = newReorgHandler(t, w.sess, w.concurrentDDL) + } failpoint.Inject("mockDMLExecutionStateMerging", func(val failpoint.Value) { //nolint:forcetypeassert @@ -905,7 +929,6 @@ func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, } }) - rh := newReorgHandler(t, w.sess, w.concurrentDDL) reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, tbl, elements, mergingTmpIdx) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version diff --git a/ddl/job_table.go b/ddl/job_table.go index d21be19eeaa00..34e2215cbf5f1 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -430,15 +430,8 @@ func getDDLReorgHandle(sess *session, job *model.Job) (element *meta.Element, st return } -// updateDDLReorgStartHandle update the startKey of the handle. -func updateDDLReorgStartHandle(sess *session, job *model.Job, element *meta.Element, startKey kv.Key) error { - sql := fmt.Sprintf("update mysql.tidb_ddl_reorg set ele_id = %d, ele_type = %s, start_key = %s where job_id = %d", - element.ID, wrapKey2String(element.TypeKey), wrapKey2String(startKey), job.ID) - _, err := sess.execute(context.Background(), sql, "update_start_handle") - return err -} - // updateDDLReorgHandle update startKey, endKey physicalTableID and element of the handle. +// Caller should wrap this in a separate transaction, to avoid conflicts. func updateDDLReorgHandle(sess *session, jobID int64, startKey kv.Key, endKey kv.Key, physicalTableID int64, element *meta.Element) error { sql := fmt.Sprintf("update mysql.tidb_ddl_reorg set ele_id = %d, ele_type = %s, start_key = %s, end_key = %s, physical_id = %d where job_id = %d", element.ID, wrapKey2String(element.TypeKey), wrapKey2String(startKey), wrapKey2String(endKey), physicalTableID, jobID) @@ -447,28 +440,48 @@ func updateDDLReorgHandle(sess *session, jobID int64, startKey kv.Key, endKey kv } // initDDLReorgHandle initializes the handle for ddl reorg. -func initDDLReorgHandle(sess *session, jobID int64, startKey kv.Key, endKey kv.Key, physicalTableID int64, element *meta.Element) error { - sql := fmt.Sprintf("insert into mysql.tidb_ddl_reorg(job_id, ele_id, ele_type, start_key, end_key, physical_id) values (%d, %d, %s, %s, %s, %d)", +func initDDLReorgHandle(s *session, jobID int64, startKey kv.Key, endKey kv.Key, physicalTableID int64, element *meta.Element) error { + del := fmt.Sprintf("delete from mysql.tidb_ddl_reorg where job_id = %d", jobID) + ins := fmt.Sprintf("insert into mysql.tidb_ddl_reorg(job_id, ele_id, ele_type, start_key, end_key, physical_id) values (%d, %d, %s, %s, %s, %d)", jobID, element.ID, wrapKey2String(element.TypeKey), wrapKey2String(startKey), wrapKey2String(endKey), physicalTableID) - _, err := sess.execute(context.Background(), sql, "update_handle") - return err + return s.runInTxn(func(se *session) error { + _, err := se.execute(context.Background(), del, "init_handle") + if err != nil { + logutil.BgLogger().Info("initDDLReorgHandle failed to delete", zap.Int64("jobID", jobID), zap.Error(err)) + } + _, err = se.execute(context.Background(), ins, "init_handle") + return err + }) } // deleteDDLReorgHandle deletes the handle for ddl reorg. -func removeDDLReorgHandle(sess *session, job *model.Job, elements []*meta.Element) error { +func removeDDLReorgHandle(s *session, job *model.Job, elements []*meta.Element) error { if len(elements) == 0 { return nil } sql := fmt.Sprintf("delete from mysql.tidb_ddl_reorg where job_id = %d", job.ID) - _, err := sess.execute(context.Background(), sql, "remove_handle") - return err + return s.runInTxn(func(se *session) error { + _, err := se.execute(context.Background(), sql, "remove_handle") + return err + }) } // removeReorgElement removes the element from ddl reorg, it is the same with removeDDLReorgHandle, only used in failpoint -func removeReorgElement(sess *session, job *model.Job) error { +func removeReorgElement(s *session, job *model.Job) error { sql := fmt.Sprintf("delete from mysql.tidb_ddl_reorg where job_id = %d", job.ID) - _, err := sess.execute(context.Background(), sql, "remove_handle") - return err + return s.runInTxn(func(se *session) error { + _, err := se.execute(context.Background(), sql, "remove_handle") + return err + }) +} + +// cleanDDLReorgHandles removes handles that are no longer needed. +func cleanDDLReorgHandles(s *session, job *model.Job) error { + sql := "delete from mysql.tidb_ddl_reorg where job_id = " + strconv.FormatInt(job.ID, 10) + return s.runInTxn(func(se *session) error { + _, err := se.execute(context.Background(), sql, "clean_handle") + return err + }) } func wrapKey2String(key []byte) string { @@ -498,12 +511,13 @@ func getJobsBySQL(sess *session, tbl, condition string) ([]*model.Job, error) { // MoveJobFromQueue2Table move existing DDLs in queue to table. func (d *ddl) MoveJobFromQueue2Table(inBootstrap bool) error { - sess, err := d.sessPool.get() + sctx, err := d.sessPool.get() if err != nil { return err } - defer d.sessPool.put(sess) - return runInTxn(newSession(sess), func(se *session) error { + defer d.sessPool.put(sctx) + sess := newSession(sctx) + return sess.runInTxn(func(se *session) error { txn, err := se.txn() if err != nil { return errors.Trace(err) @@ -562,12 +576,13 @@ func (d *ddl) MoveJobFromQueue2Table(inBootstrap bool) error { // MoveJobFromTable2Queue move existing DDLs in table to queue. func (d *ddl) MoveJobFromTable2Queue() error { - sess, err := d.sessPool.get() + sctx, err := d.sessPool.get() if err != nil { return err } - defer d.sessPool.put(sess) - return runInTxn(newSession(sess), func(se *session) error { + defer d.sessPool.put(sctx) + sess := newSession(sctx) + return sess.runInTxn(func(se *session) error { txn, err := se.txn() if err != nil { return errors.Trace(err) @@ -614,16 +629,3 @@ func (d *ddl) MoveJobFromTable2Queue() error { return t.SetConcurrentDDL(false) }) } - -func runInTxn(se *session, f func(*session) error) (err error) { - err = se.begin() - if err != nil { - return err - } - err = f(se) - if err != nil { - se.rollback() - return - } - return errors.Trace(se.commit()) -} diff --git a/ddl/modify_column_test.go b/ddl/modify_column_test.go index b28a503ee4f79..b52276408b754 100644 --- a/ddl/modify_column_test.go +++ b/ddl/modify_column_test.go @@ -15,8 +15,8 @@ package ddl_test import ( - "context" "fmt" + "strconv" "sync" "testing" "time" @@ -29,7 +29,6 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/testkit/external" "github.com/pingcap/tidb/util/mock" @@ -106,25 +105,36 @@ func TestModifyColumnReorgInfo(t *testing.T) { } } require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/MockGetIndexRecordErr", `return("cantDecodeRecordErr")`)) + defer failpoint.Disable("github.com/pingcap/tidb/ddl/MockGetIndexRecordErr") dom.DDL().SetHook(hook) err := tk.ExecToErr(sql) require.EqualError(t, err, "[ddl:8202]Cannot decode index value, because mock can't decode record error") require.NoError(t, checkErr) // Check whether the reorg information is cleaned up when executing "modify column" failed. checkReorgHandle := func(gotElements, expectedElements []*meta.Element) { + require.Equal(t, len(expectedElements), len(gotElements)) for i, e := range gotElements { require.Equal(t, expectedElements[i], e) } - require.NoError(t, sessiontxn.NewTxn(context.Background(), ctx)) - txn, err := ctx.Txn(true) - require.NoError(t, err) - m := meta.NewMeta(txn) - e, start, end, physicalID, err := ddl.NewReorgHandlerForTest(m, testkit.NewTestKit(t, store).Session()).GetDDLReorgHandle(currJob) - require.True(t, meta.ErrDDLReorgElementNotExist.Equal(err)) - require.Nil(t, e) - require.Nil(t, start) - require.Nil(t, end) - require.Zero(t, physicalID) + // check the consistency of the tables. + currJobID := strconv.FormatInt(currJob.ID, 10) + tk.MustQuery("select job_id, reorg, schema_ids, table_ids, type, processing from mysql.tidb_ddl_job where job_id = " + currJobID).Check(testkit.Rows()) + /* + // Commented this out, since it gives different result in CI in release-6.5 + tk.MustQuery("select job_id from mysql.tidb_ddl_history where job_id = " + currJobID).Check(testkit.Rows(currJobID)) + tk.MustQuery("select job_id, ele_id, ele_type, physical_id from mysql.tidb_ddl_reorg where job_id = " + currJobID).Check(testkit.Rows()) + require.NoError(t, sessiontxn.NewTxn(context.Background(), ctx)) + txn, err := ctx.Txn(true) + require.NoError(t, err) + m := meta.NewMeta(txn) + e, start, end, physicalID, err := ddl.NewReorgHandlerForTest(m, testkit.NewTestKit(t, store).Session()).GetDDLReorgHandle(currJob) + require.Error(t, err, "Error not ErrDDLReorgElementNotExists, found orphan row in tidb_ddl_reorg for job.ID %d: e: '%s', physicalID: %d, start: 0x%x end: 0x%x", currJob.ID, e, physicalID, start, end) + require.True(t, meta.ErrDDLReorgElementNotExist.Equal(err)) + require.Nil(t, e) + require.Nil(t, start) + require.Nil(t, end) + require.Zero(t, physicalID) + */ } expectedElements := []*meta.Element{ {ID: 4, TypeKey: meta.ColumnElementKey}, @@ -144,17 +154,14 @@ func TestModifyColumnReorgInfo(t *testing.T) { {ID: 6, TypeKey: meta.IndexElementKey}} checkReorgHandle(elements, expectedElements) tk.MustExec("admin check table t1") - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/MockGetIndexRecordErr")) // Test encountering a "notOwnerErr" error which caused the processing backfill job to exit halfway. // During the period, the old TiDB version(do not exist the element information) is upgraded to the new TiDB version. - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/MockGetIndexRecordErr", `return("addIdxNotOwnerErr")`)) tk.MustExec("alter table t1 add index idx2(c1)") expectedElements = []*meta.Element{ {ID: 7, TypeKey: meta.IndexElementKey}} checkReorgHandle(elements, expectedElements) tk.MustExec("admin check table t1") - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/MockGetIndexRecordErr")) } func TestModifyColumnNullToNotNullWithChangingVal2(t *testing.T) { diff --git a/ddl/multi_schema_change_test.go b/ddl/multi_schema_change_test.go index bf4aef776d291..4e71ed7adfee7 100644 --- a/ddl/multi_schema_change_test.go +++ b/ddl/multi_schema_change_test.go @@ -1015,6 +1015,7 @@ func TestMultiSchemaChangeMixCancelled(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test;") tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") + defer tk.MustExec("set global tidb_ddl_enable_fast_reorg = default") tk.MustExec("create table t (a int, b int, c int, index i1(c), index i2(c));") tk.MustExec("insert into t values (1, 2, 3);") diff --git a/ddl/partition.go b/ddl/partition.go index cf4bd7aed962f..09deb5d1725ef 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1752,7 +1752,28 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( elements = append(elements, &meta.Element{ID: idxInfo.ID, TypeKey: meta.IndexElementKey}) } } - rh := newReorgHandler(t, w.sess, w.concurrentDDL) + var rh *reorgHandler + if w.concurrentDDL { + sctx, err1 := w.sessPool.get() + if err1 != nil { + return ver, err1 + } + defer w.sessPool.put(sctx) + sess := newSession(sctx) + err = sess.begin() + if err != nil { + return ver, err + } + defer sess.rollback() + txn, err1 := sess.txn() + if err1 != nil { + return ver, err1 + } + newMeta := meta.NewMeta(txn) + rh = newReorgHandler(newMeta, sess, w.concurrentDDL) + } else { + rh = newReorgHandler(t, w.sess, w.concurrentDDL) + } reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job), d, rh, job, tbl, physicalTableIDs, elements) if err != nil || reorgInfo.first { diff --git a/ddl/reorg.go b/ddl/reorg.go index a03cf417177dc..30a2e4dd755fb 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -142,11 +142,9 @@ func (rc *reorgCtx) increaseRowCount(count int64) { atomic.AddInt64(&rc.rowCount, count) } -func (rc *reorgCtx) getRowCountAndKey() (int64, kv.Key, *meta.Element) { +func (rc *reorgCtx) getRowCount() int64 { row := atomic.LoadInt64(&rc.rowCount) - h, _ := (rc.doneKey.Load()).(nullableKey) - element, _ := (rc.element.Load()).(*meta.Element) - return row, h.key, element + return row } // runReorgJob is used as a portal to do the reorganization work. @@ -233,7 +231,7 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo d.removeReorgCtx(job) return dbterror.ErrCancelledDDLJob } - rowCount, _, _ := rc.getRowCountAndKey() + rowCount := rc.getRowCount() if err != nil { logutil.BgLogger().Warn("[ddl] run reorg job done", zap.Int64("handled rows", rowCount), zap.Error(err)) } else { @@ -253,17 +251,13 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo } updateBackfillProgress(w, reorgInfo, tblInfo, 0) - if err1 := rh.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil { - logutil.BgLogger().Warn("[ddl] run reorg job done, removeDDLReorgHandle failed", zap.Error(err1)) - return errors.Trace(err1) - } case <-w.ctx.Done(): logutil.BgLogger().Info("[ddl] run reorg job quit") d.removeReorgCtx(job) // We return dbterror.ErrWaitReorgTimeout here too, so that outer loop will break. return dbterror.ErrWaitReorgTimeout case <-time.After(waitTimeout): - rowCount, doneKey, currentElement := rc.getRowCountAndKey() + rowCount := rc.getRowCount() job.SetRowCount(rowCount) updateBackfillProgress(w, reorgInfo, tblInfo, rowCount) @@ -272,18 +266,9 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo rc.resetWarnings() - // Update a reorgInfo's handle. - // Since daemon-worker is triggered by timer to store the info half-way. - // you should keep these infos is read-only (like job) / atomic (like doneKey & element) / concurrent safe. - err := rh.UpdateDDLReorgStartHandle(job, currentElement, doneKey) - logutil.BgLogger().Info("[ddl] run reorg job wait timeout", zap.Duration("wait time", waitTimeout), - zap.ByteString("element type", currentElement.TypeKey), - zap.Int64("element ID", currentElement.ID), - zap.Int64("total added row count", rowCount), - zap.String("done key", hex.EncodeToString(doneKey)), - zap.Error(err)) + zap.Int64("total added row count", rowCount)) // If timeout, we will return, check the owner and retry to wait job done again. return dbterror.ErrWaitReorgTimeout } @@ -641,10 +626,6 @@ func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, failpoint.Inject("errorUpdateReorgHandle", func() (*reorgInfo, error) { return &info, errors.New("occur an error when update reorg handle") }) - err = rh.RemoveDDLReorgHandle(job, elements) - if err != nil { - return &info, errors.Trace(err) - } err = rh.InitDDLReorgHandle(job, start, end, pid, elements[0]) if err != nil { return &info, errors.Trace(err) @@ -749,25 +730,28 @@ func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, jo return &info, nil } +// UpdateReorgMeta creates a new transaction and updates tidb_ddl_reorg table, +// so the reorg can restart in case of issues. func (r *reorgInfo) UpdateReorgMeta(startKey kv.Key, pool *sessionPool) (err error) { if startKey == nil && r.EndKey == nil { return nil } - se, err := pool.get() + sctx, err := pool.get() if err != nil { return } - defer pool.put(se) + defer pool.put(sctx) - sess := newSession(se) + sess := newSession(sctx) err = sess.begin() if err != nil { return } + defer sess.rollback() txn, err := sess.txn() if err != nil { sess.rollback() - return err + return } rh := newReorgHandler(meta.NewMeta(txn), sess, variable.EnableConcurrentDDL.Load()) err = rh.UpdateDDLReorgHandle(r.Job, startKey, r.EndKey, r.PhysicalTableID, r.currElement) @@ -787,20 +771,12 @@ type reorgHandler struct { } // NewReorgHandlerForTest creates a new reorgHandler, only used in test. -func NewReorgHandlerForTest(t *meta.Meta, sess sessionctx.Context) *reorgHandler { - return newReorgHandler(t, newSession(sess), variable.EnableConcurrentDDL.Load()) -} - -func newReorgHandler(t *meta.Meta, sess *session, enableConcurrentDDL bool) *reorgHandler { - return &reorgHandler{m: t, s: sess, enableConcurrentDDL: enableConcurrentDDL} +func NewReorgHandlerForTest(m *meta.Meta, sess sessionctx.Context) *reorgHandler { + return newReorgHandler(m, newSession(sess), variable.EnableConcurrentDDL.Load()) } -// UpdateDDLReorgStartHandle saves the job reorganization latest processed element and start handle for later resuming. -func (r *reorgHandler) UpdateDDLReorgStartHandle(job *model.Job, element *meta.Element, startKey kv.Key) error { - if r.enableConcurrentDDL { - return updateDDLReorgStartHandle(r.s, job, element, startKey) - } - return r.m.UpdateDDLReorgStartHandle(job, element, startKey) +func newReorgHandler(m *meta.Meta, sess *session, enableConcurrentDDL bool) *reorgHandler { + return &reorgHandler{m: m, s: sess, enableConcurrentDDL: enableConcurrentDDL} } // UpdateDDLReorgHandle saves the job reorganization latest processed information for later resuming. @@ -816,6 +792,7 @@ func (r *reorgHandler) InitDDLReorgHandle(job *model.Job, startKey, endKey kv.Ke if r.enableConcurrentDDL { return initDDLReorgHandle(r.s, job.ID, startKey, endKey, physicalTableID, element) } + _ = r.m.ClearAllDDLReorgHandle() // Cleanup, no need to check error return r.m.UpdateDDLReorgHandle(job.ID, startKey, endKey, physicalTableID, element) } @@ -835,6 +812,24 @@ func (r *reorgHandler) RemoveDDLReorgHandle(job *model.Job, elements []*meta.Ele return r.m.RemoveDDLReorgHandle(job, elements) } +// CleanupDDLReorgHandles removes the job reorganization related handles. +func CleanupDDLReorgHandles(job *model.Job, w *worker, t *meta.Meta) { + if job != nil && !job.IsFinished() && !job.IsSynced() { + // Job is given, but it is neither finished nor synced; do nothing + return + } + var err error + if w.concurrentDDL { + err = cleanDDLReorgHandles(w.sess, job) + } else { + err = t.ClearAllDDLReorgHandle() + } + if err != nil { + // ignore error, cleanup is not that critical + logutil.BgLogger().Warn("Failed removing the DDL reorg entry in tidb_ddl_reorg", zap.String("job", job.String()), zap.Error(err)) + } +} + // GetDDLReorgHandle gets the latest processed DDL reorganize position. func (r *reorgHandler) GetDDLReorgHandle(job *model.Job) (element *meta.Element, startKey, endKey kv.Key, physicalTableID int64, err error) { if r.enableConcurrentDDL { diff --git a/parser/model/ddl.go b/parser/model/ddl.go index c9b36a9e9ef3a..980d524c225c2 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -615,6 +615,9 @@ func (job *Job) String() string { rowCount := job.GetRowCount() ret := fmt.Sprintf("ID:%d, Type:%s, State:%s, SchemaState:%s, SchemaID:%d, TableID:%d, RowCount:%d, ArgLen:%d, start time: %v, Err:%v, ErrCount:%d, SnapshotVersion:%v", job.ID, job.Type, job.State, job.SchemaState, job.SchemaID, job.TableID, rowCount, len(job.Args), TSConvert2Time(job.StartTS), job.Error, job.ErrorCount, job.SnapshotVer) + if job.ReorgMeta != nil { + ret += fmt.Sprintf(", UniqueWarnings:%d", len(job.ReorgMeta.Warnings)) + } if job.Type != ActionMultiSchemaChange && job.MultiSchemaInfo != nil { ret += fmt.Sprintf(", Multi-Schema Change:true, Revertible:%v", job.MultiSchemaInfo.Revertible) }