Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: add COMMENTS column to DDL jobs and enhance job reorg meta handling (#57392) #57595

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/tests/br_partition_add_index/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ run_br restore db --db $DB -s "local://$TEST_DIR/$DB" --pd $PD_ADDR

run_sql "ALTER TABLE $DB.t0 ADD INDEX idx(data);"

result=$(run_sql "ADMIN SHOW DDL JOBS 1 WHERE job_type LIKE '%ingest%';")
result=$(run_sql "ADMIN SHOW DDL JOBS 1 WHERE comments LIKE '%ingest%';")

run_sql "ADMIN SHOW DDL JOBS 1;"

Expand Down
6 changes: 3 additions & 3 deletions br/tests/br_pitr_failpoint/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ run_br --pd $PD_ADDR log start --task-name integration_test -s "local://$TEST_DI
# wait until the index creation is running
retry_cnt=0
while true; do
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'running' AND SCHEMA_STATE = 'write reorganization' AND JOB_TYPE = 'add index /* ingest */';"
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'running' AND SCHEMA_STATE = 'write reorganization' AND JOB_TYPE = 'add index';"
if grep -Fq "1. row" $res_file; then
break
fi
Expand All @@ -71,7 +71,7 @@ touch $hint_sig_file_public
# wait until the index creation is done
retry_cnt=0
while true; do
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'done' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index /* ingest */';"
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'done' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index';"
if grep -Fq "1. row" $res_file; then
break
fi
Expand All @@ -98,7 +98,7 @@ wait $sql_pid
# wait until the index creation is done
retry_cnt=0
while true; do
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'synced' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index /* ingest */';"
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'synced' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index';"
if grep -Fq "1. row" $res_file; then
break
fi
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ func TestModifyColumnReorgCheckpoint(t *testing.T) {
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk.MustExec("set global tidb_ddl_reorg_worker_cnt = 1;")
tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1;")
tk.MustExec("create table t (a int primary key, b bigint);")
rowCnt := 10
for i := 0; i < rowCnt; i++ {
Expand Down
176 changes: 97 additions & 79 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2005,20 +2005,14 @@ func (e *executor) multiSchemaChange(ctx sessionctx.Context, ti ast.Ident, info
Type: model.ActionMultiSchemaChange,
BinlogInfo: &model.HistoryInfo{},
MultiSchemaInfo: info,
ReorgMeta: nil,
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
InvolvingSchemaInfo: involvingSchemaInfo,
SQLMode: ctx.GetSessionVars().SQLMode,
}
if containsDistTaskSubJob(subJobs) {
job.ReorgMeta, err = newReorgMetaFromVariables(job, ctx)
if err != nil {
return err
}
} else {
job.ReorgMeta = NewDDLReorgMeta(ctx)
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return errors.Trace(err)
}

err = checkMultiSchemaInfo(info, t)
if err != nil {
return errors.Trace(err)
Expand All @@ -2027,16 +2021,6 @@ func (e *executor) multiSchemaChange(ctx sessionctx.Context, ti ast.Ident, info
return e.DoDDLJob(ctx, job)
}

func containsDistTaskSubJob(subJobs []*model.SubJob) bool {
for _, sub := range subJobs {
if sub.Type == model.ActionAddIndex ||
sub.Type == model.ActionAddPrimaryKey {
return true
}
}
return false
}

func (e *executor) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int64, tp autoid.AllocatorType, force bool) error {
schema, t, err := e.getSchemaAndTableByIdent(ident)
if err != nil {
Expand Down Expand Up @@ -2451,10 +2435,13 @@ func (e *executor) AlterTablePartitioning(ctx sessionctx.Context, ident ast.Iden
TableName: t.Meta().Name.L,
Type: model.ActionAlterTablePartitioning,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: NewDDLReorgMeta(ctx),
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return err
}

args := &model.TablePartitionArgs{
PartNames: partNames,
Expand Down Expand Up @@ -2517,10 +2504,13 @@ func (e *executor) ReorganizePartitions(ctx sessionctx.Context, ident ast.Ident,
TableName: t.Meta().Name.L,
Type: model.ActionReorganizePartition,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: NewDDLReorgMeta(ctx),
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return errors.Trace(err)
}
args := &model.TablePartitionArgs{
PartNames: partNames,
PartInfo: partInfo,
Expand Down Expand Up @@ -2583,10 +2573,13 @@ func (e *executor) RemovePartitioning(ctx sessionctx.Context, ident ast.Ident, s
TableName: meta.Name.L,
Type: model.ActionRemovePartitioning,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: NewDDLReorgMeta(ctx),
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return errors.Trace(err)
}
args := &model.TablePartitionArgs{
PartNames: partNames,
PartInfo: partInfo,
Expand Down Expand Up @@ -3385,10 +3378,13 @@ func (e *executor) RenameColumn(ctx sessionctx.Context, ident ast.Ident, spec *a
TableName: tbl.Meta().Name.L,
Type: model.ActionModifyColumn,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: NewDDLReorgMeta(ctx),
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return err
}

args := &model.ModifyColumnArgs{
Column: newCol,
Expand Down Expand Up @@ -4645,11 +4641,10 @@ func (e *executor) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexN
OpType: model.OpAddIndex,
}

reorgMeta, err := newReorgMetaFromVariables(job, ctx)
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return err
}
job.ReorgMeta = reorgMeta

err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
Expand Down Expand Up @@ -4758,10 +4753,7 @@ func (e *executor) createVectorIndex(ctx sessionctx.Context, ti ast.Ident, index
return errors.Trace(err)
}

job, err := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t)
if err != nil {
return errors.Trace(err)
}
job := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t)
job.Version = model.GetJobVerInUse()
job.Type = model.ActionAddVectorIndex
indexPartSpecifications[0].Expr = nil
Expand All @@ -4788,32 +4780,20 @@ func (e *executor) createVectorIndex(ctx sessionctx.Context, ti ast.Ident, index
return errors.Trace(err)
}

func buildAddIndexJobWithoutTypeAndArgs(ctx sessionctx.Context, schema *model.DBInfo, t table.Table) (*model.Job, error) {
tzName, tzOffset := ddlutil.GetTimeZone(ctx)
func buildAddIndexJobWithoutTypeAndArgs(ctx sessionctx.Context, schema *model.DBInfo, t table.Table) *model.Job {
charset, collate := ctx.GetSessionVars().GetCharsetInfo()
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: &model.DDLReorgMeta{
SQLMode: ctx.GetSessionVars().SQLMode,
Warnings: make(map[errors.ErrorID]*terror.Error),
WarningsCount: make(map[errors.ErrorID]int64),
Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset},
},
Priority: ctx.GetSessionVars().DDLReorgPriority,
Charset: charset,
Collate: collate,
SQLMode: ctx.GetSessionVars().SQLMode,
Priority: ctx.GetSessionVars().DDLReorgPriority,
Charset: charset,
Collate: collate,
SQLMode: ctx.GetSessionVars().SQLMode,
}
reorgMeta, err := newReorgMetaFromVariables(job, ctx)
if err != nil {
return nil, errors.Trace(err)
}
job.ReorgMeta = reorgMeta
return job, nil
return job
}

func (e *executor) CreateIndex(ctx sessionctx.Context, stmt *ast.CreateIndexStmt) error {
Expand Down Expand Up @@ -4907,15 +4887,17 @@ func (e *executor) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast
// global is set to 'false' is just there to be backwards compatible,
// to avoid unmarshal issues, it is now part of indexOption.
global := false
job, err := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t)
if err != nil {
return errors.Trace(err)
}
job := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t)

job.Version = model.GetJobVerInUse()
job.Type = model.ActionAddIndex
job.CDCWriteSource = ctx.GetSessionVars().CDCWriteSource

err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return errors.Trace(err)
}

args := &model.ModifyIndexArgs{
IndexArgs: []*model.IndexArg{{
Unique: unique,
Expand All @@ -4937,44 +4919,80 @@ func (e *executor) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast
return errors.Trace(err)
}

func newReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) (*model.DDLReorgMeta, error) {
reorgMeta := NewDDLReorgMeta(sctx)
reorgMeta.IsDistReorg = variable.EnableDistTask.Load()
reorgMeta.IsFastReorg = variable.EnableFastReorg.Load()
reorgMeta.TargetScope = variable.ServiceScope.Load()
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgWorkerCount); ok {
reorgMeta.Concurrency = variable.TidbOptInt(sv, 0)
func initJobReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) error {
m := NewDDLReorgMeta(sctx)
setReorgParam := func() {
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgWorkerCount); ok {
m.Concurrency = variable.TidbOptInt(sv, 0)
}
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok {
m.BatchSize = variable.TidbOptInt(sv, 0)
}
}
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok {
reorgMeta.BatchSize = variable.TidbOptInt(sv, 0)
setDistTaskParam := func() error {
m.IsDistReorg = variable.EnableDistTask.Load()
m.IsFastReorg = variable.EnableFastReorg.Load()
m.TargetScope = variable.ServiceScope.Load()
if hasSysDB(job) {
if m.IsDistReorg {
logutil.DDLLogger().Info("cannot use distributed task execution on system DB",
zap.Stringer("job", job))
}
m.IsDistReorg = false
m.IsFastReorg = false
failpoint.Inject("reorgMetaRecordFastReorgDisabled", func(_ failpoint.Value) {
LastReorgMetaFastReorgDisabled = true
})
}
if m.IsDistReorg && !m.IsFastReorg {
return dbterror.ErrUnsupportedDistTask
}
return nil
}

if reorgMeta.IsDistReorg && !reorgMeta.IsFastReorg {
return nil, dbterror.ErrUnsupportedDistTask
}
if hasSysDB(job) {
if reorgMeta.IsDistReorg {
logutil.DDLLogger().Info("cannot use distributed task execution on system DB",
zap.Stringer("job", job))
switch job.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey:
setReorgParam()
err := setDistTaskParam()
if err != nil {
return err
}
reorgMeta.IsDistReorg = false
reorgMeta.IsFastReorg = false
failpoint.Inject("reorgMetaRecordFastReorgDisabled", func(_ failpoint.Value) {
LastReorgMetaFastReorgDisabled = true
})
case model.ActionReorganizePartition,
model.ActionRemovePartitioning,
model.ActionAlterTablePartitioning,
model.ActionModifyColumn:
setReorgParam()
case model.ActionMultiSchemaChange:
for _, sub := range job.MultiSchemaInfo.SubJobs {
switch sub.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey:
setReorgParam()
err := setDistTaskParam()
if err != nil {
return err
}
case model.ActionReorganizePartition,
model.ActionRemovePartitioning,
model.ActionAlterTablePartitioning,
model.ActionModifyColumn:
setReorgParam()
}
}
default:
return nil
}

job.ReorgMeta = m
logutil.DDLLogger().Info("initialize reorg meta",
zap.String("jobSchema", job.SchemaName),
zap.String("jobTable", job.TableName),
zap.Stringer("jobType", job.Type),
zap.Bool("enableDistTask", reorgMeta.IsDistReorg),
zap.Bool("enableFastReorg", reorgMeta.IsFastReorg),
zap.String("targetScope", reorgMeta.TargetScope),
zap.Int("concurrency", reorgMeta.Concurrency),
zap.Int("batchSize", reorgMeta.BatchSize),
zap.Bool("enableDistTask", m.IsDistReorg),
zap.Bool("enableFastReorg", m.IsFastReorg),
zap.String("targetScope", m.TargetScope),
zap.Int("concurrency", m.Concurrency),
zap.Int("batchSize", m.BatchSize),
)
return reorgMeta, nil
return nil
}

// LastReorgMetaFastReorgDisabled is used for test.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,7 +1274,7 @@ func pickBackfillType(job *model.Job) (model.ReorgType, error) {
func loadCloudStorageURI(w *worker, job *model.Job) {
jc := w.jobContext(job.ID, job.ReorgMeta)
jc.cloudStorageURI = variable.CloudStorageURI.Load()
job.ReorgMeta.UseCloudStorage = len(jc.cloudStorageURI) > 0
job.ReorgMeta.UseCloudStorage = len(jc.cloudStorageURI) > 0 && job.ReorgMeta.IsDistReorg
}

func doReorgWorkForCreateIndexMultiSchema(w *worker, jobCtx *jobContext, job *model.Job,
Expand Down
6 changes: 3 additions & 3 deletions pkg/ddl/ingest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestAddIndexIngestGeneratedColumns(t *testing.T) {
require.Len(t, rows, n)
for i := 0; i < n; i++ {
//nolint: forcetypeassert
jobTp := rows[i][3].(string)
jobTp := rows[i][12].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
}
}
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestIngestError(t *testing.T) {
tk.MustExec("admin check table t;")
rows := tk.MustQuery("admin show ddl jobs 1;").Rows()
//nolint: forcetypeassert
jobTp := rows[0][3].(string)
jobTp := rows[0][12].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)

tk.MustExec("drop table t;")
Expand All @@ -116,7 +116,7 @@ func TestIngestError(t *testing.T) {
tk.MustExec("admin check table t;")
rows = tk.MustQuery("admin show ddl jobs 1;").Rows()
//nolint: forcetypeassert
jobTp = rows[0][3].(string)
jobTp = rows[0][12].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/ddl/modify_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,11 +975,14 @@ func GetModifiableColumnJob(
TableName: t.Meta().Name.L,
Type: model.ActionModifyColumn,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: NewDDLReorgMeta(sctx),
CtxVars: []any{needChangeColData},
CDCWriteSource: sctx.GetSessionVars().CDCWriteSource,
SQLMode: sctx.GetSessionVars().SQLMode,
}
err = initJobReorgMetaFromVariables(job, sctx)
if err != nil {
return nil, errors.Trace(err)
}

args := &model.ModifyColumnArgs{
Column: newCol.ColumnInfo,
Expand Down
Loading