diff --git a/ddl/column_modify_test.go b/ddl/column_modify_test.go new file mode 100644 index 0000000000000..d07e4ac25385f --- /dev/null +++ b/ddl/column_modify_test.go @@ -0,0 +1,1291 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl_test + +import ( + "context" + "fmt" + "math/rand" + "strconv" + "strings" + "testing" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/ddl" + testddlutil "github.com/pingcap/tidb/ddl/testutil" + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/testkit/external" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/admin" + "github.com/pingcap/tidb/util/mock" + "github.com/stretchr/testify/require" +) + +const columnModifyLease = 600 * time.Millisecond + +func TestAddAndDropColumn(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, columnModifyLease) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t2 (c1 int, c2 int, c3 int)") + tk.MustExec("set @@tidb_disable_txn_auto_retry = 0") + + // ========== + // ADD COLUMN + // ========== + + done := make(chan error, 1) + + num := defaultBatchSize + 10 + // add some rows + batchInsert(tk, "t2", 0, num) + + testddlutil.SessionExecInGoroutine(store, "test", "alter table t2 add column c4 int default -1", done) + + ticker := time.NewTicker(columnModifyLease / 2) + defer ticker.Stop() + step := 10 +AddLoop: + for { + select { + case err := <-done: + if err == nil { + break AddLoop + } + require.NoError(t, err) + case <-ticker.C: + // delete some rows, and add some data + for i := num; i < num+step; i++ { + n := rand.Intn(num) + tk.MustExec("begin") + tk.MustExec("delete from t2 where c1 = ?", n) + tk.MustExec("commit") + + // Make sure that statement of insert and show use the same infoSchema. + tk.MustExec("begin") + err := tk.ExecToErr("insert into t2 values (?, ?, ?)", i, i, i) + if err != nil { + // if err is failed, the column number must be 4 now. + values := tk.MustQuery("show columns from t2").Rows() + require.Len(t, values, 4) + } + tk.MustExec("commit") + } + num += step + } + } + + // add data, here c4 must exist + for i := num; i < num+step; i++ { + tk.MustExec("insert into t2 values (?, ?, ?, ?)", i, i, i, i) + } + + rows := tk.MustQuery("select count(c4) from t2").Rows() + require.Len(t, rows, 1) + require.Len(t, rows[0], 1) + count, err := strconv.ParseInt(rows[0][0].(string), 10, 64) + require.NoError(t, err) + require.Greater(t, count, int64(0)) + + tk.MustQuery("select count(c4) from t2 where c4 = -1").Check([][]interface{}{ + {fmt.Sprintf("%v", count-int64(step))}, + }) + + for i := num; i < num+step; i++ { + tk.MustQuery("select c4 from t2 where c4 = ?", i).Check([][]interface{}{ + {fmt.Sprintf("%v", i)}, + }) + } + + tbl := external.GetTableByName(t, tk, "test", "t2") + i := 0 + j := 0 + require.NoError(t, tk.Session().NewTxn(context.Background())) + defer func() { + if txn, err := tk.Session().Txn(true); err == nil { + require.NoError(t, txn.Rollback()) + } + }() + + err = tables.IterRecords(tbl, tk.Session(), tbl.Cols(), + func(_ kv.Handle, data []types.Datum, cols []*table.Column) (bool, error) { + i++ + // c4 must be -1 or > 0 + v, err := data[3].ToInt64(tk.Session().GetSessionVars().StmtCtx) + require.NoError(t, err) + if v == -1 { + j++ + } else { + require.Greater(t, v, int64(0)) + } + return true, nil + }) + require.NoError(t, err) + require.Equal(t, int(count), i) + require.LessOrEqual(t, i, num+step) + require.Equal(t, int(count)-step, j) + + // for modifying columns after adding columns + tk.MustExec("alter table t2 modify c4 int default 11") + for i := num + step; i < num+step+10; i++ { + tk.MustExec("insert into t2 values (?, ?, ?, ?)", i, i, i, i) + } + tk.MustQuery("select count(c4) from t2 where c4 = -1").Check([][]interface{}{ + {fmt.Sprintf("%v", count-int64(step))}, + }) + + // add timestamp type column + tk.MustExec("create table test_on_update_c (c1 int, c2 timestamp);") + defer tk.MustExec("drop table test_on_update_c;") + tk.MustExec("alter table test_on_update_c add column c3 timestamp null default '2017-02-11' on update current_timestamp;") + is := domain.GetDomain(tk.Session()).InfoSchema() + tbl, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("test_on_update_c")) + require.NoError(t, err) + tblInfo := tbl.Meta() + colC := tblInfo.Columns[2] + require.Equal(t, mysql.TypeTimestamp, colC.Tp) + require.False(t, mysql.HasNotNullFlag(colC.Flag)) + // add datetime type column + tk.MustExec("create table test_on_update_d (c1 int, c2 datetime);") + tk.MustExec("alter table test_on_update_d add column c3 datetime on update current_timestamp;") + is = domain.GetDomain(tk.Session()).InfoSchema() + tbl, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("test_on_update_d")) + require.NoError(t, err) + tblInfo = tbl.Meta() + colC = tblInfo.Columns[2] + require.Equal(t, mysql.TypeDatetime, colC.Tp) + require.False(t, mysql.HasNotNullFlag(colC.Flag)) + + // add year type column + tk.MustExec("create table test_on_update_e (c1 int);") + defer tk.MustExec("drop table test_on_update_e;") + tk.MustExec("insert into test_on_update_e (c1) values (0);") + tk.MustExec("alter table test_on_update_e add column c2 year not null;") + tk.MustQuery("select c2 from test_on_update_e").Check(testkit.Rows("0")) + + // test add unsupported constraint + tk.MustExec("create table t_add_unsupported_constraint (a int);") + err = tk.ExecToErr("ALTER TABLE t_add_unsupported_constraint ADD id int AUTO_INCREMENT;") + require.EqualError(t, err, "[ddl:8200]unsupported add column 'id' constraint AUTO_INCREMENT when altering 'test.t_add_unsupported_constraint'") + err = tk.ExecToErr("ALTER TABLE t_add_unsupported_constraint ADD id int KEY;") + require.EqualError(t, err, "[ddl:8200]unsupported add column 'id' constraint PRIMARY KEY when altering 'test.t_add_unsupported_constraint'") + err = tk.ExecToErr("ALTER TABLE t_add_unsupported_constraint ADD id int UNIQUE;") + require.EqualError(t, err, "[ddl:8200]unsupported add column 'id' constraint UNIQUE KEY when altering 'test.t_add_unsupported_constraint'") + + // =========== + // DROP COLUMN + // =========== + + done = make(chan error, 1) + tk.MustExec("delete from t2") + + num = 100 + // add some rows + for i := 0; i < num; i++ { + tk.MustExec("insert into t2 values (?, ?, ?, ?)", i, i, i, i) + } + + // get c4 column id + testddlutil.SessionExecInGoroutine(store, "test", "alter table t2 drop column c4", done) + + ticker = time.NewTicker(columnModifyLease / 2) + defer ticker.Stop() + step = 10 +DropLoop: + for { + select { + case err := <-done: + if err == nil { + break DropLoop + } + require.NoError(t, err) + case <-ticker.C: + // delete some rows, and add some data + for i := num; i < num+step; i++ { + // Make sure that statement of insert and show use the same infoSchema. + tk.MustExec("begin") + err := tk.ExecToErr("insert into t2 values (?, ?, ?)", i, i, i) + if err != nil { + // If executing is failed, the column number must be 4 now. + values := tk.MustQuery("show columns from t2").Rows() + require.Len(t, values, 4) + } + tk.MustExec("commit") + } + num += step + } + } + + // add data, here c4 must not exist + for i := num; i < num+step; i++ { + tk.MustExec("insert into t2 values (?, ?, ?)", i, i, i) + } + + rows = tk.MustQuery("select count(*) from t2").Rows() + require.Len(t, rows, 1) + require.Len(t, rows[0], 1) + count, err = strconv.ParseInt(rows[0][0].(string), 10, 64) + require.NoError(t, err) + require.Greater(t, count, int64(0)) +} + +// TestDropColumn is for inserting value with a to-be-dropped column when do drop column. +// Column info from schema in build-insert-plan should be public only, +// otherwise they will not be consisted with Table.Col(), then the server will panic. +func TestDropColumn(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, columnModifyLease) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + num := 25 + multiDDL := make([]string, 0, num) + sql := "create table t2 (c1 int, c2 int, c3 int, " + for i := 4; i < 4+num; i++ { + multiDDL = append(multiDDL, fmt.Sprintf("alter table t2 drop column c%d", i)) + + if i != 3+num { + sql += fmt.Sprintf("c%d int, ", i) + } else { + sql += fmt.Sprintf("c%d int)", i) + } + } + tk.MustExec(sql) + dmlDone := make(chan error, num) + ddlDone := make(chan error, num) + + testddlutil.ExecMultiSQLInGoroutine(store, "test", multiDDL, ddlDone) + for i := 0; i < num; i++ { + testddlutil.ExecMultiSQLInGoroutine(store, "test", []string{"insert into t2 set c1 = 1, c2 = 1, c3 = 1, c4 = 1"}, dmlDone) + } + for i := 0; i < num; i++ { + err := <-ddlDone + require.NoError(t, err) + } + + // Test for drop partition table column. + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1 (a int,b int) partition by hash(a) partitions 4;") + err := tk.ExecToErr("alter table t1 drop column a") + // TODO: refine the error message to compatible with MySQL + require.EqualError(t, err, "[planner:1054]Unknown column 'a' in 'expression'") +} + +func TestChangeColumn(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, columnModifyLease) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + tk.MustExec("create table t3 (a int default '0', b varchar(10), d int not null default '0')") + tk.MustExec("insert into t3 set b = 'a'") + tk.MustQuery("select a from t3").Check(testkit.Rows("0")) + tk.MustExec("alter table t3 change a aa bigint") + tk.MustExec("insert into t3 set b = 'b'") + tk.MustQuery("select aa from t3").Check(testkit.Rows("0", "")) + // for no default flag + tk.MustExec("alter table t3 change d dd bigint not null") + is := domain.GetDomain(tk.Session()).InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t3")) + require.NoError(t, err) + tblInfo := tbl.Meta() + colD := tblInfo.Columns[2] + require.True(t, mysql.HasNoDefaultValueFlag(colD.Flag)) + // for the following definitions: 'not null', 'null', 'default value' and 'comment' + tk.MustExec("alter table t3 change b b varchar(20) null default 'c' comment 'my comment'") + is = domain.GetDomain(tk.Session()).InfoSchema() + tbl, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("t3")) + require.NoError(t, err) + tblInfo = tbl.Meta() + colB := tblInfo.Columns[1] + require.Equal(t, "my comment", colB.Comment) + require.False(t, mysql.HasNotNullFlag(colB.Flag)) + tk.MustExec("insert into t3 set aa = 3, dd = 5") + tk.MustQuery("select b from t3").Check(testkit.Rows("a", "b", "c")) + // for timestamp + tk.MustExec("alter table t3 add column c timestamp not null") + tk.MustExec("alter table t3 change c c timestamp null default '2017-02-11' comment 'col c comment' on update current_timestamp") + is = domain.GetDomain(tk.Session()).InfoSchema() + tbl, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("t3")) + require.NoError(t, err) + tblInfo = tbl.Meta() + colC := tblInfo.Columns[3] + require.Equal(t, "col c comment", colC.Comment) + require.False(t, mysql.HasNotNullFlag(colC.Flag)) + // for enum + tk.MustExec("alter table t3 add column en enum('a', 'b', 'c') not null default 'a'") + // https://github.com/pingcap/tidb/issues/23488 + // if there is a prefix index on the varchar column, then we can change it to text + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (k char(10), v int, INDEX(k(7)));") + tk.MustExec("alter table t change column k k tinytext") + is = domain.GetDomain(tk.Session()).InfoSchema() + tbl, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + + // for failing tests + sql := "alter table t3 change aa a bigint default ''" + tk.MustGetErrCode(sql, errno.ErrInvalidDefault) + sql = "alter table t3 change a testx.t3.aa bigint" + tk.MustGetErrCode(sql, errno.ErrWrongDBName) + sql = "alter table t3 change t.a aa bigint" + tk.MustGetErrCode(sql, errno.ErrWrongTableName) + tk.MustExec("create table t4 (c1 int, c2 int, c3 int default 1, index (c1));") + tk.MustExec("insert into t4(c2) values (null);") + err = tk.ExecToErr("alter table t4 change c1 a1 int not null;") + require.EqualError(t, err, "[ddl:1265]Data truncated for column 'a1' at row 1") + sql = "alter table t4 change c2 a bigint not null;" + tk.MustGetErrCode(sql, mysql.WarnDataTruncated) + sql = "alter table t3 modify en enum('a', 'z', 'b', 'c') not null default 'a'" + tk.MustExec(sql) + // Rename to an existing column. + tk.MustExec("alter table t3 add column a bigint") + sql = "alter table t3 change aa a bigint" + tk.MustGetErrCode(sql, errno.ErrDupFieldName) + // https://github.com/pingcap/tidb/issues/23488 + tk.MustExec("drop table if exists t5") + tk.MustExec("create table t5 (k char(10) primary key, v int)") + sql = "alter table t5 change column k k tinytext;" + tk.MustGetErrCode(sql, mysql.ErrBlobKeyWithoutLength) + tk.MustExec("drop table t5") + tk.MustExec("drop table if exists t5") + tk.MustExec("create table t5 (k char(10), v int, INDEX(k))") + sql = "alter table t5 change column k k tinytext;" + tk.MustGetErrCode(sql, mysql.ErrBlobKeyWithoutLength) + tk.MustExec("drop table t5") + tk.MustExec("drop table t3") +} + +func TestRenameColumn(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, columnModifyLease) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + assertColNames := func(tableName string, colNames ...string) { + cols := external.GetTableByName(t, tk, "test", tableName).Cols() + require.Equal(t, len(colNames), len(cols)) + for i := range cols { + require.Equal(t, strings.ToLower(colNames[i]), cols[i].Name.L) + } + } + + tk.MustExec("create table test_rename_column (id int not null primary key auto_increment, col1 int)") + tk.MustExec("alter table test_rename_column rename column col1 to col1") + assertColNames("test_rename_column", "id", "col1") + tk.MustExec("alter table test_rename_column rename column col1 to col2") + assertColNames("test_rename_column", "id", "col2") + + // Test renaming non-exist columns. + tk.MustGetErrCode("alter table test_rename_column rename column non_exist_col to col3", errno.ErrBadField) + + // Test renaming to an exist column. + tk.MustGetErrCode("alter table test_rename_column rename column col2 to id", errno.ErrDupFieldName) + + // Test renaming the column with foreign key. + tk.MustExec("drop table test_rename_column") + tk.MustExec("create table test_rename_column_base (base int)") + tk.MustExec("create table test_rename_column (col int, foreign key (col) references test_rename_column_base(base))") + + tk.MustGetErrCode("alter table test_rename_column rename column col to col1", errno.ErrFKIncompatibleColumns) + + tk.MustExec("drop table test_rename_column_base") + + // Test renaming generated columns. + tk.MustExec("drop table test_rename_column") + tk.MustExec("create table test_rename_column (id int, col1 int generated always as (id + 1))") + + tk.MustExec("alter table test_rename_column rename column col1 to col2") + assertColNames("test_rename_column", "id", "col2") + tk.MustExec("alter table test_rename_column rename column col2 to col1") + assertColNames("test_rename_column", "id", "col1") + tk.MustGetErrCode("alter table test_rename_column rename column id to id1", errno.ErrDependentByGeneratedColumn) + + // Test renaming view columns. + tk.MustExec("drop table test_rename_column") + tk.MustExec("create table test_rename_column (id int, col1 int)") + tk.MustExec("create view test_rename_column_view as select * from test_rename_column") + + tk.MustExec("alter table test_rename_column rename column col1 to col2") + tk.MustGetErrCode("select * from test_rename_column_view", errno.ErrViewInvalid) + + tk.MustExec("drop view test_rename_column_view") + tk.MustExec("drop table test_rename_column") +} + +// TestCancelDropColumn tests cancel ddl job which type is drop column. +func TestCancelDropColumn(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, columnModifyLease) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + tk.MustExec("create table test_drop_column(c1 int, c2 int)") + defer tk.MustExec("drop table test_drop_column;") + testCases := []struct { + needAddColumn bool + jobState model.JobState + JobSchemaState model.SchemaState + cancelSucc bool + }{ + {true, model.JobStateNone, model.StateNone, true}, + {false, model.JobStateRunning, model.StateWriteOnly, false}, + {true, model.JobStateRunning, model.StateDeleteOnly, false}, + {true, model.JobStateRunning, model.StateDeleteReorganization, false}, + } + var checkErr error + hook := &ddl.TestDDLCallback{Do: dom} + var jobID int64 + testCase := &testCases[0] + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.Type == model.ActionDropColumn && job.State == testCase.jobState && job.SchemaState == testCase.JobSchemaState { + jobIDs := []int64{job.ID} + jobID = job.ID + hookCtx := mock.NewContext() + hookCtx.Store = store + err := hookCtx.NewTxn(context.TODO()) + if err != nil { + checkErr = errors.Trace(err) + return + } + txn, err := hookCtx.Txn(true) + if err != nil { + checkErr = errors.Trace(err) + return + } + errs, err := admin.CancelJobs(txn, jobIDs) + if err != nil { + checkErr = errors.Trace(err) + return + } + if errs[0] != nil { + checkErr = errors.Trace(errs[0]) + return + } + checkErr = txn.Commit(context.Background()) + } + } + + originalHook := dom.DDL().GetHook() + dom.DDL().SetHook(hook) + for i := range testCases { + testCase = &testCases[i] + if testCase.needAddColumn { + tk.MustExec("alter table test_drop_column add column c3 int") + tk.MustExec("alter table test_drop_column add index idx_c3(c3)") + } + + err := tk.ExecToErr("alter table test_drop_column drop column c3") + var col1 *table.Column + var idx1 table.Index + tbl := external.GetTableByName(t, tk, "test", "test_drop_column") + for _, col := range tbl.Cols() { + if strings.EqualFold(col.Name.L, "c3") { + col1 = col + break + } + } + for _, idx := range tbl.Indices() { + if strings.EqualFold(idx.Meta().Name.L, "idx_c3") { + idx1 = idx + break + } + } + if testCase.cancelSucc { + require.NoError(t, checkErr) + require.NotNil(t, col1) + require.Equal(t, "c3", col1.Name.L) + require.NotNil(t, idx1) + require.Equal(t, "idx_c3", idx1.Meta().Name.L) + require.EqualError(t, err, "[ddl:8214]Cancelled DDL job") + } else { + require.Nil(t, col1) + require.Nil(t, col1) + require.NoError(t, err) + require.EqualError(t, checkErr, admin.ErrCannotCancelDDLJob.GenWithStackByArgs(jobID).Error()) + } + } + dom.DDL().SetHook(originalHook) + tk.MustExec("alter table test_drop_column add column c3 int") + tk.MustExec("alter table test_drop_column drop column c3") +} + +// TestCancelDropColumns tests cancel ddl job which type is drop multi-columns. +func TestCancelDropColumns(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, columnModifyLease) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + tk.MustExec("create table test_drop_column(c1 int, c2 int)") + defer tk.MustExec("drop table test_drop_column;") + testCases := []struct { + needAddColumn bool + jobState model.JobState + JobSchemaState model.SchemaState + cancelSucc bool + }{ + {true, model.JobStateNone, model.StateNone, true}, + {false, model.JobStateRunning, model.StateWriteOnly, false}, + {true, model.JobStateRunning, model.StateDeleteOnly, false}, + {true, model.JobStateRunning, model.StateDeleteReorganization, false}, + } + var checkErr error + hook := &ddl.TestDDLCallback{Do: dom} + var jobID int64 + testCase := &testCases[0] + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.Type == model.ActionDropColumns && job.State == testCase.jobState && job.SchemaState == testCase.JobSchemaState { + jobIDs := []int64{job.ID} + jobID = job.ID + hookCtx := mock.NewContext() + hookCtx.Store = store + err := hookCtx.NewTxn(context.TODO()) + if err != nil { + checkErr = errors.Trace(err) + return + } + txn, err := hookCtx.Txn(true) + if err != nil { + checkErr = errors.Trace(err) + return + } + errs, err := admin.CancelJobs(txn, jobIDs) + if err != nil { + checkErr = errors.Trace(err) + return + } + if errs[0] != nil { + checkErr = errors.Trace(errs[0]) + return + } + checkErr = txn.Commit(context.Background()) + } + } + + originalHook := dom.DDL().GetHook() + dom.DDL().SetHook(hook) + for i := range testCases { + testCase = &testCases[i] + if testCase.needAddColumn { + tk.MustExec("alter table test_drop_column add column c3 int, add column c4 int") + tk.MustExec("alter table test_drop_column add index idx_c3(c3)") + } + err := tk.ExecToErr("alter table test_drop_column drop column c3, drop column c4") + tbl := external.GetTableByName(t, tk, "test", "test_drop_column") + col3 := table.FindCol(tbl.Cols(), "c3") + col4 := table.FindCol(tbl.Cols(), "c4") + var idx3 table.Index + for _, idx := range tbl.Indices() { + if strings.EqualFold(idx.Meta().Name.L, "idx_c3") { + idx3 = idx + break + } + } + if testCase.cancelSucc { + require.NoError(t, checkErr) + require.NotNil(t, col3) + require.NotNil(t, col4) + require.NotNil(t, idx3) + require.Equal(t, "c3", col3.Name.L) + require.Equal(t, "c4", col4.Name.L) + require.Equal(t, "idx_c3", idx3.Meta().Name.L) + require.EqualError(t, err, "[ddl:8214]Cancelled DDL job") + } else { + require.Nil(t, col3) + require.Nil(t, col4) + require.Nil(t, idx3) + require.NoError(t, err) + require.EqualError(t, checkErr, admin.ErrCannotCancelDDLJob.GenWithStackByArgs(jobID).Error()) + } + } + dom.DDL().SetHook(originalHook) + tk.MustExec("alter table test_drop_column add column c3 int, add column c4 int") + tk.MustExec("alter table test_drop_column drop column c3, drop column c4") +} + +func TestVirtualColumnDDL(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, columnModifyLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`create global temporary table test_gv_ddl(a int, b int as (a+8) virtual, c int as (b + 2) stored) on commit delete rows;`) + is := tk.Session().(sessionctx.Context).GetInfoSchema().(infoschema.InfoSchema) + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("test_gv_ddl")) + require.NoError(t, err) + testCases := []struct { + generatedExprString string + generatedStored bool + }{ + {"", false}, + {"`a` + 8", false}, + {"`b` + 2", true}, + } + for i, column := range tbl.Meta().Columns { + require.Equal(t, testCases[i].generatedExprString, column.GeneratedExprString) + require.Equal(t, testCases[i].generatedStored, column.GeneratedStored) + } + result := tk.MustQuery(`DESC test_gv_ddl`) + result.Check(testkit.Rows(`a int(11) YES `, `b int(11) YES VIRTUAL GENERATED`, `c int(11) YES STORED GENERATED`)) + tk.MustExec("begin;") + tk.MustExec("insert into test_gv_ddl values (1, default, default)") + tk.MustQuery("select * from test_gv_ddl").Check(testkit.Rows("1 9 11")) + tk.MustExec("commit") + + // for local temporary table + tk.MustExec(`create temporary table test_local_gv_ddl(a int, b int as (a+8) virtual, c int as (b + 2) stored);`) + is = tk.Session().(sessionctx.Context).GetInfoSchema().(infoschema.InfoSchema) + tbl, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("test_local_gv_ddl")) + require.NoError(t, err) + for i, column := range tbl.Meta().Columns { + require.Equal(t, testCases[i].generatedExprString, column.GeneratedExprString) + require.Equal(t, testCases[i].generatedStored, column.GeneratedStored) + } + result = tk.MustQuery(`DESC test_local_gv_ddl`) + result.Check(testkit.Rows(`a int(11) YES `, `b int(11) YES VIRTUAL GENERATED`, `c int(11) YES STORED GENERATED`)) + tk.MustExec("begin;") + tk.MustExec("insert into test_local_gv_ddl values (1, default, default)") + tk.MustQuery("select * from test_local_gv_ddl").Check(testkit.Rows("1 9 11")) + tk.MustExec("commit") + tk.MustQuery("select * from test_local_gv_ddl").Check(testkit.Rows("1 9 11")) +} + +func TestGeneratedColumnDDL(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, columnModifyLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + // Check create table with virtual and stored generated columns. + tk.MustExec(`CREATE TABLE test_gv_ddl(a int, b int as (a+8) virtual, c int as (b + 2) stored)`) + + // Check desc table with virtual and stored generated columns. + result := tk.MustQuery(`DESC test_gv_ddl`) + result.Check(testkit.Rows(`a int(11) YES `, `b int(11) YES VIRTUAL GENERATED`, `c int(11) YES STORED GENERATED`)) + + // Check show create table with virtual and stored generated columns. + result = tk.MustQuery(`show create table test_gv_ddl`) + result.Check(testkit.Rows( + "test_gv_ddl CREATE TABLE `test_gv_ddl` (\n `a` int(11) DEFAULT NULL,\n `b` int(11) GENERATED ALWAYS AS (`a` + 8) VIRTUAL,\n `c` int(11) GENERATED ALWAYS AS (`b` + 2) STORED\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", + )) + + // Check generated expression with blanks. + tk.MustExec("create table table_with_gen_col_blanks (a int, b char(20) as (cast( \r\n\t a \r\n\tas char)), c int as (a+100))") + result = tk.MustQuery(`show create table table_with_gen_col_blanks`) + result.Check(testkit.Rows("table_with_gen_col_blanks CREATE TABLE `table_with_gen_col_blanks` (\n" + + " `a` int(11) DEFAULT NULL,\n" + + " `b` char(20) GENERATED ALWAYS AS (cast(`a` as char)) VIRTUAL,\n" + + " `c` int(11) GENERATED ALWAYS AS (`a` + 100) VIRTUAL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + + // Check generated expression with charset latin1 ("latin1" != mysql.DefaultCharset). + tk.MustExec("create table table_with_gen_col_latin1 (a int, b char(20) as (cast( \r\n\t a \r\n\tas char charset latin1)), c int as (a+100))") + result = tk.MustQuery(`show create table table_with_gen_col_latin1`) + result.Check(testkit.Rows("table_with_gen_col_latin1 CREATE TABLE `table_with_gen_col_latin1` (\n" + + " `a` int(11) DEFAULT NULL,\n" + + " `b` char(20) GENERATED ALWAYS AS (cast(`a` as char charset latin1)) VIRTUAL,\n" + + " `c` int(11) GENERATED ALWAYS AS (`a` + 100) VIRTUAL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + + // Check generated expression with string (issue 9457). + tk.MustExec("create table table_with_gen_col_string (first_name varchar(10), last_name varchar(10), full_name varchar(255) AS (CONCAT(first_name,' ',last_name)))") + result = tk.MustQuery(`show create table table_with_gen_col_string`) + result.Check(testkit.Rows("table_with_gen_col_string CREATE TABLE `table_with_gen_col_string` (\n" + + " `first_name` varchar(10) DEFAULT NULL,\n" + + " `last_name` varchar(10) DEFAULT NULL,\n" + + " `full_name` varchar(255) GENERATED ALWAYS AS (concat(`first_name`, _utf8mb4' ', `last_name`)) VIRTUAL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + + tk.MustExec("alter table table_with_gen_col_string modify column full_name varchar(255) GENERATED ALWAYS AS (CONCAT(last_name,' ' ,first_name) ) VIRTUAL") + result = tk.MustQuery(`show create table table_with_gen_col_string`) + result.Check(testkit.Rows("table_with_gen_col_string CREATE TABLE `table_with_gen_col_string` (\n" + + " `first_name` varchar(10) DEFAULT NULL,\n" + + " `last_name` varchar(10) DEFAULT NULL,\n" + + " `full_name` varchar(255) GENERATED ALWAYS AS (concat(`last_name`, _utf8mb4' ', `first_name`)) VIRTUAL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + + // Test incorrect parameter count. + tk.MustGetErrCode("create table test_gv_incorrect_pc(a double, b int as (lower(a, 2)))", errno.ErrWrongParamcountToNativeFct) + tk.MustGetErrCode("create table test_gv_incorrect_pc(a double, b int as (lower(a, 2)) stored)", errno.ErrWrongParamcountToNativeFct) + + genExprTests := []struct { + stmt string + err int + }{ + // Drop/rename columns dependent by other column. + {`alter table test_gv_ddl drop column a`, errno.ErrDependentByGeneratedColumn}, + {`alter table test_gv_ddl change column a anew int`, errno.ErrBadField}, + + // Modify/change stored status of generated columns. + {`alter table test_gv_ddl modify column b bigint`, errno.ErrUnsupportedOnGeneratedColumn}, + {`alter table test_gv_ddl change column c cnew bigint as (a+100)`, errno.ErrUnsupportedOnGeneratedColumn}, + + // Modify/change generated columns breaking prior. + {`alter table test_gv_ddl modify column b int as (c+100)`, errno.ErrGeneratedColumnNonPrior}, + {`alter table test_gv_ddl change column b bnew int as (c+100)`, errno.ErrGeneratedColumnNonPrior}, + + // Refer not exist columns in generation expression. + {`create table test_gv_ddl_bad (a int, b int as (c+8))`, errno.ErrBadField}, + + // Refer generated columns non prior. + {`create table test_gv_ddl_bad (a int, b int as (c+1), c int as (a+1))`, errno.ErrGeneratedColumnNonPrior}, + + // Virtual generated columns cannot be primary key. + {`create table test_gv_ddl_bad (a int, b int, c int as (a+b) primary key)`, errno.ErrUnsupportedOnGeneratedColumn}, + {`create table test_gv_ddl_bad (a int, b int, c int as (a+b), primary key(c))`, errno.ErrUnsupportedOnGeneratedColumn}, + {`create table test_gv_ddl_bad (a int, b int, c int as (a+b), primary key(a, c))`, errno.ErrUnsupportedOnGeneratedColumn}, + + // Add stored generated column through alter table. + {`alter table test_gv_ddl add column d int as (b+2) stored`, errno.ErrUnsupportedOnGeneratedColumn}, + {`alter table test_gv_ddl modify column b int as (a + 8) stored`, errno.ErrUnsupportedOnGeneratedColumn}, + + // Add generated column with incorrect parameter count. + {`alter table test_gv_ddl add column z int as (lower(a, 2))`, errno.ErrWrongParamcountToNativeFct}, + {`alter table test_gv_ddl add column z int as (lower(a, 2)) stored`, errno.ErrWrongParamcountToNativeFct}, + + // Modify generated column with incorrect parameter count. + {`alter table test_gv_ddl modify column b int as (lower(a, 2))`, errno.ErrWrongParamcountToNativeFct}, + {`alter table test_gv_ddl change column b b int as (lower(a, 2))`, errno.ErrWrongParamcountToNativeFct}, + } + for _, tt := range genExprTests { + tk.MustGetErrCode(tt.stmt, tt.err) + } + + // Check alter table modify/change generated column. + modStoredColErrMsg := "[ddl:3106]'modifying a stored column' is not supported for generated columns." + tk.MustGetErrMsg(`alter table test_gv_ddl modify column c bigint as (b+200) stored`, modStoredColErrMsg) + + result = tk.MustQuery(`DESC test_gv_ddl`) + result.Check(testkit.Rows(`a int(11) YES `, `b int(11) YES VIRTUAL GENERATED`, `c int(11) YES STORED GENERATED`)) + + tk.MustExec(`alter table test_gv_ddl change column b b bigint as (a+100) virtual`) + result = tk.MustQuery(`DESC test_gv_ddl`) + result.Check(testkit.Rows(`a int(11) YES `, `b bigint(20) YES VIRTUAL GENERATED`, `c int(11) YES STORED GENERATED`)) + + tk.MustExec(`alter table test_gv_ddl change column c cnew bigint`) + result = tk.MustQuery(`DESC test_gv_ddl`) + result.Check(testkit.Rows(`a int(11) YES `, `b bigint(20) YES VIRTUAL GENERATED`, `cnew bigint(20) YES `)) + + // Test generated column `\\`. + tk.MustExec("drop table if exists t") + tk.MustExec("CREATE TABLE t(c0 TEXT AS ('\\\\'));") + tk.MustExec("insert into t values ()") + tk.MustQuery("select * from t").Check(testkit.Rows("\\")) + tk.MustExec("drop table if exists t") + tk.MustExec("CREATE TABLE t(c0 TEXT AS ('a\\\\b\\\\c\\\\'))") + tk.MustExec("insert into t values ()") + tk.MustQuery("select * from t").Check(testkit.Rows("a\\b\\c\\")) +} + +func TestColumnModifyingDefinition(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, columnModifyLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table test2 (c1 int, c2 int, c3 int default 1, index (c1));") + tk.MustExec("alter table test2 change c2 a int not null;") + is := domain.GetDomain(tk.Session()).InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("test2")) + require.NoError(t, err) + var c2 *table.Column + for _, col := range tbl.Cols() { + if col.Name.L == "a" { + c2 = col + } + } + require.True(t, mysql.HasNotNullFlag(c2.Flag)) + + tk.MustExec("drop table if exists test2;") + tk.MustExec("create table test2 (c1 int, c2 int, c3 int default 1, index (c1));") + tk.MustExec("insert into test2(c2) values (null);") + tk.MustGetErrMsg("alter table test2 change c2 a int not null", "[ddl:1265]Data truncated for column 'a' at row 1") + tk.MustGetErrCode("alter table test2 change c1 a1 bigint not null;", mysql.WarnDataTruncated) +} + +func TestTransactionWithWriteOnlyColumn(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, columnModifyLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1 (a int key);") + + transactions := [][]string{ + { + "begin", + "insert into t1 set a=1", + "update t1 set a=2 where a=1", + "commit", + }, + } + + hook := &ddl.TestDDLCallback{Do: dom} + var checkErr error + hook.OnJobRunBeforeExported = func(job *model.Job) { + if checkErr != nil { + return + } + switch job.SchemaState { + case model.StateWriteOnly: + default: + return + } + // do transaction. + for _, transaction := range transactions { + for _, sql := range transaction { + if _, checkErr = tk.Exec(sql); checkErr != nil { + checkErr = errors.Errorf("err: %s, sql: %s, job schema state: %s", checkErr.Error(), sql, job.SchemaState) + return + } + } + } + } + dom.DDL().SetHook(hook) + done := make(chan error, 1) + // test transaction on add column. + go backgroundExec(store, "alter table t1 add column c int not null", done) + err := <-done + require.NoError(t, err) + require.NoError(t, checkErr) + tk.MustQuery("select a from t1").Check(testkit.Rows("2")) + tk.MustExec("delete from t1") + + // test transaction on drop column. + go backgroundExec(store, "alter table t1 drop column c", done) + err = <-done + require.NoError(t, err) + require.NoError(t, checkErr) + tk.MustQuery("select a from t1").Check(testkit.Rows("2")) +} + +func TestColumnCheck(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, columnModifyLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists column_check") + tk.MustExec("create table column_check (pk int primary key, a int check (a > 1))") + defer tk.MustExec("drop table if exists column_check") + require.Equal(t, uint16(1), tk.Session().GetSessionVars().StmtCtx.WarningCount()) + tk.MustQuery("show warnings").Check(testkit.RowsWithSep("|", "Warning|8231|CONSTRAINT CHECK is not supported")) +} + +func TestModifyGeneratedColumn(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, columnModifyLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + modIdxColErrMsg := "[ddl:3106]'modifying an indexed column' is not supported for generated columns." + modStoredColErrMsg := "[ddl:3106]'modifying a stored column' is not supported for generated columns." + + // Modify column with single-col-index. + tk.MustExec("drop table if exists t1;") + tk.MustExec("create table t1 (a int, b int as (a+1), index idx(b));") + tk.MustExec("insert into t1 set a=1;") + tk.MustGetErrMsg("alter table t1 modify column b int as (a+2);", modIdxColErrMsg) + tk.MustExec("drop index idx on t1;") + tk.MustExec("alter table t1 modify b int as (a+2);") + tk.MustQuery("select * from t1").Check(testkit.Rows("1 3")) + + // Modify column with multi-col-index. + tk.MustExec("drop table t1;") + tk.MustExec("create table t1 (a int, b int as (a+1), index idx(a, b));") + tk.MustExec("insert into t1 set a=1;") + tk.MustGetErrMsg("alter table t1 modify column b int as (a+2);", modIdxColErrMsg) + tk.MustExec("drop index idx on t1;") + tk.MustExec("alter table t1 modify b int as (a+2);") + tk.MustQuery("select * from t1").Check(testkit.Rows("1 3")) + + // Modify column with stored status to a different expression. + tk.MustExec("drop table t1;") + tk.MustExec("create table t1 (a int, b int as (a+1) stored);") + tk.MustExec("insert into t1 set a=1;") + tk.MustGetErrMsg("alter table t1 modify column b int as (a+2) stored;", modStoredColErrMsg) + + // Modify column with stored status to the same expression. + tk.MustExec("drop table t1;") + tk.MustExec("create table t1 (a int, b int as (a+1) stored);") + tk.MustExec("insert into t1 set a=1;") + tk.MustExec("alter table t1 modify column b bigint as (a+1) stored;") + tk.MustExec("alter table t1 modify column b bigint as (a + 1) stored;") + tk.MustQuery("select * from t1").Check(testkit.Rows("1 2")) + + // Modify column with index to the same expression. + tk.MustExec("drop table t1;") + tk.MustExec("create table t1 (a int, b int as (a+1), index idx(b));") + tk.MustExec("insert into t1 set a=1;") + tk.MustExec("alter table t1 modify column b bigint as (a+1);") + tk.MustExec("alter table t1 modify column b bigint as (a + 1);") + tk.MustQuery("select * from t1").Check(testkit.Rows("1 2")) + + // Modify column from non-generated to stored generated. + tk.MustExec("drop table t1;") + tk.MustExec("create table t1 (a int, b int);") + tk.MustGetErrMsg("alter table t1 modify column b bigint as (a+1) stored;", modStoredColErrMsg) + + // Modify column from stored generated to non-generated. + tk.MustExec("drop table t1;") + tk.MustExec("create table t1 (a int, b int as (a+1) stored);") + tk.MustExec("insert into t1 set a=1;") + tk.MustExec("alter table t1 modify column b int;") + tk.MustQuery("select * from t1").Check(testkit.Rows("1 2")) +} + +func TestCheckColumnDefaultValue(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, columnModifyLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists text_default_text;") + tk.MustGetErrCode("create table text_default_text(c1 text not null default '');", errno.ErrBlobCantHaveDefault) + tk.MustGetErrCode("create table text_default_text(c1 text not null default 'scds');", errno.ErrBlobCantHaveDefault) + + tk.MustExec("drop table if exists text_default_json;") + tk.MustGetErrCode("create table text_default_json(c1 json not null default '');", errno.ErrBlobCantHaveDefault) + tk.MustGetErrCode("create table text_default_json(c1 json not null default 'dfew555');", errno.ErrBlobCantHaveDefault) + + tk.MustExec("drop table if exists text_default_blob;") + tk.MustGetErrCode("create table text_default_blob(c1 blob not null default '');", errno.ErrBlobCantHaveDefault) + tk.MustGetErrCode("create table text_default_blob(c1 blob not null default 'scds54');", errno.ErrBlobCantHaveDefault) + + tk.MustExec("set sql_mode='';") + tk.MustExec("create table text_default_text(c1 text not null default '');") + tk.MustQuery(`show create table text_default_text`).Check(testkit.RowsWithSep("|", + "text_default_text CREATE TABLE `text_default_text` (\n"+ + " `c1` text NOT NULL\n"+ + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", + )) + is := domain.GetDomain(tk.Session()).InfoSchema() + tblInfo, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("text_default_text")) + require.NoError(t, err) + require.Empty(t, tblInfo.Meta().Columns[0].DefaultValue) + + tk.MustExec("create table text_default_blob(c1 blob not null default '');") + tk.MustQuery(`show create table text_default_blob`).Check(testkit.RowsWithSep("|", + "text_default_blob CREATE TABLE `text_default_blob` (\n"+ + " `c1` blob NOT NULL\n"+ + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", + )) + is = domain.GetDomain(tk.Session()).InfoSchema() + tblInfo, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("text_default_blob")) + require.NoError(t, err) + require.Empty(t, tblInfo.Meta().Columns[0].DefaultValue) + + tk.MustExec("create table text_default_json(c1 json not null default '');") + tk.MustQuery(`show create table text_default_json`).Check(testkit.RowsWithSep("|", + "text_default_json CREATE TABLE `text_default_json` (\n"+ + " `c1` json NOT NULL DEFAULT 'null'\n"+ + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin", + )) + is = domain.GetDomain(tk.Session()).InfoSchema() + tblInfo, err = is.TableByName(model.NewCIStr("test"), model.NewCIStr("text_default_json")) + require.NoError(t, err) + require.Equal(t, "null", tblInfo.Meta().Columns[0].DefaultValue) +} + +func TestCheckConvertToCharacter(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, columnModifyLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a varchar(10) charset binary);") + is := domain.GetDomain(tk.Session()).InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tk.MustGetErrCode("alter table t modify column a varchar(10) charset utf8 collate utf8_bin", errno.ErrUnsupportedDDLOperation) + tk.MustGetErrCode("alter table t modify column a varchar(10) charset utf8mb4 collate utf8mb4_bin", errno.ErrUnsupportedDDLOperation) + tk.MustGetErrCode("alter table t modify column a varchar(10) charset latin1 collate latin1_bin", errno.ErrUnsupportedDDLOperation) + require.Equal(t, "binary", tbl.Cols()[0].Charset) +} + +func TestAddMultiColumnsIndex(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, columnModifyLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists tidb;") + tk.MustExec("create database tidb;") + tk.MustExec("use tidb;") + tk.MustExec("create table tidb.test (a int auto_increment primary key, b int);") + tk.MustExec("insert tidb.test values (1, 1);") + tk.MustExec("update tidb.test set b = b + 1 where a = 1;") + tk.MustExec("insert into tidb.test values (2, 2);") + // Test that the b value is nil. + tk.MustExec("insert into tidb.test (a) values (3);") + tk.MustExec("insert into tidb.test values (4, 4);") + // Test that the b value is nil again. + tk.MustExec("insert into tidb.test (a) values (5);") + tk.MustExec("insert tidb.test values (6, 6);") + tk.MustExec("alter table tidb.test add index idx1 (a, b);") + tk.MustExec("admin check table test") +} + +// For issue #31735. +func TestAddGeneratedColumnAndInsert(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, columnModifyLease) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t1 (a int, unique kye(a))") + tk.MustExec("insert into t1 value (1), (10)") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + hook := &ddl.TestDDLCallback{Do: dom} + ctx := mock.NewContext() + ctx.Store = store + times := 0 + var checkErr error + hook.OnJobUpdatedExported = func(job *model.Job) { + if checkErr != nil { + return + } + switch job.SchemaState { + case model.StateDeleteOnly: + _, checkErr = tk1.Exec("insert into t1 values (1) on duplicate key update a=a+1") + if checkErr == nil { + _, checkErr = tk1.Exec("replace into t1 values (2)") + } + case model.StateWriteOnly: + _, checkErr = tk1.Exec("insert into t1 values (2) on duplicate key update a=a+1") + if checkErr == nil { + _, checkErr = tk1.Exec("replace into t1 values (3)") + } + case model.StateWriteReorganization: + if checkErr == nil && job.SchemaState == model.StateWriteReorganization && times == 0 { + _, checkErr = tk1.Exec("insert into t1 values (3) on duplicate key update a=a+1") + if checkErr == nil { + _, checkErr = tk1.Exec("replace into t1 values (4)") + } + times++ + } + } + } + d.SetHook(hook) + + tk.MustExec("alter table t1 add column gc int as ((a+1))") + tk.MustQuery("select * from t1 order by a").Check(testkit.Rows("4 5", "10 11")) + require.NoError(t, checkErr) +} + +func TestColumnTypeChangeGenUniqueChangingName(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, columnModifyLease) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + hook := &ddl.TestDDLCallback{} + var checkErr error + assertChangingColName := "_col$_c2_0" + assertChangingIdxName := "_idx$_idx_0" + hook.OnJobUpdatedExported = func(job *model.Job) { + if job.SchemaState == model.StateDeleteOnly && job.Type == model.ActionModifyColumn { + var ( + newCol *model.ColumnInfo + oldColName *model.CIStr + modifyColumnTp byte + updatedAutoRandomBits uint64 + changingCol *model.ColumnInfo + changingIdxs []*model.IndexInfo + ) + pos := &ast.ColumnPosition{} + err := job.DecodeArgs(&newCol, &oldColName, pos, &modifyColumnTp, &updatedAutoRandomBits, &changingCol, &changingIdxs) + if err != nil { + checkErr = err + return + } + if changingCol.Name.L != assertChangingColName { + checkErr = errors.New("changing column name is incorrect") + } else if changingIdxs[0].Name.L != assertChangingIdxName { + checkErr = errors.New("changing index name is incorrect") + } + } + } + d := dom.DDL() + d.SetHook(hook) + + tk.MustExec("create table if not exists t(c1 varchar(256), c2 bigint, `_col$_c2` varchar(10), unique _idx$_idx(c1), unique idx(c2));") + tk.MustExec("alter table test.t change column c2 cC2 tinyint after `_col$_c2`") + require.NoError(t, checkErr) + + tbl := external.GetTableByName(t, tk, "test", "t") + require.Len(t, tbl.Meta().Columns, 3) + require.Equal(t, "c1", tbl.Meta().Columns[0].Name.O) + require.Equal(t, 0, tbl.Meta().Columns[0].Offset) + require.Equal(t, "_col$_c2", tbl.Meta().Columns[1].Name.O) + require.Equal(t, 1, tbl.Meta().Columns[1].Offset) + require.Equal(t, "cC2", tbl.Meta().Columns[2].Name.O) + require.Equal(t, 2, tbl.Meta().Columns[2].Offset) + + require.Len(t, tbl.Meta().Indices, 2) + require.Equal(t, "_idx$_idx", tbl.Meta().Indices[0].Name.O) + require.Equal(t, "idx", tbl.Meta().Indices[1].Name.O) + + require.Len(t, tbl.Meta().Indices[0].Columns, 1) + require.Equal(t, "c1", tbl.Meta().Indices[0].Columns[0].Name.O) + require.Equal(t, 0, tbl.Meta().Indices[0].Columns[0].Offset) + + require.Len(t, tbl.Meta().Indices[1].Columns, 1) + require.Equal(t, "cC2", tbl.Meta().Indices[1].Columns[0].Name.O) + require.Equal(t, 2, tbl.Meta().Indices[1].Columns[0].Offset) + + assertChangingColName1 := "_col$__col$_c1_1" + assertChangingColName2 := "_col$__col$__col$_c1_0_1" + query1 := "alter table t modify column _col$_c1 tinyint" + query2 := "alter table t modify column _col$__col$_c1_0 tinyint" + hook.OnJobUpdatedExported = func(job *model.Job) { + if (job.Query == query1 || job.Query == query2) && job.SchemaState == model.StateDeleteOnly && job.Type == model.ActionModifyColumn { + var ( + newCol *model.ColumnInfo + oldColName *model.CIStr + modifyColumnTp byte + updatedAutoRandomBits uint64 + changingCol *model.ColumnInfo + changingIdxs []*model.IndexInfo + ) + pos := &ast.ColumnPosition{} + err := job.DecodeArgs(&newCol, &oldColName, pos, &modifyColumnTp, &updatedAutoRandomBits, &changingCol, &changingIdxs) + if err != nil { + checkErr = err + return + } + if job.Query == query1 && changingCol.Name.L != assertChangingColName1 { + checkErr = errors.New("changing column name is incorrect") + } + if job.Query == query2 && changingCol.Name.L != assertChangingColName2 { + checkErr = errors.New("changing column name is incorrect") + } + } + } + d.SetHook(hook) + + tk.MustExec("drop table if exists t") + tk.MustExec("create table if not exists t(c1 bigint, _col$_c1 bigint, _col$__col$_c1_0 bigint, _col$__col$__col$_c1_0_0 bigint)") + tk.MustExec("alter table t modify column c1 tinyint") + tk.MustExec("alter table t modify column _col$_c1 tinyint") + require.NoError(t, checkErr) + tk.MustExec("alter table t modify column _col$__col$_c1_0 tinyint") + require.NoError(t, checkErr) + tk.MustExec("alter table t change column _col$__col$__col$_c1_0_0 _col$__col$__col$_c1_0_0 tinyint") + + tbl = external.GetTableByName(t, tk, "test", "t") + require.Len(t, tbl.Meta().Columns, 4) + require.Equal(t, "c1", tbl.Meta().Columns[0].Name.O) + require.Equal(t, mysql.TypeTiny, tbl.Meta().Columns[0].Tp) + require.Equal(t, 0, tbl.Meta().Columns[0].Offset) + require.Equal(t, "_col$_c1", tbl.Meta().Columns[1].Name.O) + require.Equal(t, mysql.TypeTiny, tbl.Meta().Columns[1].Tp) + require.Equal(t, 1, tbl.Meta().Columns[1].Offset) + require.Equal(t, "_col$__col$_c1_0", tbl.Meta().Columns[2].Name.O) + require.Equal(t, mysql.TypeTiny, tbl.Meta().Columns[2].Tp) + require.Equal(t, 2, tbl.Meta().Columns[2].Offset) + require.Equal(t, "_col$__col$__col$_c1_0_0", tbl.Meta().Columns[3].Name.O) + require.Equal(t, mysql.TypeTiny, tbl.Meta().Columns[3].Tp) + require.Equal(t, 3, tbl.Meta().Columns[3].Offset) + + tk.MustExec("drop table if exists t") +} + +func TestWriteReorgForColumnTypeChangeOnAmendTxn(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, columnModifyLease) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_amend_pessimistic_txn = ON") + defer tk.MustExec("set global tidb_enable_amend_pessimistic_txn = OFF") + + d := dom.DDL() + testInsertOnModifyColumn := func(sql string, startColState, commitColState model.SchemaState, retStrs []string, retErr error) { + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1 (c1 int, c2 int, c3 int, unique key(c1))") + tk.MustExec("insert into t1 values (20, 20, 20);") + + var checkErr error + tk1 := testkit.NewTestKit(t, store) + defer func() { + if tk1.Session() != nil { + tk1.Session().Close() + } + }() + hook := &ddl.TestDDLCallback{Do: dom} + times := 0 + hook.OnJobUpdatedExported = func(job *model.Job) { + if job.Type != model.ActionModifyColumn || checkErr != nil || + (job.SchemaState != startColState && job.SchemaState != commitColState) { + return + } + + if job.SchemaState == startColState { + tk1.MustExec("use test") + tk1.MustExec("begin pessimistic;") + tk1.MustExec("insert into t1 values(101, 102, 103)") + return + } + if times == 0 { + _, checkErr = tk1.Exec("commit;") + } + times++ + } + d.SetHook(hook) + + tk.MustExec(sql) + if retErr == nil { + require.NoError(t, checkErr) + } else { + require.Error(t, checkErr) + require.Contains(t, checkErr.Error(), retErr.Error()) + } + tk.MustQuery("select * from t1").Check(testkit.Rows(retStrs...)) + tk.MustExec("admin check table t1") + } + + // Testing it needs reorg data. + ddlStatement := "alter table t1 change column c2 cc smallint;" + testInsertOnModifyColumn(ddlStatement, model.StateNone, model.StateWriteReorganization, []string{"20 20 20"}, domain.ErrInfoSchemaChanged) + testInsertOnModifyColumn(ddlStatement, model.StateDeleteOnly, model.StateWriteReorganization, []string{"20 20 20"}, domain.ErrInfoSchemaChanged) + testInsertOnModifyColumn(ddlStatement, model.StateWriteOnly, model.StateWriteReorganization, []string{"20 20 20"}, domain.ErrInfoSchemaChanged) + testInsertOnModifyColumn(ddlStatement, model.StateNone, model.StatePublic, []string{"20 20 20"}, domain.ErrInfoSchemaChanged) + testInsertOnModifyColumn(ddlStatement, model.StateDeleteOnly, model.StatePublic, []string{"20 20 20"}, domain.ErrInfoSchemaChanged) + testInsertOnModifyColumn(ddlStatement, model.StateWriteOnly, model.StatePublic, []string{"20 20 20"}, domain.ErrInfoSchemaChanged) + + // Testing it needs not reorg data. This case only have two states: none, public. + ddlStatement = "alter table t1 change column c2 cc bigint;" + testInsertOnModifyColumn(ddlStatement, model.StateNone, model.StateWriteReorganization, []string{"20 20 20"}, nil) + testInsertOnModifyColumn(ddlStatement, model.StateWriteOnly, model.StateWriteReorganization, []string{"20 20 20"}, nil) + testInsertOnModifyColumn(ddlStatement, model.StateNone, model.StatePublic, []string{"20 20 20", "101 102 103"}, nil) + testInsertOnModifyColumn(ddlStatement, model.StateWriteOnly, model.StatePublic, []string{"20 20 20"}, nil) +} diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index b181555f6eb04..8d889bd349591 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -2615,6 +2615,7 @@ func testPartitionDropIndex(c *C, store kv.Storage, lease time.Duration, idxName } tk.MustExec(addIdxSQL) +<<<<<<< HEAD ctx := tk.Se.(sessionctx.Context) is := domain.GetDomain(ctx).InfoSchema() t, err := is.TableByName(model.NewCIStr("test_db"), model.NewCIStr("partition_drop_idx")) @@ -2630,6 +2631,9 @@ func testPartitionDropIndex(c *C, store kv.Storage, lease time.Duration, idxName c.Assert(idx1, NotNil) testutil.SessionExecInGoroutine(c, store, dropIdxSQL, done) +======= + testutil.ExecMultiSQLInGoroutine(store, "test", []string{dropIdxSQL}, done) +>>>>>>> 48efcf68e... ddl: fix duplicate elementID allocation to make sure gc work for partition table (#33726) ticker := time.NewTicker(lease / 2) defer ticker.Stop() LOOP: @@ -2651,6 +2655,7 @@ LOOP: num += step } } +<<<<<<< HEAD is = domain.GetDomain(ctx).InfoSchema() t, err = is.TableByName(model.NewCIStr("test_db"), model.NewCIStr("partition_drop_idx")) @@ -2668,6 +2673,8 @@ LOOP: c.Assert(idxn, IsNil) idx := tables.NewIndex(pid, t.Meta(), idx1.Meta()) checkDelRangeDone(c, ctx, idx) +======= +>>>>>>> 48efcf68e... ddl: fix duplicate elementID allocation to make sure gc work for partition table (#33726) tk.MustExec("drop table partition_drop_idx;") } @@ -2705,14 +2712,17 @@ func testPartitionCancelAddIndex(c *C, store kv.Storage, d ddl.DDL, lease time.D } var checkErr error - var c3IdxInfo *model.IndexInfo hook := &ddl.TestDDLCallback{} originBatchSize := tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size") // Set batch size to lower try to slow down add-index reorganization, This if for hook to cancel this ddl job. tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 32") ctx := tk.Se.(sessionctx.Context) defer tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_batch_size = %v", originBatchSize.Rows()[0][0])) +<<<<<<< HEAD hook.OnJobUpdatedExported, c3IdxInfo, checkErr = backgroundExecOnJobUpdatedExported(c, store, ctx, hook, idxName) +======= + hook.OnJobUpdatedExported, _, checkErr = backgroundExecOnJobUpdatedExportedT(t, tk, store, hook, idxName) +>>>>>>> 48efcf68e... ddl: fix duplicate elementID allocation to make sure gc work for partition table (#33726) originHook := d.GetHook() defer d.(ddl.DDLForTest).SetHook(originHook) d.(ddl.DDLForTest).SetHook(hook) @@ -2746,6 +2756,7 @@ LOOP: times++ } } +<<<<<<< HEAD t := testGetTableByName(c, ctx, "test_db", "t1") // Only one partition id test is taken here. @@ -2757,6 +2768,8 @@ LOOP: idx := tables.NewIndex(pid, t.Meta(), c3IdxInfo) checkDelRangeDone(c, ctx, idx) +======= +>>>>>>> 48efcf68e... ddl: fix duplicate elementID allocation to make sure gc work for partition table (#33726) tk.MustExec("drop table t1") } diff --git a/ddl/delete_range.go b/ddl/delete_range.go index 57612bcd67d3d..1f130f0f8a77d 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -16,6 +16,7 @@ package ddl import ( "context" "encoding/hex" + "fmt" "math" "strings" "sync" @@ -244,14 +245,24 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error { return nil } +type elementIDAlloc struct { + id int64 +} + +func (ea *elementIDAlloc) alloc() int64 { + ea.id++ + return ea.id +} + // insertJobIntoDeleteRangeTable parses the job into delete-range arguments, // and inserts a new record into gc_delete_range table. The primary key is -// job ID, so we ignore key conflict error. +// (job ID, element ID), so we ignore key conflict error. func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, job *model.Job) error { now, err := getNowTSO(sctx) if err != nil { return errors.Trace(err) } + var ea elementIDAlloc s := sctx.(sqlexec.SQLExecutor) switch job.Type { @@ -265,7 +276,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, if batchEnd > i+batchInsertDeleteRangeSize { batchEnd = i + batchInsertDeleteRangeSize } - if err := doBatchInsert(ctx, s, job.ID, tableIDs[i:batchEnd], now); err != nil { + if err := doBatchInsert(ctx, s, job.ID, tableIDs[i:batchEnd], now, &ea); err != nil { return errors.Trace(err) } } @@ -281,7 +292,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, for _, pid := range physicalTableIDs { startKey = tablecodec.EncodeTablePrefix(pid) endKey := tablecodec.EncodeTablePrefix(pid + 1) - if err := doInsert(ctx, s, job.ID, pid, startKey, endKey, now); err != nil { + if err := doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("partition ID is %d", pid)); err != nil { return errors.Trace(err) } } @@ -289,7 +300,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, } startKey = tablecodec.EncodeTablePrefix(tableID) endKey := tablecodec.EncodeTablePrefix(tableID + 1) - return doInsert(ctx, s, job.ID, tableID, startKey, endKey, now) + return doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("table ID is %d", tableID)) case model.ActionDropTablePartition, model.ActionTruncateTablePartition: var physicalTableIDs []int64 if err := job.DecodeArgs(&physicalTableIDs); err != nil { @@ -298,7 +309,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, for _, physicalTableID := range physicalTableIDs { startKey := tablecodec.EncodeTablePrefix(physicalTableID) endKey := tablecodec.EncodeTablePrefix(physicalTableID + 1) - if err := doInsert(ctx, s, job.ID, physicalTableID, startKey, endKey, now); err != nil { + if err := doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("partition table ID is %d", physicalTableID)); err != nil { return errors.Trace(err) } } @@ -314,14 +325,14 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, for _, pid := range partitionIDs { startKey := tablecodec.EncodeTableIndexPrefix(pid, indexID) endKey := tablecodec.EncodeTableIndexPrefix(pid, indexID+1) - if err := doInsert(ctx, s, job.ID, indexID, startKey, endKey, now); err != nil { + if err := doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil { return errors.Trace(err) } } } else { startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID) endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1) - return doInsert(ctx, s, job.ID, indexID, startKey, endKey, now) + return doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("table ID is %d", tableID)) } case model.ActionDropIndex, model.ActionDropPrimaryKey: tableID := job.TableID @@ -335,15 +346,36 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, for _, pid := range partitionIDs { startKey := tablecodec.EncodeTableIndexPrefix(pid, indexID) endKey := tablecodec.EncodeTableIndexPrefix(pid, indexID+1) - if err := doInsert(ctx, s, job.ID, indexID, startKey, endKey, now); err != nil { + if err := doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil { return errors.Trace(err) } } } else { startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID) endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1) - return doInsert(ctx, s, job.ID, indexID, startKey, endKey, now) + return doInsert(ctx, s, job.ID, ea.alloc(), startKey, endKey, now, fmt.Sprintf("index ID is %d", indexID)) + } +<<<<<<< HEAD +======= + case model.ActionDropIndexes: + var indexIDs []int64 + var partitionIDs []int64 + if err := job.DecodeArgs(&[]model.CIStr{}, &[]bool{}, &indexIDs, &partitionIDs); err != nil { + return errors.Trace(err) + } + // Remove data in TiKV. + if len(indexIDs) == 0 { + return nil + } + if len(partitionIDs) == 0 { + return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &ea) + } + for _, pID := range partitionIDs { + if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pID, indexIDs, now, &ea); err != nil { + return errors.Trace(err) + } } +>>>>>>> 48efcf68e... ddl: fix duplicate elementID allocation to make sure gc work for partition table (#33726) case model.ActionDropColumn: var colName model.CIStr var indexIDs []int64 @@ -354,12 +386,12 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, if len(indexIDs) > 0 { if len(partitionIDs) > 0 { for _, pid := range partitionIDs { - if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now); err != nil { + if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, &ea); err != nil { return errors.Trace(err) } } } else { - return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now) + return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &ea) } } case model.ActionDropColumns: @@ -373,12 +405,12 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, if len(indexIDs) > 0 { if len(partitionIDs) > 0 { for _, pid := range partitionIDs { - if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now); err != nil { + if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, &ea); err != nil { return errors.Trace(err) } } } else { - return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now) + return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &ea) } } case model.ActionModifyColumn: @@ -391,10 +423,10 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, return nil } if len(partitionIDs) == 0 { - return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now) + return doBatchDeleteIndiceRange(ctx, s, job.ID, job.TableID, indexIDs, now, &ea) } for _, pid := range partitionIDs { - if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now); err != nil { + if err := doBatchDeleteIndiceRange(ctx, s, job.ID, pid, indexIDs, now, &ea); err != nil { return errors.Trace(err) } } @@ -402,8 +434,8 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, return nil } -func doBatchDeleteIndiceRange(ctx context.Context, s sqlexec.SQLExecutor, jobID, tableID int64, indexIDs []int64, ts uint64) error { - logutil.BgLogger().Info("[ddl] batch insert into delete-range indices", zap.Int64("jobID", jobID), zap.Int64s("elementIDs", indexIDs)) +func doBatchDeleteIndiceRange(ctx context.Context, s sqlexec.SQLExecutor, jobID, tableID int64, indexIDs []int64, ts uint64, ea *elementIDAlloc) error { + logutil.BgLogger().Info("[ddl] batch insert into delete-range indices", zap.Int64("jobID", jobID), zap.Int64("tableID", tableID), zap.Int64s("indexIDs", indexIDs)) paramsList := make([]interface{}, 0, len(indexIDs)*5) var buf strings.Builder buf.WriteString(insertDeleteRangeSQLPrefix) @@ -416,14 +448,14 @@ func doBatchDeleteIndiceRange(ctx context.Context, s sqlexec.SQLExecutor, jobID, if i != len(indexIDs)-1 { buf.WriteString(",") } - paramsList = append(paramsList, jobID, indexID, startKeyEncoded, endKeyEncoded, ts) + paramsList = append(paramsList, jobID, ea.alloc(), startKeyEncoded, endKeyEncoded, ts) } _, err := s.ExecuteInternal(ctx, buf.String(), paramsList...) return errors.Trace(err) } -func doInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, elementID int64, startKey, endKey kv.Key, ts uint64) error { - logutil.BgLogger().Info("[ddl] insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64("elementID", elementID)) +func doInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID, elementID int64, startKey, endKey kv.Key, ts uint64, comment string) error { + logutil.BgLogger().Info("[ddl] insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64("elementID", elementID), zap.String("comment", comment)) startKeyEncoded := hex.EncodeToString(startKey) endKeyEncoded := hex.EncodeToString(endKey) // set session disk full opt @@ -435,8 +467,8 @@ func doInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, elementID return errors.Trace(err) } -func doBatchInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, tableIDs []int64, ts uint64) error { - logutil.BgLogger().Info("[ddl] batch insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64s("elementIDs", tableIDs)) +func doBatchInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, tableIDs []int64, ts uint64, ea *elementIDAlloc) error { + logutil.BgLogger().Info("[ddl] batch insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64s("tableIDs", tableIDs)) var buf strings.Builder buf.WriteString(insertDeleteRangeSQLPrefix) paramsList := make([]interface{}, 0, len(tableIDs)*5) @@ -449,7 +481,7 @@ func doBatchInsert(ctx context.Context, s sqlexec.SQLExecutor, jobID int64, tabl if i != len(tableIDs)-1 { buf.WriteString(",") } - paramsList = append(paramsList, jobID, tableID, startKeyEncoded, endKeyEncoded, ts) + paramsList = append(paramsList, jobID, ea.alloc(), startKeyEncoded, endKeyEncoded, ts) } // set session disk full opt s.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) diff --git a/ddl/index_modify_test.go b/ddl/index_modify_test.go new file mode 100644 index 0000000000000..5072c036ba745 --- /dev/null +++ b/ddl/index_modify_test.go @@ -0,0 +1,1368 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl_test + +import ( + "context" + "fmt" + "math" + "math/rand" + "strconv" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/ddl" + testddlutil "github.com/pingcap/tidb/ddl/testutil" + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/testkit/external" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/admin" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/dbterror" + "github.com/pingcap/tidb/util/mock" + "github.com/stretchr/testify/require" +) + +const indexModifyLease = 600 * time.Millisecond + +func TestAddPrimaryKey1(t *testing.T) { + testAddIndex(t, testPlain, "create table test_add_index (c1 bigint, c2 bigint, c3 bigint, unique key(c1))", "primary") +} + +func TestAddPrimaryKey2(t *testing.T) { + testAddIndex(t, testPartition, + `create table test_add_index (c1 bigint, c2 bigint, c3 bigint, key(c1)) + partition by range (c3) ( + partition p0 values less than (3440), + partition p1 values less than (61440), + partition p2 values less than (122880), + partition p3 values less than (204800), + partition p4 values less than maxvalue)`, "primary") +} + +func TestAddPrimaryKey3(t *testing.T) { + testAddIndex(t, testPartition, + `create table test_add_index (c1 bigint, c2 bigint, c3 bigint, key(c1)) + partition by hash (c3) partitions 4;`, "primary") +} + +func TestAddPrimaryKey4(t *testing.T) { + testAddIndex(t, testPartition, + `create table test_add_index (c1 bigint, c2 bigint, c3 bigint, key(c1)) + partition by range columns (c3) ( + partition p0 values less than (3440), + partition p1 values less than (61440), + partition p2 values less than (122880), + partition p3 values less than (204800), + partition p4 values less than maxvalue)`, "primary") +} + +func TestAddIndex1(t *testing.T) { + testAddIndex(t, testPlain, + "create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))", "") +} + +func TestAddIndex1WithShardRowID(t *testing.T) { + testAddIndex(t, testPartition|testShardRowID, + "create table test_add_index (c1 bigint, c2 bigint, c3 bigint) SHARD_ROW_ID_BITS = 4 pre_split_regions = 4;", "") +} + +func TestAddIndex2(t *testing.T) { + testAddIndex(t, testPartition, + `create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1)) + partition by range (c1) ( + partition p0 values less than (3440), + partition p1 values less than (61440), + partition p2 values less than (122880), + partition p3 values less than (204800), + partition p4 values less than maxvalue)`, "") +} + +func TestAddIndex2WithShardRowID(t *testing.T) { + testAddIndex(t, testPartition|testShardRowID, + `create table test_add_index (c1 bigint, c2 bigint, c3 bigint) + SHARD_ROW_ID_BITS = 4 pre_split_regions = 4 + partition by range (c1) ( + partition p0 values less than (3440), + partition p1 values less than (61440), + partition p2 values less than (122880), + partition p3 values less than (204800), + partition p4 values less than maxvalue)`, "") +} + +func TestAddIndex3(t *testing.T) { + testAddIndex(t, testPartition, + `create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1)) + partition by hash (c1) partitions 4;`, "") +} + +func TestAddIndex3WithShardRowID(t *testing.T) { + testAddIndex(t, testPartition|testShardRowID, + `create table test_add_index (c1 bigint, c2 bigint, c3 bigint) + SHARD_ROW_ID_BITS = 4 pre_split_regions = 4 + partition by hash (c1) partitions 4;`, "") +} + +func TestAddIndex4(t *testing.T) { + testAddIndex(t, testPartition, + `create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1)) + partition by range columns (c1) ( + partition p0 values less than (3440), + partition p1 values less than (61440), + partition p2 values less than (122880), + partition p3 values less than (204800), + partition p4 values less than maxvalue)`, "") +} + +func TestAddIndex4WithShardRowID(t *testing.T) { + testAddIndex(t, testPartition|testShardRowID, + `create table test_add_index (c1 bigint, c2 bigint, c3 bigint) + SHARD_ROW_ID_BITS = 4 pre_split_regions = 4 + partition by range columns (c1) ( + partition p0 values less than (3440), + partition p1 values less than (61440), + partition p2 values less than (122880), + partition p3 values less than (204800), + partition p4 values less than maxvalue)`, "") +} + +func TestAddIndex5(t *testing.T) { + testAddIndex(t, testClusteredIndex, + `create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c2, c3))`, "") +} + +type testAddIndexType uint8 + +const ( + testPlain testAddIndexType = 1 + testPartition testAddIndexType = 1 << 1 + testClusteredIndex testAddIndexType = 1 << 2 + testShardRowID testAddIndexType = 1 << 3 +) + +func testAddIndex(t *testing.T, tp testAddIndexType, createTableSQL, idxTp string) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, indexModifyLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + isTestPartition := (testPartition & tp) > 0 + isTestShardRowID := (testShardRowID & tp) > 0 + if isTestShardRowID { + atomic.StoreUint32(&ddl.EnableSplitTableRegion, 1) + tk.MustExec("set global tidb_scatter_region = 1") + defer func() { + atomic.StoreUint32(&ddl.EnableSplitTableRegion, 0) + tk.MustExec("set global tidb_scatter_region = 0") + }() + } + if isTestPartition { + tk.MustExec("set @@session.tidb_enable_table_partition = '1';") + } else if (testClusteredIndex & tp) > 0 { + tk.Session().GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeOn + } + tk.MustExec("drop table if exists test_add_index") + tk.MustExec(createTableSQL) + + done := make(chan error, 1) + start := -10 + num := defaultBatchSize + // first add some rows + batchInsert(tk, "test_add_index", start, num) + + // Add some discrete rows. + maxBatch := 20 + batchCnt := 100 + otherKeys := make([]int, 0, batchCnt*maxBatch) + // Make sure there are no duplicate keys. + base := defaultBatchSize * 20 + for i := 1; i < batchCnt; i++ { + if isTestShardRowID { + base = i % 4 << 61 + } + n := base + i*defaultBatchSize + i + for j := 0; j < rand.Intn(maxBatch); j++ { + n += j + sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", n, n, n) + tk.MustExec(sql) + otherKeys = append(otherKeys, n) + } + } + // Encounter the value of math.MaxInt64 in middle of + v := math.MaxInt64 - defaultBatchSize/2 + tk.MustExec(fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", v, v, v)) + otherKeys = append(otherKeys, v) + + addIdxSQL := fmt.Sprintf("alter table test_add_index add %s key c3_index(c3)", idxTp) + testddlutil.SessionExecInGoroutine(store, "test", addIdxSQL, done) + + deletedKeys := make(map[int]struct{}) + + ticker := time.NewTicker(indexModifyLease / 2) + defer ticker.Stop() +LOOP: + for { + select { + case err := <-done: + if err == nil { + break LOOP + } + require.NoError(t, err) + case <-ticker.C: + // When the server performance is particularly poor, + // the adding index operation can not be completed. + // So here is a limit to the number of rows inserted. + if num > defaultBatchSize*10 { + break + } + step := 5 + // delete some rows, and add some data + for i := num; i < num+step; i++ { + n := rand.Intn(num) + deletedKeys[n] = struct{}{} + sql := fmt.Sprintf("delete from test_add_index where c1 = %d", n) + tk.MustExec(sql) + sql = fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i) + tk.MustExec(sql) + } + num += step + } + } + + if isTestShardRowID { + rows := tk.MustQuery("show table test_add_index regions").Rows() + require.GreaterOrEqual(t, len(rows), 16) + tk.MustExec("admin check table test_add_index") + return + } + + // get exists keys + keys := make([]int, 0, num) + for i := start; i < num; i++ { + if _, ok := deletedKeys[i]; ok { + continue + } + keys = append(keys, i) + } + keys = append(keys, otherKeys...) + + // test index key + expectedRows := make([][]interface{}, 0, len(keys)) + for _, key := range keys { + expectedRows = append(expectedRows, []interface{}{fmt.Sprintf("%v", key)}) + } + tk.MustQuery(fmt.Sprintf("select c1 from test_add_index where c3 >= %d order by c1", start)).Check(expectedRows) + tk.MustExec("admin check table test_add_index") + if isTestPartition { + return + } + + // TODO: Support explain in future. + // rows := tk.MustQuery("explain select c1 from test_add_index where c3 >= 100").Rows() + // ay := dumpRows(c, rows) + // require.Contains(t, fmt.Sprintf("%v", ay), "c3_index") + + // get all row handles + require.NoError(t, tk.Session().NewTxn(context.Background())) + tbl := external.GetTableByName(t, tk, "test", "test_add_index") + handles := kv.NewHandleMap() + err := tables.IterRecords(tbl, tk.Session(), tbl.Cols(), + func(h kv.Handle, data []types.Datum, cols []*table.Column) (bool, error) { + handles.Set(h, struct{}{}) + return true, nil + }) + require.NoError(t, err) + + // check in index + var nidx table.Index + idxName := "c3_index" + if len(idxTp) != 0 { + idxName = "primary" + } + for _, tidx := range tbl.Indices() { + if tidx.Meta().Name.L == idxName { + nidx = tidx + break + } + } + // Make sure there is index with name c3_index. + require.NotNil(t, nidx) + require.Greater(t, nidx.Meta().ID, int64(0)) + txn, err := tk.Session().Txn(true) + require.NoError(t, err) + require.NoError(t, txn.Rollback()) + + require.NoError(t, tk.Session().NewTxn(context.Background())) + tk.MustExec("admin check table test_add_index") + tk.MustExec("drop table test_add_index") +} + +func TestAddIndexForGeneratedColumn(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, indexModifyLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(y year NOT NULL DEFAULT '2155')") + for i := 0; i < 50; i++ { + tk.MustExec("insert into t values (?)", i) + } + tk.MustExec("insert into t values()") + tk.MustExec("ALTER TABLE t ADD COLUMN y1 year as (y + 2)") + tk.MustExec("ALTER TABLE t ADD INDEX idx_y(y1)") + + tbl := external.GetTableByName(t, tk, "test", "t") + for _, idx := range tbl.Indices() { + require.False(t, strings.EqualFold(idx.Meta().Name.L, "idx_c2")) + } + // NOTE: this test case contains a bug, it should be uncommented after the bug is fixed. + // TODO: Fix bug https://github.com/pingcap/tidb/issues/12181 + // tk.MustExec("delete from t where y = 2155") + // tk.MustExec("alter table t add index idx_y(y1)") + // tk.MustExec("alter table t drop index idx_y") + + // Fix issue 9311. + tk.MustExec("drop table if exists gcai_table") + tk.MustExec("create table gcai_table (id int primary key);") + tk.MustExec("insert into gcai_table values(1);") + tk.MustExec("ALTER TABLE gcai_table ADD COLUMN d date DEFAULT '9999-12-31';") + tk.MustExec("ALTER TABLE gcai_table ADD COLUMN d1 date as (DATE_SUB(d, INTERVAL 31 DAY));") + tk.MustExec("ALTER TABLE gcai_table ADD INDEX idx(d1);") + tk.MustQuery("select * from gcai_table").Check(testkit.Rows("1 9999-12-31 9999-11-30")) + tk.MustQuery("select d1 from gcai_table use index(idx)").Check(testkit.Rows("9999-11-30")) + tk.MustExec("admin check table gcai_table") + // The column is PKIsHandle in generated column expression. + tk.MustExec("ALTER TABLE gcai_table ADD COLUMN id1 int as (id+5);") + tk.MustExec("ALTER TABLE gcai_table ADD INDEX idx1(id1);") + tk.MustQuery("select * from gcai_table").Check(testkit.Rows("1 9999-12-31 9999-11-30 6")) + tk.MustQuery("select id1 from gcai_table use index(idx1)").Check(testkit.Rows("6")) + tk.MustExec("admin check table gcai_table") +} + +// TestAddPrimaryKeyRollback1 is used to test scenarios that will roll back when a duplicate primary key is encountered. +func TestAddPrimaryKeyRollback1(t *testing.T) { + idxName := "PRIMARY" + addIdxSQL := "alter table t1 add primary key c3_index (c3);" + errMsg := "[kv:1062]Duplicate entry '" + strconv.Itoa(defaultBatchSize*2-10) + "' for key 'PRIMARY'" + testAddIndexRollback(t, idxName, addIdxSQL, errMsg, false) +} + +// TestAddPrimaryKeyRollback2 is used to test scenarios that will roll back when a null primary key is encountered. +func TestAddPrimaryKeyRollback2(t *testing.T) { + idxName := "PRIMARY" + addIdxSQL := "alter table t1 add primary key c3_index (c3);" + errMsg := "[ddl:1138]Invalid use of NULL value" + testAddIndexRollback(t, idxName, addIdxSQL, errMsg, true) +} + +func TestAddUniqueIndexRollback(t *testing.T) { + idxName := "c3_index" + addIdxSQL := "create unique index c3_index on t1 (c3)" + errMsg := "[kv:1062]Duplicate entry '" + strconv.Itoa(defaultBatchSize*2-10) + "' for key 'c3_index'" + testAddIndexRollback(t, idxName, addIdxSQL, errMsg, false) +} + +func testAddIndexRollback(t *testing.T, idxName, addIdxSQL, errMsg string, hasNullValsInKey bool) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, indexModifyLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t1 (c1 int, c2 int, c3 int, unique key(c1))") + // defaultBatchSize is equal to ddl.defaultBatchSize + base := defaultBatchSize * 2 + count := base + // add some rows + batchInsert(tk, "t1", 0, count) + // add some null rows + if hasNullValsInKey { + for i := count - 10; i < count; i++ { + tk.MustExec("insert into t1 values (?, ?, null)", i+10, i) + } + } else { + // add some duplicate rows + for i := count - 10; i < count; i++ { + tk.MustExec("insert into t1 values (?, ?, ?)", i+10, i, i) + } + } + + done := make(chan error, 1) + go backgroundExec(store, addIdxSQL, done) + + times := 0 + ticker := time.NewTicker(indexModifyLease / 2) + defer ticker.Stop() +LOOP: + for { + select { + case err := <-done: + require.EqualError(t, err, errMsg) + break LOOP + case <-ticker.C: + if times >= 10 { + break + } + step := 5 + // delete some rows, and add some data + for i := count; i < count+step; i++ { + n := rand.Intn(count) + // (2048, 2038, 2038) and (2038, 2038, 2038) + // Don't delete rows where c1 is 2048 or 2038, otherwise, the entry value in duplicated error message would change. + if n == defaultBatchSize*2-10 || n == defaultBatchSize*2 { + continue + } + tk.MustExec("delete from t1 where c1 = ?", n) + tk.MustExec("insert into t1 values (?, ?, ?)", i+10, i, i) + } + count += step + times++ + } + } + + tbl := external.GetTableByName(t, tk, "test", "t1") + for _, tidx := range tbl.Indices() { + require.False(t, strings.EqualFold(tidx.Meta().Name.L, idxName)) + } + + // delete duplicated/null rows, then add index + for i := base - 10; i < base; i++ { + tk.MustExec("delete from t1 where c1 = ?", i+10) + } + tk.MustExec(addIdxSQL) + tk.MustExec("drop table t1") +} + +func TestAddIndexWithSplitTable(t *testing.T) { + createSQL := "CREATE TABLE test_add_index(a bigint PRIMARY KEY AUTO_RANDOM(4), b varchar(255), c bigint)" + stSQL := fmt.Sprintf("SPLIT TABLE test_add_index BETWEEN (%d) AND (%d) REGIONS 16;", math.MinInt64, math.MaxInt64) + testAddIndexWithSplitTable(t, createSQL, stSQL) +} + +func TestAddIndexWithShardRowID(t *testing.T) { + createSQL := "create table test_add_index(a bigint, b bigint, c bigint) SHARD_ROW_ID_BITS = 4 pre_split_regions = 4;" + testAddIndexWithSplitTable(t, createSQL, "") +} + +func testAddIndexWithSplitTable(t *testing.T, createSQL, splitTableSQL string) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, indexModifyLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + hasAutoRandomField := len(splitTableSQL) > 0 + if !hasAutoRandomField { + atomic.StoreUint32(&ddl.EnableSplitTableRegion, 1) + tk.MustExec("set global tidb_scatter_region = 1") + defer func() { + atomic.StoreUint32(&ddl.EnableSplitTableRegion, 0) + tk.MustExec("set global tidb_scatter_region = 0") + }() + } + tk.MustExec(createSQL) + + batchInsertRows := func(tk *testkit.TestKit, needVal bool, tbl string, start, end int) error { + dml := fmt.Sprintf("insert into %s values", tbl) + for i := start; i < end; i++ { + if needVal { + dml += fmt.Sprintf("(%d, %d, %d)", i, i, i) + } else { + dml += "()" + } + if i != end-1 { + dml += "," + } + } + _, err := tk.Exec(dml) + return err + } + + done := make(chan error, 1) + start := -20 + num := defaultBatchSize + // Add some discrete rows. + goCnt := 10 + errCh := make(chan error, goCnt) + for i := 0; i < goCnt; i++ { + base := (i % 8) << 60 + go func(b int, eCh chan error) { + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + eCh <- batchInsertRows(tk1, !hasAutoRandomField, "test_add_index", base+start, base+num) + }(base, errCh) + } + for i := 0; i < goCnt; i++ { + err := <-errCh + require.NoError(t, err) + } + + if hasAutoRandomField { + tk.MustQuery(splitTableSQL).Check(testkit.Rows("15 1")) + } + tk.MustQuery("select @@session.tidb_wait_split_region_finish").Check(testkit.Rows("1")) + rows := tk.MustQuery("show table test_add_index regions").Rows() + require.Len(t, rows, 16) + addIdxSQL := "alter table test_add_index add index idx(a)" + testddlutil.SessionExecInGoroutine(store, "test", addIdxSQL, done) + + ticker := time.NewTicker(indexModifyLease / 5) + defer ticker.Stop() + num = 0 +LOOP: + for { + select { + case err := <-done: + if err == nil { + break LOOP + } + require.NoError(t, err) + case <-ticker.C: + // When the server performance is particularly poor, + // the adding index operation can not be completed. + // So here is a limit to the number of rows inserted. + if num >= 1000 { + break + } + step := 20 + // delete, insert and update some data + for i := num; i < num+step; i++ { + sql := fmt.Sprintf("delete from test_add_index where a = %d", i+1) + tk.MustExec(sql) + if hasAutoRandomField { + sql = "insert into test_add_index values ()" + } else { + sql = fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i) + } + tk.MustExec(sql) + sql = fmt.Sprintf("update test_add_index set b = %d", i*10) + tk.MustExec(sql) + } + num += step + } + } + + tk.MustExec("admin check table test_add_index") +} + +func TestAddAnonymousIndex(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, indexModifyLease) + defer clean() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t_anonymous_index (c1 int, c2 int, C3 int)") + tk.MustExec("alter table t_anonymous_index add index (c1, c2)") + // for dropping empty index + err := tk.ExecToErr("alter table t_anonymous_index drop index") + require.Error(t, err) + // The index name is c1 when adding index (c1, c2). + tk.MustExec("alter table t_anonymous_index drop index c1") + tbl := external.GetTableByName(t, tk, "test", "t_anonymous_index") + require.Len(t, tbl.Indices(), 0) + // for adding some indices that the first column name is c1 + tk.MustExec("alter table t_anonymous_index add index (c1)") + err = tk.ExecToErr("alter table t_anonymous_index add index c1 (c2)") + require.Error(t, err) + tbl = external.GetTableByName(t, tk, "test", "t_anonymous_index") + require.Len(t, tbl.Indices(), 1) + require.Equal(t, "c1", tbl.Indices()[0].Meta().Name.L) + // The MySQL will be a warning. + tk.MustExec("alter table t_anonymous_index add index c1_3 (c1)") + tk.MustExec("alter table t_anonymous_index add index (c1, c2, C3)") + // The MySQL will be a warning. + tk.MustExec("alter table t_anonymous_index add index (c1)") + tbl = external.GetTableByName(t, tk, "test", "t_anonymous_index") + require.Len(t, tbl.Indices(), 4) + tk.MustExec("alter table t_anonymous_index drop index c1") + tk.MustExec("alter table t_anonymous_index drop index c1_2") + tk.MustExec("alter table t_anonymous_index drop index c1_3") + tk.MustExec("alter table t_anonymous_index drop index c1_4") + // for case-insensitive + tk.MustExec("alter table t_anonymous_index add index (C3)") + tk.MustExec("alter table t_anonymous_index drop index c3") + tk.MustExec("alter table t_anonymous_index add index c3 (C3)") + tk.MustExec("alter table t_anonymous_index drop index C3") + // for anonymous index with column name `primary` + tk.MustExec("create table t_primary (`primary` int, b int, key (`primary`))") + tbl = external.GetTableByName(t, tk, "test", "t_primary") + require.Equal(t, "primary_2", tbl.Indices()[0].Meta().Name.L) + tk.MustExec("alter table t_primary add index (`primary`);") + tbl = external.GetTableByName(t, tk, "test", "t_primary") + require.Equal(t, "primary_2", tbl.Indices()[0].Meta().Name.L) + require.Equal(t, "primary_3", tbl.Indices()[1].Meta().Name.L) + tk.MustExec("alter table t_primary add primary key(b);") + tbl = external.GetTableByName(t, tk, "test", "t_primary") + require.Equal(t, "primary_2", tbl.Indices()[0].Meta().Name.L) + require.Equal(t, "primary_3", tbl.Indices()[1].Meta().Name.L) + require.Equal(t, "primary", tbl.Indices()[2].Meta().Name.L) + tk.MustExec("create table t_primary_2 (`primary` int, key primary_2 (`primary`), key (`primary`))") + tbl = external.GetTableByName(t, tk, "test", "t_primary_2") + require.Equal(t, "primary_2", tbl.Indices()[0].Meta().Name.L) + require.Equal(t, "primary_3", tbl.Indices()[1].Meta().Name.L) + tk.MustExec("create table t_primary_3 (`primary_2` int, key(`primary_2`), `primary` int, key(`primary`));") + tbl = external.GetTableByName(t, tk, "test", "t_primary_3") + require.Equal(t, "primary_2", tbl.Indices()[0].Meta().Name.L) + require.Equal(t, "primary_3", tbl.Indices()[1].Meta().Name.L) +} + +func TestAddIndexWithPK(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, indexModifyLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + tests := []struct { + name string + mode variable.ClusteredIndexDefMode + }{ + { + "ClusteredIndexDefModeIntOnly", + variable.ClusteredIndexDefModeIntOnly, + }, + { + "ClusteredIndexDefModeOn", + variable.ClusteredIndexDefModeOn, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tk.Session().GetSessionVars().EnableClusteredIndex = test.mode + tk.MustExec("drop table if exists test_add_index_with_pk") + tk.MustExec("create table test_add_index_with_pk(a int not null, b int not null default '0', primary key(a))") + tk.MustExec("insert into test_add_index_with_pk values(1, 2)") + tk.MustExec("alter table test_add_index_with_pk add index idx (a)") + tk.MustQuery("select a from test_add_index_with_pk").Check(testkit.Rows("1")) + tk.MustExec("insert into test_add_index_with_pk values(2, 2)") + tk.MustExec("alter table test_add_index_with_pk add index idx1 (a, b)") + tk.MustQuery("select * from test_add_index_with_pk").Check(testkit.Rows("1 2", "2 2")) + tk.MustExec("drop table if exists test_add_index_with_pk1") + tk.MustExec("create table test_add_index_with_pk1(a int not null, b int not null default '0', c int, d int, primary key(c))") + tk.MustExec("insert into test_add_index_with_pk1 values(1, 1, 1, 1)") + tk.MustExec("alter table test_add_index_with_pk1 add index idx (c)") + tk.MustExec("insert into test_add_index_with_pk1 values(2, 2, 2, 2)") + tk.MustQuery("select * from test_add_index_with_pk1").Check(testkit.Rows("1 1 1 1", "2 2 2 2")) + tk.MustExec("drop table if exists test_add_index_with_pk2") + tk.MustExec("create table test_add_index_with_pk2(a int not null, b int not null default '0', c int unsigned, d int, primary key(c))") + tk.MustExec("insert into test_add_index_with_pk2 values(1, 1, 1, 1)") + tk.MustExec("alter table test_add_index_with_pk2 add index idx (c)") + tk.MustExec("insert into test_add_index_with_pk2 values(2, 2, 2, 2)") + tk.MustQuery("select * from test_add_index_with_pk2").Check(testkit.Rows("1 1 1 1", "2 2 2 2")) + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int, b int, c int, primary key(a, b));") + tk.MustExec("insert into t values (1, 2, 3);") + tk.MustExec("create index idx on t (a, b);") + }) + } +} + +func TestCancelAddPrimaryKey(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, indexModifyLease) + defer clean() + idxName := "primary" + addIdxSQL := "alter table t1 add primary key idx_c2 (c2);" + testCancelAddIndex(t, store, dom, idxName, addIdxSQL) + + // Check the column's flag when the "add primary key" failed. + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + require.NoError(t, tk.Session().NewTxn(context.Background())) + tbl := external.GetTableByName(t, tk, "test", "t1") + col1Flag := tbl.Cols()[1].Flag + require.True(t, !mysql.HasNotNullFlag(col1Flag) && !mysql.HasPreventNullInsertFlag(col1Flag) && mysql.HasUnsignedFlag(col1Flag)) +} + +func TestCancelAddIndex(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, indexModifyLease) + defer clean() + idxName := "c3_index" + addIdxSQL := "create unique index c3_index on t1 (c3)" + testCancelAddIndex(t, store, dom, idxName, addIdxSQL) +} + +func testCancelAddIndex(t *testing.T, store kv.Storage, dom *domain.Domain, idxName, addIdxSQL string) { + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t1 (c1 int, c2 int unsigned, c3 int, unique key(c1))") + + d := dom.DDL() + + // defaultBatchSize is equal to ddl.defaultBatchSize + count := defaultBatchSize * 32 + start := 0 + for i := start; i < count; i += defaultBatchSize { + batchInsert(tk, "t1", i, i+defaultBatchSize) + } + + hook := &ddl.TestDDLCallback{Do: dom} + originBatchSize := tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size") + // Set batch size to lower try to slow down add-index reorganization, This if for hook to cancel this ddl job. + tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 32") + defer tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_batch_size = %v", originBatchSize.Rows()[0][0])) + // let hook.OnJobUpdatedExported has chance to cancel the job. + // the hook.OnJobUpdatedExported is called when the job is updated, runReorgJob will wait ddl.ReorgWaitTimeout, then return the ddl.runDDLJob. + // After that ddl call d.hook.OnJobUpdated(job), so that we can canceled the job in this test case. + var checkErr error + hook.OnJobUpdatedExported, _, checkErr = backgroundExecOnJobUpdatedExported(t, tk, store, hook, idxName) + originalHook := d.GetHook() + jobIDExt := wrapJobIDExtCallback(hook) + d.SetHook(jobIDExt) + done := make(chan error, 1) + go backgroundExec(store, addIdxSQL, done) + + times := 0 + ticker := time.NewTicker(indexModifyLease / 2) + defer ticker.Stop() +LOOP: + for { + select { + case err := <-done: + require.NoError(t, checkErr) + require.EqualError(t, err, "[ddl:8214]Cancelled DDL job") + break LOOP + case <-ticker.C: + if times >= 10 { + break + } + step := 5 + // delete some rows, and add some data + for i := count; i < count+step; i++ { + n := rand.Intn(count) + tk.MustExec("delete from t1 where c1 = ?", n) + tk.MustExec("insert into t1 values (?, ?, ?)", i+10, i, i) + } + count += step + times++ + } + } + d.SetHook(originalHook) +} + +func backgroundExecOnJobUpdatedExported(t *testing.T, tk *testkit.TestKit, store kv.Storage, hook *ddl.TestDDLCallback, idxName string) (func(*model.Job), *model.IndexInfo, error) { + var checkErr error + first := true + c3IdxInfo := &model.IndexInfo{} + hook.OnJobUpdatedExported = func(job *model.Job) { + addIndexNotFirstReorg := (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) && + job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 + // If the action is adding index and the state is writing reorganization, it want to test the case of cancelling the job when backfilling indexes. + // When the job satisfies this case of addIndexNotFirstReorg, the worker will start to backfill indexes. + if !addIndexNotFirstReorg { + // Get the index's meta. + if c3IdxInfo.ID != 0 { + return + } + tbl := external.GetTableByName(t, tk, "test", "t1") + for _, index := range tbl.Indices() { + if !tables.IsIndexWritable(index) { + continue + } + if index.Meta().Name.L == idxName { + *c3IdxInfo = *index.Meta() + } + } + return + } + // The job satisfies the case of addIndexNotFirst for the first time, the worker hasn't finished a batch of backfill indexes. + if first { + first = false + return + } + if checkErr != nil { + return + } + hookCtx := mock.NewContext() + hookCtx.Store = store + err := hookCtx.NewTxn(context.Background()) + if err != nil { + checkErr = errors.Trace(err) + return + } + jobIDs := []int64{job.ID} + txn, err := hookCtx.Txn(true) + if err != nil { + checkErr = errors.Trace(err) + return + } + errs, err := admin.CancelJobs(txn, jobIDs) + if err != nil { + checkErr = errors.Trace(err) + return + } + // It only tests cancel one DDL job. + if errs[0] != nil { + checkErr = errors.Trace(errs[0]) + return + } + txn, err = hookCtx.Txn(true) + if err != nil { + checkErr = errors.Trace(err) + return + } + err = txn.Commit(context.Background()) + if err != nil { + checkErr = errors.Trace(err) + } + } + return hook.OnJobUpdatedExported, c3IdxInfo, checkErr +} + +// TestCancelAddIndex1 tests canceling ddl job when the add index worker is not started. +func TestCancelAddIndex1(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, indexModifyLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(c1 int, c2 int)") + for i := 0; i < 50; i++ { + tk.MustExec("insert into t values (?, ?)", i, i) + } + + var checkErr error + hook := &ddl.TestDDLCallback{Do: dom} + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer == 0 { + jobIDs := []int64{job.ID} + hookCtx := mock.NewContext() + hookCtx.Store = store + err := hookCtx.NewTxn(context.Background()) + if err != nil { + checkErr = errors.Trace(err) + return + } + txn, err := hookCtx.Txn(true) + if err != nil { + checkErr = errors.Trace(err) + return + } + errs, err := admin.CancelJobs(txn, jobIDs) + if err != nil { + checkErr = errors.Trace(err) + return + } + + if errs[0] != nil { + checkErr = errors.Trace(errs[0]) + return + } + + checkErr = txn.Commit(context.Background()) + } + } + originalHook := dom.DDL().GetHook() + dom.DDL().SetHook(hook) + err := tk.ExecToErr("alter table t add index idx_c2(c2)") + require.NoError(t, checkErr) + require.EqualError(t, err, "[ddl:8214]Cancelled DDL job") + + dom.DDL().SetHook(originalHook) + tbl := external.GetTableByName(t, tk, "test", "t") + for _, idx := range tbl.Indices() { + require.False(t, strings.EqualFold(idx.Meta().Name.L, "idx_c2")) + } + tk.MustExec("alter table t add index idx_c2(c2)") + tk.MustExec("alter table t drop index idx_c2") +} + +func TestAddGlobalIndex(t *testing.T) { + defer config.RestoreFunc()() + config.UpdateGlobal(func(conf *config.Config) { + conf.EnableGlobalIndex = true + }) + store, clean := testkit.CreateMockStoreWithSchemaLease(t, indexModifyLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table test_t1 (a int, b int) partition by range (b)" + + " (partition p0 values less than (10), " + + " partition p1 values less than (maxvalue));") + tk.MustExec("insert test_t1 values (1, 1)") + tk.MustExec("alter table test_t1 add unique index p_a (a);") + tk.MustExec("insert test_t1 values (2, 11)") + tbl := external.GetTableByName(t, tk, "test", "test_t1") + tblInfo := tbl.Meta() + indexInfo := tblInfo.FindIndexByName("p_a") + require.NotNil(t, indexInfo) + require.True(t, indexInfo.Global) + + require.NoError(t, tk.Session().NewTxn(context.Background())) + txn, err := tk.Session().Txn(true) + require.NoError(t, err) + + // check row 1 + pid := tblInfo.Partition.Definitions[0].ID + idxVals := []types.Datum{types.NewDatum(1)} + rowVals := []types.Datum{types.NewDatum(1), types.NewDatum(1)} + checkGlobalIndexRow(t, tk.Session(), tblInfo, indexInfo, pid, idxVals, rowVals) + + // check row 2 + pid = tblInfo.Partition.Definitions[1].ID + idxVals = []types.Datum{types.NewDatum(2)} + rowVals = []types.Datum{types.NewDatum(2), types.NewDatum(11)} + checkGlobalIndexRow(t, tk.Session(), tblInfo, indexInfo, pid, idxVals, rowVals) + require.NoError(t, txn.Commit(context.Background())) + + // Test add global Primary Key index + tk.MustExec("create table test_t2 (a int, b int) partition by range (b)" + + " (partition p0 values less than (10), " + + " partition p1 values less than (maxvalue));") + tk.MustExec("insert test_t2 values (1, 1)") + tk.MustExec("alter table test_t2 add primary key (a) nonclustered;") + tk.MustExec("insert test_t2 values (2, 11)") + tbl = external.GetTableByName(t, tk, "test", "test_t2") + tblInfo = tbl.Meta() + indexInfo = tblInfo.FindIndexByName("primary") + require.NotNil(t, indexInfo) + require.True(t, indexInfo.Global) + + require.NoError(t, tk.Session().NewTxn(context.Background())) + txn, err = tk.Session().Txn(true) + require.NoError(t, err) + + // check row 1 + pid = tblInfo.Partition.Definitions[0].ID + idxVals = []types.Datum{types.NewDatum(1)} + rowVals = []types.Datum{types.NewDatum(1), types.NewDatum(1)} + checkGlobalIndexRow(t, tk.Session(), tblInfo, indexInfo, pid, idxVals, rowVals) + + // check row 2 + pid = tblInfo.Partition.Definitions[1].ID + idxVals = []types.Datum{types.NewDatum(2)} + rowVals = []types.Datum{types.NewDatum(2), types.NewDatum(11)} + checkGlobalIndexRow(t, tk.Session(), tblInfo, indexInfo, pid, idxVals, rowVals) + + require.NoError(t, txn.Commit(context.Background())) +} + +// checkGlobalIndexRow reads one record from global index and check. Only support int handle. +func checkGlobalIndexRow( + t *testing.T, + ctx sessionctx.Context, + tblInfo *model.TableInfo, + indexInfo *model.IndexInfo, + pid int64, + idxVals []types.Datum, + rowVals []types.Datum, +) { + require.NoError(t, ctx.NewTxn(context.Background())) + txn, err := ctx.Txn(true) + require.NoError(t, err) + sc := ctx.GetSessionVars().StmtCtx + + tblColMap := make(map[int64]*types.FieldType, len(tblInfo.Columns)) + for _, col := range tblInfo.Columns { + tblColMap[col.ID] = &col.FieldType + } + + // Check local index entry does not exist. + localPrefix := tablecodec.EncodeTableIndexPrefix(pid, indexInfo.ID) + it, err := txn.Iter(localPrefix, nil) + require.NoError(t, err) + // no local index entry. + require.False(t, it.Valid() && it.Key().HasPrefix(localPrefix)) + it.Close() + + // Check global index entry. + encodedValue, err := codec.EncodeKey(sc, nil, idxVals...) + require.NoError(t, err) + key := tablecodec.EncodeIndexSeekKey(tblInfo.ID, indexInfo.ID, encodedValue) + require.NoError(t, err) + value, err := txn.Get(context.Background(), key) + require.NoError(t, err) + idxColInfos := tables.BuildRowcodecColInfoForIndexColumns(indexInfo, tblInfo) + colVals, err := tablecodec.DecodeIndexKV(key, value, len(indexInfo.Columns), tablecodec.HandleDefault, idxColInfos) + require.NoError(t, err) + require.Len(t, colVals, len(idxVals)+2) + for i, val := range idxVals { + _, d, err := codec.DecodeOne(colVals[i]) + require.NoError(t, err) + require.Equal(t, val, d) + } + _, d, err := codec.DecodeOne(colVals[len(idxVals)+1]) // pid + require.NoError(t, err) + require.Equal(t, pid, d.GetInt64()) + + _, d, err = codec.DecodeOne(colVals[len(idxVals)]) // handle + require.NoError(t, err) + h := kv.IntHandle(d.GetInt64()) + rowKey := tablecodec.EncodeRowKey(pid, h.Encoded()) + rowValue, err := txn.Get(context.Background(), rowKey) + require.NoError(t, err) + rowValueDatums, err := tablecodec.DecodeRowToDatumMap(rowValue, tblColMap, time.UTC) + require.NoError(t, err) + require.NotNil(t, rowValueDatums) + for i, val := range rowVals { + require.Equal(t, val, rowValueDatums[tblInfo.Columns[i].ID]) + } +} + +func TestDropIndexes(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, indexModifyLease) + defer clean() + // drop multiple indexes + createSQL := "create table test_drop_indexes (id int, c1 int, c2 int, primary key(id), key i1(c1), key i2(c2));" + dropIdxSQL := "alter table test_drop_indexes drop index i1, drop index i2;" + idxNames := []string{"i1", "i2"} + testDropIndexes(t, store, createSQL, dropIdxSQL, idxNames) + + createSQL = "create table test_drop_indexes (id int, c1 int, c2 int, primary key(id) nonclustered, unique key i1(c1), key i2(c2));" + dropIdxSQL = "alter table test_drop_indexes drop primary key, drop index i1;" + idxNames = []string{"primary", "i1"} + testDropIndexes(t, store, createSQL, dropIdxSQL, idxNames) + + createSQL = "create table test_drop_indexes (uuid varchar(32), c1 int, c2 int, primary key(uuid), unique key i1(c1), key i2(c2));" + dropIdxSQL = "alter table test_drop_indexes drop primary key, drop index i1, drop index i2;" + idxNames = []string{"primary", "i1", "i2"} + testDropIndexes(t, store, createSQL, dropIdxSQL, idxNames) + + testDropIndexesIfExists(t, store) + testDropIndexesFromPartitionedTable(t, store) + testCancelDropIndexes(t, store, dom.DDL()) +} + +func testDropIndexes(t *testing.T, store kv.Storage, createSQL, dropIdxSQL string, idxNames []string) { + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists test_drop_indexes") + tk.MustExec(createSQL) + done := make(chan error, 1) + + num := 100 + // add some rows + for i := 0; i < num; i++ { + tk.MustExec("insert into test_drop_indexes values (?, ?, ?)", i, i, i) + } + idxIDs := make([]int64, 0, 3) + for _, idxName := range idxNames { + idxIDs = append(idxIDs, external.GetIndexID(t, tk, "test", "test_drop_indexes", idxName)) + } + testddlutil.SessionExecInGoroutine(store, "test", dropIdxSQL, done) + + ticker := time.NewTicker(indexModifyLease / 2) + defer ticker.Stop() +LOOP: + for { + select { + case err := <-done: + if err == nil { + break LOOP + } + require.NoError(t, err) + case <-ticker.C: + step := 5 + // delete some rows, and add some data + for i := num; i < num+step; i++ { + n := rand.Intn(num) + tk.MustExec("update test_drop_indexes set c2 = 1 where c1 = ?", n) + tk.MustExec("insert into test_drop_indexes values (?, ?, ?)", i, i, i) + } + num += step + } + } +} + +func testDropIndexesIfExists(t *testing.T, store kv.Storage) { + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("drop table if exists test_drop_indexes_if_exists;") + tk.MustExec("create table test_drop_indexes_if_exists (id int, c1 int, c2 int, primary key(id), key i1(c1), key i2(c2));") + + // Drop different indexes. + tk.MustGetErrMsg( + "alter table test_drop_indexes_if_exists drop index i1, drop index i3;", + "[ddl:1091]index i3 doesn't exist", + ) + tk.MustExec("alter table test_drop_indexes_if_exists drop index i1, drop index if exists i3;") + tk.MustQuery("show warnings;").Check(testkit.RowsWithSep("|", "Warning|1091|index i3 doesn't exist")) + + // Verify the impact of deletion order when dropping duplicate indexes. + tk.MustGetErrMsg( + "alter table test_drop_indexes_if_exists drop index i2, drop index i2;", + "[ddl:1091]index i2 doesn't exist", + ) + tk.MustGetErrMsg( + "alter table test_drop_indexes_if_exists drop index if exists i2, drop index i2;", + "[ddl:1091]index i2 doesn't exist", + ) + tk.MustExec("alter table test_drop_indexes_if_exists drop index i2, drop index if exists i2;") + tk.MustQuery("show warnings;").Check(testkit.RowsWithSep("|", "Warning|1091|index i2 doesn't exist")) +} + +func testDropIndexesFromPartitionedTable(t *testing.T, store kv.Storage) { + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("drop table if exists test_drop_indexes_from_partitioned_table;") + tk.MustExec(` + create table test_drop_indexes_from_partitioned_table (id int, c1 int, c2 int, primary key(id), key i1(c1), key i2(c2)) + partition by range(id) (partition p0 values less than (6), partition p1 values less than maxvalue); + `) + for i := 0; i < 20; i++ { + tk.MustExec("insert into test_drop_indexes_from_partitioned_table values (?, ?, ?)", i, i, i) + } + tk.MustExec("alter table test_drop_indexes_from_partitioned_table drop index i1, drop index if exists i2;") + tk.MustExec("alter table test_drop_indexes_from_partitioned_table add index i1(c1)") + tk.MustExec("alter table test_drop_indexes_from_partitioned_table drop index i1, drop index if exists i1;") + tk.MustExec("alter table test_drop_indexes_from_partitioned_table drop column c1, drop column c2;") + tk.MustExec("alter table test_drop_indexes_from_partitioned_table add column c1 int") + tk.MustExec("alter table test_drop_indexes_from_partitioned_table drop column c1, drop column if exists c1;") +} + +func testCancelDropIndexes(t *testing.T, store kv.Storage, d ddl.DDL) { + indexesName := []string{"idx_c1", "idx_c2"} + addIdxesSQL := "alter table t add index idx_c1 (c1);alter table t add index idx_c2 (c2);" + dropIdxesSQL := "alter table t drop index idx_c1;alter table t drop index idx_c2;" + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(c1 int, c2 int)") + defer tk.MustExec("drop table t;") + for i := 0; i < 5; i++ { + tk.MustExec("insert into t values (?, ?)", i, i) + } + testCases := []struct { + needAddIndex bool + jobState model.JobState + JobSchemaState model.SchemaState + cancelSucc bool + }{ + // model.JobStateNone means the jobs is canceled before the first run. + // if we cancel successfully, we need to set needAddIndex to false in the next test case. Otherwise, set needAddIndex to true. + {true, model.JobStateNone, model.StateNone, true}, + {false, model.JobStateRunning, model.StateWriteOnly, false}, + {true, model.JobStateRunning, model.StateDeleteOnly, false}, + {true, model.JobStateRunning, model.StateDeleteReorganization, false}, + } + var checkErr error + hook := &ddl.TestDDLCallback{} + var jobID int64 + testCase := &testCases[0] + hook.OnJobRunBeforeExported = func(job *model.Job) { + if (job.Type == model.ActionDropIndex || job.Type == model.ActionDropPrimaryKey) && + job.State == testCase.jobState && job.SchemaState == testCase.JobSchemaState { + jobID = job.ID + jobIDs := []int64{job.ID} + hookCtx := mock.NewContext() + hookCtx.Store = store + err := hookCtx.NewTxn(context.TODO()) + if err != nil { + checkErr = errors.Trace(err) + return + } + txn, err := hookCtx.Txn(true) + if err != nil { + checkErr = errors.Trace(err) + return + } + + errs, err := admin.CancelJobs(txn, jobIDs) + if err != nil { + checkErr = errors.Trace(err) + return + } + if errs[0] != nil { + checkErr = errors.Trace(errs[0]) + return + } + checkErr = txn.Commit(context.Background()) + } + } + originalHook := d.GetHook() + d.SetHook(hook) + for i := range testCases { + testCase = &testCases[i] + if testCase.needAddIndex { + tk.MustExec(addIdxesSQL) + } + err := tk.ExecToErr(dropIdxesSQL) + tbl := external.GetTableByName(t, tk, "test", "t") + + var indexInfos []*model.IndexInfo + for _, idxName := range indexesName { + indexInfo := tbl.Meta().FindIndexByName(idxName) + if indexInfo != nil { + indexInfos = append(indexInfos, indexInfo) + } + } + + if testCase.cancelSucc { + require.NoError(t, checkErr) + require.EqualError(t, err, "[ddl:8214]Cancelled DDL job") + require.NotNil(t, indexInfos) + require.Equal(t, model.StatePublic, indexInfos[0].State) + } else { + require.NoError(t, err) + require.EqualError(t, checkErr, admin.ErrCannotCancelDDLJob.GenWithStackByArgs(jobID).Error()) + require.Nil(t, indexInfos) + } + } + d.SetHook(originalHook) + tk.MustExec(addIdxesSQL) + tk.MustExec(dropIdxesSQL) +} + +func TestDropPrimaryKey(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, indexModifyLease) + defer clean() + idxName := "primary" + createSQL := "create table test_drop_index (c1 int, c2 int, c3 int, unique key(c1), primary key(c3) nonclustered)" + dropIdxSQL := "alter table test_drop_index drop primary key;" + testDropIndex(t, store, createSQL, dropIdxSQL, idxName) +} + +func TestDropIndex(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, indexModifyLease) + defer clean() + idxName := "c3_index" + createSQL := "create table test_drop_index (c1 int, c2 int, c3 int, unique key(c1), key c3_index(c3))" + dropIdxSQL := "alter table test_drop_index drop index c3_index;" + testDropIndex(t, store, createSQL, dropIdxSQL, idxName) +} + +func testDropIndex(t *testing.T, store kv.Storage, createSQL, dropIdxSQL, idxName string) { + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists test_drop_index") + tk.MustExec(createSQL) + done := make(chan error, 1) + tk.MustExec("delete from test_drop_index") + + num := 100 + // add some rows + for i := 0; i < num; i++ { + tk.MustExec("insert into test_drop_index values (?, ?, ?)", i, i, i) + } + testddlutil.SessionExecInGoroutine(store, "test", dropIdxSQL, done) + + ticker := time.NewTicker(indexModifyLease / 2) + defer ticker.Stop() +LOOP: + for { + select { + case err := <-done: + if err == nil { + break LOOP + } + require.NoError(t, err) + case <-ticker.C: + step := 5 + // delete some rows, and add some data + for i := num; i < num+step; i++ { + n := rand.Intn(num) + tk.MustExec("update test_drop_index set c2 = 1 where c1 = ?", n) + tk.MustExec("insert into test_drop_index values (?, ?, ?)", i, i, i) + } + num += step + } + } + + rows := tk.MustQuery("explain select c1 from test_drop_index where c3 >= 0") + require.NotContains(t, fmt.Sprintf("%v", rows), idxName) + + tk.MustExec("drop table test_drop_index") +} + +func TestAddMultiColumnsIndexClusterIndex(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, indexModifyLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists test_add_multi_col_index_clustered;") + tk.MustExec("create database test_add_multi_col_index_clustered;") + tk.MustExec("use test_add_multi_col_index_clustered;") + + tk.Session().GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeOn + tk.MustExec("create table t (a int, b varchar(10), c int, primary key (a, b));") + tk.MustExec("insert into t values (1, '1', 1), (2, '2', NULL), (3, '3', 3);") + tk.MustExec("create index idx on t (a, c);") + + tk.MustExec("admin check index t idx;") + tk.MustExec("admin check table t;") + + tk.MustExec("insert into t values (5, '5', 5), (6, '6', NULL);") + + tk.MustExec("admin check index t idx;") + tk.MustExec("admin check table t;") +} + +func TestAddIndexWithDupCols(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, indexModifyLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + err1 := infoschema.ErrColumnExists.GenWithStackByArgs("b") + err2 := infoschema.ErrColumnExists.GenWithStackByArgs("B") + + tk.MustExec("create table test_add_index_with_dup (a int, b int)") + err := tk.ExecToErr("create index c on test_add_index_with_dup(b, a, b)") + require.ErrorIs(t, err, errors.Cause(err1)) + err = tk.ExecToErr("create index c on test_add_index_with_dup(b, a, B)") + require.ErrorIs(t, err, errors.Cause(err2)) + err = tk.ExecToErr("alter table test_add_index_with_dup add index c (b, a, b)") + require.ErrorIs(t, err, errors.Cause(err1)) + err = tk.ExecToErr("alter table test_add_index_with_dup add index c (b, a, B)") + require.ErrorIs(t, err, errors.Cause(err2)) + + tk.MustExec("drop table test_add_index_with_dup") +} + +func TestAnonymousIndex(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, indexModifyLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("DROP TABLE IF EXISTS t") + tk.MustExec("create table t(bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb int, b int)") + tk.MustExec("alter table t add index bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb(b)") + tk.MustExec("alter table t add index (bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb)") + rows := tk.MustQuery("show index from t where key_name='bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb'").Rows() + require.Len(t, rows, 1) + rows = tk.MustQuery("show index from t where key_name='bbbbbbbbbbbbbbbbbbbbbbbbbbbbbb_2'").Rows() + require.Len(t, rows, 1) +} + +func TestAddIndexWithDupIndex(t *testing.T) { + store, clean := testkit.CreateMockStoreWithSchemaLease(t, indexModifyLease) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + err1 := dbterror.ErrDupKeyName.GenWithStack("index already exist %s", "idx") + err2 := dbterror.ErrDupKeyName.GenWithStack("index already exist %s; "+ + "a background job is trying to add the same index, "+ + "please check by `ADMIN SHOW DDL JOBS`", "idx") + + // When there is already an duplicate index, show error message. + tk.MustExec("create table test_add_index_with_dup (a int, key idx (a))") + err := tk.ExecToErr("alter table test_add_index_with_dup add index idx (a)") + require.ErrorIs(t, err, errors.Cause(err1)) + + // When there is another session adding duplicate index with state other than + // StatePublic, show explicit error message. + tbl := external.GetTableByName(t, tk, "test", "test_add_index_with_dup") + indexInfo := tbl.Meta().FindIndexByName("idx") + indexInfo.State = model.StateNone + err = tk.ExecToErr("alter table test_add_index_with_dup add index idx (a)") + require.ErrorIs(t, err, errors.Cause(err2)) +} diff --git a/ddl/main_test.go b/ddl/main_test.go new file mode 100644 index 0000000000000..c944d92799902 --- /dev/null +++ b/ddl/main_test.go @@ -0,0 +1,89 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl_test + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/util/testbridge" + "github.com/tikv/client-go/v2/tikv" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + testbridge.SetupForCommonTest() + tikv.EnableFailpoints() + + domain.SchemaOutOfDateRetryInterval.Store(50 * time.Millisecond) + domain.SchemaOutOfDateRetryTimes.Store(50) + + autoid.SetStep(5000) + ddl.ReorgWaitTimeout = 30 * time.Millisecond + ddl.SetBatchInsertDeleteRangeSize(2) + + config.UpdateGlobal(func(conf *config.Config) { + // Test for table lock. + conf.EnableTableLock = true + conf.Log.SlowThreshold = 10000 + conf.TiKVClient.AsyncCommit.SafeWindow = 0 + conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0 + conf.Experimental.AllowsExpressionIndex = true + }) + + _, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, true) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "ddl: infosync.GlobalInfoSyncerInit: %v\n", err) + os.Exit(1) + } + + opts := []goleak.Option{ + goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), + goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + } + + goleak.VerifyTestMain(m, opts...) +} + +func wrapJobIDExtCallback(oldCallback ddl.Callback) *testDDLJobIDCallback { + return &testDDLJobIDCallback{ + Callback: oldCallback, + jobID: 0, + } +} + +type testDDLJobIDCallback struct { + ddl.Callback + jobID int64 +} + +func (t *testDDLJobIDCallback) OnJobUpdated(job *model.Job) { + if t.jobID == 0 { + t.jobID = job.ID + } + if t.Callback != nil { + t.Callback.OnJobUpdated(job) + } +} diff --git a/ddl/sanity_check.go b/ddl/sanity_check.go new file mode 100644 index 0000000000000..528370eeee157 --- /dev/null +++ b/ddl/sanity_check.go @@ -0,0 +1,223 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "context" + "flag" + "fmt" + "strings" + + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/util/sqlexec" +) + +func checkRangeCntByTableIDs(physicalTableIDs []int64, cnt int64) { + if len(physicalTableIDs) > 0 { + if len(physicalTableIDs) != int(cnt) { + panic("should not happened" + fmt.Sprintf("expect count: %d, real count: %d", len(physicalTableIDs), cnt)) + } + } else if cnt != 1 { + panic("should not happened" + fmt.Sprintf("expect count: %d, real count: %d", 1, cnt)) + } +} + +func checkRangeCntByTableIDsAndIndexIDs(partitionTableIDs []int64, indexIDs []int64, cnt int64) { + if len(indexIDs) == 0 { + return + } + expectedCnt := len(indexIDs) + if len(partitionTableIDs) > 0 { + expectedCnt *= len(partitionTableIDs) + } + if expectedCnt != int(cnt) { + panic("should not happened" + fmt.Sprintf("expect count: %d, real count: %d", expectedCnt, cnt)) + } +} + +func (d *ddl) checkDeleteRangeCnt(job *model.Job) { + sctx, _ := d.sessPool.get() + s, _ := sctx.(sqlexec.SQLExecutor) + defer func() { + d.sessPool.put(sctx) + }() + + query := `select sum(cnt) from + (select count(1) cnt from mysql.gc_delete_range where job_id = %? union all + select count(1) cnt from mysql.gc_delete_range_done where job_id = %?) as gdr;` + rs, err := s.ExecuteInternal(context.TODO(), query, job.ID, job.ID) + if err != nil { + if strings.Contains(err.Error(), "Not Supported") { + return + } + panic(err) + } + defer func() { + _ = rs.Close() + }() + req := rs.NewChunk(nil) + err = rs.Next(context.TODO(), req) + if err != nil { + panic("should not happened, err:" + err.Error()) + } + cnt, _ := req.GetRow(0).GetMyDecimal(0).ToInt() + + switch job.Type { + case model.ActionDropSchema: + var tableIDs []int64 + if err := job.DecodeArgs(&tableIDs); err != nil { + panic("should not happened") + } + if len(tableIDs) != int(cnt) { + panic("should not happened" + fmt.Sprintf("expect count: %d, real count: %d", len(tableIDs), cnt)) + } + case model.ActionDropTable, model.ActionTruncateTable: + var startKey kv.Key + var physicalTableIDs []int64 + var ruleIDs []string + if err := job.DecodeArgs(&startKey, &physicalTableIDs, &ruleIDs); err != nil { + panic("should not happened") + } + checkRangeCntByTableIDs(physicalTableIDs, cnt) + case model.ActionDropTablePartition, model.ActionTruncateTablePartition: + var physicalTableIDs []int64 + if err := job.DecodeArgs(&physicalTableIDs); err != nil { + panic("should not happened") + } + if len(physicalTableIDs) != int(cnt) { + panic("should not happened" + fmt.Sprintf("expect count: %d, real count: %d", len(physicalTableIDs), cnt)) + } + case model.ActionAddIndex, model.ActionAddPrimaryKey: + var indexID int64 + var partitionIDs []int64 + if err := job.DecodeArgs(&indexID, &partitionIDs); err != nil { + panic("should not happened") + } + checkRangeCntByTableIDs(partitionIDs, cnt) + case model.ActionDropIndex, model.ActionDropPrimaryKey: + var indexName interface{} + var indexID int64 + var partitionIDs []int64 + if err := job.DecodeArgs(&indexName, &indexID, &partitionIDs); err != nil { + panic("should not happened") + } + checkRangeCntByTableIDsAndIndexIDs(partitionIDs, []int64{indexID}, cnt) + case model.ActionDropIndexes: + var indexIDs []int64 + var partitionIDs []int64 + if err := job.DecodeArgs(&[]model.CIStr{}, &[]bool{}, &indexIDs, &partitionIDs); err != nil { + panic("should not happened") + } + checkRangeCntByTableIDsAndIndexIDs(partitionIDs, indexIDs, cnt) + case model.ActionDropColumn: + var colName model.CIStr + var indexIDs []int64 + var partitionIDs []int64 + if err := job.DecodeArgs(&colName, &indexIDs, &partitionIDs); err != nil { + panic("should not happened") + } + checkRangeCntByTableIDsAndIndexIDs(partitionIDs, indexIDs, cnt) + case model.ActionDropColumns: + var colNames []model.CIStr + var ifExists []bool + var indexIDs []int64 + var partitionIDs []int64 + if err := job.DecodeArgs(&colNames, &ifExists, &indexIDs, &partitionIDs); err != nil { + panic("should not happened") + } + checkRangeCntByTableIDsAndIndexIDs(partitionIDs, indexIDs, cnt) + case model.ActionModifyColumn: + var indexIDs []int64 + var partitionIDs []int64 + if err := job.DecodeArgs(&indexIDs, &partitionIDs); err != nil { + panic("should not happened") + } + checkRangeCntByTableIDsAndIndexIDs(partitionIDs, indexIDs, cnt) + } +} + +// checkHistoryJobInTest does some sanity check to make sure something is correct after DDL complete. +// It's only check during the test environment, so it would panic directly. +// These checks may be controlled by configuration in the future. +func (d *ddl) checkHistoryJobInTest(ctx sessionctx.Context, historyJob *model.Job) { + if !(flag.Lookup("test.v") != nil || flag.Lookup("check.v") != nil) { + return + } + + // Check delete range. + if jobNeedGC(historyJob) { + d.checkDeleteRangeCnt(historyJob) + } + + // Check binlog. + if historyJob.BinlogInfo.FinishedTS == 0 { + panic(fmt.Sprintf("job ID %d, BinlogInfo.FinishedTS is 0", historyJob.ID)) + } + + // Check DDL query. + switch historyJob.Type { + case model.ActionUpdateTiFlashReplicaStatus, model.ActionUnlockTable: + if historyJob.Query != "" { + panic(fmt.Sprintf("job ID %d, type %s, query %s", historyJob.ID, historyJob.Type.String(), historyJob.Query)) + } + return + default: + if historyJob.Query == "skip" { + // Skip the check if the test explicitly set the query. + return + } + } + p := parser.New() + p.SetSQLMode(ctx.GetSessionVars().SQLMode) + p.SetParserConfig(ctx.GetSessionVars().BuildParserConfig()) + stmt, _, err := p.ParseSQL(historyJob.Query) + if err != nil { + panic(fmt.Sprintf("job ID %d, parse ddl job failed, query %s, err %s", historyJob.ID, historyJob.Query, err.Error())) + } + if len(stmt) != 1 && historyJob.Type != model.ActionCreateTables { + panic(fmt.Sprintf("job ID %d, parse ddl job failed, query %s", historyJob.ID, historyJob.Query)) + } + for _, st := range stmt { + switch historyJob.Type { + case model.ActionCreatePlacementPolicy: + if _, ok := st.(*ast.CreatePlacementPolicyStmt); !ok { + panic(fmt.Sprintf("job ID %d, parse ddl job failed, query %s", historyJob.ID, historyJob.Query)) + } + case model.ActionCreateTable: + if _, ok := st.(*ast.CreateTableStmt); !ok { + panic(fmt.Sprintf("job ID %d, parse ddl job failed, query %s", historyJob.ID, historyJob.Query)) + } + case model.ActionCreateSchema: + if _, ok := st.(*ast.CreateDatabaseStmt); !ok { + panic(fmt.Sprintf("job ID %d, parse ddl job failed, query %s", historyJob.ID, historyJob.Query)) + } + case model.ActionCreateTables: + _, isCreateTable := st.(*ast.CreateTableStmt) + _, isCreateSeq := st.(*ast.CreateSequenceStmt) + _, isCreateView := st.(*ast.CreateViewStmt) + if !isCreateTable && !isCreateSeq && !isCreateView { + panic(fmt.Sprintf("job ID %d, parse ddl job failed, query %s", historyJob.ID, historyJob.Query)) + } + default: + if _, ok := st.(ast.DDLNode); !ok { + panic(fmt.Sprintf("job ID %d, parse ddl job failed, query %s", historyJob.ID, historyJob.Query)) + } + } + } +}