Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix issue of multi-schema change with foreign key #40042

Merged
merged 10 commits into from
Dec 21, 2022
12 changes: 10 additions & 2 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1980,7 +1980,7 @@ func addIndexForForeignKey(ctx sessionctx.Context, tbInfo *model.TableInfo) erro
if handleCol != nil && len(fk.Cols) == 1 && handleCol.Name.L == fk.Cols[0].L {
continue
}
if model.FindIndexByColumns(tbInfo, fk.Cols...) != nil {
if model.FindIndexByColumns(tbInfo, tbInfo.Indices, fk.Cols...) != nil {
continue
}
idxName := fk.Name
Expand Down Expand Up @@ -3264,6 +3264,14 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast
return dbterror.ErrOptOnCacheTable.GenWithStackByArgs("Alter Table")
}
}
// set name for anonymous foreign key.
maxForeignKeyID := tb.Meta().MaxForeignKeyID
for _, spec := range validSpecs {
if spec.Tp == ast.AlterTableAddConstraint && spec.Constraint.Tp == ast.ConstraintForeignKey && spec.Constraint.Name == "" {
maxForeignKeyID++
spec.Constraint.Name = fmt.Sprintf("fk_%d", maxForeignKeyID)
}
}

if len(validSpecs) > 1 {
sctx.GetSessionVars().StmtCtx.MultiSchemaInfo = model.NewMultiSchemaInfo()
Expand Down Expand Up @@ -6570,7 +6578,7 @@ func (d *ddl) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName mode
if err != nil {
return err
}
if model.FindIndexByColumns(t.Meta(), fkInfo.Cols...) == nil {
if model.FindIndexByColumns(t.Meta(), t.Meta().Indices, fkInfo.Cols...) == nil {
// Need to auto create index for fk cols
if ctx.GetSessionVars().StmtCtx.MultiSchemaInfo == nil {
ctx.GetSessionVars().StmtCtx.MultiSchemaInfo = model.NewMultiSchemaInfo()
Expand Down
29 changes: 29 additions & 0 deletions ddl/fktest/foreign_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1562,3 +1562,32 @@ func getLatestSchemaDiff(t *testing.T, tk *testkit.TestKit) *model.SchemaDiff {
require.NoError(t, err)
return diff
}

func TestTestMultiSchemaAddForeignKey(t *testing.T) {
store, _ := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@foreign_key_checks=1;")
tk.MustExec("use test")
tk.MustExec("create table t1 (id int key);")
tk.MustExec("create table t2 (a int, b int);")
tk.MustExec("alter table t2 add foreign key (a) references t1(id), add foreign key (b) references t1(id)")
tk.MustExec("alter table t2 add column c int, add column d int")
tk.MustExec("alter table t2 add foreign key (c) references t1(id), add foreign key (d) references t1(id), add index(c), add index(d)")
tk.MustExec("drop table t2")
tk.MustExec("create table t2 (a int, b int, index idx1(a), index idx2(b));")
tk.MustGetErrMsg("alter table t2 drop index idx1, drop index idx2, add foreign key (a) references t1(id), add foreign key (b) references t1(id)",
"[ddl:1553]Cannot drop index 'idx1': needed in a foreign key constraint")
tk.MustExec("alter table t2 drop index idx1, drop index idx2")
tk.MustExec("alter table t2 add foreign key (a) references t1(id), add foreign key (b) references t1(id)")
tk.MustQuery("show create table t2").Check(testkit.Rows("t2 CREATE TABLE `t2` (\n" +
" `a` int(11) DEFAULT NULL,\n" +
" `b` int(11) DEFAULT NULL,\n" +
" KEY `fk_1` (`a`),\n" +
" KEY `fk_2` (`b`),\n" +
" CONSTRAINT `fk_1` FOREIGN KEY (`a`) REFERENCES `test`.`t1` (`id`),\n" +
" CONSTRAINT `fk_2` FOREIGN KEY (`b`) REFERENCES `test`.`t1` (`id`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
tk.MustExec("drop table t2")
tk.MustExec("create table t2 (a int, b int, index idx0(a,b), index idx1(a), index idx2(b));")
tk.MustExec("alter table t2 drop index idx1, add foreign key (a) references t1(id), add foreign key (b) references t1(id)")
}
4 changes: 2 additions & 2 deletions ddl/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func checkTableForeignKey(referTblInfo, tblInfo *model.TableInfo, fkInfo *model.
}
}
// check refer columns should have index.
if model.FindIndexByColumns(referTblInfo, fkInfo.RefCols...) == nil {
if model.FindIndexByColumns(referTblInfo, referTblInfo.Indices, fkInfo.RefCols...) == nil {
return infoschema.ErrForeignKeyNoIndexInParent.GenWithStackByArgs(fkInfo.Name, fkInfo.RefTable)
}
return nil
Expand Down Expand Up @@ -660,7 +660,7 @@ func checkAddForeignKeyValidInOwner(d *ddlCtx, t *meta.Meta, schema string, tbIn
return nil
}
}
if model.FindIndexByColumns(tbInfo, fk.Cols...) == nil {
if model.FindIndexByColumns(tbInfo, tbInfo.Indices, fk.Cols...) == nil {
return errors.Errorf("Failed to add the foreign key constraint. Missing index for '%s' foreign key columns in the table '%s'", fk.Name, tbInfo.Name)
}
return nil
Expand Down
36 changes: 35 additions & 1 deletion ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,10 @@ func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error
case model.ActionRebaseAutoID, model.ActionModifyTableComment, model.ActionModifyTableCharsetAndCollate:
case model.ActionAddForeignKey:
fkInfo := job.Args[0].(*model.FKInfo)
info.ForeignKeys = append(info.ForeignKeys, fkInfo.Name)
info.AddForeignKeys = append(info.AddForeignKeys, model.AddForeignKeyInfo{
Name: fkInfo.Name,
Cols: fkInfo.Cols,
})
default:
return dbterror.ErrRunMultiSchemaChanges.FastGenByArgs(job.Type.String())
}
Expand Down Expand Up @@ -323,6 +326,32 @@ func checkOperateSameColAndIdx(info *model.MultiSchemaInfo) error {
return checkIndexes(info.AlterIndexes, true)
}

func checkOperateDropIndexUseByForeignKey(info *model.MultiSchemaInfo, t table.Table) error {
var remainIndexes, droppingIndexes []*model.IndexInfo
tbInfo := t.Meta()
for _, idx := range tbInfo.Indices {
dropping := false
for _, name := range info.DropIndexes {
if name.L == idx.Name.L {
dropping = true
break
}
}
if dropping {
droppingIndexes = append(droppingIndexes, idx)
} else {
remainIndexes = append(remainIndexes, idx)
}
}

for _, fk := range info.AddForeignKeys {
if droppingIdx := model.FindIndexByColumns(tbInfo, droppingIndexes, fk.Cols...); droppingIdx != nil && model.FindIndexByColumns(tbInfo, remainIndexes, fk.Cols...) == nil {
return dbterror.ErrDropIndexNeededInForeignKey.GenWithStackByArgs(droppingIdx.Name)
}
}
return nil
}

func checkMultiSchemaInfo(info *model.MultiSchemaInfo, t table.Table) error {
err := checkOperateSameColAndIdx(info)
if err != nil {
Expand All @@ -334,6 +363,11 @@ func checkMultiSchemaInfo(info *model.MultiSchemaInfo, t table.Table) error {
return err
}

err = checkOperateDropIndexUseByForeignKey(info, t)
if err != nil {
return err
}

return checkAddColumnTooManyColumns(len(t.Cols()) + len(info.AddColumns) - len(info.DropColumns))
}

Expand Down
9 changes: 8 additions & 1 deletion parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,19 @@ type MultiSchemaInfo struct {
AddIndexes []CIStr `json:"-"`
DropIndexes []CIStr `json:"-"`
AlterIndexes []CIStr `json:"-"`
ForeignKeys []CIStr `json:"-"`

AddForeignKeys []AddForeignKeyInfo `json:"-"`

RelativeColumns []CIStr `json:"-"`
PositionColumns []CIStr `json:"-"`
}

// AddForeignKeyInfo contains foreign key information.
type AddForeignKeyInfo struct {
Name CIStr
Cols []CIStr
}

// NewMultiSchemaInfo new a MultiSchemaInfo.
func NewMultiSchemaInfo() *MultiSchemaInfo {
return &MultiSchemaInfo{
Expand Down
4 changes: 2 additions & 2 deletions parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,8 @@ func FindFKInfoByName(fks []*FKInfo, name string) *FKInfo {
}

// FindIndexByColumns find IndexInfo in indices which is cover the specified columns.
func FindIndexByColumns(tbInfo *TableInfo, cols ...CIStr) *IndexInfo {
for _, index := range tbInfo.Indices {
func FindIndexByColumns(tbInfo *TableInfo, indices []*IndexInfo, cols ...CIStr) *IndexInfo {
for _, index := range indices {
if IsIndexPrefixCovered(tbInfo, index, cols...) {
return index
}
Expand Down
4 changes: 2 additions & 2 deletions planner/core/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func buildFKCheck(ctx sessionctx.Context, tbl table.Table, cols []model.CIStr, f
}
}

referTbIdxInfo := model.FindIndexByColumns(tblInfo, cols...)
referTbIdxInfo := model.FindIndexByColumns(tblInfo, tblInfo.Indices, cols...)
if referTbIdxInfo == nil {
return nil, failedErr
}
Expand Down Expand Up @@ -460,7 +460,7 @@ func buildFKCascade(ctx sessionctx.Context, tp FKCascadeType, referredFK *model.
return fkCascade, nil
}
}
indexForFK := model.FindIndexByColumns(childTable.Meta(), fk.Cols...)
indexForFK := model.FindIndexByColumns(childTable.Meta(), childTable.Meta().Indices, fk.Cols...)
if indexForFK == nil {
return nil, errors.Errorf("Missing index for '%s' foreign key columns in the table '%s'", fk.Name, childTable.Meta().Name)
}
Expand Down