Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Use strict validation for stale read ts & flashback ts #57050

Merged
merged 7 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -6946,26 +6946,26 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "81065eb2f65e0874e7519d41580bdbe279bc93b1b9631bdc6c3092bc9f9458a1",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20241023023120-691e80ae0ea9",
sha256 = "537e3204b8178e2ce0ce43c744fc2699883bb33e718e267da2f1dd6c389968c2",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20241111090227-70049ae310bf",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241023023120-691e80ae0ea9.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241023023120-691e80ae0ea9.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241023023120-691e80ae0ea9.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241023023120-691e80ae0ea9.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241111090227-70049ae310bf.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241111090227-70049ae310bf.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241111090227-70049ae310bf.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241111090227-70049ae310bf.zip",
],
)
go_repository(
name = "com_github_tikv_pd_client",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/pd/client",
sha256 = "da76c552e8a9cfcc707d462d7cdc1ea6bea4b8e9017a60fc0a23200db500978d",
strip_prefix = "github.com/tikv/pd/client@v0.0.0-20241016064947-b70107ec31e6",
sha256 = "52a62b6f6247ce31ee9d0a5dbde941ba3be3db74a713fd79643d015d98a15c5f",
strip_prefix = "github.com/tikv/pd/client@v0.0.0-20241111073742-238d4d79ea31",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241016064947-b70107ec31e6.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241016064947-b70107ec31e6.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241016064947-b70107ec31e6.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241016064947-b70107ec31e6.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241111073742-238d4d79ea31.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241111073742-238d4d79ea31.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241111073742-238d4d79ea31.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241111073742-238d4d79ea31.zip",
],
)
go_repository(
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ require (
github.com/tdakkota/asciicheck v0.2.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tidwall/btree v1.7.0
github.com/tikv/client-go/v2 v2.0.8-0.20241023023120-691e80ae0ea9
github.com/tikv/pd/client v0.0.0-20241016064947-b70107ec31e6
github.com/tikv/client-go/v2 v2.0.8-0.20241111090227-70049ae310bf
github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31
github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a
github.com/twmb/murmur3 v1.1.6
github.com/uber/jaeger-client-go v2.22.1+incompatible
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -826,10 +826,10 @@ github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI=
github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/tikv/client-go/v2 v2.0.8-0.20241023023120-691e80ae0ea9 h1:1Fgp6FqjgXEj/CKcegdXu3wLo77sx7JM9NPC7sF0io0=
github.com/tikv/client-go/v2 v2.0.8-0.20241023023120-691e80ae0ea9/go.mod h1:WAp0oZxDL+3GX+QhJdG0quubJUzEH8LrFofmIxleJhs=
github.com/tikv/pd/client v0.0.0-20241016064947-b70107ec31e6 h1:u0z6yR68sg0pextuabJv/bD4mvwBe8iFeQOdymBUy0E=
github.com/tikv/pd/client v0.0.0-20241016064947-b70107ec31e6/go.mod h1:W5a0sDadwUpI9k8p7M77d3jo253ZHdmua+u4Ho4Xw8U=
github.com/tikv/client-go/v2 v2.0.8-0.20241111090227-70049ae310bf h1:qCi6BiBUPk3Ky4f2CCgBxgUmi3ZpuQLYDLgxw1ilXPA=
github.com/tikv/client-go/v2 v2.0.8-0.20241111090227-70049ae310bf/go.mod h1:p9zPFlKBrxhp3b/cBmKBWL9M0X4HtJjgi1ThUtQYF7o=
github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31 h1:oAYc4m5Eu1OY9ogJ103VO47AYPHvhtzbUPD8L8B67Qk=
github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31/go.mod h1:W5a0sDadwUpI9k8p7M77d3jo253ZHdmua+u4Ho4Xw8U=
github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a h1:A6uKudFIfAEpoPdaal3aSqGxBzLyU8TqyXImLwo6dIo=
github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a/go.mod h1:mkjARE7Yr8qU23YcGMSALbIxTQ9r9QBVahQOBRfU460=
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
Expand Down
13 changes: 4 additions & 9 deletions pkg/ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand Down Expand Up @@ -113,16 +112,12 @@ func getStoreGlobalMinSafeTS(s kv.Storage) time.Time {

// ValidateFlashbackTS validates that flashBackTS in range [gcSafePoint, currentTS).
func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBackTS uint64) error {
currentTS, err := sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0)
// If we fail to calculate currentTS from local time, fallback to get a timestamp from PD.
currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope)
if err != nil {
metrics.ValidateReadTSFromPDCount.Inc()
currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope)
if err != nil {
return errors.Errorf("fail to validate flashback timestamp: %v", err)
}
currentTS = currentVer.Ver
return errors.Errorf("fail to validate flashback timestamp: %v", err)
}
currentTS := currentVer.Ver

