Skip to content

Commit

Permalink
ddl: args v2 for create/alter/drop resource group (#56153)
Browse files Browse the repository at this point in the history
ref #53930
  • Loading branch information
D3Hunter authored Sep 19, 2024
1 parent 68fdf55 commit c13eb90
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 22 deletions.
15 changes: 9 additions & 6 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5784,17 +5784,18 @@ func (e *executor) AddResourceGroup(ctx sessionctx.Context, stmt *ast.CreateReso
logutil.DDLLogger().Debug("create resource group", zap.String("name", groupName.O), zap.Stringer("resource group settings", groupInfo.ResourceGroupSettings))

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaName: groupName.L,
Type: model.ActionCreateResourceGroup,
BinlogInfo: &model.HistoryInfo{},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
Args: []any{groupInfo, false},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
ResourceGroup: groupInfo.Name.L,
}},
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = e.DoDDLJob(ctx, job)
args := &model.ResourceGroupArgs{RGInfo: groupInfo}
err = e.doDDLJob2(ctx, job, args)
return err
}

Expand Down Expand Up @@ -5828,18 +5829,19 @@ func (e *executor) DropResourceGroup(ctx sessionctx.Context, stmt *ast.DropResou
}

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: group.ID,
SchemaName: group.Name.L,
Type: model.ActionDropResourceGroup,
BinlogInfo: &model.HistoryInfo{},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
Args: []any{groupName},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
ResourceGroup: groupName.L,
}},
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = e.DoDDLJob(ctx, job)
args := &model.ResourceGroupArgs{RGInfo: &model.ResourceGroupInfo{Name: groupName}}
err = e.doDDLJob2(ctx, job, args)
return err
}

