diff --git a/pkg/ddl/job_scheduler.go b/pkg/ddl/job_scheduler.go index b884cc2263f1d..88a72234f6851 100644 --- a/pkg/ddl/job_scheduler.go +++ b/pkg/ddl/job_scheduler.go @@ -15,13 +15,11 @@ package ddl import ( - "bytes" "context" "encoding/hex" "encoding/json" "fmt" "runtime" - "slices" "strconv" "strings" "sync/atomic" @@ -658,102 +656,6 @@ func (d *ddl) getTableByTxn(r autoid.Requirement, schemaID, tableID int64) (*mod return dbInfo, tbl, err } -const ( - addDDLJobSQL = "insert into mysql.tidb_ddl_job(job_id, reorg, schema_ids, table_ids, job_meta, type, processing) values" - updateDDLJobSQL = "update mysql.tidb_ddl_job set job_meta = %s where job_id = %d" -) - -func insertDDLJobs2Table(ctx context.Context, se *sess.Session, jobWs ...*JobWrapper) error { - failpoint.Inject("mockAddBatchDDLJobsErr", func(val failpoint.Value) { - if val.(bool) { - failpoint.Return(errors.Errorf("mockAddBatchDDLJobsErr")) - } - }) - if len(jobWs) == 0 { - return nil - } - var sql bytes.Buffer - sql.WriteString(addDDLJobSQL) - for i, jobW := range jobWs { - // TODO remove this check when all job type pass args in this way. - if jobW.JobArgs != nil { - jobW.FillArgs(jobW.JobArgs) - } - injectModifyJobArgFailPoint(jobWs) - b, err := jobW.Encode(true) - if err != nil { - return err - } - if i != 0 { - sql.WriteString(",") - } - fmt.Fprintf(&sql, "(%d, %t, %s, %s, %s, %d, %t)", jobW.ID, jobW.MayNeedReorg(), - strconv.Quote(job2SchemaIDs(jobW)), strconv.Quote(job2TableIDs(jobW)), - util.WrapKey2String(b), jobW.Type, jobW.Started()) - } - se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) - _, err := se.Execute(ctx, sql.String(), "insert_job") - logutil.DDLLogger().Debug("add job to mysql.tidb_ddl_job table", zap.String("sql", sql.String())) - return errors.Trace(err) -} - -func makeStringForIDs(ids []int64) string { - set := make(map[int64]struct{}, len(ids)) - for _, id := range ids { - set[id] = struct{}{} - } - - s := make([]string, 0, len(set)) - for id := range set { - s = append(s, strconv.FormatInt(id, 10)) - } - slices.Sort(s) - return strings.Join(s, ",") -} - -func job2SchemaIDs(jobW *JobWrapper) string { - switch jobW.Type { - case model.ActionRenameTables: - var ids []int64 - arg := jobW.JobArgs.(*model.RenameTablesArgs) - ids = make([]int64, 0, len(arg.RenameTableInfos)*2) - for _, info := range arg.RenameTableInfos { - ids = append(ids, info.OldSchemaID, info.NewSchemaID) - } - return makeStringForIDs(ids) - case model.ActionRenameTable: - oldSchemaID := jobW.JobArgs.(*model.RenameTableArgs).OldSchemaID - ids := []int64{oldSchemaID, jobW.SchemaID} - return makeStringForIDs(ids) - case model.ActionExchangeTablePartition: - args := jobW.JobArgs.(*model.ExchangeTablePartitionArgs) - return makeStringForIDs([]int64{jobW.SchemaID, args.PTSchemaID}) - default: - return strconv.FormatInt(jobW.SchemaID, 10) - } -} - -func job2TableIDs(jobW *JobWrapper) string { - switch jobW.Type { - case model.ActionRenameTables: - var ids []int64 - arg := jobW.JobArgs.(*model.RenameTablesArgs) - ids = make([]int64, 0, len(arg.RenameTableInfos)) - for _, info := range arg.RenameTableInfos { - ids = append(ids, info.TableID) - } - return makeStringForIDs(ids) - case model.ActionExchangeTablePartition: - args := jobW.JobArgs.(*model.ExchangeTablePartitionArgs) - return makeStringForIDs([]int64{jobW.TableID, args.PTTableID}) - case model.ActionTruncateTable: - newTableID := jobW.JobArgs.(*model.TruncateTableArgs).NewTableID - return strconv.FormatInt(jobW.TableID, 10) + "," + strconv.FormatInt(newTableID, 10) - default: - return strconv.FormatInt(jobW.TableID, 10) - } -} - func updateDDLJob2Table( ctx context.Context, se *sess.Session, @@ -764,7 +666,7 @@ func updateDDLJob2Table( if err != nil { return err } - sql := fmt.Sprintf(updateDDLJobSQL, util.WrapKey2String(b), job.ID) + sql := fmt.Sprintf("update mysql.tidb_ddl_job set job_meta = %s where job_id = %d", util.WrapKey2String(b), job.ID) _, err = se.Execute(ctx, sql, "update_job") return errors.Trace(err) } diff --git a/pkg/ddl/job_submitter.go b/pkg/ddl/job_submitter.go index 4074b63ce6a14..772b9c167b86a 100644 --- a/pkg/ddl/job_submitter.go +++ b/pkg/ddl/job_submitter.go @@ -15,9 +15,12 @@ package ddl import ( + "bytes" "context" "fmt" "math" + "slices" + "strconv" "strings" "time" @@ -677,6 +680,97 @@ func lockGlobalIDKey(ctx context.Context, ddlSe *sess.Session, txn kv.Transactio return forUpdateTs, err } +func insertDDLJobs2Table(ctx context.Context, se *sess.Session, jobWs ...*JobWrapper) error { + failpoint.Inject("mockAddBatchDDLJobsErr", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(errors.Errorf("mockAddBatchDDLJobsErr")) + } + }) + if len(jobWs) == 0 { + return nil + } + var sql bytes.Buffer + sql.WriteString("insert into mysql.tidb_ddl_job(job_id, reorg, schema_ids, table_ids, job_meta, type, processing) values") + for i, jobW := range jobWs { + // TODO remove this check when all job type pass args in this way. + if jobW.JobArgs != nil { + jobW.FillArgs(jobW.JobArgs) + } + injectModifyJobArgFailPoint(jobWs) + b, err := jobW.Encode(true) + if err != nil { + return err + } + if i != 0 { + sql.WriteString(",") + } + fmt.Fprintf(&sql, "(%d, %t, %s, %s, %s, %d, %t)", jobW.ID, jobW.MayNeedReorg(), + strconv.Quote(job2SchemaIDs(jobW)), strconv.Quote(job2TableIDs(jobW)), + ddlutil.WrapKey2String(b), jobW.Type, jobW.Started()) + } + se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) + _, err := se.Execute(ctx, sql.String(), "insert_job") + logutil.DDLLogger().Debug("add job to mysql.tidb_ddl_job table", zap.String("sql", sql.String())) + return errors.Trace(err) +} + +func makeStringForIDs(ids []int64) string { + set := make(map[int64]struct{}, len(ids)) + for _, id := range ids { + set[id] = struct{}{} + } + + s := make([]string, 0, len(set)) + for id := range set { + s = append(s, strconv.FormatInt(id, 10)) + } + slices.Sort(s) + return strings.Join(s, ",") +} + +func job2SchemaIDs(jobW *JobWrapper) string { + switch jobW.Type { + case model.ActionRenameTables: + var ids []int64 + arg := jobW.JobArgs.(*model.RenameTablesArgs) + ids = make([]int64, 0, len(arg.RenameTableInfos)*2) + for _, info := range arg.RenameTableInfos { + ids = append(ids, info.OldSchemaID, info.NewSchemaID) + } + return makeStringForIDs(ids) + case model.ActionRenameTable: + oldSchemaID := jobW.JobArgs.(*model.RenameTableArgs).OldSchemaID + ids := []int64{oldSchemaID, jobW.SchemaID} + return makeStringForIDs(ids) + case model.ActionExchangeTablePartition: + args := jobW.JobArgs.(*model.ExchangeTablePartitionArgs) + return makeStringForIDs([]int64{jobW.SchemaID, args.PTSchemaID}) + default: + return strconv.FormatInt(jobW.SchemaID, 10) + } +} + +func job2TableIDs(jobW *JobWrapper) string { + switch jobW.Type { + case model.ActionRenameTables: + var ids []int64 + arg := jobW.JobArgs.(*model.RenameTablesArgs) + ids = make([]int64, 0, len(arg.RenameTableInfos)) + for _, info := range arg.RenameTableInfos { + ids = append(ids, info.TableID) + } + return makeStringForIDs(ids) + case model.ActionExchangeTablePartition: + args := jobW.JobArgs.(*model.ExchangeTablePartitionArgs) + return makeStringForIDs([]int64{jobW.TableID, args.PTTableID}) + case model.ActionTruncateTable: + newTableID := jobW.JobArgs.(*model.TruncateTableArgs).NewTableID + return strconv.FormatInt(jobW.TableID, 10) + "," + strconv.FormatInt(newTableID, 10) + default: + return strconv.FormatInt(jobW.TableID, 10) + } +} + // TODO this failpoint is only checking how job scheduler handle // corrupted job args, we should test it there by UT, not here. func injectModifyJobArgFailPoint(jobWs []*JobWrapper) {