oracleFlashbackTS := oracle.GetTimeFromTS(flashBackTS)
if oracleFlashbackTS.After(oracle.GetTimeFromTS(currentTS)) {
return errors.Errorf("cannot set flashback timestamp to future time")
Expand Down
6 changes: 2 additions & 4 deletions pkg/executor/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,8 @@ func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expres
newSnapshotTS := getSnapshotTSByName()
newSnapshotIsSet := newSnapshotTS > 0 && newSnapshotTS != oldSnapshotTS
if newSnapshotIsSet {
if name == variable.TiDBTxnReadTS {
err = sessionctx.ValidateStaleReadTS(ctx, e.Ctx().GetSessionVars().StmtCtx, e.Ctx().GetStore(), newSnapshotTS)
} else {
err = sessionctx.ValidateSnapshotReadTS(ctx, e.Ctx(), newSnapshotTS)
err = sessionctx.ValidateSnapshotReadTS(ctx, e.Ctx().GetStore(), newSnapshotTS)
if name != variable.TiDBTxnReadTS {
// Also check gc safe point for snapshot read.
// We don't check snapshot with gc safe point for read_ts
// Client-go will automatically check the snapshotTS with gc safe point. It's unnecessary to check gc safe point during set executor.
Expand Down
25 changes: 21 additions & 4 deletions pkg/executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -1372,14 +1373,30 @@ func TestStaleTSO(t *testing.T) {
tk.MustExec("create table t (id int)")

tk.MustExec("insert into t values(1)")
ts1, err := strconv.ParseUint(tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows()[0][0].(string), 10, 64)
require.NoError(t, err)

// Wait until the physical advances for 1s
var currentTS uint64
for {
tk.MustExec("begin")
currentTS, err = strconv.ParseUint(tk.MustQuery("select @@tidb_current_ts").Rows()[0][0].(string), 10, 64)
require.NoError(t, err)
tk.MustExec("rollback")
if oracle.GetTimeFromTS(currentTS).After(oracle.GetTimeFromTS(ts1).Add(time.Second)) {
break
}
time.Sleep(time.Millisecond * 100)
}

asOfExprs := []string{
"now(3) - interval 1 second",
"current_time() - interval 1 second",
"curtime() - interval 1 second",
"now(3) - interval 10 second",
"current_time() - interval 10 second",
"curtime() - interval 10 second",
}

nextTSO := oracle.GoTimeToTS(time.Now().Add(2 * time.Second))
nextPhysical := oracle.GetPhysical(oracle.GetTimeFromTS(currentTS).Add(10 * time.Second))
nextTSO := oracle.ComposeTS(nextPhysical, oracle.ExtractLogical(currentTS))
require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/pkg/sessiontxn/staleread/mockStaleReadTSO", fmt.Sprintf("return(%d)", nextTSO)))
defer failpoint.Disable("github.com/pingcap/tidb/pkg/sessiontxn/staleread/mockStaleReadTSO")
for _, expr := range asOfExprs {
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/plan_cache_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ type PlanCacheStmt struct {
SQLDigest *parser.Digest
PlanDigest *parser.Digest
ForUpdateRead bool
SnapshotTSEvaluator func(sessionctx.Context) (uint64, error)
SnapshotTSEvaluator func(context.Context, sessionctx.Context) (uint64, error)

BindingInfo bindinfo.BindingMatchInfo

Expand Down
4 changes: 2 additions & 2 deletions pkg/planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3643,7 +3643,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (base.
if err != nil {
return nil, err
}
if err := sessionctx.ValidateStaleReadTS(ctx, b.ctx.GetSessionVars().StmtCtx, b.ctx.GetStore(), startTS); err != nil {
if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil {
return nil, err
}
p.StaleTxnStartTS = startTS
Expand All @@ -3657,7 +3657,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (base.
if err != nil {
return nil, err
}
if err := sessionctx.ValidateStaleReadTS(ctx, b.ctx.GetSessionVars().StmtCtx, b.ctx.GetStore(), startTS); err != nil {
if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil {
return nil, err
}
p.StaleTxnStartTS = startTS
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ var _ = PreprocessorReturn{}.initedLastSnapshotTS
type PreprocessorReturn struct {
initedLastSnapshotTS bool
IsStaleness bool
SnapshotTSEvaluator func(sessionctx.Context) (uint64, error)
SnapshotTSEvaluator func(context.Context, sessionctx.Context) (uint64, error)
// LastSnapshotTS is the last evaluated snapshotTS if any
// otherwise it defaults to zero
LastSnapshotTS uint64
Expand Down
3 changes: 0 additions & 3 deletions pkg/sessionctx/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ go_library(
"//pkg/kv",
"//pkg/lock/context",
"//pkg/meta/model",
"//pkg/metrics",
"//pkg/planner/planctx",
"//pkg/session/cursor",
"//pkg/sessionctx/sessionstates",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
"//pkg/statistics/handle/usage/indexusage",
"//pkg/table/tblctx",
Expand All @@ -27,7 +25,6 @@ go_library(
"//pkg/util/sli",
"//pkg/util/sqlexec",
"//pkg/util/topsql/stmtstats",
"@com_github_pingcap_errors//:errors",
"@com_github_tikv_client_go_v2//oracle",
],
)
Expand Down
44 changes: 2 additions & 42 deletions pkg/sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,17 @@ package sessionctx
import (
"context"
"sync"
"time"

"github.com/pingcap/errors"
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
"github.com/pingcap/tidb/pkg/expression/exprctx"
"github.com/pingcap/tidb/pkg/extension"
infoschema "github.com/pingcap/tidb/pkg/infoschema/context"
"github.com/pingcap/tidb/pkg/kv"
tablelock "github.com/pingcap/tidb/pkg/lock/context"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/planner/planctx"
"github.com/pingcap/tidb/pkg/session/cursor"
"github.com/pingcap/tidb/pkg/sessionctx/sessionstates"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics/handle/usage/indexusage"
"github.com/pingcap/tidb/pkg/table/tblctx"
Expand Down Expand Up @@ -244,42 +240,6 @@ const (
)

// ValidateSnapshotReadTS strictly validates that readTS does not exceed the PD timestamp
func ValidateSnapshotReadTS(ctx context.Context, sctx Context, readTS uint64) error {
latestTS, err := sctx.GetStore().GetOracle().GetLowResolutionTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
// If we fail to get latestTS or the readTS exceeds it, get a timestamp from PD to double check
if err != nil || readTS > latestTS {
metrics.ValidateReadTSFromPDCount.Inc()
currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope)
if err != nil {
return errors.Errorf("fail to validate read timestamp: %v", err)
}
if readTS > currentVer.Ver {
return errors.Errorf("cannot set read timestamp to a future time")
}
}
return nil
}

// How far future from now ValidateStaleReadTS allows at most
const allowedTimeFromNow = 100 * time.Millisecond

// ValidateStaleReadTS validates that readTS does not exceed the current time not strictly.
func ValidateStaleReadTS(ctx context.Context, sc *stmtctx.StatementContext, store kv.Storage, readTS uint64) error {
currentTS, err := sc.GetStaleTSO()
if currentTS == 0 || err != nil {
currentTS, err = store.GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0)
}
// If we fail to calculate currentTS from local time, fallback to get a timestamp from PD
if err != nil {
metrics.ValidateReadTSFromPDCount.Inc()
currentVer, err := store.CurrentVersion(oracle.GlobalTxnScope)
if err != nil {
return errors.Errorf("fail to validate read timestamp: %v", err)
}
currentTS = currentVer.Ver
}
if oracle.GetTimeFromTS(readTS).After(oracle.GetTimeFromTS(currentTS).Add(allowedTimeFromNow)) {
return errors.Errorf("cannot set read timestamp to a future time")
}
return nil
func ValidateSnapshotReadTS(ctx context.Context, store kv.Storage, readTS uint64) error {
return store.GetOracle().ValidateSnapshotReadTS(ctx, readTS, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
}
20 changes: 10 additions & 10 deletions pkg/sessiontxn/staleread/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
var _ Processor = &staleReadProcessor{}

// StalenessTSEvaluator is a function to get staleness ts
type StalenessTSEvaluator func(sctx sessionctx.Context) (uint64, error)
type StalenessTSEvaluator func(ctx context.Context, sctx sessionctx.Context) (uint64, error)

// Processor is an interface used to process stale read
type Processor interface {
Expand Down Expand Up @@ -101,7 +101,7 @@ func (p *baseProcessor) setEvaluatedTS(ts uint64) (err error) {
return err
}

return p.setEvaluatedValues(ts, is, func(sctx sessionctx.Context) (uint64, error) {
return p.setEvaluatedValues(ts, is, func(_ context.Context, sctx sessionctx.Context) (uint64, error) {
return ts, nil
})
}
Expand All @@ -117,7 +117,7 @@ func (p *baseProcessor) setEvaluatedTSWithoutEvaluator(ts uint64) (err error) {
}

func (p *baseProcessor) setEvaluatedEvaluator(evaluator StalenessTSEvaluator) error {
ts, err := evaluator(p.sctx)
ts, err := evaluator(p.ctx, p.sctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -168,10 +168,10 @@ func (p *staleReadProcessor) OnSelectTable(tn *ast.TableName) error {
}

// If `stmtAsOfTS` is not 0, it means we use 'select ... from xxx as of timestamp ...'
evaluateTS := func(sctx sessionctx.Context) (uint64, error) {
return parseAndValidateAsOf(context.Background(), p.sctx, tn.AsOf)
evaluateTS := func(ctx context.Context, sctx sessionctx.Context) (uint64, error) {
return parseAndValidateAsOf(ctx, p.sctx, tn.AsOf)
}
stmtAsOfTS, err := evaluateTS(p.sctx)
stmtAsOfTS, err := evaluateTS(p.ctx, p.sctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -201,7 +201,7 @@ func (p *staleReadProcessor) OnExecutePreparedStmt(preparedTSEvaluator Staleness
var stmtTS uint64
if preparedTSEvaluator != nil {
// If the `preparedTSEvaluator` is not nil, it means the prepared statement is stale read
if stmtTS, err = preparedTSEvaluator(p.sctx); err != nil {
if stmtTS, err = preparedTSEvaluator(p.ctx, p.sctx); err != nil {
return err
}
}
Expand Down Expand Up @@ -286,7 +286,7 @@ func parseAndValidateAsOf(ctx context.Context, sctx sessionctx.Context, asOf *as
return 0, err
}

if err = sessionctx.ValidateStaleReadTS(ctx, sctx.GetSessionVars().StmtCtx, sctx.GetStore(), ts); err != nil {
if err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), ts); err != nil {
return 0, err
}

Expand All @@ -299,8 +299,8 @@ func getTsEvaluatorFromReadStaleness(sctx sessionctx.Context) StalenessTSEvaluat
return nil
}

return func(sctx sessionctx.Context) (uint64, error) {
return CalculateTsWithReadStaleness(sctx, readStaleness)
return func(ctx context.Context, sctx sessionctx.Context) (uint64, error) {
return CalculateTsWithReadStaleness(ctx, sctx, readStaleness)
}
}

Expand Down
Loading