Skip to content

Commit

Permalink
flashback: retry getStoreGlobalMinSafeTS during execute flashback (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 7, 2023
1 parent df204de commit 45f2d06
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 43 deletions.
39 changes: 35 additions & 4 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
Expand Down Expand Up @@ -96,6 +95,16 @@ func recoverPDSchedule(pdScheduleParam map[string]interface{}) error {
return infosync.SetPDScheduleConfig(context.Background(), pdScheduleParam)
}

func getStoreGlobalMinSafeTS(s kv.Storage) time.Time {
minSafeTS := s.GetMinSafeTS(kv.GlobalTxnScope)
// Inject mocked SafeTS for test.
failpoint.Inject("injectSafeTS", func(val failpoint.Value) {
injectTS := val.(int)
minSafeTS = uint64(injectTS)
})
return oracle.GetTimeFromTS(minSafeTS)
}

// ValidateFlashbackTS validates that flashBackTS in range [gcSafePoint, currentTS).
func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBackTS uint64) error {
currentTS, err := sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0)
Expand All @@ -108,12 +117,34 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack
}
currentTS = currentVer.Ver
}
if oracle.GetTimeFromTS(flashBackTS).After(oracle.GetTimeFromTS(currentTS)) {
oracleFlashbackTS := oracle.GetTimeFromTS(flashBackTS)
if oracleFlashbackTS.After(oracle.GetTimeFromTS(currentTS)) {
return errors.Errorf("cannot set flashback timestamp to future time")
}
if oracle.GetTimeFromTS(flashBackTS).After(expression.GetMinSafeTime(sctx)) {
return errors.Errorf("cannot set flashback timestamp to too close to present time")

flashbackGetMinSafeTimeTimeout := time.Minute
failpoint.Inject("changeFlashbackGetMinSafeTimeTimeout", func(val failpoint.Value) {
t := val.(int)
flashbackGetMinSafeTimeTimeout = time.Duration(t)
})

start := time.Now()
minSafeTime := getStoreGlobalMinSafeTS(sctx.GetStore())
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for oracleFlashbackTS.After(minSafeTime) {
if time.Since(start) >= flashbackGetMinSafeTimeTimeout {
return errors.Errorf("cannot set flashback timestamp after min-resolved-ts(%s)", minSafeTime)
}
select {
case <-ticker.C:
minSafeTime = getStoreGlobalMinSafeTS(sctx.GetStore())
break
case <-ctx.Done():
return ctx.Err()
}
}

gcSafePoint, err := gcutil.GetGCSafePoint(sctx)
if err != nil {
return err
Expand Down
28 changes: 8 additions & 20 deletions ddl/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) {

injectSafeTS := oracle.GoTimeToTS(time.Now().Add(10 * time.Second))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

oldValue := map[string]interface{}{
Expand Down Expand Up @@ -131,8 +129,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) {
require.EqualValues(t, finishValue["merge-schedule-limit"], 1)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
}

func TestAddDDLDuringFlashback(t *testing.T) {
Expand All @@ -147,9 +144,7 @@ func TestAddDDLDuringFlashback(t *testing.T) {

injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
Expand All @@ -169,8 +164,7 @@ func TestAddDDLDuringFlashback(t *testing.T) {

dom.DDL().SetHook(originHook)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
}

func TestGlobalVariablesOnFlashback(t *testing.T) {
Expand All @@ -185,9 +179,7 @@ func TestGlobalVariablesOnFlashback(t *testing.T) {

injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
Expand Down Expand Up @@ -239,8 +231,7 @@ func TestGlobalVariablesOnFlashback(t *testing.T) {

dom.DDL().SetHook(originHook)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
}

func TestCancelFlashbackCluster(t *testing.T) {
Expand All @@ -252,9 +243,7 @@ func TestCancelFlashbackCluster(t *testing.T) {

injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
Expand All @@ -280,6 +269,5 @@ func TestCancelFlashbackCluster(t *testing.T) {
dom.DDL().SetHook(originHook)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
}
61 changes: 47 additions & 14 deletions executor/recover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,7 @@ func TestRecoverClusterMeetError(t *testing.T) {

injectSafeTS := oracle.GoTimeToTS(flashbackTs.Add(10 * time.Second))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

// Get GC safe point error.
Expand Down Expand Up @@ -337,8 +335,7 @@ func TestRecoverClusterMeetError(t *testing.T) {
tk.MustExec("create table t(a int);")
tk.MustMatchErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs), "Detected schema change due to another DDL job during \\[.*, now\\), can't do flashback")

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
}

Expand All @@ -347,6 +344,7 @@ func TestFlashbackWithSafeTs(t *testing.T) {
tk := testkit.NewTestKit(t, store)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/changeFlashbackGetMinSafeTimeTimeout", `return(0)`))

timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
defer resetGC()
Expand All @@ -371,9 +369,8 @@ func TestFlashbackWithSafeTs(t *testing.T) {
compareWithSafeTS: 0,
},
{
name: "10 seconds ago to now, safeTS 5 secs ago",
// Add flashbackTs.Add(-500*time.Millisecond) to avoid flashback time range overlapped.
sql: fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs.Add(-500*time.Millisecond)),
name: "10 seconds ago to now, safeTS 5 secs ago",
sql: fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs),
injectSafeTS: oracle.GoTimeToTS(flashbackTs.Add(10 * time.Second)),
compareWithSafeTS: -1,
},
Expand All @@ -386,19 +383,55 @@ func TestFlashbackWithSafeTs(t *testing.T) {
}
for _, testcase := range testcases {
t.Log(testcase.name)
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", testcase.injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", testcase.injectSafeTS)))
if testcase.compareWithSafeTS == 1 {
start := time.Now()
tk.MustContainErrMsg(testcase.sql,
"cannot set flashback timestamp to too close to present time")
"cannot set flashback timestamp after min-resolved-ts")
// When set `flashbackGetMinSafeTimeTimeout` = 0, no retry for `getStoreGlobalMinSafeTS`.
require.Less(t, time.Since(start), time.Second)
} else {
tk.MustExec(testcase.sql)
}
}
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/changeFlashbackGetMinSafeTimeTimeout"))
}

func TestFlashbackRetryGetMinSafeTime(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))

timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
defer resetGC()

// Set GC safe point.
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))

time.Sleep(time.Second)
ts, _ := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
flashbackTs := oracle.GetTimeFromTS(ts)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", oracle.GoTimeToTS(flashbackTs.Add(-10*time.Minute)))))

go func() {
time.Sleep(2 * time.Second)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", oracle.GoTimeToTS(flashbackTs.Add(10*time.Minute)))))
}()

start := time.Now()
tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs))
duration := time.Since(start)
require.Greater(t, duration, 2*time.Second)
require.Less(t, duration, 5*time.Second)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
}

Expand Down
7 changes: 2 additions & 5 deletions tests/realtikvtest/brietest/flashback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ func TestFlashback(t *testing.T) {
require.NoError(t, err)

injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

tk.MustExec("insert t values (4), (5), (6)")
Expand All @@ -86,7 +84,6 @@ func TestFlashback(t *testing.T) {
require.Equal(t, tk.MustQuery("select max(a) from t").Rows()[0][0], "3")
require.Equal(t, tk.MustQuery("select max(a) from t use index(i)").Rows()[0][0], "3")

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
}
}

0 comments on commit 45f2d06

Please sign in to comment.