Skip to content

Commit

Permalink
ddl: args v2 for Alter/Remove Partitioning and Add/Drop/Reorganize Pa…
Browse files Browse the repository at this point in the history
…rtition (#56163)

ref #53930
  • Loading branch information
D3Hunter authored Sep 20, 2024
1 parent 92d45d5 commit 2651b77
Show file tree
Hide file tree
Showing 14 changed files with 368 additions and 111 deletions.
16 changes: 12 additions & 4 deletions br/pkg/stream/rewrite_meta_rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,10 +672,10 @@ var (
dropSchemaJob *model.Job
dropTable0Job = &model.Job{Version: model.JobVersion1, Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`["",[72,73,74],[""]]`)}
dropTable1Job = &model.Job{Version: model.JobVersion1, Type: model.ActionDropTable, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`["",[],[""]]`)}
dropTable0Partition1Job = &model.Job{Version: model.JobVersion1, Type: model.ActionDropTablePartition, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)}
reorganizeTable0Partition1Job = &model.Job{Version: model.JobVersion1, Type: model.ActionReorganizePartition, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)}
removeTable0Partition1Job = &model.Job{Version: model.JobVersion1, Type: model.ActionRemovePartitioning, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)}
alterTable0Partition1Job = &model.Job{Version: model.JobVersion1, Type: model.ActionAlterTablePartitioning, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[[73]]`)}
dropTable0Partition1Job *model.Job
reorganizeTable0Partition1Job *model.Job
removeTable0Partition1Job *model.Job
alterTable0Partition1Job *model.Job
rollBackTable0IndexJob = &model.Job{Version: model.JobVersion1, Type: model.ActionAddIndex, State: model.JobStateRollbackDone, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)}
rollBackTable1IndexJob = &model.Job{Version: model.JobVersion1, Type: model.ActionAddIndex, State: model.JobStateRollbackDone, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable1OldID, RawArgs: json.RawMessage(`[2,false,[]]`)}
addTable0IndexJob = &model.Job{Version: model.JobVersion1, Type: model.ActionAddIndex, State: model.JobStateSynced, SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID, RawArgs: json.RawMessage(`[2,false,[72,73,74]]`)}
Expand Down Expand Up @@ -735,6 +735,14 @@ func genFinishedJob(job *model.Job, args model.FinishedJobArgs) *model.Job {
func init() {
dropSchemaJob = genFinishedJob(&model.Job{Version: model.GetJobVerInUse(), Type: model.ActionDropSchema,
SchemaID: mDDLJobDBOldID}, &model.DropSchemaArgs{AllDroppedTableIDs: []int64{71, 72, 73, 74, 75}})
alterTable0Partition1Job = genFinishedJob(&model.Job{Version: model.GetJobVerInUse(), Type: model.ActionAlterTablePartitioning,
SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID}, &model.TablePartitionArgs{OldPhysicalTblIDs: []int64{73}})
removeTable0Partition1Job = genFinishedJob(&model.Job{Version: model.GetJobVerInUse(), Type: model.ActionRemovePartitioning,
SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID}, &model.TablePartitionArgs{OldPhysicalTblIDs: []int64{73}})
reorganizeTable0Partition1Job = genFinishedJob(&model.Job{Version: model.GetJobVerInUse(), Type: model.ActionReorganizePartition,
SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID}, &model.TablePartitionArgs{OldPhysicalTblIDs: []int64{73}})
dropTable0Partition1Job = genFinishedJob(&model.Job{Version: model.GetJobVerInUse(), Type: model.ActionDropTablePartition,
SchemaID: mDDLJobDBOldID, TableID: mDDLJobTable0OldID}, &model.TablePartitionArgs{OldPhysicalTblIDs: []int64{73}})
}

type mockInsertDeleteRange struct {
Expand Down
13 changes: 9 additions & 4 deletions pkg/ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,19 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap
// always delete the table range, even when it's a partitioned table where
// it may contain global index regions.
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, []int64{tableID}, ea, "truncate table: table ID"))
case model.ActionDropTablePartition, model.ActionTruncateTablePartition,
model.ActionReorganizePartition, model.ActionRemovePartitioning,
model.ActionAlterTablePartitioning:
case model.ActionDropTablePartition, model.ActionReorganizePartition,
model.ActionRemovePartitioning, model.ActionAlterTablePartitioning:
args, err := model.GetFinishedTablePartitionArgs(job)
if err != nil {
return errors.Trace(err)
}
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, args.OldPhysicalTblIDs, ea, "reorganize/drop partition: physical table ID(s)"))
case model.ActionTruncateTablePartition:
var physicalTableIDs []int64
if err := job.DecodeArgs(&physicalTableIDs); err != nil {
return errors.Trace(err)
}
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, physicalTableIDs, ea, "reorganize/drop partition: physical table ID(s)"))
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, physicalTableIDs, ea, "truncate partition: physical table ID(s)"))
// ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled.
case model.ActionAddIndex, model.ActionAddPrimaryKey:
allIndexIDs := make([]int64, 1)
Expand Down
38 changes: 28 additions & 10 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2327,16 +2327,19 @@ func (e *executor) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, s
}

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: schema.ID,
TableID: meta.ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
Type: model.ActionAddTablePartition,
BinlogInfo: &model.HistoryInfo{},
Args: []any{partInfo},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
args := &model.TablePartitionArgs{
PartInfo: partInfo,
}

if spec.Tp == ast.AlterTableAddLastPartition && spec.Partition != nil {
query, ok := ctx.Value(sessionctx.QueryString).(string)
Expand All @@ -2352,7 +2355,7 @@ func (e *executor) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, s
ctx.SetValue(sessionctx.QueryString, newQuery)
}
}
err = e.DoDDLJob(ctx, job)
err = e.doDDLJob2(ctx, job, args)
if dbterror.ErrSameNamePartition.Equal(err) && spec.IfNotExists {
ctx.GetSessionVars().StmtCtx.AppendNote(err)
return nil
Expand Down Expand Up @@ -2479,20 +2482,24 @@ func (e *executor) AlterTablePartitioning(ctx sessionctx.Context, ident ast.Iden
newPartInfo.DDLType = piOld.Type

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: schema.ID,
TableID: meta.ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
Type: model.ActionAlterTablePartitioning,
BinlogInfo: &model.HistoryInfo{},
Args: []any{partNames, newPartInfo},
ReorgMeta: NewDDLReorgMeta(ctx),
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}

args := &model.TablePartitionArgs{
PartNames: partNames,
PartInfo: newPartInfo,
}
// No preSplitAndScatter here, it will be done by the worker in onReorganizePartition instead.
err = e.DoDDLJob(ctx, job)
err = e.doDDLJob2(ctx, job, args)
if err == nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackError("The statistics of new partitions will be outdated after reorganizing partitions. Please use 'ANALYZE TABLE' statement if you want to update it now"))
}
Expand Down Expand Up @@ -2541,20 +2548,24 @@ func (e *executor) ReorganizePartitions(ctx sessionctx.Context, ident ast.Ident,
}

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: schema.ID,
TableID: meta.ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
Type: model.ActionReorganizePartition,
BinlogInfo: &model.HistoryInfo{},
Args: []any{partNames, partInfo},
ReorgMeta: NewDDLReorgMeta(ctx),
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
args := &model.TablePartitionArgs{
PartNames: partNames,
PartInfo: partInfo,
}

// No preSplitAndScatter here, it will be done by the worker in onReorganizePartition instead.
err = e.DoDDLJob(ctx, job)
err = e.doDDLJob2(ctx, job, args)
failpoint.InjectCall("afterReorganizePartition")
if err == nil {
ctx.GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackError("The statistics of related partitions will be outdated after reorganizing partitions. Please use 'ANALYZE TABLE' statement if you want to update it now"))
Expand Down Expand Up @@ -2603,20 +2614,24 @@ func (e *executor) RemovePartitioning(ctx sessionctx.Context, ident ast.Ident, s
}

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: schema.ID,
TableID: meta.ID,
SchemaName: schema.Name.L,
TableName: meta.Name.L,
Type: model.ActionRemovePartitioning,
BinlogInfo: &model.HistoryInfo{},
Args: []any{partNames, partInfo},
ReorgMeta: NewDDLReorgMeta(ctx),
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
args := &model.TablePartitionArgs{
PartNames: partNames,
PartInfo: partInfo,
}

// No preSplitAndScatter here, it will be done by the worker in onReorganizePartition instead.
err = e.DoDDLJob(ctx, job)
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down Expand Up @@ -2904,19 +2919,22 @@ func (e *executor) DropTablePartition(ctx sessionctx.Context, ident ast.Ident, s
}

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: schema.ID,
TableID: meta.ID,
SchemaName: schema.Name.L,
SchemaState: model.StatePublic,
TableName: meta.Name.L,
Type: model.ActionDropTablePartition,
BinlogInfo: &model.HistoryInfo{},
Args: []any{partNames},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
args := &model.TablePartitionArgs{
PartNames: partNames,
}

err = e.DoDDLJob(ctx, job)
err = e.doDDLJob2(ctx, job, args)
if err != nil {
if dbterror.ErrDropPartitionNonExistent.Equal(err) && spec.IfExists {
ctx.GetSessionVars().StmtCtx.AppendNote(err)
Expand Down
30 changes: 11 additions & 19 deletions pkg/ddl/job_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,20 +503,17 @@ func getRequiredGIDCount(jobWs []*JobWrapper) int {
case model.ActionCreateSchema, model.ActionCreateResourceGroup:
count++
case model.ActionAlterTablePartitioning:
pInfo := jobW.Args[1].(*model.PartitionInfo)
args := jobW.JobArgs.(*model.TablePartitionArgs)
// A new table ID would be needed for
// the global table, which cannot be the same as the current table id,
// since this table id will be removed in the final state when removing
// all the data with this table id.
count += 1 + len(pInfo.Definitions)
count += 1 + len(args.PartInfo.Definitions)
case model.ActionTruncateTablePartition:
count += len(jobW.Args[0].([]int64))
case model.ActionAddTablePartition:
pInfo := jobW.Args[0].(*model.PartitionInfo)
count += len(pInfo.Definitions)
case model.ActionReorganizePartition, model.ActionRemovePartitioning:
pInfo := jobW.Args[1].(*model.PartitionInfo)
count += len(pInfo.Definitions)
case model.ActionAddTablePartition, model.ActionReorganizePartition, model.ActionRemovePartitioning:
args := jobW.JobArgs.(*model.TablePartitionArgs)
count += len(args.PartInfo.Definitions)
case model.ActionTruncateTable:
count += 1 + len(jobW.JobArgs.(*model.TruncateTableArgs).OldPartitionIDs)
}
Expand Down Expand Up @@ -561,9 +558,9 @@ func assignGIDsForJobs(jobWs []*JobWrapper, ids []int64) {
}
case model.ActionAlterTablePartitioning:
if !jobW.IDAllocated {
pInfo := jobW.Args[1].(*model.PartitionInfo)
alloc.assignIDsForPartitionInfo(pInfo)
pInfo.NewTableID = alloc.next()
args := jobW.JobArgs.(*model.TablePartitionArgs)
alloc.assignIDsForPartitionInfo(args.PartInfo)
args.PartInfo.NewTableID = alloc.next()
}
case model.ActionTruncateTablePartition:
if !jobW.IDAllocated {
Expand All @@ -573,20 +570,15 @@ func assignGIDsForJobs(jobWs []*JobWrapper, ids []int64) {
}
jobW.Args[1] = newIDs
}
case model.ActionAddTablePartition:
if !jobW.IDAllocated {
pInfo := jobW.Args[0].(*model.PartitionInfo)
alloc.assignIDsForPartitionInfo(pInfo)
}
case model.ActionReorganizePartition:
case model.ActionAddTablePartition, model.ActionReorganizePartition:
if !jobW.IDAllocated {
pInfo := jobW.Args[1].(*model.PartitionInfo)
pInfo := jobW.JobArgs.(*model.TablePartitionArgs).PartInfo
alloc.assignIDsForPartitionInfo(pInfo)
}
case model.ActionRemovePartitioning:
// a special partition is used in this case, and we will use the ID
// of the partition as the new table ID.
pInfo := jobW.Args[1].(*model.PartitionInfo)
pInfo := jobW.JobArgs.(*model.TablePartitionArgs).PartInfo
if !jobW.IDAllocated {
alloc.assignIDsForPartitionInfo(pInfo)
}
Expand Down
Loading

0 comments on commit 2651b77

Please sign in to comment.