Skip to content

Commit

Permalink
ddl: backport of (#38738) tidb_ddl_reorg commit conflicts (#41115)
Browse files Browse the repository at this point in the history
ref #24427, ref #38669, close #41118
  • Loading branch information
mjonss authored Apr 11, 2023
1 parent e3f3aa3 commit cee7565
Show file tree
Hide file tree
Showing 13 changed files with 251 additions and 103 deletions.
10 changes: 6 additions & 4 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ const (
// Backfilling is time consuming, to accelerate this process, TiDB has built some sub
// workers to do this in the DDL owner node.
//
// DDL owner thread
// DDL owner thread (also see comments before runReorgJob func)
// ^
// | (reorgCtx.doneCh)
// |
Expand Down Expand Up @@ -453,9 +453,10 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
err = dc.isReorgRunnable(reorgInfo.Job)
}

// Update the reorg handle that has been processed.
err1 := reorgInfo.UpdateReorgMeta(nextKey, scheduler.sessPool)

if err != nil {
// Update the reorg handle that has been processed.
err1 := reorgInfo.UpdateReorgMeta(nextKey, scheduler.sessPool)
metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblError).Observe(elapsedTime.Seconds())
logutil.BgLogger().Warn("[ddl] backfill worker handle batch tasks failed",
zap.ByteString("element type", reorgInfo.currElement.TypeKey),
Expand All @@ -480,7 +481,8 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
zap.String("start key", hex.EncodeToString(startKey)),
zap.String("next key", hex.EncodeToString(nextKey)),
zap.Int64("batch added count", taskAddedCount),
zap.String("take time", elapsedTime.String()))
zap.String("take time", elapsedTime.String()),
zap.NamedError("updateHandleError", err1))
return nil
}

