diff --git a/DEPS.bzl b/DEPS.bzl index 198e6b351fd8b..2f63c983fca97 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -7041,13 +7041,13 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sha256 = "ec8fd9b0cdf1352758305c7bb2107d2d5a6782937853ecd2e76a561a4f054b40", - strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20241125064441-5ce6bf1f099c", + sha256 = "587d22d21daa1f44b18b0c2325fcb4233af71a985f397a36aa9db8796777b8f2", + strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20241212025239-0dc41295f929", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241125064441-5ce6bf1f099c.zip", - "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241125064441-5ce6bf1f099c.zip", - "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241125064441-5ce6bf1f099c.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241125064441-5ce6bf1f099c.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241212025239-0dc41295f929.zip", + "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241212025239-0dc41295f929.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241212025239-0dc41295f929.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241212025239-0dc41295f929.zip", ], ) go_repository( diff --git a/go.mod b/go.mod index 7ccef2210891c..e6e71c18592fa 100644 --- a/go.mod +++ b/go.mod @@ -104,7 +104,7 @@ require ( github.com/stretchr/testify v1.9.0 github.com/tdakkota/asciicheck v0.2.0 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.8-0.20241125064441-5ce6bf1f099c + github.com/tikv/client-go/v2 v2.0.8-0.20241212025239-0dc41295f929 github.com/tikv/pd/client v0.0.0-20240724132535-fcb34c90790c github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 github.com/twmb/murmur3 v1.1.6 diff --git a/go.sum b/go.sum index 72ffc887aa0a8..eeae6d24601bf 100644 --- a/go.sum +++ b/go.sum @@ -997,8 +997,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= -github.com/tikv/client-go/v2 v2.0.8-0.20241125064441-5ce6bf1f099c h1:c5/4ODu6lcuueN7lUtIyHrGeWLCHVZpAbrfFoNfP+UI= -github.com/tikv/client-go/v2 v2.0.8-0.20241125064441-5ce6bf1f099c/go.mod h1:37p0ryKaieJbBpVDWnaPi2ZS6UFqkgpsemBLkGX2FvM= +github.com/tikv/client-go/v2 v2.0.8-0.20241212025239-0dc41295f929 h1:dvY5kl35L+aroDl3HQzOx4J7N6sdd8TiXEV+GhNemtU= +github.com/tikv/client-go/v2 v2.0.8-0.20241212025239-0dc41295f929/go.mod h1:37p0ryKaieJbBpVDWnaPi2ZS6UFqkgpsemBLkGX2FvM= github.com/tikv/pd/client v0.0.0-20240724132535-fcb34c90790c h1:oZygf/SCdTUhjoHuZRE85EBgK0oA6LjikpWuJqqjM8U= github.com/tikv/pd/client v0.0.0-20240724132535-fcb34c90790c/go.mod h1:NW6Af689Jw1FDxjq+WL0nqOdmQ1XT0ly2R1SIKfQuUw= github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 h1:quvGphlmUVU+nhpFa4gg4yJyTRJ13reZMDHrKwYw53M= diff --git a/pkg/ddl/cluster.go b/pkg/ddl/cluster.go index e05d0c4006be0..1798aa087bc7b 100644 --- a/pkg/ddl/cluster.go +++ b/pkg/ddl/cluster.go @@ -34,7 +34,6 @@ import ( "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" - "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" @@ -112,16 +111,12 @@ func getStoreGlobalMinSafeTS(s kv.Storage) time.Time { // ValidateFlashbackTS validates that flashBackTS in range [gcSafePoint, currentTS). func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBackTS uint64) error { - currentTS, err := sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0) - // If we fail to calculate currentTS from local time, fallback to get a timestamp from PD. + currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope) if err != nil { - metrics.ValidateReadTSFromPDCount.Inc() - currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope) - if err != nil { - return errors.Errorf("fail to validate flashback timestamp: %v", err) - } - currentTS = currentVer.Ver + return errors.Errorf("fail to validate flashback timestamp: %v", err) } + currentTS := currentVer.Ver + oracleFlashbackTS := oracle.GetTimeFromTS(flashBackTS) if oracleFlashbackTS.After(oracle.GetTimeFromTS(currentTS)) { return errors.Errorf("cannot set flashback timestamp to future time") diff --git a/pkg/executor/set.go b/pkg/executor/set.go index 4b63d9728101a..1fcce5863c6f6 100644 --- a/pkg/executor/set.go +++ b/pkg/executor/set.go @@ -213,10 +213,8 @@ func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expres newSnapshotTS := getSnapshotTSByName() newSnapshotIsSet := newSnapshotTS > 0 && newSnapshotTS != oldSnapshotTS if newSnapshotIsSet { - if name == variable.TiDBTxnReadTS { - err = sessionctx.ValidateStaleReadTS(ctx, e.Ctx(), newSnapshotTS) - } else { - err = sessionctx.ValidateSnapshotReadTS(ctx, e.Ctx(), newSnapshotTS) + err = sessionctx.ValidateSnapshotReadTS(ctx, e.Ctx().GetStore(), newSnapshotTS) + if name != variable.TiDBTxnReadTS { // Also check gc safe point for snapshot read. // We don't check snapshot with gc safe point for read_ts // Client-go will automatically check the snapshotTS with gc safe point. It's unnecessary to check gc safe point during set executor. diff --git a/pkg/executor/stale_txn_test.go b/pkg/executor/stale_txn_test.go index ca743c9bd55a0..f1570f05f6e97 100644 --- a/pkg/executor/stale_txn_test.go +++ b/pkg/executor/stale_txn_test.go @@ -17,6 +17,7 @@ package executor_test import ( "context" "fmt" + "strconv" "testing" "time" @@ -1409,14 +1410,30 @@ func TestStaleTSO(t *testing.T) { tk.MustExec("create table t (id int)") tk.MustExec("insert into t values(1)") + ts1, err := strconv.ParseUint(tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows()[0][0].(string), 10, 64) + require.NoError(t, err) + + // Wait until the physical advances for 1s + var currentTS uint64 + for { + tk.MustExec("begin") + currentTS, err = strconv.ParseUint(tk.MustQuery("select @@tidb_current_ts").Rows()[0][0].(string), 10, 64) + require.NoError(t, err) + tk.MustExec("rollback") + if oracle.GetTimeFromTS(currentTS).After(oracle.GetTimeFromTS(ts1).Add(time.Second)) { + break + } + time.Sleep(time.Millisecond * 100) + } asOfExprs := []string{ - "now(3) - interval 1 second", - "current_time() - interval 1 second", - "curtime() - interval 1 second", + "now(3) - interval 10 second", + "current_time() - interval 10 second", + "curtime() - interval 10 second", } - nextTSO := oracle.GoTimeToTS(time.Now().Add(2 * time.Second)) + nextPhysical := oracle.GetPhysical(oracle.GetTimeFromTS(currentTS).Add(10 * time.Second)) + nextTSO := oracle.ComposeTS(nextPhysical, oracle.ExtractLogical(currentTS)) require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/pkg/sessiontxn/staleread/mockStaleReadTSO", fmt.Sprintf("return(%d)", nextTSO))) defer failpoint.Disable("github.com/pingcap/tidb/pkg/sessiontxn/staleread/mockStaleReadTSO") for _, expr := range asOfExprs { diff --git a/pkg/planner/core/plan_cache_utils.go b/pkg/planner/core/plan_cache_utils.go index f1409a3964d7b..659c507b811c2 100644 --- a/pkg/planner/core/plan_cache_utils.go +++ b/pkg/planner/core/plan_cache_utils.go @@ -535,7 +535,7 @@ type PlanCacheStmt struct { SQLDigest *parser.Digest PlanDigest *parser.Digest ForUpdateRead bool - SnapshotTSEvaluator func(sessionctx.Context) (uint64, error) + SnapshotTSEvaluator func(context.Context, sessionctx.Context) (uint64, error) NormalizedSQL4PC string SQLDigest4PC string diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index 7cfe1ada57122..4e2bb5ab3dd08 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -3693,7 +3693,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, if err != nil { return nil, err } - if err := sessionctx.ValidateStaleReadTS(ctx, b.ctx, startTS); err != nil { + if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil { return nil, err } p.StaleTxnStartTS = startTS @@ -3707,7 +3707,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, if err != nil { return nil, err } - if err := sessionctx.ValidateStaleReadTS(ctx, b.ctx, startTS); err != nil { + if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil { return nil, err } p.StaleTxnStartTS = startTS diff --git a/pkg/planner/core/preprocess.go b/pkg/planner/core/preprocess.go index e68b3df13b67d..fc4eca7d9e16e 100644 --- a/pkg/planner/core/preprocess.go +++ b/pkg/planner/core/preprocess.go @@ -170,7 +170,7 @@ var _ = PreprocessorReturn{}.initedLastSnapshotTS type PreprocessorReturn struct { initedLastSnapshotTS bool IsStaleness bool - SnapshotTSEvaluator func(sessionctx.Context) (uint64, error) + SnapshotTSEvaluator func(context.Context, sessionctx.Context) (uint64, error) // LastSnapshotTS is the last evaluated snapshotTS if any // otherwise it defaults to zero LastSnapshotTS uint64 diff --git a/pkg/sessionctx/BUILD.bazel b/pkg/sessionctx/BUILD.bazel index b18e2816cd015..2b51d38cb15ab 100644 --- a/pkg/sessionctx/BUILD.bazel +++ b/pkg/sessionctx/BUILD.bazel @@ -8,7 +8,6 @@ go_library( deps = [ "//pkg/extension", "//pkg/kv", - "//pkg/metrics", "//pkg/parser/model", "//pkg/sessionctx/sessionstates", "//pkg/sessionctx/variable", @@ -17,7 +16,6 @@ go_library( "//pkg/util/plancache", "//pkg/util/sli", "//pkg/util/topsql/stmtstats", - "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_tipb//go-binlog", "@com_github_tikv_client_go_v2//oracle", diff --git a/pkg/sessionctx/context.go b/pkg/sessionctx/context.go index 15e646e799618..5ebb1052cd384 100644 --- a/pkg/sessionctx/context.go +++ b/pkg/sessionctx/context.go @@ -17,13 +17,10 @@ package sessionctx import ( "context" "fmt" - "time" - "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/pkg/extension" "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx/sessionstates" "github.com/pingcap/tidb/pkg/sessionctx/variable" @@ -217,44 +214,8 @@ const ( ) // ValidateSnapshotReadTS strictly validates that readTS does not exceed the PD timestamp -func ValidateSnapshotReadTS(ctx context.Context, sctx Context, readTS uint64) error { - latestTS, err := sctx.GetStore().GetOracle().GetLowResolutionTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) - // If we fail to get latestTS or the readTS exceeds it, get a timestamp from PD to double check - if err != nil || readTS > latestTS { - metrics.ValidateReadTSFromPDCount.Inc() - currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope) - if err != nil { - return errors.Errorf("fail to validate read timestamp: %v", err) - } - if readTS > currentVer.Ver { - return errors.Errorf("cannot set read timestamp to a future time") - } - } - return nil -} - -// How far future from now ValidateStaleReadTS allows at most -const allowedTimeFromNow = 100 * time.Millisecond - -// ValidateStaleReadTS validates that readTS does not exceed the current time not strictly. -func ValidateStaleReadTS(ctx context.Context, sctx Context, readTS uint64) error { - currentTS, err := sctx.GetSessionVars().StmtCtx.GetStaleTSO() - if currentTS == 0 || err != nil { - currentTS, err = sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0) - } - // If we fail to calculate currentTS from local time, fallback to get a timestamp from PD - if err != nil { - metrics.ValidateReadTSFromPDCount.Inc() - currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope) - if err != nil { - return errors.Errorf("fail to validate read timestamp: %v", err) - } - currentTS = currentVer.Ver - } - if oracle.GetTimeFromTS(readTS).After(oracle.GetTimeFromTS(currentTS).Add(allowedTimeFromNow)) { - return errors.Errorf("cannot set read timestamp to a future time") - } - return nil +func ValidateSnapshotReadTS(ctx context.Context, store kv.Storage, readTS uint64) error { + return store.GetOracle().ValidateSnapshotReadTS(ctx, readTS, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) } // SysProcTracker is used to track background sys processes diff --git a/pkg/sessiontxn/staleread/processor.go b/pkg/sessiontxn/staleread/processor.go index 969d2269d8c59..393c3e7c378bb 100644 --- a/pkg/sessiontxn/staleread/processor.go +++ b/pkg/sessiontxn/staleread/processor.go @@ -30,7 +30,7 @@ import ( var _ Processor = &staleReadProcessor{} // StalenessTSEvaluator is a function to get staleness ts -type StalenessTSEvaluator func(sctx sessionctx.Context) (uint64, error) +type StalenessTSEvaluator func(ctx context.Context, sctx sessionctx.Context) (uint64, error) // Processor is an interface used to process stale read type Processor interface { @@ -100,7 +100,7 @@ func (p *baseProcessor) setEvaluatedTS(ts uint64) (err error) { return err } - return p.setEvaluatedValues(ts, is, func(sctx sessionctx.Context) (uint64, error) { + return p.setEvaluatedValues(ts, is, func(_ context.Context, sctx sessionctx.Context) (uint64, error) { return ts, nil }) } @@ -116,7 +116,7 @@ func (p *baseProcessor) setEvaluatedTSWithoutEvaluator(ts uint64) (err error) { } func (p *baseProcessor) setEvaluatedEvaluator(evaluator StalenessTSEvaluator) error { - ts, err := evaluator(p.sctx) + ts, err := evaluator(p.ctx, p.sctx) if err != nil { return err } @@ -167,10 +167,10 @@ func (p *staleReadProcessor) OnSelectTable(tn *ast.TableName) error { } // If `stmtAsOfTS` is not 0, it means we use 'select ... from xxx as of timestamp ...' - evaluateTS := func(sctx sessionctx.Context) (uint64, error) { - return parseAndValidateAsOf(context.Background(), p.sctx, tn.AsOf) + evaluateTS := func(ctx context.Context, sctx sessionctx.Context) (uint64, error) { + return parseAndValidateAsOf(ctx, p.sctx, tn.AsOf) } - stmtAsOfTS, err := evaluateTS(p.sctx) + stmtAsOfTS, err := evaluateTS(p.ctx, p.sctx) if err != nil { return err } @@ -200,7 +200,7 @@ func (p *staleReadProcessor) OnExecutePreparedStmt(preparedTSEvaluator Staleness var stmtTS uint64 if preparedTSEvaluator != nil { // If the `preparedTSEvaluator` is not nil, it means the prepared statement is stale read - if stmtTS, err = preparedTSEvaluator(p.sctx); err != nil { + if stmtTS, err = preparedTSEvaluator(p.ctx, p.sctx); err != nil { return err } } @@ -285,7 +285,7 @@ func parseAndValidateAsOf(ctx context.Context, sctx sessionctx.Context, asOf *as return 0, err } - if err = sessionctx.ValidateStaleReadTS(ctx, sctx, ts); err != nil { + if err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), ts); err != nil { return 0, err } @@ -298,8 +298,8 @@ func getTsEvaluatorFromReadStaleness(sctx sessionctx.Context) StalenessTSEvaluat return nil } - return func(sctx sessionctx.Context) (uint64, error) { - return CalculateTsWithReadStaleness(sctx, readStaleness) + return func(ctx context.Context, sctx sessionctx.Context) (uint64, error) { + return CalculateTsWithReadStaleness(ctx, sctx, readStaleness) } } diff --git a/pkg/sessiontxn/staleread/processor_test.go b/pkg/sessiontxn/staleread/processor_test.go index 8a7e9813301ba..6f4996868b9e0 100644 --- a/pkg/sessiontxn/staleread/processor_test.go +++ b/pkg/sessiontxn/staleread/processor_test.go @@ -51,7 +51,7 @@ func (p *staleReadPoint) checkMatchProcessor(t *testing.T, processor staleread.P evaluator := processor.GetStalenessTSEvaluatorForPrepare() if hasEvaluator { require.NotNil(t, evaluator) - ts, err := evaluator(p.tk.Session()) + ts, err := evaluator(context.Background(), p.tk.Session()) require.NoError(t, err) require.Equal(t, processor.GetStalenessReadTS(), ts) } else { @@ -108,6 +108,7 @@ func TestStaleReadProcessorWithSelectTable(t *testing.T) { tn := astTableWithAsOf(t, "") p1 := genStaleReadPoint(t, tk) p2 := genStaleReadPoint(t, tk) + ctx := context.Background() // create local temporary table to check processor's infoschema will consider temporary table tk.MustExec("create temporary table test.t2(a int)") @@ -157,19 +158,19 @@ func TestStaleReadProcessorWithSelectTable(t *testing.T) { err = processor.OnSelectTable(tn) require.True(t, processor.IsStaleness()) require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion()) - expectedTS, err := staleread.CalculateTsWithReadStaleness(tk.Session(), -100*time.Second) + expectedTS, err := staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -100*time.Second) require.NoError(t, err) require.Equal(t, expectedTS, processor.GetStalenessReadTS()) evaluator := processor.GetStalenessTSEvaluatorForPrepare() - evaluatorTS, err := evaluator(tk.Session()) + evaluatorTS, err := evaluator(ctx, tk.Session()) require.NoError(t, err) require.Equal(t, expectedTS, evaluatorTS) tk.MustExec("set @@tidb_read_staleness=''") tk.MustExec("do sleep(0.01)") - evaluatorTS, err = evaluator(tk.Session()) + evaluatorTS, err = evaluator(ctx, tk.Session()) require.NoError(t, err) - expectedTS2, err := staleread.CalculateTsWithReadStaleness(tk.Session(), -100*time.Second) + expectedTS2, err := staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -100*time.Second) require.NoError(t, err) require.Equal(t, expectedTS2, evaluatorTS) @@ -216,11 +217,11 @@ func TestStaleReadProcessorWithSelectTable(t *testing.T) { err = processor.OnSelectTable(tn) require.True(t, processor.IsStaleness()) require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion()) - expectedTS, err = staleread.CalculateTsWithReadStaleness(tk.Session(), -5*time.Second) + expectedTS, err = staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -5*time.Second) require.NoError(t, err) require.Equal(t, expectedTS, processor.GetStalenessReadTS()) evaluator = processor.GetStalenessTSEvaluatorForPrepare() - evaluatorTS, err = evaluator(tk.Session()) + evaluatorTS, err = evaluator(ctx, tk.Session()) require.NoError(t, err) require.Equal(t, expectedTS, evaluatorTS) tk.MustExec("set @@tidb_read_staleness=''") @@ -233,13 +234,14 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { tk := testkit.NewTestKit(t, store) p1 := genStaleReadPoint(t, tk) //p2 := genStaleReadPoint(t, tk) + ctx := context.Background() // create local temporary table to check processor's infoschema will consider temporary table tk.MustExec("create temporary table test.t2(a int)") // execute prepared stmt with ts evaluator processor := createProcessor(t, tk.Session()) - err := processor.OnExecutePreparedStmt(func(sctx sessionctx.Context) (uint64, error) { + err := processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) { return p1.ts, nil }) require.NoError(t, err) @@ -247,7 +249,7 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { // will get an error when ts evaluator fails processor = createProcessor(t, tk.Session()) - err = processor.OnExecutePreparedStmt(func(sctx sessionctx.Context) (uint64, error) { + err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) { return 0, errors.New("mock error") }) require.Error(t, err) @@ -272,7 +274,7 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { // prepared ts is not allowed when @@txn_read_ts is set tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt)) processor = createProcessor(t, tk.Session()) - err = processor.OnExecutePreparedStmt(func(sctx sessionctx.Context) (uint64, error) { + err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) { return p1.ts, nil }) require.Error(t, err) @@ -285,7 +287,7 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { err = processor.OnExecutePreparedStmt(nil) require.True(t, processor.IsStaleness()) require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion()) - expectedTS, err := staleread.CalculateTsWithReadStaleness(tk.Session(), -100*time.Second) + expectedTS, err := staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -100*time.Second) require.NoError(t, err) require.Equal(t, expectedTS, processor.GetStalenessReadTS()) tk.MustExec("set @@tidb_read_staleness=''") @@ -293,7 +295,7 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { // `@@tidb_read_staleness` will be ignored when `as of` or `@@tx_read_ts` tk.MustExec("set @@tidb_read_staleness=-100") processor = createProcessor(t, tk.Session()) - err = processor.OnExecutePreparedStmt(func(sctx sessionctx.Context) (uint64, error) { + err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) { return p1.ts, nil }) require.NoError(t, err) @@ -336,7 +338,7 @@ func TestStaleReadProcessorWithExecutePreparedStmt(t *testing.T) { err = processor.OnExecutePreparedStmt(nil) require.True(t, processor.IsStaleness()) require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion()) - expectedTS, err = staleread.CalculateTsWithReadStaleness(tk.Session(), -5*time.Second) + expectedTS, err = staleread.CalculateTsWithReadStaleness(ctx, tk.Session(), -5*time.Second) require.NoError(t, err) require.Equal(t, expectedTS, processor.GetStalenessReadTS()) tk.MustExec("set @@tidb_read_staleness=''") @@ -376,7 +378,7 @@ func TestStaleReadProcessorInTxn(t *testing.T) { // return an error when execute prepared stmt with as of processor = createProcessor(t, tk.Session()) - err = processor.OnExecutePreparedStmt(func(sctx sessionctx.Context) (uint64, error) { + err = processor.OnExecutePreparedStmt(func(_ctx context.Context, sctx sessionctx.Context) (uint64, error) { return p1.ts, nil }) require.Error(t, err) diff --git a/pkg/sessiontxn/staleread/util.go b/pkg/sessiontxn/staleread/util.go index fde24b7b7c8ed..09f3edc2dbe0b 100644 --- a/pkg/sessiontxn/staleread/util.go +++ b/pkg/sessiontxn/staleread/util.go @@ -64,14 +64,25 @@ func CalculateAsOfTsExpr(ctx context.Context, sctx sessionctx.Context, tsExpr as } // CalculateTsWithReadStaleness calculates the TsExpr for readStaleness duration -func CalculateTsWithReadStaleness(sctx sessionctx.Context, readStaleness time.Duration) (uint64, error) { +func CalculateTsWithReadStaleness(ctx context.Context, sctx sessionctx.Context, readStaleness time.Duration) (uint64, error) { nowVal, err := expression.GetStmtTimestamp(sctx) if err != nil { return 0, err } tsVal := nowVal.Add(readStaleness) - minTsVal := expression.GetMinSafeTime(sctx) - return oracle.GoTimeToTS(expression.CalAppropriateTime(tsVal, nowVal, minTsVal)), nil + minSafeTSVal := expression.GetMinSafeTime(sctx) + calculatedTime := expression.CalAppropriateTime(tsVal, nowVal, minSafeTSVal) + readTS := oracle.GoTimeToTS(calculatedTime) + if calculatedTime.After(minSafeTSVal) { + // If the final calculated exceeds the min safe ts, we are not sure whether the ts is safe to read (note that + // reading with a ts larger than PD's max allocated ts + 1 is unsafe and may break linearizability). + // So in this case, do an extra check on it. + err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), readTS) + if err != nil { + return 0, err + } + } + return readTS, nil } // IsStmtStaleness indicates whether the current statement is staleness or not