Skip to content

Commit

Permalink
ddl: args v2 for add/drop/alter CheckConstraint (#56010)
Browse files Browse the repository at this point in the history
ref #53930
  • Loading branch information
joechenrh authored Sep 22, 2024
1 parent 2eb4dc8 commit 2761fe0
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 21 deletions.
25 changes: 11 additions & 14 deletions pkg/ddl/constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,14 @@ func checkAddCheckConstraint(t *meta.Meta, job *model.Job) (*model.DBInfo, *mode
if err != nil {
return nil, nil, nil, nil, errors.Trace(err)
}
constraintInfo1 := &model.ConstraintInfo{}
err = job.DecodeArgs(constraintInfo1)

args, err := model.GetAddCheckConstraintArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, nil, errors.Trace(err)
}
constraintInfo1 := args.Constraint

// do the double-check with constraint existence.
constraintInfo2 := tblInfo.FindConstraintInfoByName(constraintInfo1.Name.L)
if constraintInfo2 != nil {
Expand Down Expand Up @@ -199,18 +201,17 @@ func checkDropCheckConstraint(t *meta.Meta, job *model.Job) (*model.TableInfo, *
return nil, nil, errors.Trace(err)
}

var constrName pmodel.CIStr
err = job.DecodeArgs(&constrName)
args, err := model.GetCheckConstraintArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, errors.Trace(err)
}

// double check with constraint existence.
constraintInfo := tblInfo.FindConstraintInfoByName(constrName.L)
constraintInfo := tblInfo.FindConstraintInfoByName(args.ConstraintName.L)
if constraintInfo == nil {
job.State = model.JobStateCancelled
return nil, nil, dbterror.ErrConstraintNotFound.GenWithStackByArgs(constrName)
return nil, nil, dbterror.ErrConstraintNotFound.GenWithStackByArgs(args.ConstraintName)
}
return tblInfo, constraintInfo, nil
}
Expand Down Expand Up @@ -281,22 +282,18 @@ func checkAlterCheckConstraint(t *meta.Meta, job *model.Job) (*model.DBInfo, *mo
return nil, nil, nil, false, errors.Trace(err)
}

var (
enforced bool
constrName pmodel.CIStr
)
err = job.DecodeArgs(&constrName, &enforced)
args, err := model.GetCheckConstraintArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, false, errors.Trace(err)
}
// do the double check with constraint existence.
constraintInfo := tblInfo.FindConstraintInfoByName(constrName.L)
constraintInfo := tblInfo.FindConstraintInfoByName(args.ConstraintName.L)
if constraintInfo == nil {
job.State = model.JobStateCancelled
return nil, nil, nil, false, dbterror.ErrConstraintNotFound.GenWithStackByArgs(constrName)
return nil, nil, nil, false, dbterror.ErrConstraintNotFound.GenWithStackByArgs(args.ConstraintName)
}
return dbInfo, tblInfo, constraintInfo, enforced, nil
return dbInfo, tblInfo, constraintInfo, args.Enforced, nil
}

func allocateConstraintID(tblInfo *model.TableInfo) int64 {
Expand Down
22 changes: 16 additions & 6 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6186,19 +6186,22 @@ func (e *executor) CreateCheckConstraint(ctx sessionctx.Context, ti ast.Ident, c
return err
}
job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: schema.ID,
TableID: tblInfo.ID,
SchemaName: schema.Name.L,
TableName: tblInfo.Name.L,
Type: model.ActionAddCheckConstraint,
BinlogInfo: &model.HistoryInfo{},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
Args: []any{constraintInfo},
Priority: ctx.GetSessionVars().DDLReorgPriority,
SQLMode: ctx.GetSessionVars().SQLMode,
}

err = e.DoDDLJob(ctx, job)
args := &model.AddCheckConstraintArgs{
Constraint: constraintInfo,
}
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand All @@ -6220,18 +6223,21 @@ func (e *executor) DropCheckConstraint(ctx sessionctx.Context, ti ast.Ident, con
}

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: schema.ID,
TableID: tblInfo.ID,
SchemaName: schema.Name.L,
TableName: tblInfo.Name.L,
Type: model.ActionDropCheckConstraint,
BinlogInfo: &model.HistoryInfo{},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
Args: []any{constrName},
SQLMode: ctx.GetSessionVars().SQLMode,
}

err = e.DoDDLJob(ctx, job)
args := &model.CheckConstraintArgs{
ConstraintName: constrName,
}
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand All @@ -6253,18 +6259,22 @@ func (e *executor) AlterCheckConstraint(ctx sessionctx.Context, ti ast.Ident, co
}

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: schema.ID,
TableID: tblInfo.ID,
SchemaName: schema.Name.L,
TableName: tblInfo.Name.L,
Type: model.ActionAlterCheckConstraint,
BinlogInfo: &model.HistoryInfo{},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
Args: []any{constrName, enforced},
SQLMode: ctx.GetSessionVars().SQLMode,
}

