diff --git a/ddl/cluster.go b/ddl/cluster.go index 96a7cd8544abb..e05964c8826fa 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/domain/infosync" - "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" @@ -96,6 +95,16 @@ func recoverPDSchedule(pdScheduleParam map[string]interface{}) error { return infosync.SetPDScheduleConfig(context.Background(), pdScheduleParam) } +func getStoreGlobalMinSafeTS(s kv.Storage) time.Time { + minSafeTS := s.GetMinSafeTS(kv.GlobalTxnScope) + // Inject mocked SafeTS for test. + failpoint.Inject("injectSafeTS", func(val failpoint.Value) { + injectTS := val.(int) + minSafeTS = uint64(injectTS) + }) + return oracle.GetTimeFromTS(minSafeTS) +} + // ValidateFlashbackTS validates that flashBackTS in range [gcSafePoint, currentTS). func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBackTS uint64) error { currentTS, err := sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0) @@ -108,12 +117,34 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack } currentTS = currentVer.Ver } - if oracle.GetTimeFromTS(flashBackTS).After(oracle.GetTimeFromTS(currentTS)) { + oracleFlashbackTS := oracle.GetTimeFromTS(flashBackTS) + if oracleFlashbackTS.After(oracle.GetTimeFromTS(currentTS)) { return errors.Errorf("cannot set flashback timestamp to future time") } - if oracle.GetTimeFromTS(flashBackTS).After(expression.GetMinSafeTime(sctx)) { - return errors.Errorf("cannot set flashback timestamp to too close to present time") + + flashbackGetMinSafeTimeTimeout := time.Minute + failpoint.Inject("changeFlashbackGetMinSafeTimeTimeout", func(val failpoint.Value) { + t := val.(int) + flashbackGetMinSafeTimeTimeout = time.Duration(t) + }) + + start := time.Now() + minSafeTime := getStoreGlobalMinSafeTS(sctx.GetStore()) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for oracleFlashbackTS.After(minSafeTime) { + if time.Since(start) >= flashbackGetMinSafeTimeTimeout { + return errors.Errorf("cannot set flashback timestamp after min-resolved-ts(%s)", minSafeTime) + } + select { + case <-ticker.C: + minSafeTime = getStoreGlobalMinSafeTS(sctx.GetStore()) + break + case <-ctx.Done(): + return ctx.Err() + } } + gcSafePoint, err := gcutil.GetGCSafePoint(sctx) if err != nil { return err diff --git a/ddl/cluster_test.go b/ddl/cluster_test.go index 4c1ec291f87f2..ae57d20e2eb48 100644 --- a/ddl/cluster_test.go +++ b/ddl/cluster_test.go @@ -92,9 +92,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) { injectSafeTS := oracle.GoTimeToTS(time.Now().Add(10 * time.Second)) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) oldValue := map[string]interface{}{ @@ -131,8 +129,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) { require.EqualValues(t, finishValue["merge-schedule-limit"], 1) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) } func TestAddDDLDuringFlashback(t *testing.T) { @@ -147,9 +144,7 @@ func TestAddDDLDuringFlashback(t *testing.T) { injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second)) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) @@ -169,8 +164,7 @@ func TestAddDDLDuringFlashback(t *testing.T) { dom.DDL().SetHook(originHook) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) } func TestGlobalVariablesOnFlashback(t *testing.T) { @@ -185,9 +179,7 @@ func TestGlobalVariablesOnFlashback(t *testing.T) { injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second)) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) @@ -239,8 +231,7 @@ func TestGlobalVariablesOnFlashback(t *testing.T) { dom.DDL().SetHook(originHook) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) } func TestCancelFlashbackCluster(t *testing.T) { @@ -252,9 +243,7 @@ func TestCancelFlashbackCluster(t *testing.T) { injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second)) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) @@ -280,6 +269,5 @@ func TestCancelFlashbackCluster(t *testing.T) { dom.DDL().SetHook(originHook) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) } diff --git a/ddl/ddl_tiflash_test.go b/ddl/ddl_tiflash_test.go index accf7cc038ebd..25b2f546b0d22 100644 --- a/ddl/ddl_tiflash_test.go +++ b/ddl/ddl_tiflash_test.go @@ -442,6 +442,44 @@ func TestTiFlashDropPartition(t *testing.T) { CheckTableAvailableWithTableName(s.dom, t, 1, []string{}, "test", "ddltiflash") } +<<<<<<< HEAD:ddl/ddl_tiflash_test.go +======= +func TestTiFlashFlashbackCluster(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + tk.MustExec("insert into t values (1), (2), (3)") + + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + tk.MustExec("alter table t set tiflash replica 1") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable) + CheckTableAvailableWithTableName(s.dom, t, 1, []string{}, "test", "t") + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + + ChangeGCSafePoint(tk, time.Now().Add(-10*time.Second), "true", "10m0s") + defer func() { + ChangeGCSafePoint(tk, time.Now(), "true", "10m0s") + }() + + errorMsg := fmt.Sprintf("[ddl:-1]Detected unsupported DDL job type(%s) during [%s, now), can't do flashback", + model.ActionSetTiFlashReplica.String(), oracle.GetTimeFromTS(ts).String()) + tk.MustGetErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)), errorMsg) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) +} + +>>>>>>> c996d6437fe (flashback: retry `getStoreGlobalMinSafeTS` during execute flashback (#41100)):ddl/tiflashtest/ddl_tiflash_test.go func CheckTableAvailableWithTableName(dom *domain.Domain, t *testing.T, count uint64, labels []string, db string, table string) { tb, err := dom.InfoSchema().TableByName(model.NewCIStr(db), model.NewCIStr(table)) require.NoError(t, err) diff --git a/executor/recover_test.go b/executor/recover_test.go index aad1c93d9fb87..55977190cfc06 100644 --- a/executor/recover_test.go +++ b/executor/recover_test.go @@ -307,9 +307,7 @@ func TestRecoverClusterMeetError(t *testing.T) { injectSafeTS := oracle.GoTimeToTS(flashbackTs.Add(10 * time.Second)) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", injectSafeTS))) // Get GC safe point error. @@ -337,8 +335,7 @@ func TestRecoverClusterMeetError(t *testing.T) { tk.MustExec("create table t(a int);") tk.MustMatchErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs), "Detected schema change due to another DDL job during \\[.*, now\\), can't do flashback") - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) } @@ -347,6 +344,7 @@ func TestFlashbackWithSafeTs(t *testing.T) { tk := testkit.NewTestKit(t, store) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/changeFlashbackGetMinSafeTimeTimeout", `return(0)`)) timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) defer resetGC() @@ -371,9 +369,8 @@ func TestFlashbackWithSafeTs(t *testing.T) { compareWithSafeTS: 0, }, { - name: "10 seconds ago to now, safeTS 5 secs ago", - // Add flashbackTs.Add(-500*time.Millisecond) to avoid flashback time range overlapped. - sql: fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs.Add(-500*time.Millisecond)), + name: "10 seconds ago to now, safeTS 5 secs ago", + sql: fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs), injectSafeTS: oracle.GoTimeToTS(flashbackTs.Add(10 * time.Second)), compareWithSafeTS: -1, }, @@ -386,19 +383,55 @@ func TestFlashbackWithSafeTs(t *testing.T) { } for _, testcase := range testcases { t.Log(testcase.name) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", testcase.injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", fmt.Sprintf("return(%v)", testcase.injectSafeTS))) if testcase.compareWithSafeTS == 1 { + start := time.Now() tk.MustContainErrMsg(testcase.sql, - "cannot set flashback timestamp to too close to present time") + "cannot set flashback timestamp after min-resolved-ts") + // When set `flashbackGetMinSafeTimeTimeout` = 0, no retry for `getStoreGlobalMinSafeTS`. + require.Less(t, time.Since(start), time.Second) } else { tk.MustExec(testcase.sql) } } - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/changeFlashbackGetMinSafeTimeTimeout")) +} + +func TestFlashbackRetryGetMinSafeTime(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + // Set GC safe point. + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + + time.Sleep(time.Second) + ts, _ := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + flashbackTs := oracle.GetTimeFromTS(ts) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", + fmt.Sprintf("return(%v)", oracle.GoTimeToTS(flashbackTs.Add(-10*time.Minute))))) + + go func() { + time.Sleep(2 * time.Second) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", + fmt.Sprintf("return(%v)", oracle.GoTimeToTS(flashbackTs.Add(10*time.Minute))))) + }() + + start := time.Now() + tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs)) + duration := time.Since(start) + require.Greater(t, duration, 2*time.Second) + require.Less(t, duration, 5*time.Second) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) } diff --git a/tests/realtikvtest/flashbacktest/flashback_test.go b/tests/realtikvtest/flashbacktest/flashback_test.go new file mode 100644 index 0000000000000..e3350f4a1774a --- /dev/null +++ b/tests/realtikvtest/flashbacktest/flashback_test.go @@ -0,0 +1,526 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flashbacktest + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/pingcap/failpoint" + ddlutil "github.com/pingcap/tidb/ddl/util" + "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/tests/realtikvtest" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" + tikvutil "github.com/tikv/client-go/v2/util" +) + +// MockGC is used to make GC work in the test environment. +func MockGC(tk *testkit.TestKit) (string, string, string, func()) { + originGC := ddlutil.IsEmulatorGCEnable() + resetGC := func() { + if originGC { + ddlutil.EmulatorGCEnable() + } else { + ddlutil.EmulatorGCDisable() + } + } + + // disable emulator GC. + // Otherwise emulator GC will delete table record as soon as possible after execute drop table ddl. + ddlutil.EmulatorGCDisable() + timeBeforeDrop := time.Now().Add(0 - 48*60*60*time.Second).Format(tikvutil.GCTimeFormat) + timeAfterDrop := time.Now().Add(48 * 60 * 60 * time.Second).Format(tikvutil.GCTimeFormat) + safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '') + ON DUPLICATE KEY + UPDATE variable_value = '%[1]s'` + // clear GC variables first. + tk.MustExec("delete from mysql.tidb where variable_name in ( 'tikv_gc_safe_point','tikv_gc_enable' )") + return timeBeforeDrop, timeAfterDrop, safePointSQL, resetGC +} + +func TestFlashback(t *testing.T) { + if *realtikvtest.WithRealTiKV { + store := realtikvtest.CreateMockStoreAndSetup(t) + + tk := testkit.NewTestKit(t, store) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, index i(a))") + tk.MustExec("insert t values (1), (2), (3)") + + time.Sleep(1 * time.Second) + + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + + tk.MustExec("insert t values (4), (5), (6)") + tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) + + tk.MustExec("admin check table t") + require.Equal(t, tk.MustQuery("select max(a) from t").Rows()[0][0], "3") + require.Equal(t, tk.MustQuery("select max(a) from t use index(i)").Rows()[0][0], "3") + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) + } +} + +func TestPrepareFlashbackFailed(t *testing.T) { + if *realtikvtest.WithRealTiKV { + store := realtikvtest.CreateMockStoreAndSetup(t) + + tk := testkit.NewTestKit(t, store) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, index i(a))") + tk.MustExec("insert t values (1), (2), (3)") + + time.Sleep(1 * time.Second) + + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockPrepareMeetsEpochNotMatch", `return(true)`)) + + tk.MustExec("insert t values (4), (5), (6)") + tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) + + tk.MustExec("admin check table t") + require.Equal(t, tk.MustQuery("select max(a) from t").Rows()[0][0], "3") + require.Equal(t, tk.MustQuery("select max(a) from t use index(i)").Rows()[0][0], "3") + + jobMeta := tk.MustQuery("select job_meta from mysql.tidb_ddl_history order by job_id desc limit 1").Rows()[0][0].(string) + job := model.Job{} + require.NoError(t, job.Decode([]byte(jobMeta))) + require.Equal(t, job.ErrorCount, int64(0)) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockPrepareMeetsEpochNotMatch")) + } +} + +func TestFlashbackAddDropIndex(t *testing.T) { + if *realtikvtest.WithRealTiKV { + store := realtikvtest.CreateMockStoreAndSetup(t) + + tk := testkit.NewTestKit(t, store) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, index i(a))") + tk.MustExec("insert t values (1), (2), (3)") + prevGCCount := tk.MustQuery("select count(*) from mysql.gc_delete_range").Rows()[0][0] + + time.Sleep(1 * time.Second) + + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + tk.MustExec("alter table t add index k(a)") + require.Equal(t, tk.MustQuery("select max(a) from t use index(k)").Rows()[0][0], "3") + tk.MustExec("alter table t drop index i") + tk.MustGetErrCode("select max(a) from t use index(i)", errno.ErrKeyDoesNotExist) + require.Greater(t, tk.MustQuery("select count(*) from mysql.gc_delete_range").Rows()[0][0], prevGCCount) + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + + tk.MustExec("insert t values (4), (5), (6)") + tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) + + tk.MustExec("admin check table t") + require.Equal(t, tk.MustQuery("select max(a) from t use index(i)").Rows()[0][0], "3") + tk.MustGetErrCode("select max(a) from t use index(k)", errno.ErrKeyDoesNotExist) + require.Equal(t, tk.MustQuery("select count(*) from mysql.gc_delete_range").Rows()[0][0], prevGCCount) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) + } +} + +func TestFlashbackAddDropModifyColumn(t *testing.T) { + if *realtikvtest.WithRealTiKV { + store := realtikvtest.CreateMockStoreAndSetup(t) + + tk := testkit.NewTestKit(t, store) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b int, index i(a))") + tk.MustExec("insert t values (1, 1), (2, 2), (3, 3)") + + time.Sleep(1 * time.Second) + + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + tk.MustExec("alter table t add column c int") + tk.MustExec("alter table t drop column b") + tk.MustExec("alter table t modify column a tinyint") + require.Equal(t, tk.MustQuery("show create table t").Rows()[0][1], "CREATE TABLE `t` (\n"+ + " `a` tinyint(4) DEFAULT NULL,\n"+ + " `c` int(11) DEFAULT NULL,\n"+ + " KEY `i` (`a`)\n"+ + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin") + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + + tk.MustExec("insert t values (4, 4), (5, 5), (6, 6)") + tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) + + tk.MustExec("admin check table t") + require.Equal(t, tk.MustQuery("show create table t").Rows()[0][1], "CREATE TABLE `t` (\n"+ + " `a` int(11) DEFAULT NULL,\n"+ + " `b` int(11) DEFAULT NULL,\n"+ + " KEY `i` (`a`)\n"+ + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin") + require.Equal(t, tk.MustQuery("select max(b) from t").Rows()[0][0], "3") + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) + } +} + +func TestFlashbackBasicRenameDropCreateTable(t *testing.T) { + if *realtikvtest.WithRealTiKV { + store := realtikvtest.CreateMockStoreAndSetup(t) + + tk := testkit.NewTestKit(t, store) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + tk.MustExec("use test") + tk.MustExec("drop table if exists t, t1, t2, t3") + tk.MustExec("create table t(a int, index i(a))") + tk.MustExec("insert t values (1), (2), (3)") + tk.MustExec("create table t1(a int, index i(a))") + tk.MustExec("insert t1 values (4), (5), (6)") + prevGCCount := tk.MustQuery("select count(*) from mysql.gc_delete_range").Rows()[0][0] + + time.Sleep(1 * time.Second) + + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + tk.MustExec("rename table t to t3") + tk.MustExec("drop table t1") + tk.MustExec("create table t2(a int, index i(a))") + tk.MustExec("insert t2 values (7), (8), (9)") + + require.Equal(t, tk.MustQuery("select max(a) from t3").Rows()[0][0], "3") + require.Equal(t, tk.MustQuery("select max(a) from t2").Rows()[0][0], "9") + + require.Greater(t, tk.MustQuery("select count(*) from mysql.gc_delete_range").Rows()[0][0], prevGCCount) + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) + + tk.MustExec("admin check table t") + require.Equal(t, tk.MustQuery("select max(a) from t").Rows()[0][0], "3") + tk.MustExec("admin check table t1") + require.Equal(t, tk.MustQuery("select max(a) from t1").Rows()[0][0], "6") + require.Equal(t, tk.MustQuery("select count(*) from mysql.gc_delete_range").Rows()[0][0], prevGCCount) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) + } +} + +func TestFlashbackCreateDropTableWithData(t *testing.T) { + if *realtikvtest.WithRealTiKV { + store := realtikvtest.CreateMockStoreAndSetup(t) + + tk := testkit.NewTestKit(t, store) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + tk.MustExec("use test") + tk.MustExec("create table t(a int)") + + time.Sleep(1 * time.Second) + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + tk.MustExec("insert into t values (1)") + tk.MustExec("drop table t") + tk.MustExec("create table t(b int)") + tk.MustExec("insert into t(b) values (1)") + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) + + tk.MustExec("admin check table t") + require.Equal(t, tk.MustQuery("select count(a) from t").Rows()[0][0], "0") + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) + } +} + +func TestFlashbackCreateDropSchema(t *testing.T) { + if *realtikvtest.WithRealTiKV { + store := realtikvtest.CreateMockStoreAndSetup(t) + + tk := testkit.NewTestKit(t, store) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + tk.MustExec("use test") + tk.MustExec("create table t(a int, index k(a))") + tk.MustExec("insert into t values (1),(2)") + + time.Sleep(1 * time.Second) + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + tk.MustExec("drop schema test") + tk.MustExec("create schema test1") + tk.MustExec("create schema test2") + tk.MustExec("use test1") + tk.MustGetErrCode("use test", errno.ErrBadDB) + tk.MustExec("use test2") + tk.MustExec("drop schema test2") + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) + + tk.MustExec("admin check table test.t") + res := tk.MustQuery("select max(a) from test.t").Rows() + require.Equal(t, res[0][0], "2") + tk.MustGetErrCode("use test1", errno.ErrBadDB) + tk.MustGetErrCode("use test2", errno.ErrBadDB) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) + } +} + +func TestFlashbackAutoID(t *testing.T) { + if *realtikvtest.WithRealTiKV { + store := realtikvtest.CreateMockStoreAndSetup(t) + + tk := testkit.NewTestKit(t, store) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + tk.MustExec("use test") + tk.MustExec("create table t(a int auto_increment, primary key(a)) auto_id_cache 100") + tk.MustExec("insert into t values (),()") + + time.Sleep(1 * time.Second) + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + tk.MustExec("insert into t values (),()") + res := tk.MustQuery("select max(a) from test.t").Rows() + require.Equal(t, res[0][0], "4") + tk.MustExec("drop table t") + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) + + tk.MustExec("admin check table t") + res = tk.MustQuery("select max(a) from t").Rows() + require.Equal(t, res[0][0], "2") + tk.MustExec("insert into t values ()") + res = tk.MustQuery("select max(a) from t").Rows() + require.Equal(t, res[0][0], "101") + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) + } +} + +func TestFlashbackSequence(t *testing.T) { + if *realtikvtest.WithRealTiKV { + store := realtikvtest.CreateMockStoreAndSetup(t) + + tk := testkit.NewTestKit(t, store) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + tk.MustExec("use test") + tk.MustExec("create sequence seq cache 100") + res := tk.MustQuery("select nextval(seq)").Rows() + require.Equal(t, res[0][0], "1") + + time.Sleep(1 * time.Second) + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + res = tk.MustQuery("select nextval(seq)").Rows() + require.Equal(t, res[0][0], "2") + tk.MustExec("drop sequence seq") + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) + + // flashback schema and skip cached values + res = tk.MustQuery("select nextval(seq)").Rows() + require.Equal(t, res[0][0], "101") + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) + } +} + +func TestFlashbackPartitionTable(t *testing.T) { + if *realtikvtest.WithRealTiKV { + store := realtikvtest.CreateMockStoreAndSetup(t) + + tk := testkit.NewTestKit(t, store) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + tk.MustExec("use test") + tk.MustExec("create table t(a int) partition by range(`a`) " + + "(partition `a_1` values less than (25), " + + "partition `a_2` values less than (75), " + + "partition `a_3` values less than (200))") + + for i := 0; i < 100; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%d)", i)) + } + + time.Sleep(1 * time.Second) + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + tk.MustExec("alter table t drop partition `a_3`") + tk.MustExec("alter table t add partition (partition `a_3` values less than (300))") + res := tk.MustQuery("select max(a) from t").Rows() + require.Equal(t, res[0][0], "74") + tk.MustExec("drop table t") + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) + + tk.MustExec("admin check table t") + res = tk.MustQuery("select max(a), min(a), count(*) from t").Rows() + require.Equal(t, res[0][0], "99") + require.Equal(t, res[0][1], "0") + require.Equal(t, res[0][2], "100") + tk.MustExec("insert into t values (100), (-1)") + res = tk.MustQuery("select max(a), min(a), count(*) from t").Rows() + require.Equal(t, res[0][0], "100") + require.Equal(t, res[0][1], "-1") + require.Equal(t, res[0][2], "102") + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) + } +} + +func TestFlashbackTmpTable(t *testing.T) { + if *realtikvtest.WithRealTiKV { + store := realtikvtest.CreateMockStoreAndSetup(t) + + tk := testkit.NewTestKit(t, store) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + tk.MustExec("use test") + tk.MustExec("create temporary table t(a int)") + + // test flashback tmp table data + time.Sleep(1 * time.Second) + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + tk.MustExec("insert into t values (1), (2), (3)") + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) + + res := tk.MustQuery("select max(a) from t").Rows() + require.Equal(t, res[0][0], "3") + + // test flashback tmp table schema + time.Sleep(1 * time.Second) + ts, err = tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + tk.MustExec("drop table t") + + injectSafeTS = oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) + + tk.MustGetErrCode("select * from t", errno.ErrNoSuchTable) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS")) + } +}