From 49659fbed1fbf7ad34bfb915c0aba1d0260301a0 Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Wed, 7 Dec 2022 16:00:55 +0800 Subject: [PATCH 1/3] done Signed-off-by: wjhuang2016 --- ddl/column_modify_test.go | 75 -- ddl/db_test.go | 247 ------ ddl/index_merge_tmp_test.go | 10 - ddl/table_modify_test.go | 13 - domain/schema_validator.go | 26 +- domain/schema_validator_test.go | 10 +- expression/integration_test.go | 19 - session/BUILD.bazel | 12 - session/schema_amender.go | 713 ------------------ session/schema_amender_test.go | 477 ------------ session/schema_test.go | 11 - session/session.go | 7 - sessionctx/variable/session.go | 4 - sessionctx/variable/sysvar.go | 8 - sessionctx/variable/tidb_vars.go | 4 - sessionctx/variable/varsutil_test.go | 3 - store/driver/txn/txn_driver.go | 2 - .../pessimistictest/pessimistic_test.go | 635 ---------------- 18 files changed, 12 insertions(+), 2264 deletions(-) delete mode 100644 session/schema_amender.go delete mode 100644 session/schema_amender_test.go diff --git a/ddl/column_modify_test.go b/ddl/column_modify_test.go index 2055f9df9fa9c..1b22ddd74f14e 100644 --- a/ddl/column_modify_test.go +++ b/ddl/column_modify_test.go @@ -1030,78 +1030,3 @@ func TestColumnTypeChangeGenUniqueChangingName(t *testing.T) { tk.MustExec("drop table if exists t") } - -func TestWriteReorgForColumnTypeChangeOnAmendTxn(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, columnModifyLease) - - tk := testkit.NewTestKit(t, store) - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0") - tk.MustExec("set global tidb_enable_amend_pessimistic_txn = ON") - defer tk.MustExec("set global tidb_enable_amend_pessimistic_txn = OFF") - - d := dom.DDL() - testInsertOnModifyColumn := func(sql string, startColState, commitColState model.SchemaState, retStrs []string, retErr error) { - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("drop table if exists t1") - tk.MustExec("create table t1 (c1 int, c2 int, c3 int, unique key(c1))") - tk.MustExec("insert into t1 values (20, 20, 20);") - - var checkErr error - tk1 := testkit.NewTestKit(t, store) - defer func() { - if tk1.Session() != nil { - tk1.Session().Close() - } - }() - hook := &ddl.TestDDLCallback{Do: dom} - times := 0 - hook.OnJobRunBeforeExported = func(job *model.Job) { - if job.Type != model.ActionModifyColumn || checkErr != nil || job.SchemaState != startColState { - return - } - - tk1.MustExec("use test") - tk1.MustExec("begin pessimistic;") - tk1.MustExec("insert into t1 values(101, 102, 103)") - } - onJobUpdatedExportedFunc := func(job *model.Job) { - if job.Type != model.ActionModifyColumn || checkErr != nil || job.SchemaState != commitColState { - return - } - if times == 0 { - _, checkErr = tk1.Exec("commit;") - } - times++ - } - hook.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) - d.SetHook(hook) - - tk.MustExec(sql) - if retErr == nil { - require.NoError(t, checkErr) - } else { - require.Error(t, checkErr) - require.Contains(t, checkErr.Error(), retErr.Error()) - } - tk.MustQuery("select * from t1").Check(testkit.Rows(retStrs...)) - tk.MustExec("admin check table t1") - } - - // Testing it needs reorg data. - ddlStatement := "alter table t1 change column c2 cc smallint;" - testInsertOnModifyColumn(ddlStatement, model.StateNone, model.StateWriteReorganization, []string{"20 20 20"}, domain.ErrInfoSchemaChanged) - testInsertOnModifyColumn(ddlStatement, model.StateDeleteOnly, model.StateWriteReorganization, []string{"20 20 20"}, domain.ErrInfoSchemaChanged) - testInsertOnModifyColumn(ddlStatement, model.StateWriteOnly, model.StateWriteReorganization, []string{"20 20 20"}, domain.ErrInfoSchemaChanged) - testInsertOnModifyColumn(ddlStatement, model.StateNone, model.StatePublic, []string{"20 20 20"}, domain.ErrInfoSchemaChanged) - testInsertOnModifyColumn(ddlStatement, model.StateDeleteOnly, model.StatePublic, []string{"20 20 20"}, domain.ErrInfoSchemaChanged) - testInsertOnModifyColumn(ddlStatement, model.StateWriteOnly, model.StatePublic, []string{"20 20 20"}, domain.ErrInfoSchemaChanged) - - // Testing it needs not reorg data. This case only have two states: none, public. - ddlStatement = "alter table t1 change column c2 cc bigint;" - testInsertOnModifyColumn(ddlStatement, model.StateNone, model.StateWriteReorganization, []string{"20 20 20"}, nil) - testInsertOnModifyColumn(ddlStatement, model.StateWriteOnly, model.StateWriteReorganization, []string{"20 20 20"}, nil) - testInsertOnModifyColumn(ddlStatement, model.StateNone, model.StatePublic, []string{"20 20 20", "101 102 103"}, nil) - testInsertOnModifyColumn(ddlStatement, model.StateWriteOnly, model.StatePublic, []string{"20 20 20"}, nil) -} diff --git a/ddl/db_test.go b/ddl/db_test.go index a061af75baf26..492769c727955 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -40,7 +40,6 @@ import ( "github.com/pingcap/tidb/parser/terror" parsertypes "github.com/pingcap/tidb/parser/types" "github.com/pingcap/tidb/planner/core" - "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/store/mockstore" @@ -54,7 +53,6 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" - "golang.org/x/exp/slices" ) const ( @@ -980,202 +978,6 @@ func TestDDLJobErrorCount(t *testing.T) { require.True(t, kv.ErrEntryTooLarge.Equal(historyJob.Error)) } -func TestCommitTxnWithIndexChange(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease) - // Prepare work. - tk := testkit.NewTestKit(t, store) - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - tk.MustExec("use test") - tk.MustExec("create table t1 (c1 int primary key, c2 int, c3 int, index ok2(c2))") - tk.MustExec("insert t1 values (1, 10, 100), (2, 20, 200)") - tk.MustExec("alter table t1 add index k2(c2)") - tk.MustExec("alter table t1 drop index k2") - tk.MustExec("alter table t1 add index k2(c2)") - tk.MustExec("alter table t1 drop index k2") - tk2 := testkit.NewTestKit(t, store) - tk2.MustExec("use test") - - // tkSQLs are the sql statements for the pessimistic transaction. - // tk2DDL are the ddl statements executed before the pessimistic transaction. - // idxDDL is the DDL statement executed between pessimistic transaction begin and commit. - // failCommit means the pessimistic transaction commit should fail not. - type caseUnit struct { - tkSQLs []string - tk2DDL []string - idxDDL string - checkSQLs []string - rowsExps [][]string - failCommit bool - stateEnd model.SchemaState - } - - cases := []caseUnit{ - // Test secondary index - {[]string{"insert into t1 values(3, 30, 300)", - "insert into t2 values(11, 11, 11)"}, - []string{"alter table t1 add index k2(c2)", - "alter table t1 drop index k2", - "alter table t1 add index kk2(c2, c1)", - "alter table t1 add index k2(c2)", - "alter table t1 drop index k2"}, - "alter table t1 add index k2(c2)", - []string{"select c3, c2 from t1 use index(k2) where c2 = 20", - "select c3, c2 from t1 use index(k2) where c2 = 10", - "select * from t1", - "select * from t2 where c1 = 11"}, - [][]string{{"200 20"}, - {"100 10"}, - {"1 10 100", "2 20 200", "3 30 300"}, - {"11 11 11"}}, - false, - model.StateNone}, - // Test secondary index - {[]string{"insert into t2 values(5, 50, 500)", - "insert into t2 values(11, 11, 11)", - "delete from t2 where c2 = 11", - "update t2 set c2 = 110 where c1 = 11"}, - // "update t2 set c1 = 10 where c3 = 100"}, - []string{"alter table t1 add index k2(c2)", - "alter table t1 drop index k2", - "alter table t1 add index kk2(c2, c1)", - "alter table t1 add index k2(c2)", - "alter table t1 drop index k2"}, - "alter table t1 add index k2(c2)", - []string{"select c3, c2 from t1 use index(k2) where c2 = 20", - "select c3, c2 from t1 use index(k2) where c2 = 10", - "select * from t1", - "select * from t2 where c1 = 11", - "select * from t2 where c3 = 100"}, - [][]string{{"200 20"}, - {"100 10"}, - {"1 10 100", "2 20 200"}, - {}, - {"1 10 100"}}, - false, - model.StateNone}, - // Test unique index - {[]string{"insert into t1 values(3, 30, 300)", - "insert into t1 values(4, 40, 400)", - "insert into t2 values(11, 11, 11)", - "insert into t2 values(12, 12, 11)"}, - []string{"alter table t1 add unique index uk3(c3)", - "alter table t1 drop index uk3", - "alter table t2 add unique index ukc1c3(c1, c3)", - "alter table t2 add unique index ukc3(c3)", - "alter table t2 drop index ukc1c3", - "alter table t2 drop index ukc3", - "alter table t2 add index kc3(c3)"}, - "alter table t1 add unique index uk3(c3)", - []string{"select c3, c2 from t1 use index(uk3) where c3 = 200", - "select c3, c2 from t1 use index(uk3) where c3 = 300", - "select c3, c2 from t1 use index(uk3) where c3 = 400", - "select * from t1", - "select * from t2"}, - [][]string{{"200 20"}, - {"300 30"}, - {"400 40"}, - {"1 10 100", "2 20 200", "3 30 300", "4 40 400"}, - {"1 10 100", "2 20 200", "11 11 11", "12 12 11"}}, - false, model.StateNone}, - // Test unique index fail to commit, this case needs the new index could be inserted - {[]string{"insert into t1 values(3, 30, 300)", - "insert into t1 values(4, 40, 300)", - "insert into t2 values(11, 11, 11)", - "insert into t2 values(12, 11, 12)"}, - //[]string{"alter table t1 add unique index uk3(c3)", "alter table t1 drop index uk3"}, - []string{}, - "alter table t1 add unique index uk3(c3)", - []string{"select c3, c2 from t1 use index(uk3) where c3 = 200", - "select c3, c2 from t1 use index(uk3) where c3 = 300", - "select c3, c2 from t1 where c1 = 4", - "select * from t1", - "select * from t2"}, - [][]string{{"200 20"}, - {}, - {}, - {"1 10 100", "2 20 200"}, - {"1 10 100", "2 20 200"}}, - true, - model.StateWriteOnly}, - } - tk.MustQuery("select * from t1;").Check(testkit.Rows("1 10 100", "2 20 200")) - - // Test add index state change - do := dom.DDL() - startStates := []model.SchemaState{model.StateNone, model.StateDeleteOnly} - for _, startState := range startStates { - endStatMap := session.ConstOpAddIndex[startState] - var endStates []model.SchemaState - for st := range endStatMap { - endStates = append(endStates, st) - } - slices.Sort(endStates) - for _, endState := range endStates { - for _, curCase := range cases { - if endState < curCase.stateEnd { - break - } - tk2.MustExec("drop table if exists t1") - tk2.MustExec("drop table if exists t2") - tk2.MustExec("create table t1 (c1 int primary key, c2 int, c3 int, index ok2(c2))") - tk2.MustExec("create table t2 (c1 int primary key, c2 int, c3 int, index ok2(c2))") - tk2.MustExec("insert t1 values (1, 10, 100), (2, 20, 200)") - tk2.MustExec("insert t2 values (1, 10, 100), (2, 20, 200)") - tk2.MustQuery("select * from t1;").Check(testkit.Rows("1 10 100", "2 20 200")) - tk.MustQuery("select * from t1;").Check(testkit.Rows("1 10 100", "2 20 200")) - tk.MustQuery("select * from t2;").Check(testkit.Rows("1 10 100", "2 20 200")) - - for _, DDLSQL := range curCase.tk2DDL { - tk2.MustExec(DDLSQL) - } - hook := &ddl.TestDDLCallback{Do: dom} - prepared := false - committed := false - hook.OnJobRunBeforeExported = func(job *model.Job) { - if job.SchemaState == startState { - if !prepared { - tk.MustExec("begin pessimistic") - for _, tkSQL := range curCase.tkSQLs { - tk.MustExec(tkSQL) - } - prepared = true - } - } - } - onJobUpdatedExportedFunc := func(job *model.Job) { - if job.SchemaState == endState { - if !committed { - if curCase.failCommit { - err := tk.ExecToErr("commit") - require.Error(t, err) - } else { - tk.MustExec("commit") - } - } - committed = true - } - } - hook.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) - originalCallback := do.GetHook() - do.SetHook(hook) - tk2.MustExec(curCase.idxDDL) - do.SetHook(originalCallback) - tk2.MustExec("admin check table t1") - for i, checkSQL := range curCase.checkSQLs { - if len(curCase.rowsExps[i]) > 0 { - tk2.MustQuery(checkSQL).Check(testkit.Rows(curCase.rowsExps[i]...)) - } else { - tk2.MustQuery(checkSQL).Check(nil) - } - } - } - } - } - tk.MustExec("admin check table t1") -} - // TestAddIndexFailOnCaseWhenCanExit is used to close #19325. func TestAddIndexFailOnCaseWhenCanExit(t *testing.T) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/MockCaseWhenParseFailure", `return(true)`)) @@ -1379,55 +1181,6 @@ func TestTxnSavepointWithDDL(t *testing.T) { tk.MustExec("admin check table t1, t2") } -func TestAmendTxnSavepointWithDDL(t *testing.T) { - store, _ := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease) - tk := testkit.NewTestKit(t, store) - tk2 := testkit.NewTestKit(t, store) - tk.MustExec("use test;") - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk2.MustExec("use test;") - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - - prepareFn := func() { - tk.MustExec("drop table if exists t1, t2") - tk.MustExec("create table t1 (c1 int primary key, c2 int)") - tk.MustExec("create table t2 (c1 int primary key, c2 int)") - } - - prepareFn() - tk.MustExec("truncate table t1") - tk.MustExec("begin pessimistic") - tk.MustExec("savepoint s1") - tk.MustExec("insert t1 values (1, 11)") - tk.MustExec("savepoint s2") - tk.MustExec("insert t2 values (1, 11)") - tk.MustExec("rollback to s2") - tk2.MustExec("alter table t1 add index idx2(c2)") - tk2.MustExec("alter table t2 add index idx2(c2)") - tk.MustExec("commit") - tk.MustQuery("select * from t1").Check(testkit.Rows("1 11")) - tk.MustQuery("select * from t2").Check(testkit.Rows()) - tk.MustExec("admin check table t1, t2") - - prepareFn() - tk.MustExec("truncate table t1") - tk.MustExec("begin pessimistic") - tk.MustExec("savepoint s1") - tk.MustExec("insert t1 values (1, 11)") - tk.MustExec("savepoint s2") - tk.MustExec("insert t2 values (1, 11)") - tk.MustExec("savepoint s3") - tk.MustExec("insert t2 values (2, 22)") - tk.MustExec("rollback to s3") - tk2.MustExec("alter table t1 add index idx2(c2)") - tk2.MustExec("alter table t2 add index idx2(c2)") - tk.MustExec("commit") - tk.MustQuery("select * from t1").Check(testkit.Rows("1 11")) - tk.MustQuery("select * from t2").Check(testkit.Rows("1 11")) - tk.MustExec("admin check table t1, t2") -} - func TestSnapshotVersion(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease) diff --git a/ddl/index_merge_tmp_test.go b/ddl/index_merge_tmp_test.go index 389339ac15ad4..ff974385c7ad5 100644 --- a/ddl/index_merge_tmp_test.go +++ b/ddl/index_merge_tmp_test.go @@ -250,16 +250,6 @@ func findIdxInfo(dom *domain.Domain, dbName, tbName, idxName string) *model.Inde return tbl.Meta().FindIndexByName(idxName) } -func TestPessimisticAmendIncompatibleWithFastReorg(t *testing.T) { - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 1;") - - tk.MustGetErrMsg("set @@tidb_enable_amend_pessimistic_txn = 1;", - "amend pessimistic transactions is not compatible with tidb_ddl_enable_fast_reorg") -} - // TestCreateUniqueIndexKeyExist this case will test below things: // Create one unique index idx((a*b+1)); // insert (0, 6) and delete it; diff --git a/ddl/table_modify_test.go b/ddl/table_modify_test.go index c042d266ac9e2..590fea8ad973d 100644 --- a/ddl/table_modify_test.go +++ b/ddl/table_modify_test.go @@ -117,7 +117,6 @@ func TestLockTableReadOnly(t *testing.T) { tk1 := testkit.NewTestKit(t, store) tk2 := testkit.NewTestKit(t, store) tk1.MustExec("use test") - tk1.MustExec("set global tidb_enable_metadata_lock=0") tk2.MustExec("use test") tk1.MustExec("drop table if exists t1,t2") defer func() { @@ -162,18 +161,6 @@ func TestLockTableReadOnly(t *testing.T) { require.True(t, terror.ErrorEqual(tk2.ExecToErr("lock tables t1 write local"), infoschema.ErrTableLocked)) tk1.MustExec("admin cleanup table lock t1") tk2.MustExec("insert into t1 set a=1, b=2") - - tk1.MustExec("set global tidb_ddl_enable_fast_reorg = 0") - tk1.MustExec("set tidb_enable_amend_pessimistic_txn = 1") - tk1.MustExec("begin pessimistic") - tk1.MustQuery("select * from t1 where a = 1").Check(testkit.Rows("1 2")) - tk2.MustExec("update t1 set b = 3") - tk2.MustExec("alter table t1 read only") - tk2.MustQuery("select * from t1 where a = 1").Check(testkit.Rows("1 3")) - tk1.MustQuery("select * from t1 where a = 1").Check(testkit.Rows("1 2")) - tk1.MustExec("update t1 set b = 4") - require.True(t, terror.ErrorEqual(tk1.ExecToErr("commit"), domain.ErrInfoSchemaChanged)) - tk2.MustExec("alter table t1 read write") } // TestConcurrentLockTables test concurrent lock/unlock tables. diff --git a/domain/schema_validator.go b/domain/schema_validator.go index 6f028fdca2e70..e43c3ef7fbbea 100644 --- a/domain/schema_validator.go +++ b/domain/schema_validator.go @@ -15,6 +15,7 @@ package domain import ( + "golang.org/x/exp/slices" "sync" "time" @@ -26,7 +27,6 @@ import ( "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/txnkv/transaction" "go.uber.org/zap" - "golang.org/x/exp/slices" ) type checkResult int @@ -165,19 +165,18 @@ func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, cha // isRelatedTablesChanged returns the result whether relatedTableIDs is changed // from usedVer to the latest schema version. // NOTE, this function should be called under lock! -func (s *schemaValidator) isRelatedTablesChanged(currVer int64, tableIDs []int64) (transaction.RelatedSchemaChange, bool) { - res := transaction.RelatedSchemaChange{} +func (s *schemaValidator) isRelatedTablesChanged(currVer int64, tableIDs []int64) bool { if len(s.deltaSchemaInfos) == 0 { metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorCacheEmpty).Inc() logutil.BgLogger().Info("schema change history is empty", zap.Int64("currVer", currVer)) - return res, true + return true } newerDeltas := s.findNewerDeltas(currVer) if len(newerDeltas) == len(s.deltaSchemaInfos) { metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorCacheMiss).Inc() logutil.BgLogger().Info("the schema version is much older than the latest version", zap.Int64("currVer", currVer), zap.Int64("latestSchemaVer", s.latestSchemaVer), zap.Reflect("deltas", newerDeltas)) - return res, true + return true } changedTblMap := make(map[int64]uint64) @@ -198,22 +197,15 @@ func (s *schemaValidator) isRelatedTablesChanged(currVer int64, tableIDs []int64 } if len(changedTblMap) > 0 { tblIds := make([]int64, 0, len(changedTblMap)) - actionTypes := make([]uint64, 0, len(changedTblMap)) for id := range changedTblMap { tblIds = append(tblIds, id) } slices.Sort(tblIds) - for _, tblID := range tblIds { - actionTypes = append(actionTypes, changedTblMap[tblID]) - } - res.PhyTblIDS = tblIds - res.ActionTypes = actionTypes - res.Amendable = true logutil.BgLogger().Info("schema of tables in the transaction are changed", zap.Int64s("conflicted table IDs", tblIds), zap.Int64("transaction schema", currVer), zap.Int64s("schema versions that changed the tables", changedSchemaVers)) - return res, true + return true } - return res, false + return false } func (s *schemaValidator) findNewerDeltas(currVer int64) []deltaSchemaInfo { @@ -251,12 +243,8 @@ func (s *schemaValidator) Check(txnTS uint64, schemaVer int64, relatedPhysicalTa // When disabling MDL -> enabling MDL, the old transaction's needCheckSchema is true, we need to check it. // When enabling MDL -> disabling MDL, the old transaction's needCheckSchema is false, so still need to check it, and variable EnableMDL is false now. if needCheckSchema || !variable.EnableMDL.Load() { - relatedChanges, changed := s.isRelatedTablesChanged(schemaVer, relatedPhysicalTableIDs) + changed := s.isRelatedTablesChanged(schemaVer, relatedPhysicalTableIDs) if changed { - if relatedChanges.Amendable { - relatedChanges.LatestInfoSchema = s.latestInfoSchema - return &relatedChanges, ResultFail - } return nil, ResultFail } } diff --git a/domain/schema_validator_test.go b/domain/schema_validator_test.go index a18fbcb4a435a..fc62a89b2df05 100644 --- a/domain/schema_validator_test.go +++ b/domain/schema_validator_test.go @@ -61,7 +61,7 @@ func subTestSchemaValidatorGeneral(t *testing.T) { // Stop the validator, validator's items value is nil. validator.Stop() require.False(t, validator.IsStarted()) - _, isTablesChanged := validator.isRelatedTablesChanged(item.schemaVer, []int64{10}) + isTablesChanged := validator.isRelatedTablesChanged(item.schemaVer, []int64{10}) require.True(t, isTablesChanged) _, valid = validator.Check(item.leaseGrantTS, item.schemaVer, []int64{10}, true) require.Equal(t, ResultUnknown, valid) @@ -91,12 +91,12 @@ func subTestSchemaValidatorGeneral(t *testing.T) { validator.Update(ts, currVer, newItem.schemaVer, &transaction.RelatedSchemaChange{PhyTblIDS: []int64{1, 2, 3}, ActionTypes: []uint64{1, 2, 3}}) // Make sure the updated table IDs don't be covered with the same schema version. validator.Update(ts, newItem.schemaVer, newItem.schemaVer, nil) - _, isTablesChanged = validator.isRelatedTablesChanged(currVer, nil) + isTablesChanged = validator.isRelatedTablesChanged(currVer, nil) require.False(t, isTablesChanged) - _, isTablesChanged = validator.isRelatedTablesChanged(currVer, []int64{2}) + isTablesChanged = validator.isRelatedTablesChanged(currVer, []int64{2}) require.Truef(t, isTablesChanged, "currVer %d, newItem %v", currVer, newItem) // The current schema version is older than the oldest schema version. - _, isTablesChanged = validator.isRelatedTablesChanged(-1, nil) + isTablesChanged = validator.isRelatedTablesChanged(-1, nil) require.Truef(t, isTablesChanged, "currVer %d, newItem %v", currVer, newItem) // All schema versions is expired. @@ -214,7 +214,7 @@ func subTestEnqueueActionType(t *testing.T) { // Check the flag set by schema diff, note tableID = 3 has been set flag 0x3 in schema version 9, and flag 0x4 // in schema version 10, so the resActions for tableID = 3 should be 0x3 & 0x4 = 0x7. - relatedChanges, isTablesChanged := validator.isRelatedTablesChanged(5, []int64{1, 2, 3, 4}) + isTablesChanged := validator.isRelatedTablesChanged(5, []int64{1, 2, 3, 4}) require.True(t, isTablesChanged) require.Equal(t, []int64{1, 2, 3, 4}, relatedChanges.PhyTblIDS) require.Equal(t, []uint64{15, 2, 7, 4}, relatedChanges.ActionTypes) diff --git a/expression/integration_test.go b/expression/integration_test.go index 55c8f389a5df3..6b843fa36de08 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -4967,25 +4967,6 @@ func TestIssue18525(t *testing.T) { tk.MustQuery("select INTERVAL( ( CONVERT( -11752 USING utf8 ) ), 6558853612195285496, `col1`) from t1").Check(testkit.Rows("0", "0", "0")) } -func TestSchemaDMLNotChange(t *testing.T) { - store := testkit.CreateMockStore(t) - - tk := testkit.NewTestKit(t, store) - tk2 := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - tk2.MustExec("use test") - tk.MustExec("drop table if exists t") - tk.MustExec("create table t (id int primary key, c_json json);") - tk.MustExec("insert into t values (1, '{\"k\": 1}');") - tk.MustExec("begin") - tk.MustExec("update t set c_json = '{\"k\": 2}' where id = 1;") - tk2.MustExec("alter table t rename column c_json to cc_json;") - tk.MustExec("commit") -} - func TestIssue18850(t *testing.T) { store := testkit.CreateMockStore(t) diff --git a/session/BUILD.bazel b/session/BUILD.bazel index d0d68a9142035..b74e190c837db 100644 --- a/session/BUILD.bazel +++ b/session/BUILD.bazel @@ -6,7 +6,6 @@ go_library( "advisory_locks.go", "bootstrap.go", "nontransactional.go", - "schema_amender.go", "session.go", "tidb.go", "txn.go", @@ -78,7 +77,6 @@ go_library( "//util/mathutil", "//util/memory", "//util/parser", - "//util/rowcodec", "//util/sem", "//util/sli", "//util/sqlexec", @@ -98,7 +96,6 @@ go_library( "@com_github_tikv_client_go_v2//kv", "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//tikv", - "@com_github_tikv_client_go_v2//txnkv/transaction", "@com_github_tikv_client_go_v2//util", "@io_etcd_go_etcd_client_v3//concurrency", "@org_uber_go_zap//:zap", @@ -116,7 +113,6 @@ go_test( "index_usage_sync_lease_test.go", "main_test.go", "nontransactional_test.go", - "schema_amender_test.go", "schema_test.go", "session_test.go", "tidb_test.go", @@ -136,20 +132,16 @@ go_test( "//expression", "//kv", "//meta", - "//meta/autoid", "//parser/ast", "//parser/auth", "//parser/model", - "//parser/mysql", "//parser/terror", "//planner/core", "//server", "//sessionctx", "//sessionctx/variable", - "//sessiontxn", "//statistics", "//store/mockstore", - "//table", "//tablecodec", "//telemetry", "//testkit", @@ -163,17 +155,13 @@ go_test( "//util/chunk", "//util/collate", "//util/logutil", - "//util/rowcodec", "//util/sqlexec", "@com_github_pingcap_failpoint//:failpoint", - "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_log//:log", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//testutils", "@com_github_tikv_client_go_v2//tikv", - "@com_github_tikv_client_go_v2//txnkv/transaction", "@com_github_tikv_client_go_v2//util", - "@org_golang_x_exp//slices", "@org_uber_go_atomic//:atomic", "@org_uber_go_goleak//:goleak", "@org_uber_go_zap//:zap", diff --git a/session/schema_amender.go b/session/schema_amender.go deleted file mode 100644 index caaec7994ae3a..0000000000000 --- a/session/schema_amender.go +++ /dev/null @@ -1,713 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package session - -import ( - "bytes" - "context" - "encoding/hex" - "fmt" - "reflect" - - "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/tidb/executor" - "github.com/pingcap/tidb/expression" - "github.com/pingcap/tidb/infoschema" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/table" - "github.com/pingcap/tidb/table/tables" - "github.com/pingcap/tidb/tablecodec" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/chunk" - "github.com/pingcap/tidb/util/logutil" - "github.com/pingcap/tidb/util/rowcodec" - "github.com/tikv/client-go/v2/tikv" - "github.com/tikv/client-go/v2/txnkv/transaction" - "go.uber.org/zap" -) - -const amendableType = nonMemAmendType | memBufAmendType -const nonMemAmendType = (1 << model.ActionAddColumn) | (1 << model.ActionDropColumn) | (1 << model.ActionDropIndex) -const memBufAmendType = uint64(1< 0 -} - -func needCollectIndexOps(actionType uint64) bool { - return actionType&(1< idxInfoAtStart.Meta().State { - amendOpType = ConstOpAddIndex[idxInfoAtStart.Meta().State][idxInfoAtCommit.Meta().State] - } - if amendOpType != AmendNone { - opInfo := &amendOperationAddIndexInfo{} - opInfo.AmendOpType = amendOpType - opInfo.tblInfoAtStart = tblAtStart - opInfo.tblInfoAtCommit = tblAtCommit - opInfo.indexInfoAtStart = idxInfoAtStart - opInfo.indexInfoAtCommit = idxInfoAtCommit - for _, idxCol := range idxInfoAtCommit.Meta().Columns { - colID := tblAtCommit.Meta().Columns[idxCol.Offset].ID - oldColInfo := findColByID(tblAtStart, colID) - // TODO: now index column MUST be found in old table columns, generated column is not supported. - if oldColInfo == nil || oldColInfo.IsGenerated() || oldColInfo.Hidden { - return nil, errors.Trace(errors.Errorf("amend index column=%v id=%v is not found or generated in table=%v", - idxCol.Name, colID, tblAtCommit.Meta().Name.String())) - } - opInfo.relatedOldIdxCols = append(opInfo.relatedOldIdxCols, oldColInfo) - } - opInfo.schemaAndDecoder = newSchemaAndDecoder(sctx, tblAtStart.Meta()) - fieldTypes := make([]*types.FieldType, 0, len(tblAtStart.Meta().Columns)) - for _, col := range tblAtStart.Meta().Columns { - fieldTypes = append(fieldTypes, &(col.FieldType)) - } - opInfo.chk = chunk.NewChunkWithCapacity(fieldTypes, 4) - addNewIndexOp := &amendOperationAddIndex{ - info: opInfo, - insertedNewIndexKeys: make(map[string]struct{}), - deletedOldIndexKeys: make(map[string]struct{}), - } - res = append(res, addNewIndexOp) - } - } - return res, nil -} - -// collectTblAmendOps collects amend operations for each table using the schema diff between startTS and commitTS. -func (a *amendCollector) collectTblAmendOps(sctx sessionctx.Context, phyTblID int64, - tblInfoAtStart, tblInfoAtCommit table.Table, actionType uint64) error { - if _, ok := a.tblAmendOpMap[phyTblID]; !ok { - a.tblAmendOpMap[phyTblID] = make([]amendOp, 0, 4) - } - if needCollectModifyColOps(actionType) { - _, err := a.collectModifyColAmendOps(tblInfoAtStart, tblInfoAtCommit) - if err != nil { - return err - } - } - if needCollectIndexOps(actionType) { - // TODO: currently only "add index" is considered. - ops, err := a.collectIndexAmendOps(sctx, tblInfoAtStart, tblInfoAtCommit) - if err != nil { - return err - } - a.tblAmendOpMap[phyTblID] = append(a.tblAmendOpMap[phyTblID], ops...) - } - return nil -} - -// mayGenDelIndexRowKeyOp returns if the row key op could generate Op_Del index key mutations. -func mayGenDelIndexRowKeyOp(keyOp kvrpcpb.Op) bool { - return keyOp == kvrpcpb.Op_Del || keyOp == kvrpcpb.Op_Put -} - -// mayGenPutIndexRowKeyOp returns if the row key op could generate Op_Put/Op_Insert index key mutations. -func mayGenPutIndexRowKeyOp(keyOp kvrpcpb.Op) bool { - return keyOp == kvrpcpb.Op_Put || keyOp == kvrpcpb.Op_Insert -} - -// amendOp is an amend operation for a specific schema change, new mutations will be generated using input ones. -type amendOp interface { - genMutations(ctx context.Context, sctx sessionctx.Context, commitMutations transaction.CommitterMutations, kvMap *rowKvMap, - resultMutations *transaction.PlainMutations) error -} - -// amendOperationAddIndex represents one amend operation related to a specific add index change. -type amendOperationAddIndexInfo struct { - AmendOpType int - tblInfoAtStart table.Table - tblInfoAtCommit table.Table - indexInfoAtStart table.Index - indexInfoAtCommit table.Index - relatedOldIdxCols []*table.Column - - schemaAndDecoder *schemaAndDecoder - chk *chunk.Chunk -} - -// amendOperationAddIndex represents the add operation will be performed on new key values for add index amend. -type amendOperationAddIndex struct { - info *amendOperationAddIndexInfo - - // insertedNewIndexKeys is used to check duplicates for new index generated by unique key. - insertedNewIndexKeys map[string]struct{} - // deletedOldIndexKeys is used to check duplicates for deleted old index keys. - deletedOldIndexKeys map[string]struct{} -} - -func (a *amendOperationAddIndexInfo) String() string { - var colStr string - colStr += "[" - for _, colInfo := range a.relatedOldIdxCols { - colStr += fmt.Sprintf(" %s ", colInfo.Name) - } - colStr += "]" - res := fmt.Sprintf("AmenedOpType=%d phyTblID=%d idxID=%d columns=%v", a.AmendOpType, a.indexInfoAtCommit.Meta().ID, - a.indexInfoAtCommit.Meta().ID, colStr) - return res -} - -func (a *amendOperationAddIndex) genMutations(ctx context.Context, sctx sessionctx.Context, commitMutations transaction.CommitterMutations, - kvMap *rowKvMap, resAddMutations *transaction.PlainMutations) error { - // There should be no duplicate keys in deletedOldIndexKeys and insertedNewIndexKeys. - deletedMutations := transaction.NewPlainMutations(32) - insertedMutations := transaction.NewPlainMutations(32) - for i, key := range commitMutations.GetKeys() { - if tablecodec.IsIndexKey(key) || tablecodec.DecodeTableID(key) != a.info.tblInfoAtCommit.Meta().ID { - continue - } - var newIdxMutation *transaction.PlainMutation - var oldIdxMutation *transaction.PlainMutation - var err error - keyOp := commitMutations.GetOp(i) - if addIndexNeedRemoveOp(a.info.AmendOpType) { - if mayGenDelIndexRowKeyOp(keyOp) { - oldIdxMutation, err = a.genOldIdxKey(ctx, sctx, key, kvMap.oldRowKvMap) - if err != nil { - return err - } - } - } - if addIndexNeedAddOp(a.info.AmendOpType) { - if mayGenPutIndexRowKeyOp(keyOp) { - newIdxMutation, err = a.genNewIdxKey(ctx, sctx, key, kvMap.newRowKvMap) - if err != nil { - return err - } - } - } - skipMerge := false - if a.info.AmendOpType == AmendNeedAddDeleteAndInsert { - // If the old index key is the same with new index key, then the index related row value - // is not changed in this row, we don't need to add or remove index keys for this row. - if oldIdxMutation != nil && newIdxMutation != nil { - if bytes.Equal(oldIdxMutation.Key, newIdxMutation.Key) { - skipMerge = true - } - } - } - if !skipMerge { - if oldIdxMutation != nil { - deletedMutations.AppendMutation(*oldIdxMutation) - } - if newIdxMutation != nil { - insertedMutations.AppendMutation(*newIdxMutation) - } - } - } - // For unique index, there may be conflicts on the same unique index key from different rows.Consider a update statement, - // "Op_Del" on row_key = 3, row_val = 4, the "Op_Del" unique_key_4 -> nil will be generated. - // "Op_Put" on row_key = 0, row_val = 4, the "Op_Insert" unique_key_4 -> 0 will be generated. - // The "Op_Insert" should cover the "Op_Del" otherwise the new put row value will not have a correspond index value. - if a.info.indexInfoAtCommit.Meta().Unique { - for i := 0; i < len(deletedMutations.GetKeys()); i++ { - key := deletedMutations.GetKeys()[i] - if _, ok := a.insertedNewIndexKeys[string(key)]; !ok { - resAddMutations.Push(deletedMutations.GetOps()[i], key, deletedMutations.GetValues()[i], deletedMutations.IsPessimisticLock(i), - deletedMutations.IsAssertExists(i), deletedMutations.IsAssertNotExist(i), deletedMutations.NeedConstraintCheckInPrewrite(i)) - } - } - for i := 0; i < len(insertedMutations.GetKeys()); i++ { - key := insertedMutations.GetKeys()[i] - destKeyOp := kvrpcpb.Op_Insert - if _, ok := a.deletedOldIndexKeys[string(key)]; ok { - destKeyOp = kvrpcpb.Op_Put - } - resAddMutations.Push(destKeyOp, key, insertedMutations.GetValues()[i], insertedMutations.IsPessimisticLock(i), - insertedMutations.IsAssertExists(i), insertedMutations.IsAssertNotExist(i), insertedMutations.NeedConstraintCheckInPrewrite(i)) - } - } else { - resAddMutations.MergeMutations(deletedMutations) - resAddMutations.MergeMutations(insertedMutations) - } - return nil -} - -func getCommonHandleDatum(tbl table.Table, row chunk.Row) []types.Datum { - if !tbl.Meta().IsCommonHandle { - return nil - } - datumBuf := make([]types.Datum, 0, 4) - for _, col := range tbl.Cols() { - if mysql.HasPriKeyFlag(col.GetFlag()) { - datumBuf = append(datumBuf, row.GetDatum(col.Offset, &col.FieldType)) - } - } - return datumBuf -} - -func (a *amendOperationAddIndexInfo) genIndexKeyValue(ctx context.Context, sctx sessionctx.Context, kvMap map[string][]byte, - key []byte, kvHandle kv.Handle, keyOnly bool) ([]byte, []byte, error) { - chk := a.chk - chk.Reset() - val, ok := kvMap[string(key)] - if !ok { - // The Op_Put may not exist in old value kv map. - if keyOnly { - return nil, nil, nil - } - return nil, nil, errors.Errorf("key=%v is not found in new row kv map", kv.Key(key).String()) - } - err := executor.DecodeRowValToChunk(sctx, a.schemaAndDecoder.schema, a.tblInfoAtStart.Meta(), kvHandle, val, chk, a.schemaAndDecoder.decoder) - if err != nil { - logutil.Logger(ctx).Warn("amend decode value to chunk failed", zap.Error(err)) - return nil, nil, errors.Trace(err) - } - idxVals := make([]types.Datum, 0, len(a.indexInfoAtCommit.Meta().Columns)) - for _, oldCol := range a.relatedOldIdxCols { - idxVals = append(idxVals, chk.GetRow(0).GetDatum(oldCol.Offset, &oldCol.FieldType)) - } - - rsData := tables.TryGetHandleRestoredDataWrapper(a.tblInfoAtCommit.Meta(), getCommonHandleDatum(a.tblInfoAtCommit, chk.GetRow(0)), nil, a.indexInfoAtCommit.Meta()) - - // Generate index key buf. - newIdxKey, distinct, err := tablecodec.GenIndexKey(sctx.GetSessionVars().StmtCtx, - a.tblInfoAtCommit.Meta(), a.indexInfoAtCommit.Meta(), a.tblInfoAtCommit.Meta().ID, idxVals, kvHandle, nil) - if err != nil { - logutil.Logger(ctx).Warn("amend generate index key failed", zap.Error(err)) - return nil, nil, errors.Trace(err) - } - if keyOnly { - return newIdxKey, []byte{}, nil - } - - // Generate index value buf. - needRsData := tables.NeedRestoredData(a.indexInfoAtCommit.Meta().Columns, a.tblInfoAtCommit.Meta().Columns) - newIdxVal, err := tablecodec.GenIndexValuePortal(sctx.GetSessionVars().StmtCtx, a.tblInfoAtCommit.Meta(), a.indexInfoAtCommit.Meta(), needRsData, distinct, false, idxVals, kvHandle, 0, rsData) - if err != nil { - logutil.Logger(ctx).Warn("amend generate index values failed", zap.Error(err)) - return nil, nil, errors.Trace(err) - } - return newIdxKey, newIdxVal, nil -} - -func (a *amendOperationAddIndex) genNewIdxKey(ctx context.Context, sctx sessionctx.Context, key []byte, - kvMap map[string][]byte) (*transaction.PlainMutation, error) { - kvHandle, err := tablecodec.DecodeRowKey(key) - if err != nil { - logutil.Logger(ctx).Error("decode key error", zap.String("key", hex.EncodeToString(key)), zap.Error(err)) - return nil, errors.Trace(err) - } - - newIdxKey, newIdxValue, err := a.info.genIndexKeyValue(ctx, sctx, kvMap, key, kvHandle, false) - if err != nil { - return nil, errors.Trace(err) - } - newIndexOp := kvrpcpb.Op_Put - isPessimisticLock := false - if _, ok := a.insertedNewIndexKeys[string(newIdxKey)]; ok { - return nil, errors.Trace(errors.Errorf("amend process key same key=%v found for index=%v in table=%v", - newIdxKey, a.info.indexInfoAtCommit.Meta().Name, a.info.tblInfoAtCommit.Meta().Name)) - } - if a.info.indexInfoAtCommit.Meta().Unique { - newIndexOp = kvrpcpb.Op_Insert - isPessimisticLock = true - } - a.insertedNewIndexKeys[string(newIdxKey)] = struct{}{} - var flags transaction.CommitterMutationFlags - if isPessimisticLock { - flags |= transaction.MutationFlagIsPessimisticLock - } - newMutation := &transaction.PlainMutation{KeyOp: newIndexOp, Key: newIdxKey, Value: newIdxValue, Flags: flags} - return newMutation, nil -} - -func (a *amendOperationAddIndex) genOldIdxKey(ctx context.Context, sctx sessionctx.Context, key []byte, - oldValKvMap map[string][]byte) (*transaction.PlainMutation, error) { - kvHandle, err := tablecodec.DecodeRowKey(key) - if err != nil { - logutil.Logger(ctx).Error("decode key error", zap.String("key", hex.EncodeToString(key)), zap.Error(err)) - return nil, errors.Trace(err) - } - // Generated delete index key value. - newIdxKey, emptyVal, err := a.info.genIndexKeyValue(ctx, sctx, oldValKvMap, key, kvHandle, true) - if err != nil { - return nil, errors.Trace(err) - } - // For Op_Put the key may not exist in old key value map. - if len(newIdxKey) > 0 { - isPessimisticLock := false - if _, ok := a.deletedOldIndexKeys[string(newIdxKey)]; ok { - return nil, errors.Trace(errors.Errorf("amend process key same key=%v found for index=%v in table=%v", - newIdxKey, a.info.indexInfoAtCommit.Meta().Name, a.info.tblInfoAtCommit.Meta().Name)) - } - if a.info.indexInfoAtCommit.Meta().Unique { - isPessimisticLock = true - } - a.deletedOldIndexKeys[string(newIdxKey)] = struct{}{} - var flags transaction.CommitterMutationFlags - if isPessimisticLock { - flags |= transaction.MutationFlagIsPessimisticLock - } - return &transaction.PlainMutation{KeyOp: kvrpcpb.Op_Del, Key: newIdxKey, Value: emptyVal, Flags: flags}, nil - } - return nil, nil -} - -// SchemaAmender is used to amend pessimistic transactions for schema change. -type SchemaAmender struct { - sess *session -} - -// NewSchemaAmenderForTikvTxn creates a schema amender for tikvTxn type. -func NewSchemaAmenderForTikvTxn(sess *session) *SchemaAmender { - amender := &SchemaAmender{sess: sess} - return amender -} - -func (s *SchemaAmender) getAmendableKeys(commitMutations transaction.CommitterMutations, info *amendCollector) ([]kv.Key, []kv.Key) { - addKeys := make([]kv.Key, 0, len(commitMutations.GetKeys())) - removeKeys := make([]kv.Key, 0, len(commitMutations.GetKeys())) - for i, byteKey := range commitMutations.GetKeys() { - if tablecodec.IsIndexKey(byteKey) || !info.keyHasAmendOp(byteKey) { - continue - } - keyOp := commitMutations.GetOp(i) - switch keyOp { - case kvrpcpb.Op_Put: - addKeys = append(addKeys, byteKey) - removeKeys = append(removeKeys, byteKey) - case kvrpcpb.Op_Insert: - addKeys = append(addKeys, byteKey) - case kvrpcpb.Op_Del: - removeKeys = append(removeKeys, byteKey) - } - } - return addKeys, removeKeys -} - -type rowKvMap struct { - oldRowKvMap map[string][]byte - newRowKvMap map[string][]byte -} - -func (s *SchemaAmender) prepareKvMap(ctx context.Context, commitMutations transaction.CommitterMutations, info *amendCollector) (*rowKvMap, error) { - // Get keys need to be considered for the amend operation, currently only row keys. - addKeys, removeKeys := s.getAmendableKeys(commitMutations, info) - - // BatchGet the new key values, the Op_Put and Op_Insert type keys in memory buffer. - txn, err := s.sess.Txn(true) - if err != nil { - return nil, errors.Trace(err) - } - newValKvMap, err := txn.BatchGet(ctx, addKeys) - if err != nil { - logutil.Logger(ctx).Warn("amend failed to batch get kv new keys", zap.Error(err)) - return nil, errors.Trace(err) - } - if len(newValKvMap) != len(addKeys) { - logutil.Logger(ctx).Error("amend failed to batch get results invalid", - zap.Int("addKeys len", len(addKeys)), zap.Int("newValKvMap", len(newValKvMap))) - return nil, errors.Errorf("add keys has %v values but result kvMap has %v", len(addKeys), len(newValKvMap)) - } - // BatchGet the old key values, the Op_Del and Op_Put types keys in storage using forUpdateTS, the Op_put type is for - // row update using the same row key, it may not exist. - snapshot := s.sess.GetStore().GetSnapshot(kv.Version{Ver: s.sess.sessionVars.TxnCtx.GetForUpdateTS()}) - oldValKvMap, err := snapshot.BatchGet(ctx, removeKeys) - if err != nil { - logutil.Logger(ctx).Warn("amend failed to batch get kv old keys", zap.Error(err)) - return nil, errors.Trace(err) - } - - res := &rowKvMap{ - oldRowKvMap: oldValKvMap, - newRowKvMap: newValKvMap, - } - return res, nil -} - -func (s *SchemaAmender) checkDupKeys(ctx context.Context, mutations transaction.CommitterMutations) error { - // Check if there are duplicate key entries. - checkMap := make(map[string]kvrpcpb.Op) - for i := 0; i < mutations.Len(); i++ { - key := mutations.GetKey(i) - keyOp := mutations.GetOp(i) - keyVal := mutations.GetValue(i) - if foundOp, ok := checkMap[string(key)]; ok { - logutil.Logger(ctx).Error("duplicate key found in amend result mutations", - zap.Stringer("key", kv.Key(key)), - zap.Stringer("foundKeyOp", foundOp), - zap.Stringer("thisKeyOp", keyOp), - zap.Stringer("thisKeyValue", kv.Key(keyVal))) - return errors.Trace(errors.Errorf("duplicate key=%s is found in mutations", kv.Key(key).String())) - } - checkMap[string(key)] = keyOp - } - return nil -} - -// genAllAmendMutations generates CommitterMutations for all tables and related amend operations. -func (s *SchemaAmender) genAllAmendMutations(ctx context.Context, commitMutations transaction.CommitterMutations, - info *amendCollector) (*transaction.PlainMutations, error) { - rowKvMap, err := s.prepareKvMap(ctx, commitMutations, info) - if err != nil { - return nil, err - } - // Do generate add/remove mutations processing each key. - resultNewMutations := transaction.NewPlainMutations(32) - for _, amendOps := range info.tblAmendOpMap { - for _, curOp := range amendOps { - err := curOp.genMutations(ctx, s.sess, commitMutations, rowKvMap, &resultNewMutations) - if err != nil { - return nil, err - } - } - } - err = s.checkDupKeys(ctx, &resultNewMutations) - if err != nil { - return nil, err - } - return &resultNewMutations, nil -} - -// AmendTxn does check and generate amend mutations based on input infoSchema and mutations, mutations need to prewrite -// are returned, the input commitMutations will not be changed. -func (s *SchemaAmender) AmendTxn(ctx context.Context, startInfoSchema tikv.SchemaVer, change *transaction.RelatedSchemaChange, - commitMutations transaction.CommitterMutations) (transaction.CommitterMutations, error) { - // Get info schema meta - infoSchemaAtStart := startInfoSchema.(infoschema.InfoSchema) - infoSchemaAtCheck := change.LatestInfoSchema.(infoschema.InfoSchema) - - // Collect amend operations for each table by physical table ID. - var needAmendMem bool - amendCollector := newAmendCollector() - for i, tblID := range change.PhyTblIDS { - actionType := change.ActionTypes[i] - // Check amendable flags, return if not supported flags exist. - if actionType&(^amendableType) != 0 { - logutil.Logger(ctx).Info("amend action type not supported for txn", zap.Int64("tblID", tblID), zap.Uint64("actionType", actionType)) - return nil, errors.Trace(table.ErrUnsupportedOp) - } - // Partition table is not supported now. - tblInfoAtStart, ok := infoSchemaAtStart.TableByID(tblID) - if !ok { - return nil, errors.Trace(errors.Errorf("tableID=%d is not found in infoSchema", tblID)) - } - if tblInfoAtStart.Meta().Partition != nil { - logutil.Logger(ctx).Info("Amend for partition table is not supported", - zap.String("tableName", tblInfoAtStart.Meta().Name.String()), zap.Int64("tableID", tblID)) - return nil, errors.Trace(table.ErrUnsupportedOp) - } - tblInfoAtCommit, ok := infoSchemaAtCheck.TableByID(tblID) - if !ok { - return nil, errors.Trace(errors.Errorf("tableID=%d is not found in infoSchema", tblID)) - } - if actionType&(memBufAmendType) != 0 { - needAmendMem = true - err := amendCollector.collectTblAmendOps(s.sess, tblID, tblInfoAtStart, tblInfoAtCommit, actionType) - if err != nil { - return nil, err - } - } - } - // After amend operations collect, generate related new mutations based on input commitMutations - if needAmendMem { - return s.genAllAmendMutations(ctx, commitMutations, amendCollector) - } - return nil, nil -} - -func newSchemaAndDecoder(ctx sessionctx.Context, tbl *model.TableInfo) *schemaAndDecoder { - schema := expression.NewSchema(make([]*expression.Column, 0, len(tbl.Columns))...) - for _, col := range tbl.Columns { - colExpr := &expression.Column{ - RetType: &col.FieldType, - ID: col.ID, - } - if col.IsGenerated() && !col.GeneratedStored { - // This will not be used since generated column is rejected in collectIndexAmendOps. - colExpr.VirtualExpr = &expression.Constant{} - } - schema.Append(colExpr) - } - return &schemaAndDecoder{schema, executor.NewRowDecoder(ctx, schema, tbl)} -} diff --git a/session/schema_amender_test.go b/session/schema_amender_test.go deleted file mode 100644 index 565b4861a9400..0000000000000 --- a/session/schema_amender_test.go +++ /dev/null @@ -1,477 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package session - -import ( - "bytes" - "context" - "fmt" - "strconv" - "testing" - - "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/tidb/domain" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/meta/autoid" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/planner/core" - "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/sessiontxn" - "github.com/pingcap/tidb/store/mockstore" - "github.com/pingcap/tidb/table" - "github.com/pingcap/tidb/tablecodec" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/logutil" - "github.com/pingcap/tidb/util/rowcodec" - "github.com/stretchr/testify/require" - "github.com/tikv/client-go/v2/txnkv/transaction" - "go.uber.org/zap" - "golang.org/x/exp/slices" -) - -func initTblColIdxID(metaInfo *model.TableInfo) { - for i, col := range metaInfo.Columns { - col.ID = int64(i + 1) - } - for i, idx := range metaInfo.Indices { - idx.ID = int64(i + 1) - if idx.Name.L == "f_g" { - idx.Unique = true - } else { - idx.Unique = false - } - } - metaInfo.ID = 1 - metaInfo.State = model.StatePublic -} - -func mutationsEqual(res *transaction.PlainMutations, expected *transaction.PlainMutations, t *testing.T) { - require.Len(t, res.GetKeys(), len(expected.GetKeys())) - for i := 0; i < len(res.GetKeys()); i++ { - foundIdx := -1 - for j := 0; j < len(expected.GetKeys()); j++ { - if bytes.Equal(res.GetKeys()[i], expected.GetKeys()[j]) { - foundIdx = j - break - } - } - require.GreaterOrEqual(t, foundIdx, 0) - require.Equal(t, expected.GetOps()[foundIdx], res.GetOps()[i]) - require.Equal(t, expected.IsPessimisticLock(foundIdx), res.IsPessimisticLock(i)) - require.Equal(t, expected.GetKeys()[foundIdx], res.GetKeys()[i]) - require.Equal(t, expected.GetValues()[foundIdx], res.GetValues()[i]) - } -} - -type data struct { - ops []kvrpcpb.Op - keys [][]byte - values [][]byte - rowValue [][]types.Datum -} - -// Generate exist old data and new data in transaction to be amended. Also generate the expected amend mutations -// according to the old and new data and the full generated expected mutations. -func prepareTestData( - se *session, - mutations *transaction.PlainMutations, - oldTblInfo table.Table, - newTblInfo table.Table, - expectedAmendOps []amendOp, - t *testing.T, -) (*data, transaction.PlainMutations) { - var err error - // Generated test data. - colIds := make([]int64, len(oldTblInfo.Meta().Columns)) - basicRowValue := make([]types.Datum, len(oldTblInfo.Meta().Columns)) - for i, col := range oldTblInfo.Meta().Columns { - colIds[i] = oldTblInfo.Meta().Columns[col.Offset].ID - if col.FieldType.GetType() == mysql.TypeLong { - basicRowValue[i] = types.NewIntDatum(int64(col.Offset)) - } else { - basicRowValue[i] = types.NewStringDatum(strconv.Itoa(col.Offset)) - } - } - KeyOps := []kvrpcpb.Op{kvrpcpb.Op_Put, kvrpcpb.Op_Del, kvrpcpb.Op_Lock, kvrpcpb.Op_Insert, kvrpcpb.Op_Put, - kvrpcpb.Op_Del, kvrpcpb.Op_Insert, kvrpcpb.Op_Lock} - numberOfRows := len(KeyOps) - oldRowValues := make([][]types.Datum, numberOfRows) - newRowValues := make([][]types.Datum, numberOfRows) - rd := rowcodec.Encoder{Enable: true} - oldData := &data{} - expectedMutations := transaction.NewPlainMutations(8) - oldRowKvMap := make(map[string][]types.Datum) - newRowKvMap := make(map[string][]types.Datum) - - // colIdx: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9. - // column: a, b, c, d, e, c_str, d_str, e_str, f, g. - // Generate old data. - for i := 0; i < numberOfRows; i++ { - keyOp := KeyOps[i] - thisRowValue := make([]types.Datum, len(basicRowValue)) - copy(thisRowValue, basicRowValue) - thisRowValue[0] = types.NewIntDatum(int64(i + 1)) - thisRowValue[4] = types.NewIntDatum(int64(i + 1 + 4)) - // f_g has a unique index. - thisRowValue[8] = types.NewIntDatum(int64(i + 1 + 8)) - - // Save old data, they will be put into db first. - rowKey := tablecodec.EncodeRowKeyWithHandle(oldTblInfo.Meta().ID, kv.IntHandle(int64(i+1))) - var rowValue []byte - rowValue, err = rd.Encode(se.sessionVars.StmtCtx, colIds, thisRowValue, nil) - require.NoError(t, err) - if keyOp == kvrpcpb.Op_Del || keyOp == kvrpcpb.Op_Put || keyOp == kvrpcpb.Op_Lock { - // Skip the last Op_put, it has no old row value. - if i == 4 { - oldRowValues[i] = nil - continue - } - oldData.keys = append(oldData.keys, rowKey) - oldData.values = append(oldData.values, rowValue) - oldData.ops = append(oldData.ops, keyOp) - oldData.rowValue = append(oldData.rowValue, thisRowValue) - if keyOp == kvrpcpb.Op_Del { - mutations.Push(keyOp, rowKey, []byte{}, true, false, false, false) - } - } - oldRowValues[i] = thisRowValue - oldRowKvMap[string(rowKey)] = thisRowValue - } - - // Generate new data. - for i := 0; i < numberOfRows; i++ { - keyOp := KeyOps[i] - thisRowValue := make([]types.Datum, len(basicRowValue)) - copy(thisRowValue, basicRowValue) - thisRowValue[0] = types.NewIntDatum(int64(i + 1)) - // New column e value should be different from old row values. - thisRowValue[4] = types.NewIntDatum(int64(i+1+4) * 20) - // New column f value should be different since it has a related unique index. - thisRowValue[8] = types.NewIntDatum(int64(i+1+4) * 20) - - var rowValue []byte - // Save new data. - rowKey := tablecodec.EncodeRowKeyWithHandle(oldTblInfo.Meta().ID, kv.IntHandle(int64(i+1))) - if keyOp == kvrpcpb.Op_Insert { - rowValue, err = tablecodec.EncodeOldRow(se.sessionVars.StmtCtx, thisRowValue, colIds, nil, nil) - } else { - rowValue, err = rd.Encode(se.sessionVars.StmtCtx, colIds, thisRowValue, nil) - } - require.NoError(t, err) - if keyOp == kvrpcpb.Op_Put || keyOp == kvrpcpb.Op_Insert { - mutations.Push(keyOp, rowKey, rowValue, true, false, false, false) - } else if keyOp == kvrpcpb.Op_Lock { - mutations.Push(keyOp, rowKey, []byte{}, true, false, false, false) - } - newRowValues[i] = thisRowValue - newRowKvMap[string(rowKey)] = thisRowValue - } - - // Prepare expected result mutations. - for _, op := range expectedAmendOps { - var info *amendOperationAddIndexInfo - expectedOp, ok := op.(*amendOperationAddIndex) - require.True(t, ok) - info = expectedOp.info - var idxVal []byte - genIndexKV := func(inputRow []types.Datum) ([]byte, []byte) { - indexDatums := make([]types.Datum, len(info.relatedOldIdxCols)) - for colIdx, col := range info.relatedOldIdxCols { - indexDatums[colIdx] = inputRow[col.Offset] - } - kvHandle := kv.IntHandle(inputRow[0].GetInt64()) - idxKey, _, err := tablecodec.GenIndexKey(se.sessionVars.StmtCtx, newTblInfo.Meta(), - info.indexInfoAtCommit.Meta(), newTblInfo.Meta().ID, indexDatums, kvHandle, nil) - require.NoError(t, err) - idxVal, err = tablecodec.GenIndexValuePortal(se.sessionVars.StmtCtx, newTblInfo.Meta(), info.indexInfoAtCommit.Meta(), false, info.indexInfoAtCommit.Meta().Unique, false, indexDatums, kvHandle, 0, nil) - require.NoError(t, err) - return idxKey, idxVal - } - for i := 0; i < len(mutations.GetKeys()); i++ { - oldIdxKeyMutation := transaction.PlainMutations{} - newIdxKeyMutation := transaction.PlainMutations{} - key := mutations.GetKeys()[i] - keyOp := mutations.GetOps()[i] - if addIndexNeedRemoveOp(info.AmendOpType) && mayGenDelIndexRowKeyOp(keyOp) { - thisRowValue := oldRowKvMap[string(key)] - if len(thisRowValue) > 0 { - idxKey, _ := genIndexKV(thisRowValue) - isPessimisticLock := false - if info.indexInfoAtCommit.Meta().Unique { - isPessimisticLock = true - } - oldIdxKeyMutation.Push(kvrpcpb.Op_Del, idxKey, []byte{}, isPessimisticLock, false, false, false) - } - } - if addIndexNeedAddOp(info.AmendOpType) && mayGenPutIndexRowKeyOp(keyOp) { - thisRowValue := newRowKvMap[string(key)] - idxKey, idxVal := genIndexKV(thisRowValue) - mutOp := kvrpcpb.Op_Put - isPessimisticLock := false - if info.indexInfoAtCommit.Meta().Unique { - mutOp = kvrpcpb.Op_Insert - isPessimisticLock = true - } - newIdxKeyMutation.Push(mutOp, idxKey, idxVal, isPessimisticLock, false, false, false) - } - skipMerge := false - if info.AmendOpType == AmendNeedAddDeleteAndInsert { - if len(oldIdxKeyMutation.GetKeys()) > 0 && len(newIdxKeyMutation.GetKeys()) > 0 { - if bytes.Equal(oldIdxKeyMutation.GetKeys()[0], newIdxKeyMutation.GetKeys()[0]) { - skipMerge = true - } - } - } - if !skipMerge { - if len(oldIdxKeyMutation.GetKeys()) > 0 { - expectedMutations.MergeMutations(oldIdxKeyMutation) - } - if len(newIdxKeyMutation.GetKeys()) > 0 { - expectedMutations.MergeMutations(newIdxKeyMutation) - } - } - } - } - - return oldData, expectedMutations -} - -func TestAmendCollectAndGenMutations(t *testing.T) { - ctx := context.Background() - store, err := mockstore.NewMockStore() - require.NoError(t, err) - defer func() { require.NoError(t, store.Close()) }() - se := &session{ - store: store, - sessionVars: variable.NewSessionVars(nil), - } - se.mu.values = make(map[fmt.Stringer]interface{}) - domain.BindDomain(se, domain.NewMockDomain()) - startStates := []model.SchemaState{model.StateNone, model.StateDeleteOnly, model.StateWriteOnly, model.StateWriteReorganization} - for _, startState := range startStates { - endStatMap := ConstOpAddIndex[startState] - var endStates []model.SchemaState - for st := range endStatMap { - endStates = append(endStates, st) - } - slices.Sort(endStates) - for _, endState := range endStates { - logutil.BgLogger().Info("[TEST]>>>>>>new round test", zap.Stringer("start", startState), zap.Stringer("end", endState)) - // column: a, b, c, d, e, c_str, d_str, e_str, f, g. - // PK: a. - // indices: c_d_e, e, f, g, f_g, c_d_e_str, c_d_e_str_prefix. - oldTblMeta := core.MockSignedTable() - initTblColIdxID(oldTblMeta) - // Indices[0] does not exist at the start. - oldTblMeta.Indices = oldTblMeta.Indices[1:] - oldTbInfo, err := table.TableFromMeta(autoid.NewAllocators(false), oldTblMeta) - require.NoError(t, err) - oldTblMeta.Indices[0].State = startState - oldTblMeta.Indices[2].State = endState - oldTblMeta.Indices[3].State = startState - - newTblMeta := core.MockSignedTable() - initTblColIdxID(newTblMeta) - // colh is newly added. - colh := &model.ColumnInfo{ - State: model.StatePublic, - Offset: 12, - Name: model.NewCIStr("b"), - FieldType: *(types.NewFieldType(mysql.TypeLong)), - ID: 13, - } - newTblMeta.Columns = append(newTblMeta.Columns, colh) - // The last index "c_d_e_str_prefix is dropped. - newTblMeta.Indices = newTblMeta.Indices[:len(newTblMeta.Indices)-1] - newTblMeta.Indices[0].Unique = false - newTblInfo, err := table.TableFromMeta(autoid.NewAllocators(false), newTblMeta) - require.NoError(t, err) - newTblMeta.Indices[0].State = endState - // Indices[1] is newly created. - newTblMeta.Indices[1].State = endState - // Indices[3] is dropped - newTblMeta.Indices[3].State = startState - // Indices[4] is newly created unique index. - newTblMeta.Indices[4].State = endState - - // Only the add index amend operations is collected in the results. - collector := newAmendCollector() - tblID := int64(1) - err = collector.collectTblAmendOps(se, tblID, oldTbInfo, newTblInfo, 1< 10", "2 20")) - - tk.MustExec("begin pessimistic") - tk.MustExec("insert into t1 values(5, 5, 5)") - tk2.MustExec("alter table t1 drop column c3") - tk2.MustExec("alter table t1 drop column c2") - tk.MustExec("commit") - tk.MustQuery("select * from t1").Check(testkit.Rows("1", "2", "5")) -} - -func TestPessimisticTxnWithDDLChangeColumn(t *testing.T) { - store := realtikvtest.CreateMockStoreAndSetup(t) - - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk2 := testkit.NewTestKit(t, store) - tk2.MustExec("use test") - - tk.MustExec("drop table if exists t1") - tk.MustExec("create table t1 (c1 int primary key, c2 int, c3 varchar(10))") - tk.MustExec("insert t1 values (1, 77, 'a'), (2, 88, 'b')") - - // Extend column field length is acceptable. - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - tk.MustExec("begin pessimistic") - tk.MustExec("update t1 set c2 = c1 * 10") - tk2.MustExec("alter table t1 modify column c2 bigint") - tk.MustExec("commit") - tk.MustExec("begin pessimistic") - tk.MustExec("update t1 set c3 = 'aba'") - tk2.MustExec("alter table t1 modify column c3 varchar(30)") - tk.MustExec("commit") - tk2.MustExec("admin check table t1") - tk.MustQuery("select * from t1").Check(testkit.Rows("1 10 aba", "2 20 aba")) - - // Change column from nullable to not null is not allowed by now. - tk.MustExec("begin pessimistic") - tk.MustExec("insert into t1(c1) values(100)") - tk2.MustExec("alter table t1 change column c2 cc2 bigint not null") - require.Error(t, tk.ExecToErr("commit")) - - // Change default value is rejected. - tk2.MustExec("create table ta(a bigint primary key auto_random(3), b varchar(255) default 'old');") - tk2.MustExec("insert into ta(b) values('a')") - tk.MustExec("begin pessimistic") - tk.MustExec("insert into ta values()") - tk2.MustExec("alter table ta modify column b varchar(300) default 'new';") - require.Error(t, tk.ExecToErr("commit")) - tk2.MustQuery("select b from ta").Check(testkit.Rows("a")) - - // Change default value with add index. There is a new MultipleKeyFlag flag on the index key, and the column is changed, - // the flag check will fail. - tk2.MustExec("insert into ta values()") - tk.MustExec("begin pessimistic") - tk.MustExec("insert into ta(b) values('inserted_value')") - tk.MustExec("insert into ta values()") - tk.MustExec("insert into ta values()") - tk2.MustExec("alter table ta add index i1(b)") - tk2.MustExec("alter table ta change column b b varchar(301) default 'newest'") - tk2.MustExec("alter table ta modify column b varchar(301) default 'new'") - require.Error(t, tk.ExecToErr("commit")) - tk2.MustExec("admin check table ta") - tk2.MustQuery("select count(b) from ta use index(i1) where b = 'new'").Check(testkit.Rows("1")) - - // Change default value to now(). - tk2.MustExec("create table tbl_time(c1 int, c_time timestamp)") - tk2.MustExec("insert into tbl_time(c1) values(1)") - tk.MustExec("begin pessimistic") - tk.MustExec("insert into tbl_time(c1) values(2)") - tk2.MustExec("alter table tbl_time modify column c_time timestamp default now()") - tk2.MustExec("insert into tbl_time(c1) values(3)") - tk2.MustExec("insert into tbl_time(c1) values(4)") - require.Error(t, tk.ExecToErr("commit")) - tk2.MustQuery("select count(1) from tbl_time where c_time is not null").Check(testkit.Rows("2")) -} - func TestPessimisticUnionForUpdate(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) @@ -2135,56 +2034,6 @@ func TestInsertDupKeyAfterLockBatchPointGet(t *testing.T) { require.True(t, terror.ErrorEqual(err, kv.ErrKeyExists)) } -func TestAmendTxnVariable(t *testing.T) { - store := realtikvtest.CreateMockStoreAndSetup(t) - - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk2 := testkit.NewTestKit(t, store) - tk2.MustExec("use test") - tk3 := testkit.NewTestKit(t, store) - tk3.MustExec("use test") - - tk2.MustExec("drop table if exists t1") - tk2.MustExec("create table t1(c1 int primary key, c2 int, c3 int, unique key uk(c2));") - tk2.MustExec("insert into t1 values(1, 1, 1);") - tk2.MustExec("insert into t1 values(2, 2, 2);") - - // Set off the session variable. - tk3.MustExec("set tidb_enable_amend_pessimistic_txn = 0;") - tk3.MustExec("begin pessimistic") - tk3.MustExec("insert into t1 values(3, 3, 3)") - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - tk.MustExec("begin pessimistic") - tk.MustExec("insert into t1 values(4, 4, 4)") - tk2.MustExec("alter table t1 add column new_col int") - require.Error(t, tk3.ExecToErr("commit")) - tk.MustExec("commit") - tk2.MustQuery("select * from t1").Check(testkit.Rows("1 1 1 ", "2 2 2 ", "4 4 4 ")) - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 0;") - - // Set off the global variable. - tk2.MustExec("set global tidb_enable_amend_pessimistic_txn = 0;") - - tk4 := testkit.NewTestKit(t, store) - tk4.MustExec("use test") - - tk4.MustQuery(`show variables like "tidb_enable_amend_pessimistic_txn"`).Check(testkit.Rows("tidb_enable_amend_pessimistic_txn OFF")) - tk4.MustExec("begin pessimistic") - tk4.MustExec("insert into t1 values(5, 5, 5, 5)") - tk2.MustExec("alter table t1 drop column new_col") - require.Error(t, tk4.ExecToErr("commit")) - tk4.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") - tk4.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - tk4.MustExec("begin pessimistic") - tk4.MustExec("insert into t1 values(5, 5, 5)") - tk2.MustExec("alter table t1 add column new_col2 int") - tk4.MustExec("commit") - tk2.MustQuery("select * from t1").Check(testkit.Rows("1 1 1 ", "2 2 2 ", "4 4 4 ", "5 5 5 ")) -} - func TestSelectForUpdateWaitSeconds(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) @@ -2308,9 +2157,6 @@ func TestAsyncCommitWithSchemaChange(t *testing.T) { tk2 := createAsyncCommitTestKit(t, store) tk3 := createAsyncCommitTestKit(t, store) tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - tk2.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - tk3.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") // The txn tk writes something but with failpoint the primary key is not committed. tk.MustExec("begin pessimistic") @@ -2367,12 +2213,6 @@ func Test1PCWithSchemaChange(t *testing.T) { t.Skip("This test is unstable as depending on time.Sleep") } - defer config.RestoreFunc()() - config.UpdateGlobal(func(conf *config.Config) { - conf.TiKVClient.AsyncCommit.SafeWindow = time.Second - conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0 - }) - store := realtikvtest.CreateMockStoreAndSetup(t) tk := create1PCTestKit(t, store) @@ -2383,9 +2223,6 @@ func Test1PCWithSchemaChange(t *testing.T) { tk.MustExec("create table tk (c1 int primary key, c2 int)") tk.MustExec("insert into tk values (1, 1)") tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - tk2.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - tk3.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") tk.MustExec("begin pessimistic") tk.MustExec("insert into tk values(2, 2)") @@ -2423,305 +2260,6 @@ func Test1PCWithSchemaChange(t *testing.T) { tk3.MustExec("admin check table tk") } -func TestAmendForUniqueIndex(t *testing.T) { - store := realtikvtest.CreateMockStoreAndSetup(t) - - tk := testkit.NewTestKit(t, store) - tk2 := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk2.MustExec("use test") - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - - tk2.MustExec("drop table if exists t1") - tk2.MustExec("create table t1(c1 int primary key, c2 int, c3 int, unique key uk(c2));") - tk2.MustExec("insert into t1 values(1, 1, 1);") - tk2.MustExec("insert into t1 values(2, 2, 2);") - - // New value has duplicates. - tk.MustExec("begin pessimistic") - tk.MustExec("insert into t1 values(3, 3, 3)") - tk.MustExec("insert into t1 values(4, 4, 3)") - tk2.MustExec("alter table t1 add unique index uk1(c3)") - require.Error(t, tk.ExecToErr("commit")) - tk2.MustExec("alter table t1 drop index uk1") - tk2.MustExec("admin check table t1") - - // New values has duplicates with old values. - tk.MustExec("begin pessimistic") - tk.MustExec("insert into t1 values(3, 3, 3)") - tk.MustExec("insert into t1 values(4, 4, 1)") - tk2.MustExec("alter table t1 add unique index uk1(c3)") - require.Error(t, tk.ExecToErr("commit")) - tk2.MustExec("admin check table t1") - - // Put new values. - tk2.MustQuery("select * from t1 for update").Check(testkit.Rows("1 1 1", "2 2 2")) - tk2.MustExec("alter table t1 drop index uk1") - tk.MustExec("begin pessimistic") - tk2.MustExec("alter table t1 add unique index uk1(c3)") - tk.MustExec("insert into t1 values(5, 5, 5)") - tk.MustExec("commit") - tk2.MustExec("admin check table t1") - - // Update the old value with same unique key value, should abort. - tk2.MustExec("drop table if exists t;") - tk2.MustExec("create table t (id int auto_increment primary key, c int);") - tk2.MustExec("insert into t (id, c) values (1, 2), (3, 4);") - tk.MustExec("begin pessimistic") - tk2.MustExec("alter table t add unique index uk(c);") - tk.MustExec("update t set c = 2 where id = 3;") - require.Error(t, tk.ExecToErr("commit")) - tk2.MustExec("admin check table t") - - // Update the old value with same unique key, but the row key has changed. - tk2.MustExec("drop table if exists t;") - tk2.MustExec("create table t (id int auto_increment primary key, c int);") - tk2.MustExec("insert into t (id, c) values (1, 2), (3, 4);") - tk.MustExec("begin pessimistic") - tk.MustExec("insert into t values (3, 2) on duplicate key update id = values(id) and c = values(c)") - finishCh := make(chan error) - go func() { - err := tk2.ExecToErr("alter table t add unique index uk(c);") - finishCh <- err - }() - time.Sleep(300 * time.Millisecond) - tk.MustExec("commit") - err := <-finishCh - require.NoError(t, err) - tk2.MustExec("admin check table t") - - // Update the old value with same unique key, but the row key has changed. - /* TODO this case could not pass using unistore because of https://github.com/ngaut/unistore/issues/428. - // Reopen it after fix the unistore issue. - tk2.MustExec("drop table if exists t;") - tk2.MustExec("create table t (id int auto_increment primary key, c int);") - tk2.MustExec("insert into t (id, c) values (1, 2), (3, 4);") - tk.MustExec("begin pessimistic") - tk2.MustExec("alter table t add unique index uk(c);") - tk.MustExec("insert into t values (3, 2) on duplicate key update id = values(id) and c = values(c)") - tk.MustExec("commit") - tk2.MustExec("admin check table t") - */ - - // Test pessimistic retry for unique index amend. - tk2.MustExec("drop table if exists t;") - tk2.MustExec("create table t (id int key, c int);") - tk2.MustExec("insert into t (id, c) values (1, 1), (2, 2);") - tk.MustExec("begin pessimistic") - tk2.MustExec("alter table t add unique index uk(c)") - tk.MustExec("insert into t values(3, 5)") - tk.MustExec("update t set c = 4 where c = 2") - errCh := make(chan error, 1) - go func() { - var err error - err = tk2.ExecToErr("begin pessimistic") - if err != nil { - errCh <- err - return - } - err = tk2.ExecToErr("insert into t values(5, 5)") - if err != nil { - errCh <- err - return - } - err = tk2.ExecToErr("delete from t where id = 5") - if err != nil { - errCh <- err - return - } - // let commit in tk start. - errCh <- err - time.Sleep(time.Millisecond * 100) - err = tk2.ExecToErr("commit") - errCh <- err - }() - err = <-errCh - require.Equal(t, nil, err) - tk.MustExec("commit") - tk.MustExec("admin check table t") - err = <-errCh - require.Equal(t, nil, err) -} - -func TestAmendWithColumnTypeChange(t *testing.T) { - store := realtikvtest.CreateMockStoreAndSetup(t) - - tk := testkit.NewTestKit(t, store) - tk2 := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk2.MustExec("use test") - - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1;") - - tk2.MustExec("drop table if exists t") - tk2.MustExec("create table t (id int primary key, v varchar(10));") - tk.MustExec("begin pessimistic") - tk.MustExec("insert into t values (1, \"123456789\")") - tk2.MustExec("alter table t modify column v varchar(5);") - require.Error(t, tk.ExecToErr("commit")) -} - -func TestIssue21498(t *testing.T) { - store := realtikvtest.CreateMockStoreAndSetup(t) - - tk := testkit.NewTestKit(t, store) - tk2 := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk2.MustExec("use test") - tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1") - - for _, partition := range []bool{false, true} { - // RC test - tk.MustExec("drop table if exists t, t1") - createTable := "create table t (id int primary key, v int, index iv (v))" - if partition { - createTable += " partition by range (id) (partition p0 values less than (0),partition p1 values less than (1),partition p2 values less than (2),partition p3 values less than (3),partition pn values less than MAXVALUE)" - } - tk.MustExec(createTable) - tk.MustExec("insert into t values (1, 10), (2, 20), (3, 30), (4, 40)") - tk.MustExec("create table t1(id int)") - tk.MustExec("insert into t1 values(1)") - - tk.MustExec("set tx_isolation = 'READ-COMMITTED'") - tk.MustExec("begin pessimistic") - tk.MustQuery("select * from t where v = 10").Check(testkit.Rows("1 10")) - - tk2.MustExec("alter table t drop index iv") - tk2.MustExec("update t set v = 11 where id = 1") - - tk.MustQuery("select * from t where v = 10").Check(testkit.Rows()) - tk.MustQuery("select * from t where v = 11").Check(testkit.Rows("1 11")) - tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 11")) - tk.MustExec("admin check table t") - tk.MustExec("commit") - - tk.MustExec("drop table if exists t") - createTable = "create table t (id int primary key, v int, index iv (v), v2 int)" - if partition { - createTable += " partition by range (id) (partition p0 values less than (0),partition p1 values less than (1),partition p2 values less than (2),partition p3 values less than (3),partition pn values less than MAXVALUE)" - } - tk.MustExec(createTable) - tk.MustExec("insert into t values (1, 10, 100), (2, 20, 200), (3, 30, 300), (4, 40, 400)") - - tk.MustExec("begin pessimistic") - tk.MustQuery("select * from t use index (iv) where v = 10").Check(testkit.Rows("1 10 100")) - tk2.MustExec("alter table t drop index iv") - tk2.MustExec("update t set v = 11 where id = 1") - err := tk.ExecToErr("select * from t use index (iv) where v = 10") - require.Equal(t, "[planner:1176]Key 'iv' doesn't exist in table 't'", err.Error()) - tk.MustQuery("select * from t where v = 10").Check(testkit.Rows()) - tk2.MustExec("update t set id = 5 where id = 1") - err = tk.ExecToErr("select * from t use index (iv) where v = 10") // select with - require.Equal(t, "[planner:1176]Key 'iv' doesn't exist in table 't'", err.Error()) - tk.MustQuery("select * from t where v = 10").Check(testkit.Rows()) - if !partition { - // amend transaction does not support partition table - tk.MustExec("insert into t(id, v, v2) select 6, v + 20, v2 + 200 from t where id = 4") // insert ... select with index unchanged - } - err = tk.ExecToErr("insert into t(id, v, v2) select 7, v + 30, v2 + 300 from t use index (iv) where id = 4") // insert ... select with index changed - require.Equal(t, "[planner:1176]Key 'iv' doesn't exist in table 't'", err.Error()) - tk.MustExec("admin check table t") // check consistency inside txn - tk.MustExec("commit") - if !partition { - tk.MustQuery("select * from t").Check(testkit.Rows("2 20 200", "3 30 300", "4 40 400", "5 11 100", "6 60 600")) - } - tk.MustExec("admin check table t") // check consistency out of txn - - // RR test for non partition - if partition { - continue - } - - tk.MustExec("set tx_isolation = 'REPEATABLE-READ'") - tk2.MustExec("alter table t add unique index iv(v)") - tk.MustExec("begin pessimistic") - tk2.MustExec("alter table t drop index iv") - tk2.MustExec("update t set v = 21 where v = 20") - tk2.MustExec("update t set v = 31 where v = 30") - tk.MustExec("update t set v = 22 where v = 21") // fast path - tk.CheckExecResult(1, 0) - tk.MustExec("update t set v = 23 where v = 22") - tk.CheckExecResult(1, 0) - tk.MustExec("update t set v = 32 where v >= 31 and v < 40") // common path - tk.CheckExecResult(1, 0) - tk.MustExec("commit") - tk.MustQuery("select * from t").Check(testkit.Rows("2 23 200", "3 32 300", "4 40 400", "5 11 100", "6 60 600")) - - tk2.MustExec("alter table t add unique index iv(v)") - tk.MustExec("begin pessimistic") - tk2.MustExec("alter table t drop index iv") - tk2.MustExec("update t set v = 24 where v = 23") - tk2.MustExec("update t set v = 41 where v = 40") - // fast path - tk.MustQuery("select * from t where v = 23").Check(testkit.Rows("2 23 200")) - tk.MustQuery("select * from t where v = 24").Check(testkit.Rows()) - tk.MustQuery("select * from t where v = 23 for update").Check(testkit.Rows()) - tk.MustQuery("select * from t where v = 24 for update").Check(testkit.Rows("2 24 200")) - tk.MustQuery("select (select id from t where v = 23), id from t1 for update").Check(testkit.Rows("2 1")) - tk.MustQuery("select (select id from t where v = 24), id from t1 for update").Check(testkit.Rows(" 1")) - tk.MustQuery("select (select id from t where v = 23 for update), id from t1").Check(testkit.Rows(" 1")) - tk.MustQuery("select (select id from t where v = 24 for update), id from t1").Check(testkit.Rows("2 1")) - tk.MustQuery("select (select id + 1 from t where v = 24 for update), id from t1").Check(testkit.Rows("3 1")) - // sub queries - tk.MustQuery("select (select id from (select id from t where v = 24 for update) tmp for update), (select id from t where v = 23), id from t where v = 23").Check(testkit.Rows("2 2 2")) - tk.MustQuery("select (select id + (select id from t where v = 23) from (select id from t where v = 24 for update) tmp), id from t where v = 23").Check(testkit.Rows("4 2")) - tk.MustQuery("select (select id + (select id from t where v = 23) from (select id from t where v = 24 for update) tmp for update), id from t where v = 23").Check(testkit.Rows("4 2")) - tk.MustQuery("select (select id + (select id from t where v = 23 for update) from (select id from t where v = 24 for update) tmp), id from t where v = 23").Check(testkit.Rows(" 2")) - tk.MustQuery("select (select id + (select id from t where v = 23 for update) from (select id from t where v = 24 for update) tmp for update), id from t where v = 23").Check(testkit.Rows(" 2")) - tk.MustQuery("select (select id + (select id from t where v = 23) from (select id from t where v = 23) tmp), id from t where v = 24 for update").Check(testkit.Rows("4 2")) - tk.MustQuery("select (select id + (select id from t where v = 23) from (select id from t where v = 24 for update) tmp), id from t where v = 24 for update").Check(testkit.Rows("4 2")) - tk.MustQuery("select (select id + (select id from t where v = 24 for update) from (select id from t where v = 23) tmp), id from t where v = 24 for update").Check(testkit.Rows("4 2")) - - // test index look up - tk.MustQuery("select * from t s, t t1 where s.v = 23 and s.id = t1.id").Check(testkit.Rows("2 23 200 2 23 200")) - tk.MustQuery("select * from t s, t t1 where s.v = 24 and s.id = t1.id").Check(testkit.Rows()) - tk.MustQuery("select * from t s, t t1 where s.v = 23 and s.id = t1.id for update").Check(testkit.Rows()) - // TODO: Do the same with Partitioned Table!!! Since this query leads to two columns in SelectLocExec.tblID2Handle!!! - tk.MustQuery("select * from t s, t t1 where s.v = 24 and s.id = t1.id for update").Check(testkit.Rows("2 24 200 2 24 200")) - tk.MustExec("delete from t where v = 24") - tk.CheckExecResult(1, 0) - // common path - tk.MustQuery("select * from t where v >= 41 and v < 50").Check(testkit.Rows()) - tk.MustQuery("select * from t where v >= 41 and v < 50 for update").Check(testkit.Rows("4 41 400")) - tk.MustExec("delete from t where v >= 41 and v < 50") - tk.CheckExecResult(1, 0) - tk.MustExec("commit") - tk.MustQuery("select * from t").Check(testkit.Rows("3 32 300", "5 11 100", "6 60 600")) - - tk2.MustExec("alter table t add unique index iv(v)") - tk.MustExec("begin pessimistic") - tk2.MustExec("alter table t drop index iv") - tk2.MustExec("update t set v = 33 where v = 32") - tk.MustExec("insert into t(id, v, v2) select 3 * id, 3 * v, 3 * v2 from t where v = 33") - tk.CheckExecResult(1, 0) - tk.MustExec("insert into t(id, v, v2) select (select 4 * id from t where v = 32) id, 4 * v, 4 * v2 from t where v = 33") - tk.CheckExecResult(1, 0) - err = tk.ExecToErr("insert into t(id, v, v2) select (select 4 * id from t where v = 33) id, 4 * v, 4 * v2 from t where v = 33") - require.Error(t, err) - require.Equal(t, "[table:1048]Column 'id' cannot be null", err.Error()) - tk.MustExec("commit") - tk.MustQuery("select * from t").Check(testkit.Rows("3 33 300", "5 11 100", "6 60 600", "9 99 900", "12 132 1200")) - - tk2.MustExec("alter table t add unique index iv(v)") - tk2.MustExec("drop table if exists t1") - tk2.MustExec("create table t1(id int primary key, v int, index iv (v), v2 int)") - tk.MustExec("begin pessimistic") - tk2.MustExec("alter table t drop index iv") - tk2.MustExec("update t set v = 34 where v = 33") - tk2.MustExec("update t set v = 12 where v = 11") - tk.MustExec("insert into t1(id, v, v2) select * from t where v = 33") - tk.CheckExecResult(0, 0) - tk.MustExec("insert into t1(id, v, v2) select * from t where v = 12") - tk.CheckExecResult(1, 0) - tk.MustExec("commit") - tk.MustQuery("select * from t1").Check(testkit.Rows("5 12 100")) - } -} - func TestPlanCacheSchemaChange(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) tmp := testkit.NewTestKit(t, store) @@ -2742,8 +2280,6 @@ func TestPlanCacheSchemaChange(t *testing.T) { tk.MustExec("insert into t values(1, 1, 1), (2, 2, 2), (4, 4, 4)") tk.MustExec("set global tidb_ddl_enable_fast_reorg = 0") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1") - tk2.MustExec("set tidb_enable_amend_pessimistic_txn = 1") // generate plan cache tk.MustExec("prepare update_stmt from 'update t set vv = vv + 1 where v = ?'") @@ -2895,177 +2431,6 @@ func createTable(part bool, columnNames []string, columnTypes []string) string { return str } -func TestAmendForIndexChange(t *testing.T) { - defer config.RestoreFunc()() - config.UpdateGlobal(func(conf *config.Config) { - conf.TiKVClient.AsyncCommit.SafeWindow = 0 - conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0 - }) - store := realtikvtest.CreateMockStoreAndSetup(t) - - tk := testkit.NewTestKit(t, store) - tk2 := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk2.MustExec("use test") - - tk.MustExec("set global tidb_ddl_enable_fast_reorg = OFF;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = ON;") - tk.Session().GetSessionVars().EnableAsyncCommit = false - tk.Session().GetSessionVars().Enable1PC = false - - tk2.MustExec("drop table if exists t1") - - // Add some different column types. - columnNames := []string{"c_int", "c_str", "c_datetime", "c_timestamp", "c_double", "c_decimal", "c_float"} - columnTypes := []string{"int", "varchar(40)", "datetime", "timestamp", "double", "decimal(12, 6)", "float"} - - addIndexFunc := func(idxName string, part bool, a, b int) string { - var str string - str = "alter table t" - if part { - str = "alter table t_part" - } - str += " add index " + idxName + " (" - str += strings.Join(columnNames[a:b], ",") - str += ")" - return str - } - - for i := 0; i < len(columnTypes); i++ { - for j := i + 1; j <= len(columnTypes); j++ { - // Create table and prepare some data. - tk2.MustExec("drop table if exists t") - tk2.MustExec("drop table if exists t_part") - tk2.MustExec(createTable(false, columnNames, columnTypes)) - tk2.MustExec(createTable(true, columnNames, columnTypes)) - tk2.MustExec(`insert into t values(1, "1", "2000-01-01", "2020-01-01", "1.1", "123.321", 1.1)`) - tk2.MustExec(`insert into t values(2, "2", "2000-01-02", "2020-01-02", "2.2", "223.322", 2.2)`) - tk2.MustExec(`insert into t_part values(1, "1", "2000-01-01", "2020-01-01", "1.1", "123.321", 1.1)`) - tk2.MustExec(`insert into t_part values(2, "2", "2000-01-02", "2020-01-02", "2.2", "223.322", 2.2)`) - - // Start a pessimistic transaction, the amend should succeed for common table. - tk.MustExec("begin pessimistic") - tk.MustExec(`insert into t values(5, "555", "2000-01-05", "2020-01-05", "5.5", "555.555", 5.5)`) - idxName := fmt.Sprintf("index%d%d", i, j) - tk2.MustExec(addIndexFunc(idxName, false, i, j)) - tk.MustExec("commit") - tk2.MustExec("admin check table t") - - tk.MustExec("begin pessimistic") - tk.MustExec(`insert into t values(6, "666", "2000-01-06", "2020-01-06", "6.6", "666.666", 6.6)`) - tk2.MustExec(fmt.Sprintf(`alter table t drop index %s`, idxName)) - tk.MustExec("commit") - tk2.MustExec("admin check table t") - tk2.MustQuery("select count(*) from t").Check(testkit.Rows("4")) - - // Start a pessimistic transaction for partition table, the amend should fail. - tk.MustExec("begin pessimistic") - tk.MustExec(`insert into t_part values(5, "555", "2000-01-05", "2020-01-05", "5.5", "555.555", 5.5)`) - tk2.MustExec(addIndexFunc(idxName, true, i, j)) - require.Error(t, tk.ExecToErr("commit")) - tk2.MustExec("admin check table t_part") - - tk.MustExec("begin pessimistic") - tk.MustExec(`insert into t_part values(6, "666", "2000-01-06", "2020-01-06", "6.6", "666.666", 6.6)`) - tk2.MustExec(fmt.Sprintf(`alter table t_part drop index %s`, idxName)) - require.Error(t, tk.ExecToErr("commit")) - tk2.MustExec("admin check table t_part") - tk2.MustQuery("select count(*) from t_part").Check(testkit.Rows("2")) - } - } -} - -func TestAmendForColumnChange(t *testing.T) { - store := realtikvtest.CreateMockStoreAndSetup(t) - - tk := testkit.NewTestKit(t, store) - tk2 := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("set global tidb_enable_metadata_lock=0") - tk2.MustExec("use test") - - tk.MustExec("set global tidb_ddl_enable_fast_reorg = OFF;") - tk.MustExec("set tidb_enable_amend_pessimistic_txn = ON;") - tk2.MustExec("drop table if exists t1") - - // Add some different column types. - columnNames := []string{"c_int", "c_str", "c_datetime", "c_timestamp", "c_double", "c_decimal", "c_float"} - columnTypes := []string{"int", "varchar(40)", "datetime", "timestamp", "double", "decimal(12, 6)", "float"} - colChangeDDLs := []string{ - "alter table %s change column c_int c_int bigint", - "alter table %s modify column c_str varchar(55)", - "alter table %s modify column c_datetime datetime", - "alter table %s modify column c_timestamp timestamp", - "alter table %s modify column c_double double default NULL", - "alter table %s modify column c_int bigint(20) default 100", - "alter table %s change column c_float c_float float", - "alter table %s modify column c_int bigint(20)", - } - amendSucc := []bool{ - true, - true, - true, - true, - true, - false, - true, - true, - } - colChangeFunc := func(part bool, i int) string { - var sql string - sql = colChangeDDLs[i] - if part { - sql = fmt.Sprintf(sql, "t_part") - } else { - sql = fmt.Sprintf(sql, "t") - } - return sql - } - - for i := 0; i < len(colChangeDDLs); i++ { - // Create table and prepare some data. - tk2.MustExec("drop table if exists t") - tk2.MustExec("drop table if exists t_part") - tk2.MustExec(createTable(false, columnNames, columnTypes)) - tk2.MustExec(createTable(true, columnNames, columnTypes)) - tk2.MustExec(`insert into t values(1, "1", "2000-01-01", "2020-01-01", "1.1", "123.321", 1.1)`) - tk2.MustExec(`insert into t values(2, "2", "2000-01-02", "2020-01-02", "2.2", "223.322", 2.2)`) - tk2.MustExec(`insert into t_part values(1, "1", "2000-01-01", "2020-01-01", "1.1", "123.321", 1.1)`) - tk2.MustExec(`insert into t_part values(2, "2", "2000-01-02", "2020-01-02", "2.2", "223.322", 2.2)`) - - // Start a pessimistic transaction, the amend should succeed for common table. - tk.MustExec("begin pessimistic") - tk.MustExec(`insert into t values(5, "555", "2000-01-05", "2020-01-05", "5.5", "555.555", 5.5)`) - tk2.MustExec(colChangeFunc(false, i)) - if amendSucc[i] { - tk.MustExec("commit") - } else { - require.Error(t, tk.ExecToErr("commit")) - } - tk2.MustExec("admin check table t") - if amendSucc[i] { - tk2.MustQuery("select count(*) from t").Check(testkit.Rows("3")) - } else { - tk2.MustQuery("select count(*) from t").Check(testkit.Rows("2")) - } - - // Start a pessimistic transaction for partition table, the amend should fail. - if i == 5 { - // alter table t_part modify column c_int bigint(20) default 100 - // Unsupported modify column: can't change the partitioning column, since it would require reorganize all partitions - // Skip this case - continue - } - tk.MustExec("begin pessimistic") - tk.MustExec(`insert into t_part values(5, "555", "2000-01-05", "2020-01-05", "5.5", "555.555", 5.5)`) - tk2.MustExec(colChangeFunc(true, i)) - require.Error(t, tk.ExecToErr("commit")) - tk2.MustExec("admin check table t_part") - tk2.MustQuery("select count(*) from t_part").Check(testkit.Rows("2")) - } -} - func TestPessimisticAutoCommitTxn(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) From 1a429eb7041b62609e8d6c489d1d502746b02222 Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Wed, 7 Dec 2022 16:10:08 +0800 Subject: [PATCH 2/3] fix test Signed-off-by: wjhuang2016 --- domain/schema_validator_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/domain/schema_validator_test.go b/domain/schema_validator_test.go index fc62a89b2df05..ddcc57634ab60 100644 --- a/domain/schema_validator_test.go +++ b/domain/schema_validator_test.go @@ -216,8 +216,6 @@ func subTestEnqueueActionType(t *testing.T) { // in schema version 10, so the resActions for tableID = 3 should be 0x3 & 0x4 = 0x7. isTablesChanged := validator.isRelatedTablesChanged(5, []int64{1, 2, 3, 4}) require.True(t, isTablesChanged) - require.Equal(t, []int64{1, 2, 3, 4}, relatedChanges.PhyTblIDS) - require.Equal(t, []uint64{15, 2, 7, 4}, relatedChanges.ActionTypes) } type leaseGrantItem struct { From 4d59ee418c6ec3d3a850c94e99ef3194c6998a52 Mon Sep 17 00:00:00 2001 From: wjhuang2016 Date: Wed, 7 Dec 2022 16:21:07 +0800 Subject: [PATCH 3/3] fix Signed-off-by: wjhuang2016 --- domain/schema_validator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/domain/schema_validator.go b/domain/schema_validator.go index 2c7614d8d00dc..3d4c5cb0d6a61 100644 --- a/domain/schema_validator.go +++ b/domain/schema_validator.go @@ -15,7 +15,6 @@ package domain import ( - "golang.org/x/exp/slices" "sync" "time" @@ -27,6 +26,7 @@ import ( "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/txnkv/transaction" "go.uber.org/zap" + "golang.org/x/exp/slices" ) type checkResult int