err = e.DoDDLJob(ctx, job)
args := &model.CheckConstraintArgs{
ConstraintName: constrName,
Enforced: enforced,
}
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/model/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ go_test(
],
embed = [":model"],
flaky = True,
shard_count = 37,
shard_count = 39,
deps = [
"//pkg/parser/charset",
"//pkg/parser/model",
Expand Down
62 changes: 62 additions & 0 deletions pkg/meta/model/job_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,20 @@ func UpdateRenameTableArgs(job *Job) error {
return nil
}

// CheckConstraintArgs is the arguments for both AlterCheckConstraint and DropCheckConstraint job.
type CheckConstraintArgs struct {
ConstraintName pmodel.CIStr `json:"constraint_name,omitempty"`
Enforced bool `json:"enforced,omitempty"`
}

func (a *CheckConstraintArgs) fillJob(job *Job) {
if job.Version == JobVersion1 {
job.Args = []any{a.ConstraintName, a.Enforced}
return
}
job.Args = []any{a}
}

// ResourceGroupArgs is the arguments for resource group job.
type ResourceGroupArgs struct {
// for DropResourceGroup we only use it to store the name, other fields are invalid.
Expand Down Expand Up @@ -717,3 +731,51 @@ func GetRenameTablesArgs(job *Job) (*RenameTablesArgs, error) {
}
return getOrDecodeArgsV2[*RenameTablesArgs](job)
}

// GetCheckConstraintArgs gets the AlterCheckConstraint args.
func GetCheckConstraintArgs(job *Job) (*CheckConstraintArgs, error) {
if job.Version == JobVersion1 {
var (
constraintName pmodel.CIStr
enforced bool
)
err := job.DecodeArgs(&constraintName, &enforced)
if err != nil {
return nil, errors.Trace(err)
}
return &CheckConstraintArgs{
ConstraintName: constraintName,
Enforced: enforced,
}, nil
}

return getOrDecodeArgsV2[*CheckConstraintArgs](job)
}

// AddCheckConstraintArgs is the arguemnt for add check constraint
type AddCheckConstraintArgs struct {
Constraint *ConstraintInfo `json:"constraint_info"`
}

func (a *AddCheckConstraintArgs) fillJob(job *Job) {
if job.Version == JobVersion1 {
job.Args = []any{a.Constraint}
return
}
job.Args = []any{a}
}

// GetAddCheckConstraintArgs gets the AddCheckConstraint args.
func GetAddCheckConstraintArgs(job *Job) (*AddCheckConstraintArgs, error) {
if job.Version == JobVersion1 {
var constraintInfo ConstraintInfo
err := job.DecodeArgs(&constraintInfo)
if err != nil {
return nil, errors.Trace(err)
}
return &AddCheckConstraintArgs{
Constraint: &constraintInfo,
}, nil
}
return getOrDecodeArgsV2[*AddCheckConstraintArgs](job)
}
38 changes: 38 additions & 0 deletions pkg/meta/model/job_args_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,41 @@ func TestDropColumnArgs(t *testing.T) {
require.Equal(t, inArgs, args)
}
}

func TestAddCheckConstraintArgs(t *testing.T) {
Constraint :=
&ConstraintInfo{
Name: model.NewCIStr("t3_c1"),
Table: model.NewCIStr("t3"),
ExprString: "id<10",
State: StateDeleteOnly,
}
inArgs := &AddCheckConstraintArgs{
Constraint: Constraint,
}
for _, v := range []JobVersion{JobVersion1, JobVersion2} {
j2 := &Job{}
require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionAddCheckConstraint)))
args, err := GetAddCheckConstraintArgs(j2)
require.NoError(t, err)
require.Equal(t, "t3_c1", args.Constraint.Name.O)
require.Equal(t, "t3", args.Constraint.Table.O)
require.Equal(t, "id<10", args.Constraint.ExprString)
require.Equal(t, StateDeleteOnly, args.Constraint.State)
}
}

func TestCheckConstraintArgs(t *testing.T) {
inArgs := &CheckConstraintArgs{
ConstraintName: model.NewCIStr("c1"),
Enforced: true,
}
for _, v := range []JobVersion{JobVersion1, JobVersion2} {
j2 := &Job{}
require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionDropCheckConstraint)))
args, err := GetCheckConstraintArgs(j2)
require.NoError(t, err)
require.Equal(t, "c1", args.ConstraintName.O)
require.True(t, args.Enforced)
}
}

0 comments on commit 2761fe0

Please sign in to comment.