Expand Down
34 changes: 29 additions & 5 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,30 @@ func doReorgWorkForModifyColumnMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, j
func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table,
oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) {
job.ReorgMeta.ReorgTp = model.ReorgTypeTxn
rh := newReorgHandler(t, w.sess, w.concurrentDDL)
var rh *reorgHandler
if w.concurrentDDL {
sctx, err1 := w.sessPool.get()
if err1 != nil {
err = errors.Trace(err1)
return
}
defer w.sessPool.put(sctx)
sess := newSession(sctx)
err = sess.begin()
if err != nil {
return
}
defer sess.rollback()
txn, err1 := sess.txn()
if err1 != nil {
err = errors.Trace(err1)
return
}
newMeta := meta.NewMeta(txn)
rh = newReorgHandler(newMeta, newSession(sctx), w.concurrentDDL)
} else {
rh = newReorgHandler(t, w.sess, w.concurrentDDL)
}
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, tbl, BuildElements(changingCol, changingIdxs), false)
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
Expand Down Expand Up @@ -1273,8 +1296,8 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra
if err != nil {
return w.reformatErrors(err)
}
if w.sessCtx.GetSessionVars().StmtCtx.GetWarnings() != nil && len(w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()) != 0 {
warn := w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()
warn := w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()
if len(warn) != 0 {
//nolint:forcetypeassert
recordWarning = errors.Cause(w.reformatErrors(warn[0].Err)).(*terror.Error)
}
Expand Down Expand Up @@ -1358,8 +1381,9 @@ func (w *updateColumnWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t
taskCtx.nextKey = nextKey
taskCtx.done = taskDone

warningsMap := make(map[errors.ErrorID]*terror.Error, len(rowRecords))
warningsCountMap := make(map[errors.ErrorID]int64, len(rowRecords))
// Optimize for few warnings!
warningsMap := make(map[errors.ErrorID]*terror.Error, 2)
warningsCountMap := make(map[errors.ErrorID]int64, 2)
for _, rowRecord := range rowRecords {
taskCtx.scanCount++

Expand Down
37 changes: 37 additions & 0 deletions ddl/column_type_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2426,3 +2426,40 @@ func TestColumnTypeChangeTimestampToInt(t *testing.T) {
tk.MustExec("alter table t add index idx1(id, c1);")
tk.MustExec("admin check table t")
}

func TestFixDDLTxnWillConflictWithReorgTxnNotConcurrent(t *testing.T) {
store := testkit.CreateMockStore(t)
tk0 := testkit.NewTestKit(t, store)
tk0.MustExec("set @@global.tidb_enable_metadata_lock=0")
defer tk0.MustExec("set @@global.tidb_enable_metadata_lock = default")
tk0.MustExec("set @@global.tidb_enable_concurrent_ddl = off")
defer tk0.MustExec("set @@global.tidb_enable_concurrent_ddl = default")
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("create table t (a int)")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = OFF")
defer tk.MustExec("set global tidb_ddl_enable_fast_reorg = default")
tk.MustExec("alter table t add index(a)")
tk.MustExec("set @@sql_mode=''")
tk.MustExec("insert into t values(128),(129)")
tk.MustExec("alter table t modify column a tinyint")

tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1690 2 warnings with this error code, first warning: constant 128 overflows tinyint"))
}

func TestFixDDLTxnWillConflictWithReorgTxn(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("create table t (a int)")
tk.MustExec("set global tidb_ddl_enable_fast_reorg = OFF")
defer tk.MustExec("set global tidb_ddl_enable_fast_reorg = default")
tk.MustExec("alter table t add index(a)")
tk.MustExec("set @@sql_mode=''")
tk.MustExec("insert into t values(128),(129)")
tk.MustExec("alter table t modify column a tinyint")

tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1690 2 warnings with this error code, first warning: constant 128 overflows tinyint"))
}
19 changes: 19 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4537,6 +4537,25 @@ func TestPartitionTableWithAnsiQuotes(t *testing.T) {
` PARTITION "pMax" VALUES LESS THAN (MAXVALUE,MAXVALUE))`))
}

func TestAlterModifyPartitionColTruncateWarning(t *testing.T) {
t.Skip("waiting for supporting Modify Partition Column again")
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
schemaName := "truncWarn"
tk.MustExec("create database " + schemaName)
tk.MustExec("use " + schemaName)
tk.MustExec(`set sql_mode = default`)
tk.MustExec(`create table t (a varchar(255)) partition by range columns (a) (partition p1 values less than ("0"), partition p2 values less than ("zzzz"))`)
tk.MustExec(`insert into t values ("123456"),(" 654321")`)
tk.MustContainErrMsg(`alter table t modify a varchar(5)`, "[types:1265]Data truncated for column 'a', value is '")
tk.MustExec(`set sql_mode = ''`)
tk.MustExec(`alter table t modify a varchar(5)`)
// Fix the duplicate warning, see https://github.com/pingcap/tidb/issues/38699
tk.MustQuery(`show warnings`).Check(testkit.Rows(""+
"Warning 1265 Data truncated for column 'a', value is ' 654321'",
"Warning 1265 Data truncated for column 'a', value is ' 654321'"))
}

func TestIssue40135Ver2(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down
13 changes: 13 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1806,6 +1806,19 @@ func (s *session) session() sessionctx.Context {
return s.Context
}

func (s *session) runInTxn(f func(*session) error) (err error) {
err = s.begin()
if err != nil {
return err
}
err = f(s)
if err != nil {
s.rollback()
return
}
return errors.Trace(s.commit())
}

// GetAllHistoryDDLJobs get all the done DDL jobs.
func GetAllHistoryDDLJobs(m *meta.Meta) ([]*model.Job, error) {
iterator, err := GetLastHistoryDDLJobsIterator(m)
Expand Down
1 change: 1 addition & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ func (w *worker) HandleJobDone(d *ddlCtx, job *model.Job, t *meta.Meta) error {
if err != nil {
return err
}
CleanupDDLReorgHandles(job, w, t)
asyncNotify(d.ddlJobDoneCh)
return nil
}
Expand Down
25 changes: 24 additions & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,30 @@ func convertToKeyExistsErr(originErr error, idxInfo *model.IndexInfo, tblInfo *m
func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo, mergingTmpIdx bool) (done bool, ver int64, err error) {
elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}
var rh *reorgHandler
if w.concurrentDDL {
sctx, err1 := w.sessPool.get()
if err1 != nil {
err = err1
return
}
defer w.sessPool.put(sctx)
sess := newSession(sctx)
err = sess.begin()
if err != nil {
return
}
defer sess.rollback()
txn, err1 := sess.txn()
if err1 != nil {
err = err1
return
}
newMeta := meta.NewMeta(txn)
rh = newReorgHandler(newMeta, sess, w.concurrentDDL)
} else {
rh = newReorgHandler(t, w.sess, w.concurrentDDL)
}

failpoint.Inject("mockDMLExecutionStateMerging", func(val failpoint.Value) {
//nolint:forcetypeassert
Expand All @@ -905,7 +929,6 @@ func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
}
})

rh := newReorgHandler(t, w.sess, w.concurrentDDL)
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, tbl, elements, mergingTmpIdx)
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
Expand Down
76 changes: 39 additions & 37 deletions ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,15 +430,8 @@ func getDDLReorgHandle(sess *session, job *model.Job) (element *meta.Element, st
return
}

// updateDDLReorgStartHandle update the startKey of the handle.
func updateDDLReorgStartHandle(sess *session, job *model.Job, element *meta.Element, startKey kv.Key) error {
sql := fmt.Sprintf("update mysql.tidb_ddl_reorg set ele_id = %d, ele_type = %s, start_key = %s where job_id = %d",
element.ID, wrapKey2String(element.TypeKey), wrapKey2String(startKey), job.ID)
_, err := sess.execute(context.Background(), sql, "update_start_handle")
return err
}

// updateDDLReorgHandle update startKey, endKey physicalTableID and element of the handle.
// Caller should wrap this in a separate transaction, to avoid conflicts.
func updateDDLReorgHandle(sess *session, jobID int64, startKey kv.Key, endKey kv.Key, physicalTableID int64, element *meta.Element) error {
sql := fmt.Sprintf("update mysql.tidb_ddl_reorg set ele_id = %d, ele_type = %s, start_key = %s, end_key = %s, physical_id = %d where job_id = %d",
element.ID, wrapKey2String(element.TypeKey), wrapKey2String(startKey), wrapKey2String(endKey), physicalTableID, jobID)
Expand All @@ -447,28 +440,48 @@ func updateDDLReorgHandle(sess *session, jobID int64, startKey kv.Key, endKey kv
}

// initDDLReorgHandle initializes the handle for ddl reorg.
func initDDLReorgHandle(sess *session, jobID int64, startKey kv.Key, endKey kv.Key, physicalTableID int64, element *meta.Element) error {
sql := fmt.Sprintf("insert into mysql.tidb_ddl_reorg(job_id, ele_id, ele_type, start_key, end_key, physical_id) values (%d, %d, %s, %s, %s, %d)",
func initDDLReorgHandle(s *session, jobID int64, startKey kv.Key, endKey kv.Key, physicalTableID int64, element *meta.Element) error {
del := fmt.Sprintf("delete from mysql.tidb_ddl_reorg where job_id = %d", jobID)
ins := fmt.Sprintf("insert into mysql.tidb_ddl_reorg(job_id, ele_id, ele_type, start_key, end_key, physical_id) values (%d, %d, %s, %s, %s, %d)",
jobID, element.ID, wrapKey2String(element.TypeKey), wrapKey2String(startKey), wrapKey2String(endKey), physicalTableID)
_, err := sess.execute(context.Background(), sql, "update_handle")
return err
return s.runInTxn(func(se *session) error {
_, err := se.execute(context.Background(), del, "init_handle")
if err != nil {
logutil.BgLogger().Info("initDDLReorgHandle failed to delete", zap.Int64("jobID", jobID), zap.Error(err))
}
_, err = se.execute(context.Background(), ins, "init_handle")
return err
})
}

// deleteDDLReorgHandle deletes the handle for ddl reorg.
func removeDDLReorgHandle(sess *session, job *model.Job, elements []*meta.Element) error {
func removeDDLReorgHandle(s *session, job *model.Job, elements []*meta.Element) error {
if len(elements) == 0 {
return nil
}
sql := fmt.Sprintf("delete from mysql.tidb_ddl_reorg where job_id = %d", job.ID)
_, err := sess.execute(context.Background(), sql, "remove_handle")
return err
return s.runInTxn(func(se *session) error {
_, err := se.execute(context.Background(), sql, "remove_handle")
return err
})
}

// removeReorgElement removes the element from ddl reorg, it is the same with removeDDLReorgHandle, only used in failpoint
func removeReorgElement(sess *session, job *model.Job) error {
func removeReorgElement(s *session, job *model.Job) error {
sql := fmt.Sprintf("delete from mysql.tidb_ddl_reorg where job_id = %d", job.ID)
_, err := sess.execute(context.Background(), sql, "remove_handle")
return err
return s.runInTxn(func(se *session) error {
_, err := se.execute(context.Background(), sql, "remove_handle")
return err
})
}

// cleanDDLReorgHandles removes handles that are no longer needed.
func cleanDDLReorgHandles(s *session, job *model.Job) error {
sql := "delete from mysql.tidb_ddl_reorg where job_id = " + strconv.FormatInt(job.ID, 10)
return s.runInTxn(func(se *session) error {
_, err := se.execute(context.Background(), sql, "clean_handle")
return err
})
}

func wrapKey2String(key []byte) string {
Expand Down Expand Up @@ -498,12 +511,13 @@ func getJobsBySQL(sess *session, tbl, condition string) ([]*model.Job, error) {

// MoveJobFromQueue2Table move existing DDLs in queue to table.
func (d *ddl) MoveJobFromQueue2Table(inBootstrap bool) error {
sess, err := d.sessPool.get()
sctx, err := d.sessPool.get()
if err != nil {
return err
}
defer d.sessPool.put(sess)
return runInTxn(newSession(sess), func(se *session) error {
defer d.sessPool.put(sctx)
sess := newSession(sctx)
return sess.runInTxn(func(se *session) error {
txn, err := se.txn()
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -562,12 +576,13 @@ func (d *ddl) MoveJobFromQueue2Table(inBootstrap bool) error {

// MoveJobFromTable2Queue move existing DDLs in table to queue.
func (d *ddl) MoveJobFromTable2Queue() error {
sess, err := d.sessPool.get()
sctx, err := d.sessPool.get()
if err != nil {
return err
}
defer d.sessPool.put(sess)
return runInTxn(newSession(sess), func(se *session) error {
defer d.sessPool.put(sctx)
sess := newSession(sctx)
return sess.runInTxn(func(se *session) error {
txn, err := se.txn()
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -614,16 +629,3 @@ func (d *ddl) MoveJobFromTable2Queue() error {
return t.SetConcurrentDDL(false)
})
}

func runInTxn(se *session, f func(*session) error) (err error) {
err = se.begin()
if err != nil {
return err
}
err = f(se)
if err != nil {
se.rollback()
return
}
return errors.Trace(se.commit())
}
Loading

0 comments on commit cee7565

Please sign in to comment.