From 563999ac5e755553354f20c51ac64e6d2e3bb3b7 Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Tue, 12 Nov 2024 15:46:24 +0800 Subject: [PATCH] *: Use strict validation for stale read ts & flashback ts (#57050) close pingcap/tidb#56809 --- pkg/ddl/cluster.go | 13 ++----- pkg/executor/set.go | 6 +-- pkg/executor/stale_txn_test.go | 25 ++++++++++-- pkg/planner/core/plan_cache_utils.go | 2 +- pkg/planner/core/planbuilder.go | 4 +- pkg/planner/core/preprocess.go | 2 +- pkg/sessionctx/BUILD.bazel | 3 -- pkg/sessionctx/context.go | 44 +--------------------- pkg/sessiontxn/staleread/processor.go | 20 +++++----- pkg/sessiontxn/staleread/processor_test.go | 30 ++++++++------- pkg/sessiontxn/staleread/util.go | 17 +++++++-- 11 files changed, 73 insertions(+), 93 deletions(-) diff --git a/pkg/ddl/cluster.go b/pkg/ddl/cluster.go index 565b092172a22..e38326cd18d43 100644 --- a/pkg/ddl/cluster.go +++ b/pkg/ddl/cluster.go @@ -36,7 +36,6 @@ import ( "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/meta/model" - "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/tablecodec" @@ -113,16 +112,12 @@ func getStoreGlobalMinSafeTS(s kv.Storage) time.Time { // ValidateFlashbackTS validates that flashBackTS in range [gcSafePoint, currentTS). func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBackTS uint64) error { - currentTS, err := sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0) - // If we fail to calculate currentTS from local time, fallback to get a timestamp from PD. + currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope) if err != nil { - metrics.ValidateReadTSFromPDCount.Inc() - currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope) - if err != nil { - return errors.Errorf("fail to validate flashback timestamp: %v", err) - } - currentTS = currentVer.Ver + return errors.Errorf("fail to validate flashback timestamp: %v", err) } + currentTS := currentVer.Ver + oracleFlashbackTS := oracle.GetTimeFromTS(flashBackTS) if oracleFlashbackTS.After(oracle.GetTimeFromTS(currentTS)) { return errors.Errorf("cannot set flashback timestamp to future time") diff --git a/pkg/executor/set.go b/pkg/executor/set.go index 8d88a2d782b45..d39ce5bffe43c 100644 --- a/pkg/executor/set.go +++ b/pkg/executor/set.go @@ -222,10 +222,8 @@ func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expres newSnapshotTS := getSnapshotTSByName() newSnapshotIsSet := newSnapshotTS > 0 && newSnapshotTS != oldSnapshotTS if newSnapshotIsSet { - if name == variable.TiDBTxnReadTS { - err = sessionctx.ValidateStaleReadTS(ctx, e.Ctx().GetSessionVars().StmtCtx, e.Ctx().GetStore(), newSnapshotTS) - } else { - err = sessionctx.ValidateSnapshotReadTS(ctx, e.Ctx(), newSnapshotTS) + err = sessionctx.ValidateSnapshotReadTS(ctx, e.Ctx().GetStore(), newSnapshotTS) + if name != variable.TiDBTxnReadTS { // Also check gc safe point for snapshot read. // We don't check snapshot with gc safe point for read_ts // Client-go will automatically check the snapshotTS with gc safe point. It's unnecessary to check gc safe point during set executor. diff --git a/pkg/executor/stale_txn_test.go b/pkg/executor/stale_txn_test.go index 0b5f959f09d40..3de2cdb16ea64 100644 --- a/pkg/executor/stale_txn_test.go +++ b/pkg/executor/stale_txn_test.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "fmt" + "strconv" "testing" "time" @@ -1372,14 +1373,30 @@ func TestStaleTSO(t *testing.T) { tk.MustExec("create table t (id int)") tk.MustExec("insert into t values(1)") + ts1, err := strconv.ParseUint(tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows()[0][0].(string), 10, 64) + require.NoError(t, err) + + // Wait until the physical advances for 1s + var currentTS uint64 + for { + tk.MustExec("begin") + currentTS, err = strconv.ParseUint(tk.MustQuery("select @@tidb_current_ts").Rows()[0][0].(string), 10, 64) + require.NoError(t, err) + tk.MustExec("rollback") + if oracle.GetTimeFromTS(currentTS).After(oracle.GetTimeFromTS(ts1).Add(time.Second)) { + break + } + time.Sleep(time.Millisecond * 100) + } asOfExprs := []string{ - "now(3) - interval 1 second", - "current_time() - interval 1 second", - "curtime() - interval 1 second", + "now(3) - interval 10 second", + "current_time() - interval 10 second", + "curtime() - interval 10 second", } - nextTSO := oracle.GoTimeToTS(time.Now().Add(2 * time.Second)) + nextPhysical := oracle.GetPhysical(oracle.GetTimeFromTS(currentTS).Add(10 * time.Second)) + nextTSO := oracle.ComposeTS(nextPhysical, oracle.ExtractLogical(currentTS)) require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/pkg/sessiontxn/staleread/mockStaleReadTSO", fmt.Sprintf("return(%d)", nextTSO))) defer failpoint.Disable("github.com/pingcap/tidb/pkg/sessiontxn/staleread/mockStaleReadTSO") for _, expr := range asOfExprs { diff --git a/pkg/planner/core/plan_cache_utils.go b/pkg/planner/core/plan_cache_utils.go index c5911254ff6ae..73af4a3dd4d62 100644 --- a/pkg/planner/core/plan_cache_utils.go +++ b/pkg/planner/core/plan_cache_utils.go @@ -570,7 +570,7 @@ type PlanCacheStmt struct { SQLDigest *parser.Digest PlanDigest *parser.Digest ForUpdateRead bool - SnapshotTSEvaluator func(sessionctx.Context) (uint64, error) + SnapshotTSEvaluator func(context.Context, sessionctx.Context) (uint64, error) BindingInfo bindinfo.BindingMatchInfo diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index 1d39b6e546159..591c5420b7b51 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -3643,7 +3643,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (base. if err != nil { return nil, err } - if err := sessionctx.ValidateStaleReadTS(ctx, b.ctx.GetSessionVars().StmtCtx, b.ctx.GetStore(), startTS); err != nil { + if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil { return nil, err } p.StaleTxnStartTS = startTS @@ -3657,7 +3657,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (base. if err != nil { return nil, err } - if err := sessionctx.ValidateStaleReadTS(ctx, b.ctx.GetSessionVars().StmtCtx, b.ctx.GetStore(), startTS); err != nil { + if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil { return nil, err } p.StaleTxnStartTS = startTS diff --git a/pkg/planner/core/preprocess.go b/pkg/planner/core/preprocess.go index ad1bf32e066b8..209f540a7b25f 100644 --- a/pkg/planner/core/preprocess.go +++ b/pkg/planner/core/preprocess.go @@ -180,7 +180,7 @@ var _ = PreprocessorReturn{}.initedLastSnapshotTS type PreprocessorReturn struct { initedLastSnapshotTS bool IsStaleness bool - SnapshotTSEvaluator func(sessionctx.Context) (uint64, error) + SnapshotTSEvaluator func(context.Context, sessionctx.Context) (uint64, error) // LastSnapshotTS is the last evaluated snapshotTS if any // otherwise it defaults to zero LastSnapshotTS uint64 diff --git a/pkg/sessionctx/BUILD.bazel b/pkg/sessionctx/BUILD.bazel index 723a23f8b0fc0..2536dfeb64937 100644 --- a/pkg/sessionctx/BUILD.bazel +++ b/pkg/sessionctx/BUILD.bazel @@ -13,11 +13,9 @@ go_library( "//pkg/kv", "//pkg/lock/context", "//pkg/meta/model", - "//pkg/metrics", "//pkg/planner/planctx", "//pkg/session/cursor", "//pkg/sessionctx/sessionstates", - "//pkg/sessionctx/stmtctx", "//pkg/sessionctx/variable", "//pkg/statistics/handle/usage/indexusage", "//pkg/table/tblctx", @@ -27,7 +25,6 @@ go_library( "//pkg/util/sli", "//pkg/util/sqlexec", "//pkg/util/topsql/stmtstats", - "@com_github_pingcap_errors//:errors", "@com_github_tikv_client_go_v2//oracle", ], ) diff --git a/pkg/sessionctx/context.go b/pkg/sessionctx/context.go index 6b5fc97674bd7..b3cc26cb95376 100644 --- a/pkg/sessionctx/context.go +++ b/pkg/sessionctx/context.go @@ -18,9 +18,7 @@ import ( "context" "iter" "sync" - "time" - "github.com/pingcap/errors" distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" "github.com/pingcap/tidb/pkg/expression/exprctx" "github.com/pingcap/tidb/pkg/extension" @@ -28,11 +26,9 @@ import ( "github.com/pingcap/tidb/pkg/kv" tablelock "github.com/pingcap/tidb/pkg/lock/context" "github.com/pingcap/tidb/pkg/meta/model" - "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/planner/planctx" "github.com/pingcap/tidb/pkg/session/cursor" "github.com/pingcap/tidb/pkg/sessionctx/sessionstates" - "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/statistics/handle/usage/indexusage" "github.com/pingcap/tidb/pkg/table/tblctx" @@ -245,42 +241,6 @@ const ( ) // ValidateSnapshotReadTS strictly validates that readTS does not exceed the PD timestamp -func ValidateSnapshotReadTS(ctx context.Context, sctx Context, readTS uint64) error { - latestTS, err := sctx.GetStore().GetOracle().GetLowResolutionTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - // If we fail to get latestTS or the readTS exceeds it, get a timestamp from PD to double check - if err != nil || readTS > latestTS { - metrics.ValidateReadTSFromPDCount.Inc() - currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope) - if err != nil { - return errors.Errorf("fail to validate read timestamp: %v", err) - } - if readTS > currentVer.Ver { - return errors.Errorf("cannot set read timestamp to a future time") - } - } - return nil -} - -// How far future from now ValidateStaleReadTS allows at most -const allowedTimeFromNow = 100 * time.Millisecond - -// ValidateStaleReadTS validates that readTS does not exceed the current time not strictly. -func ValidateStaleReadTS(ctx context.Context, sc *stmtctx.StatementContext, store kv.Storage, readTS uint64) error { - currentTS, err := sc.GetStaleTSO() - if currentTS == 0 || err != nil { - currentTS, err = store.GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0) - } - // If we fail to calculate currentTS from local time, fallback to get a timestamp from PD - if err != nil { - metrics.ValidateReadTSFromPDCount.Inc() - currentVer, err := store.CurrentVersion(oracle.GlobalTxnScope) - if err != nil { - return errors.Errorf("fail to validate read timestamp: %v", err) - } - currentTS = currentVer.Ver - } - if oracle.GetTimeFromTS(readTS).After(oracle.GetTimeFromTS(currentTS).Add(allowedTimeFromNow)) { - return errors.Errorf("cannot set read timestamp to a future time") - } - return nil +func ValidateSnapshotReadTS(ctx context.Context, store kv.Storage, readTS uint64) error { + return store.GetOracle().ValidateSnapshotReadTS(ctx, readTS, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) } diff --git a/pkg/sessiontxn/staleread/processor.go b/pkg/sessiontxn/staleread/processor.go index 78620657d2840..e1e87b958c345 100644 --- a/pkg/sessiontxn/staleread/processor.go +++ b/pkg/sessiontxn/staleread/processor.go @@ -31,7 +31,7 @@ import ( var _ Processor = &staleReadProcessor{} // StalenessTSEvaluator is a function to get staleness ts -type StalenessTSEvaluator func(sctx sessionctx.Context) (uint64, error) +type StalenessTSEvaluator func(ctx context.Context, sctx sessionctx.Context) (uint64, error) // Processor is an interface used to process stale read type Processor interface { @@ -101,7 +101,7 @@ func (p *baseProcessor) setEvaluatedTS(ts uint64) (err error) { return err } - return p.setEvaluatedValues(ts, is, func(sctx sessionctx.Context) (uint64, error) { + return p.setEvaluatedValues(ts, is, func(_ context.Context, sctx sessionctx.Context) (uint64, error) { return ts, nil }) } @@ -117,7 +117,7 @@ func (p *baseProcessor) setEvaluatedTSWithoutEvaluator(ts uint64) (err error) { } func (p *baseProcessor) setEvaluatedEvaluator(evaluator StalenessTSEvaluator) error { - ts, err := evaluator(p.sctx) + ts, err := evaluator(p.ctx, p.sctx) if err != nil { return err } @@ -168,10 +168,10 @@ func (p *staleReadProcessor) OnSelectTable(tn *ast.TableName) error { } // If `stmtAsOfTS` is not 0, it means we use 'select ... from xxx as of timestamp ...' - evaluateTS := func(sctx sessionctx.Context) (uint64, error) { - return parseAndValidateAsOf(context.Background(), p.sctx, tn.AsOf) + evaluateTS := func(ctx context.Context, sctx sessionctx.Context) (uint64, error) { + return parseAndValidateAsOf(ctx, p.sctx, tn.AsOf) } - stmtAsOfTS, err := evaluateTS(p.sctx) + stmtAsOfTS, err := evaluateTS(p.ctx, p.sctx) if err != nil { return err } @@ -201,7 +201,7 @@ func (p *staleReadProcessor) OnExecutePreparedStmt(preparedTSEvaluator Staleness var stmtTS uint64 if preparedTSEvaluator != nil { // If the `preparedTSEvaluator` is not nil, it means the prepared statement is stale read - if stmtTS, err = preparedTSEvaluator(p.sctx); err != nil { + if stmtTS, err = preparedTSEvaluator(p.ctx, p.sctx); err != nil { return err } } @@ -286,7 +286,7 @@ func parseAndValidateAsOf(ctx context.Context, sctx sessionctx.Context, asOf *as return 0, err } - if err = sessionctx.ValidateStaleReadTS(ctx, sctx.GetSessionVars().StmtCtx, sctx.GetStore(), ts); err != nil { + if err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), ts); err != nil { return 0, err } @@ -299,8 +299,8 @@ func getTsEvaluatorFromReadStaleness(sctx sessionctx.Context) StalenessTSEvaluat return nil } - return func(sctx sessionctx.Context) (uint64, error) { - return CalculateTsWithReadStaleness(sctx, readStaleness) + return func(ctx context.Context, sctx sessionctx.Context) (uint64, error) { + return CalculateTsWithReadStaleness(ctx, sctx, readStaleness) } } diff --git a/pkg/sessiontxn/staleread/processor_test.go b/pkg/sessiontxn/staleread/processor_test.go index 9f6adc817a46d..441528aca1ce8 100644 --- a/pkg/sessiontxn/staleread/processor_test.go +++ b/pkg/sessiontxn/staleread/processor_test.go @@ -52,7 +52,7 @@ func (p *staleReadPoint) checkMatchProcessor(t *testing.T, processor staleread.P evaluator := processor.GetStalenessTSEvaluatorForPrepare() if hasEvaluator { require.NotNil(t, evaluator) - ts, err := evaluator(p.tk.Session()) + ts, err := evaluator(context.Background(), p.tk.Session()) require.NoError(t, err) require.Equal(t, processor.GetStalenessReadTS(), ts) } else { @@ -109,6 +109,7 @@ func TestStaleReadProcessorWithSelectTable(t *testing.T) { tn := astTableWithAsOf(t, "") p1 := genStaleReadPoint(t, tk) p2 := genStaleReadPoint(t, tk) + ctx := context.Background() // create local temporary table to check processor's infoschema will consider temporary table tk.MustExec("create temporary table test.t2(a int)") @@ -158,19 +159,19 @@ func TestStaleReadProcessorWithSelectTable(t *testing.T) { err = processor.OnSelectTable(tn) require.True(t, processor.IsStaleness()) require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion()) - expectedTS, err := staleread.CalculateTsWithReadStaleness(tk.Session(), -100*time.Second) + expectedTS, err := staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -100*time.Second) require.NoError(t, err) require.Equal(t, expectedTS, processor.GetStalenessReadTS()) evaluator := processor.GetStalenessTSEvaluatorForPrepare() - evaluatorTS, err := evaluator(tk.Session()) + evaluatorTS, err := evaluator(ctx, tk.Session()) require.NoError(t, err) require.Equal(t, expectedTS, evaluatorTS) tk.MustExec("set @@tidb_read_staleness=''") tk.MustExec("do sleep(0.01)") - evaluatorTS, err = evaluator(tk.Session()) + evaluatorTS, err = evaluator(ctx, tk.Session()) require.NoError(t, err) - expectedTS2, err := staleread.CalculateTsWithReadStaleness(tk.Session(), -100*time.Second) + expectedTS2, err := staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -100*time.Second) require.NoError(t, err) require.Equal(t, expectedTS2, evaluatorTS) @@ -217,11 +218,11 @@ func TestStaleReadProcessorWithSelectTable(t *testing.T) { err = processor.OnSelectTable(tn) require.True(t, processor.IsStaleness()) require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion()) - expectedTS, err = staleread.CalculateTsWithReadStaleness(tk.Session(), -5*time.Second) + expectedTS, err = staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -5*time.Second) require.NoError(t, err) require.Equal(t, expectedTS, processor.GetStalenessReadTS()) evaluator = processor.GetStalenessTSEvaluatorForPrepare() - evaluatorTS, err = evaluator(tk.Session()) + evaluatorTS, err = evaluator(ctx, tk.Session()) require.NoError(t, err) require.Equal(t, expectedTS, evaluatorTS) tk.MustExec("set @@tidb_read_staleness=''") @@ -234,13 +235,14 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { tk := testkit.NewTestKit(t, store) p1 := genStaleReadPoint(t, tk) //p2 := genStaleReadPoint(t, tk) + ctx := context.Background() // create local temporary table to check processor's infoschema will consider temporary table tk.MustExec("create temporary table test.t2(a int)") // execute prepared stmt with ts evaluator processor := createProcessor(t, tk.Session()) - err := processor.OnExecutePreparedStmt(func(sctx sessionctx.Context) (uint64, error) { + err := processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) { return p1.ts, nil }) require.NoError(t, err) @@ -248,7 +250,7 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { // will get an error when ts evaluator fails processor = createProcessor(t, tk.Session()) - err = processor.OnExecutePreparedStmt(func(sctx sessionctx.Context) (uint64, error) { + err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) { return 0, errors.New("mock error") }) require.Error(t, err) @@ -273,7 +275,7 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { // prepared ts is not allowed when @@txn_read_ts is set tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt)) processor = createProcessor(t, tk.Session()) - err = processor.OnExecutePreparedStmt(func(sctx sessionctx.Context) (uint64, error) { + err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) { return p1.ts, nil }) require.Error(t, err) @@ -286,7 +288,7 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { err = processor.OnExecutePreparedStmt(nil) require.True(t, processor.IsStaleness()) require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion()) - expectedTS, err := staleread.CalculateTsWithReadStaleness(tk.Session(), -100*time.Second) + expectedTS, err := staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -100*time.Second) require.NoError(t, err) require.Equal(t, expectedTS, processor.GetStalenessReadTS()) tk.MustExec("set @@tidb_read_staleness=''") @@ -294,7 +296,7 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { // `@@tidb_read_staleness` will be ignored when `as of` or `@@tx_read_ts` tk.MustExec("set @@tidb_read_staleness=-100") processor = createProcessor(t, tk.Session()) - err = processor.OnExecutePreparedStmt(func(sctx sessionctx.Context) (uint64, error) { + err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) { return p1.ts, nil }) require.NoError(t, err) @@ -337,7 +339,7 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { err = processor.OnExecutePreparedStmt(nil) require.True(t, processor.IsStaleness()) require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion()) - expectedTS, err = staleread.CalculateTsWithReadStaleness(tk.Session(), -5*time.Second) + expectedTS, err = staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -5*time.Second) require.NoError(t, err) require.Equal(t, expectedTS, processor.GetStalenessReadTS()) tk.MustExec("set @@tidb_read_staleness=''") @@ -377,7 +379,7 @@ func TestStaleReadProcessorInTxn(t *testing.T) { // return an error when execute prepared stmt with as of processor = createProcessor(t, tk.Session()) - err = processor.OnExecutePreparedStmt(func(sctx sessionctx.Context) (uint64, error) { + err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) { return p1.ts, nil }) require.Error(t, err) diff --git a/pkg/sessiontxn/staleread/util.go b/pkg/sessiontxn/staleread/util.go index a02a6110e9d81..57320ddbbf274 100644 --- a/pkg/sessiontxn/staleread/util.go +++ b/pkg/sessiontxn/staleread/util.go @@ -67,15 +67,26 @@ func CalculateAsOfTsExpr(ctx context.Context, sctx planctx.PlanContext, tsExpr a } // CalculateTsWithReadStaleness calculates the TsExpr for readStaleness duration -func CalculateTsWithReadStaleness(sctx sessionctx.Context, readStaleness time.Duration) (uint64, error) { +func CalculateTsWithReadStaleness(ctx context.Context, sctx sessionctx.Context, readStaleness time.Duration) (uint64, error) { nowVal, err := expression.GetStmtTimestamp(sctx.GetExprCtx().GetEvalCtx()) if err != nil { return 0, err } tsVal := nowVal.Add(readStaleness) sc := sctx.GetSessionVars().StmtCtx - minTsVal := expression.GetStmtMinSafeTime(sc, sctx.GetStore(), sc.TimeZone()) - return oracle.GoTimeToTS(expression.CalAppropriateTime(tsVal, nowVal, minTsVal)), nil + minSafeTSVal := expression.GetStmtMinSafeTime(sc, sctx.GetStore(), sc.TimeZone()) + calculatedTime := expression.CalAppropriateTime(tsVal, nowVal, minSafeTSVal) + readTS := oracle.GoTimeToTS(calculatedTime) + if calculatedTime.After(minSafeTSVal) { + // If the final calculated exceeds the min safe ts, we are not sure whether the ts is safe to read (note that + // reading with a ts larger than PD's max allocated ts + 1 is unsafe and may break linearizability). + // So in this case, do an extra check on it. + err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), readTS) + if err != nil { + return 0, err + } + } + return readTS, nil } // IsStmtStaleness indicates whether the current statement is staleness or not