Expand Down Expand Up @@ -5869,18 +5871,19 @@ func (e *executor) AlterResourceGroup(ctx sessionctx.Context, stmt *ast.AlterRes
logutil.DDLLogger().Debug("alter resource group", zap.String("name", groupName.L), zap.Stringer("new resource group settings", newGroupInfo.ResourceGroupSettings))

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: newGroupInfo.ID,
SchemaName: newGroupInfo.Name.L,
Type: model.ActionAlterResourceGroup,
BinlogInfo: &model.HistoryInfo{},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
Args: []any{newGroupInfo},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
ResourceGroup: newGroupInfo.Name.L,
}},
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = e.DoDDLJob(ctx, job)
args := &model.ResourceGroupArgs{RGInfo: newGroupInfo}
err = e.doDDLJob2(ctx, job, args)
return err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/job_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,8 +556,8 @@ func assignGIDsForJobs(jobWs []*JobWrapper, ids []int64) {
jobW.SchemaID = dbInfo.ID
case model.ActionCreateResourceGroup:
if !jobW.IDAllocated {
rgInfo := jobW.Args[0].(*model.ResourceGroupInfo)
rgInfo.ID = alloc.next()
args := jobW.JobArgs.(*model.ResourceGroupArgs)
args.RGInfo.ID = alloc.next()
}
case model.ActionAlterTablePartitioning:
if !jobW.IDAllocated {
Expand Down
20 changes: 11 additions & 9 deletions pkg/ddl/job_submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,15 @@ func TestCombinedIDAllocation(t *testing.T) {
return j
}

genRGroupJob := func() *model.Job {
genRGroupJob := func(idAllocated bool) *ddl.JobWrapper {
info := &model.ResourceGroupInfo{}
return &model.Job{
Version: model.JobVersion1,
job := &model.Job{
Version: model.GetJobVerInUse(),
Type: model.ActionCreateResourceGroup,
Args: []any{info},
}
return ddl.NewJobWrapperWithArgs(job, &model.ResourceGroupArgs{
RGInfo: info,
}, idAllocated)
}

genAlterTblPartitioningJob := func(partCnt int) *model.Job {
Expand Down Expand Up @@ -284,11 +286,11 @@ func TestCombinedIDAllocation(t *testing.T) {
requiredIDCount: 1,
},
{
jobW: ddl.NewJobWrapper(genRGroupJob(), false),
jobW: genRGroupJob(false),
requiredIDCount: 2,
},
{
jobW: ddl.NewJobWrapper(genRGroupJob(), true),
jobW: genRGroupJob(true),
requiredIDCount: 1,
},
{
Expand Down Expand Up @@ -428,9 +430,9 @@ func TestCombinedIDAllocation(t *testing.T) {
uniqueIDs[args.DBInfo.ID] = struct{}{}
require.Equal(t, j.SchemaID, args.DBInfo.ID)
case model.ActionCreateResourceGroup:
info := &model.ResourceGroupInfo{}
require.NoError(t, j.DecodeArgs(info))
checkID(info.ID)
args, err := model.GetResourceGroupArgs(j)
require.NoError(t, err)
checkID(args.RGInfo.ID)
case model.ActionAlterTablePartitioning:
var partNames []string
info := &model.PartitionInfo{}
Expand Down
10 changes: 6 additions & 4 deletions pkg/ddl/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ const (
)

func onCreateResourceGroup(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) {
groupInfo := &model.ResourceGroupInfo{}
if err := job.DecodeArgs(groupInfo); err != nil {
args, err := model.GetResourceGroupArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
groupInfo := args.RGInfo
groupInfo.State = model.StateNone

// check if resource group value is valid and convert to proto format.
Expand Down Expand Up @@ -93,11 +94,12 @@ func onCreateResourceGroup(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ve
}

func onAlterResourceGroup(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) {
alterGroupInfo := &model.ResourceGroupInfo{}
if err := job.DecodeArgs(alterGroupInfo); err != nil {
args, err := model.GetResourceGroupArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
alterGroupInfo := args.RGInfo
// check if resource group value is valid and convert to proto format.
protoGroup, err := resourcegroup.NewGroupFromOptions(alterGroupInfo.Name.L, alterGroupInfo.ResourceGroupSettings)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/model/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ go_test(
],
embed = [":model"],
flaky = True,
shard_count = 32,
shard_count = 33,
deps = [
"//pkg/parser/charset",
"//pkg/parser/model",
Expand Down
42 changes: 42 additions & 0 deletions pkg/meta/model/job_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,3 +425,45 @@ func UpdateRenameTableArgs(job *Job) error {
}
return nil
}

// ResourceGroupArgs is the arguments for resource group job.
type ResourceGroupArgs struct {
// for DropResourceGroup we only use it to store the name, other fields are invalid.
RGInfo *ResourceGroupInfo `json:"rg_info,omitempty"`
}

func (a *ResourceGroupArgs) fillJob(job *Job) {
if job.Version == JobVersion1 {
if job.Type == ActionCreateResourceGroup {
// what's the second parameter for? we keep it for compatibility.
job.Args = []any{a.RGInfo, false}
} else if job.Type == ActionAlterResourceGroup {
job.Args = []any{a.RGInfo}
} else if job.Type == ActionDropResourceGroup {
// it's not used anywhere.
job.Args = []any{a.RGInfo.Name}
}
return
}
job.Args = []any{a}
}

// GetResourceGroupArgs gets the resource group args.
func GetResourceGroupArgs(job *Job) (*ResourceGroupArgs, error) {
if job.Version == JobVersion1 {
rgInfo := ResourceGroupInfo{}
if job.Type == ActionCreateResourceGroup || job.Type == ActionAlterResourceGroup {
if err := job.DecodeArgs(&rgInfo); err != nil {
return nil, errors.Trace(err)
}
} else if job.Type == ActionDropResourceGroup {
var rgName pmodel.CIStr
if err := job.DecodeArgs(&rgName); err != nil {
return nil, errors.Trace(err)
}
rgInfo.Name = rgName
}
return &ResourceGroupArgs{RGInfo: &rgInfo}, nil
}
return getOrDecodeArgsV2[*ResourceGroupArgs](job)
}
19 changes: 19 additions & 0 deletions pkg/meta/model/job_args_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,22 @@ func TestUpdateRenameTableArgs(t *testing.T) {
}, args)
}
}

func TestResourceGroupArgs(t *testing.T) {
inArgs := &ResourceGroupArgs{
RGInfo: &ResourceGroupInfo{ID: 100, Name: model.NewCIStr("rg_name")},
}
for _, tp := range []ActionType{ActionCreateResourceGroup, ActionAlterResourceGroup, ActionDropResourceGroup} {
for _, v := range []JobVersion{JobVersion1, JobVersion2} {
j2 := &Job{}
require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, tp)))
args, err := GetResourceGroupArgs(j2)
require.NoError(t, err)
if tp == ActionDropResourceGroup {
require.EqualValues(t, inArgs.RGInfo.Name, args.RGInfo.Name)
} else {
require.EqualValues(t, inArgs, args)
}
}
}
}

0 comments on commit c13eb90

Please sign in to comment.