Skip to content

Commit 99a2df5

Browse files
authored
ddl: add COMMENTS column to DDL jobs and enhance job reorg meta handling (#57392) (#57595)
ref #57229
1 parent 6404c67 commit 99a2df5

23 files changed

+335
-177
lines changed

br/tests/br_partition_add_index/run.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ run_br restore db --db $DB -s "local://$TEST_DIR/$DB" --pd $PD_ADDR
5353

5454
run_sql "ALTER TABLE $DB.t0 ADD INDEX idx(data);"
5555

56-
result=$(run_sql "ADMIN SHOW DDL JOBS 1 WHERE job_type LIKE '%ingest%';")
56+
result=$(run_sql "ADMIN SHOW DDL JOBS 1 WHERE comments LIKE '%ingest%';")
5757

5858
run_sql "ADMIN SHOW DDL JOBS 1;"
5959

br/tests/br_pitr_failpoint/run.sh

+3-3
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ run_br --pd $PD_ADDR log start --task-name integration_test -s "local://$TEST_DI
4747
# wait until the index creation is running
4848
retry_cnt=0
4949
while true; do
50-
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'running' AND SCHEMA_STATE = 'write reorganization' AND JOB_TYPE = 'add index /* ingest */';"
50+
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'running' AND SCHEMA_STATE = 'write reorganization' AND JOB_TYPE = 'add index';"
5151
if grep -Fq "1. row" $res_file; then
5252
break
5353
fi
@@ -71,7 +71,7 @@ touch $hint_sig_file_public
7171
# wait until the index creation is done
7272
retry_cnt=0
7373
while true; do
74-
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'done' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index /* ingest */';"
74+
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'done' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index';"
7575
if grep -Fq "1. row" $res_file; then
7676
break
7777
fi
@@ -98,7 +98,7 @@ wait $sql_pid
9898
# wait until the index creation is done
9999
retry_cnt=0
100100
while true; do
101-
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'synced' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index /* ingest */';"
101+
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'synced' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index';"
102102
if grep -Fq "1. row" $res_file; then
103103
break
104104
fi

pkg/ddl/column_modify_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,7 @@ func TestModifyColumnReorgCheckpoint(t *testing.T) {
622622
tk.MustExec("use test")
623623
tk2 := testkit.NewTestKit(t, store)
624624
tk2.MustExec("use test")
625-
tk.MustExec("set global tidb_ddl_reorg_worker_cnt = 1;")
625+
tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1;")
626626
tk.MustExec("create table t (a int primary key, b bigint);")
627627
rowCnt := 10
628628
for i := 0; i < rowCnt; i++ {

pkg/ddl/executor.go

+97-79
Original file line numberDiff line numberDiff line change
@@ -2005,20 +2005,14 @@ func (e *executor) multiSchemaChange(ctx sessionctx.Context, ti ast.Ident, info
20052005
Type: model.ActionMultiSchemaChange,
20062006
BinlogInfo: &model.HistoryInfo{},
20072007
MultiSchemaInfo: info,
2008-
ReorgMeta: nil,
20092008
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
20102009
InvolvingSchemaInfo: involvingSchemaInfo,
20112010
SQLMode: ctx.GetSessionVars().SQLMode,
20122011
}
2013-
if containsDistTaskSubJob(subJobs) {
2014-
job.ReorgMeta, err = newReorgMetaFromVariables(job, ctx)
2015-
if err != nil {
2016-
return err
2017-
}
2018-
} else {
2019-
job.ReorgMeta = NewDDLReorgMeta(ctx)
2012+
err = initJobReorgMetaFromVariables(job, ctx)
2013+
if err != nil {
2014+
return errors.Trace(err)
20202015
}
2021-
20222016
err = checkMultiSchemaInfo(info, t)
20232017
if err != nil {
20242018
return errors.Trace(err)
@@ -2027,16 +2021,6 @@ func (e *executor) multiSchemaChange(ctx sessionctx.Context, ti ast.Ident, info
20272021
return e.DoDDLJob(ctx, job)
20282022
}
20292023

2030-
func containsDistTaskSubJob(subJobs []*model.SubJob) bool {
2031-
for _, sub := range subJobs {
2032-
if sub.Type == model.ActionAddIndex ||
2033-
sub.Type == model.ActionAddPrimaryKey {
2034-
return true
2035-
}
2036-
}
2037-
return false
2038-
}
2039-
20402024
func (e *executor) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int64, tp autoid.AllocatorType, force bool) error {
20412025
schema, t, err := e.getSchemaAndTableByIdent(ident)
20422026
if err != nil {
@@ -2451,10 +2435,13 @@ func (e *executor) AlterTablePartitioning(ctx sessionctx.Context, ident ast.Iden
24512435
TableName: t.Meta().Name.L,
24522436
Type: model.ActionAlterTablePartitioning,
24532437
BinlogInfo: &model.HistoryInfo{},
2454-
ReorgMeta: NewDDLReorgMeta(ctx),
24552438
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
24562439
SQLMode: ctx.GetSessionVars().SQLMode,
24572440
}
2441+
err = initJobReorgMetaFromVariables(job, ctx)
2442+
if err != nil {
2443+
return err
2444+
}
24582445

24592446
args := &model.TablePartitionArgs{
24602447
PartNames: partNames,
@@ -2517,10 +2504,13 @@ func (e *executor) ReorganizePartitions(ctx sessionctx.Context, ident ast.Ident,
25172504
TableName: t.Meta().Name.L,
25182505
Type: model.ActionReorganizePartition,
25192506
BinlogInfo: &model.HistoryInfo{},
2520-
ReorgMeta: NewDDLReorgMeta(ctx),
25212507
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
25222508
SQLMode: ctx.GetSessionVars().SQLMode,
25232509
}
2510+
err = initJobReorgMetaFromVariables(job, ctx)
2511+
if err != nil {
2512+
return errors.Trace(err)
2513+
}
25242514
args := &model.TablePartitionArgs{
25252515
PartNames: partNames,
25262516
PartInfo: partInfo,
@@ -2583,10 +2573,13 @@ func (e *executor) RemovePartitioning(ctx sessionctx.Context, ident ast.Ident, s
25832573
TableName: meta.Name.L,
25842574
Type: model.ActionRemovePartitioning,
25852575
BinlogInfo: &model.HistoryInfo{},
2586-
ReorgMeta: NewDDLReorgMeta(ctx),
25872576
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
25882577
SQLMode: ctx.GetSessionVars().SQLMode,
25892578
}
2579+
err = initJobReorgMetaFromVariables(job, ctx)
2580+
if err != nil {
2581+
return errors.Trace(err)
2582+
}
25902583
args := &model.TablePartitionArgs{
25912584
PartNames: partNames,
25922585
PartInfo: partInfo,
@@ -3385,10 +3378,13 @@ func (e *executor) RenameColumn(ctx sessionctx.Context, ident ast.Ident, spec *a
33853378
TableName: tbl.Meta().Name.L,
33863379
Type: model.ActionModifyColumn,
33873380
BinlogInfo: &model.HistoryInfo{},
3388-
ReorgMeta: NewDDLReorgMeta(ctx),
33893381
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
33903382
SQLMode: ctx.GetSessionVars().SQLMode,
33913383
}
3384+
err = initJobReorgMetaFromVariables(job, ctx)
3385+
if err != nil {
3386+
return err
3387+
}
33923388

33933389
args := &model.ModifyColumnArgs{
33943390
Column: newCol,
@@ -4645,11 +4641,10 @@ func (e *executor) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexN
46454641
OpType: model.OpAddIndex,
46464642
}
46474643

4648-
reorgMeta, err := newReorgMetaFromVariables(job, ctx)
4644+
err = initJobReorgMetaFromVariables(job, ctx)
46494645
if err != nil {
46504646
return err
46514647
}
4652-
job.ReorgMeta = reorgMeta
46534648

46544649
err = e.doDDLJob2(ctx, job, args)
46554650
return errors.Trace(err)
@@ -4758,10 +4753,7 @@ func (e *executor) createVectorIndex(ctx sessionctx.Context, ti ast.Ident, index
47584753
return errors.Trace(err)
47594754
}
47604755

4761-
job, err := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t)
4762-
if err != nil {
4763-
return errors.Trace(err)
4764-
}
4756+
job := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t)
47654757
job.Version = model.GetJobVerInUse()
47664758
job.Type = model.ActionAddVectorIndex
47674759
indexPartSpecifications[0].Expr = nil
@@ -4788,32 +4780,20 @@ func (e *executor) createVectorIndex(ctx sessionctx.Context, ti ast.Ident, index
47884780
return errors.Trace(err)
47894781
}
47904782

4791-
func buildAddIndexJobWithoutTypeAndArgs(ctx sessionctx.Context, schema *model.DBInfo, t table.Table) (*model.Job, error) {
4792-
tzName, tzOffset := ddlutil.GetTimeZone(ctx)
4783+
func buildAddIndexJobWithoutTypeAndArgs(ctx sessionctx.Context, schema *model.DBInfo, t table.Table) *model.Job {
47934784
charset, collate := ctx.GetSessionVars().GetCharsetInfo()
47944785
job := &model.Job{
47954786
SchemaID: schema.ID,
47964787
TableID: t.Meta().ID,
47974788
SchemaName: schema.Name.L,
47984789
TableName: t.Meta().Name.L,
47994790
BinlogInfo: &model.HistoryInfo{},
4800-
ReorgMeta: &model.DDLReorgMeta{
4801-
SQLMode: ctx.GetSessionVars().SQLMode,
4802-
Warnings: make(map[errors.ErrorID]*terror.Error),
4803-
WarningsCount: make(map[errors.ErrorID]int64),
4804-
Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset},
4805-
},
4806-
Priority: ctx.GetSessionVars().DDLReorgPriority,
4807-
Charset: charset,
4808-
Collate: collate,
4809-
SQLMode: ctx.GetSessionVars().SQLMode,
4791+
Priority: ctx.GetSessionVars().DDLReorgPriority,
4792+
Charset: charset,
4793+
Collate: collate,
4794+
SQLMode: ctx.GetSessionVars().SQLMode,
48104795
}
4811-
reorgMeta, err := newReorgMetaFromVariables(job, ctx)
4812-
if err != nil {
4813-
return nil, errors.Trace(err)
4814-
}
4815-
job.ReorgMeta = reorgMeta
4816-
return job, nil
4796+
return job
48174797
}
48184798

48194799
func (e *executor) CreateIndex(ctx sessionctx.Context, stmt *ast.CreateIndexStmt) error {
@@ -4907,15 +4887,17 @@ func (e *executor) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast
49074887
// global is set to 'false' is just there to be backwards compatible,
49084888
// to avoid unmarshal issues, it is now part of indexOption.
49094889
global := false
4910-
job, err := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t)
4911-
if err != nil {
4912-
return errors.Trace(err)
4913-
}
4890+
job := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t)
49144891

49154892
job.Version = model.GetJobVerInUse()
49164893
job.Type = model.ActionAddIndex
49174894
job.CDCWriteSource = ctx.GetSessionVars().CDCWriteSource
49184895

4896+
err = initJobReorgMetaFromVariables(job, ctx)
4897+
if err != nil {
4898+
return errors.Trace(err)
4899+
}
4900+
49194901
args := &model.ModifyIndexArgs{
49204902
IndexArgs: []*model.IndexArg{{
49214903
Unique: unique,
@@ -4937,44 +4919,80 @@ func (e *executor) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast
49374919
return errors.Trace(err)
49384920
}
49394921

4940-
func newReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) (*model.DDLReorgMeta, error) {
4941-
reorgMeta := NewDDLReorgMeta(sctx)
4942-
reorgMeta.IsDistReorg = variable.EnableDistTask.Load()
4943-
reorgMeta.IsFastReorg = variable.EnableFastReorg.Load()
4944-
reorgMeta.TargetScope = variable.ServiceScope.Load()
4945-
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgWorkerCount); ok {
4946-
reorgMeta.Concurrency = variable.TidbOptInt(sv, 0)
4922+
func initJobReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) error {
4923+
m := NewDDLReorgMeta(sctx)
4924+
setReorgParam := func() {
4925+
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgWorkerCount); ok {
4926+
m.Concurrency = variable.TidbOptInt(sv, 0)
4927+
}
4928+
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok {
4929+
m.BatchSize = variable.TidbOptInt(sv, 0)
4930+
}
49474931
}
4948-
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok {
4949-
reorgMeta.BatchSize = variable.TidbOptInt(sv, 0)
4932+
setDistTaskParam := func() error {
4933+
m.IsDistReorg = variable.EnableDistTask.Load()
4934+
m.IsFastReorg = variable.EnableFastReorg.Load()
4935+
m.TargetScope = variable.ServiceScope.Load()
4936+
if hasSysDB(job) {
4937+
if m.IsDistReorg {
4938+
logutil.DDLLogger().Info("cannot use distributed task execution on system DB",
4939+
zap.Stringer("job", job))
4940+
}
4941+
m.IsDistReorg = false
4942+
m.IsFastReorg = false
4943+
failpoint.Inject("reorgMetaRecordFastReorgDisabled", func(_ failpoint.Value) {
4944+
LastReorgMetaFastReorgDisabled = true
4945+
})
4946+
}
4947+
if m.IsDistReorg && !m.IsFastReorg {
4948+
return dbterror.ErrUnsupportedDistTask
4949+
}
4950+
return nil
49504951
}
49514952

4952-
if reorgMeta.IsDistReorg && !reorgMeta.IsFastReorg {
4953-
return nil, dbterror.ErrUnsupportedDistTask
4954-
}
4955-
if hasSysDB(job) {
4956-
if reorgMeta.IsDistReorg {
4957-
logutil.DDLLogger().Info("cannot use distributed task execution on system DB",
4958-
zap.Stringer("job", job))
4953+
switch job.Type {
4954+
case model.ActionAddIndex, model.ActionAddPrimaryKey:
4955+
setReorgParam()
4956+
err := setDistTaskParam()
4957+
if err != nil {
4958+
return err
49594959
}
4960-
reorgMeta.IsDistReorg = false
4961-
reorgMeta.IsFastReorg = false
4962-
failpoint.Inject("reorgMetaRecordFastReorgDisabled", func(_ failpoint.Value) {
4963-
LastReorgMetaFastReorgDisabled = true
4964-
})
4960+
case model.ActionReorganizePartition,
4961+
model.ActionRemovePartitioning,
4962+
model.ActionAlterTablePartitioning,
4963+
model.ActionModifyColumn:
4964+
setReorgParam()
4965+
case model.ActionMultiSchemaChange:
4966+
for _, sub := range job.MultiSchemaInfo.SubJobs {
4967+
switch sub.Type {
4968+
case model.ActionAddIndex, model.ActionAddPrimaryKey:
4969+
setReorgParam()
4970+
err := setDistTaskParam()
4971+
if err != nil {
4972+
return err
4973+
}
4974+
case model.ActionReorganizePartition,
4975+
model.ActionRemovePartitioning,
4976+
model.ActionAlterTablePartitioning,
4977+
model.ActionModifyColumn:
4978+
setReorgParam()
4979+
}
4980+
}
4981+
default:
4982+
return nil
49654983
}
4966-
4984+
job.ReorgMeta = m
49674985
logutil.DDLLogger().Info("initialize reorg meta",
49684986
zap.String("jobSchema", job.SchemaName),
49694987
zap.String("jobTable", job.TableName),
49704988
zap.Stringer("jobType", job.Type),
4971-
zap.Bool("enableDistTask", reorgMeta.IsDistReorg),
4972-
zap.Bool("enableFastReorg", reorgMeta.IsFastReorg),
4973-
zap.String("targetScope", reorgMeta.TargetScope),
4974-
zap.Int("concurrency", reorgMeta.Concurrency),
4975-
zap.Int("batchSize", reorgMeta.BatchSize),
4989+
zap.Bool("enableDistTask", m.IsDistReorg),
4990+
zap.Bool("enableFastReorg", m.IsFastReorg),
4991+
zap.String("targetScope", m.TargetScope),
4992+
zap.Int("concurrency", m.Concurrency),
4993+
zap.Int("batchSize", m.BatchSize),
49764994
)
4977-
return reorgMeta, nil
4995+
return nil
49784996
}
49794997

49804998
// LastReorgMetaFastReorgDisabled is used for test.

pkg/ddl/index.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1274,7 +1274,7 @@ func pickBackfillType(job *model.Job) (model.ReorgType, error) {
12741274
func loadCloudStorageURI(w *worker, job *model.Job) {
12751275
jc := w.jobContext(job.ID, job.ReorgMeta)
12761276
jc.cloudStorageURI = variable.CloudStorageURI.Load()
1277-
job.ReorgMeta.UseCloudStorage = len(jc.cloudStorageURI) > 0
1277+
job.ReorgMeta.UseCloudStorage = len(jc.cloudStorageURI) > 0 && job.ReorgMeta.IsDistReorg
12781278
}
12791279

12801280
func doReorgWorkForCreateIndexMultiSchema(w *worker, jobCtx *jobContext, job *model.Job,

pkg/ddl/ingest/integration_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func TestAddIndexIngestGeneratedColumns(t *testing.T) {
4545
require.Len(t, rows, n)
4646
for i := 0; i < n; i++ {
4747
//nolint: forcetypeassert
48-
jobTp := rows[i][3].(string)
48+
jobTp := rows[i][12].(string)
4949
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
5050
}
5151
}
@@ -100,7 +100,7 @@ func TestIngestError(t *testing.T) {
100100
tk.MustExec("admin check table t;")
101101
rows := tk.MustQuery("admin show ddl jobs 1;").Rows()
102102
//nolint: forcetypeassert
103-
jobTp := rows[0][3].(string)
103+
jobTp := rows[0][12].(string)
104104
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
105105

106106
tk.MustExec("drop table t;")
@@ -116,7 +116,7 @@ func TestIngestError(t *testing.T) {
116116
tk.MustExec("admin check table t;")
117117
rows = tk.MustQuery("admin show ddl jobs 1;").Rows()
118118
//nolint: forcetypeassert
119-
jobTp = rows[0][3].(string)
119+
jobTp = rows[0][12].(string)
120120
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
121121
}
122122

pkg/ddl/modify_column.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -975,11 +975,14 @@ func GetModifiableColumnJob(
975975
TableName: t.Meta().Name.L,
976976
Type: model.ActionModifyColumn,
977977
BinlogInfo: &model.HistoryInfo{},
978-
ReorgMeta: NewDDLReorgMeta(sctx),
979978
CtxVars: []any{needChangeColData},
980979
CDCWriteSource: sctx.GetSessionVars().CDCWriteSource,
981980
SQLMode: sctx.GetSessionVars().SQLMode,
982981
}
982+
err = initJobReorgMetaFromVariables(job, sctx)
983+
if err != nil {
984+
return nil, errors.Trace(err)
985+
}
983986

984987
args := &model.ModifyColumnArgs{
985988
Column: newCol.ColumnInfo,

0 commit comments

Comments
 (0)