Skip to content

Commit 79a02af

Browse files
authored
sessiontxn/staleread: more accurate stale ts (pingcap#44272) (pingcap#45760)
close pingcap#44215
1 parent 2b9eaf0 commit 79a02af

File tree

10 files changed

+92
-9
lines changed

10 files changed

+92
-9
lines changed

executor/calibrate_resource.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -130,19 +130,19 @@ type calibrateResourceExec struct {
130130
done bool
131131
}
132132

133-
func (e *calibrateResourceExec) parseCalibrateDuration() (startTime time.Time, endTime time.Time, err error) {
133+
func (e *calibrateResourceExec) parseCalibrateDuration(ctx context.Context) (startTime time.Time, endTime time.Time, err error) {
134134
var dur time.Duration
135135
var ts uint64
136136
for _, op := range e.optionList {
137137
switch op.Tp {
138138
case ast.CalibrateStartTime:
139-
ts, err = staleread.CalculateAsOfTsExpr(e.ctx, op.Ts)
139+
ts, err = staleread.CalculateAsOfTsExpr(ctx, e.ctx, op.Ts)
140140
if err != nil {
141141
return
142142
}
143143
startTime = oracle.GetTimeFromTS(ts)
144144
case ast.CalibrateEndTime:
145-
ts, err = staleread.CalculateAsOfTsExpr(e.ctx, op.Ts)
145+
ts, err = staleread.CalculateAsOfTsExpr(ctx, e.ctx, op.Ts)
146146
if err != nil {
147147
return
148148
}
@@ -200,7 +200,7 @@ var (
200200
)
201201

202202
func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
203-
startTs, endTs, err := e.parseCalibrateDuration()
203+
startTs, endTs, err := e.parseCalibrateDuration(ctx)
204204
if err != nil {
205205
return err
206206
}

executor/ddl.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ func (e *DDLExec) getRecoverTableByTableName(tableName *ast.TableName) (*model.J
538538
}
539539

540540
func (e *DDLExec) executeFlashBackCluster(s *ast.FlashBackToTimestampStmt) error {
541-
flashbackTS, err := staleread.CalculateAsOfTsExpr(e.ctx, s.FlashbackTS)
541+
flashbackTS, err := staleread.CalculateAsOfTsExpr(context.Background(), e.ctx, s.FlashbackTS)
542542
if err != nil {
543543
return err
544544
}

executor/stale_txn_test.go

+25
Original file line numberDiff line numberDiff line change
@@ -1396,3 +1396,28 @@ func TestStalePrepare(t *testing.T) {
13961396
tk.MustQuery("execute stmt").Check(expected)
13971397
}
13981398
}
1399+
1400+
func TestStaleTSO(t *testing.T) {
1401+
store := testkit.CreateMockStore(t)
1402+
tk := testkit.NewTestKit(t, store)
1403+
tk.MustExec("use test")
1404+
tk.MustExec("drop table if exists t")
1405+
defer tk.MustExec("drop table if exists t")
1406+
tk.MustExec("create table t (id int)")
1407+
1408+
tk.MustExec("insert into t values(1)")
1409+
1410+
asOfExprs := []string{
1411+
"now(3) - interval 1 second",
1412+
"current_time() - interval 1 second",
1413+
"curtime() - interval 1 second",
1414+
}
1415+
1416+
nextTSO := oracle.GoTimeToTS(time.Now().Add(2 * time.Second))
1417+
require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/sessiontxn/staleread/mockStaleReadTSO", fmt.Sprintf("return(%d)", nextTSO)))
1418+
defer failpoint.Disable("github.com/pingcap/tidb/sessiontxn/staleread/mockStaleReadTSO")
1419+
for _, expr := range asOfExprs {
1420+
// Make sure the now() expr is evaluated from the stale ts provider.
1421+
tk.MustQuery("select * from t as of timestamp " + expr + " order by id asc").Check(testkit.Rows("1"))
1422+
}
1423+
}

expression/helper.go

+12
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ import (
2727
"github.com/pingcap/tidb/sessionctx"
2828
"github.com/pingcap/tidb/types"
2929
driver "github.com/pingcap/tidb/types/parser_driver"
30+
"github.com/pingcap/tidb/util/logutil"
31+
"github.com/tikv/client-go/v2/oracle"
32+
"go.uber.org/zap"
3033
)
3134

3235
func boolToInt64(v bool) int64 {
@@ -158,6 +161,15 @@ func getStmtTimestamp(ctx sessionctx.Context) (time.Time, error) {
158161
failpoint.Return(v, nil)
159162
})
160163

164+
if ctx != nil {
165+
staleTSO, err := ctx.GetSessionVars().StmtCtx.GetStaleTSO()
166+
if staleTSO != 0 && err == nil {
167+
return oracle.GetTimeFromTS(staleTSO), nil
168+
} else if err != nil {
169+
logutil.BgLogger().Error("get stale tso failed", zap.Error(err))
170+
}
171+
}
172+
161173
now := time.Now()
162174

163175
if ctx == nil {

planner/core/planbuilder.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -3543,7 +3543,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan,
35433543
case *ast.BeginStmt:
35443544
readTS := b.ctx.GetSessionVars().TxnReadTS.PeakTxnReadTS()
35453545
if raw.AsOf != nil {
3546-
startTS, err := staleread.CalculateAsOfTsExpr(b.ctx, raw.AsOf.TsExpr)
3546+
startTS, err := staleread.CalculateAsOfTsExpr(ctx, b.ctx, raw.AsOf.TsExpr)
35473547
if err != nil {
35483548
return nil, err
35493549
}

sessionctx/context.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,10 @@ const allowedTimeFromNow = 100 * time.Millisecond
244244

245245
// ValidateStaleReadTS validates that readTS does not exceed the current time not strictly.
246246
func ValidateStaleReadTS(ctx context.Context, sctx Context, readTS uint64) error {
247-
currentTS, err := sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0)
247+
currentTS, err := sctx.GetSessionVars().StmtCtx.GetStaleTSO()
248+
if currentTS == 0 || err != nil {
249+
currentTS, err = sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0)
250+
}
248251
// If we fail to calculate currentTS from local time, fallback to get a timestamp from PD
249252
if err != nil {
250253
metrics.ValidateReadTSFromPDCount.Inc()

sessionctx/stmtctx/stmtctx.go

+32
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,12 @@ type StatementContext struct {
403403
useChunkAlloc bool
404404
// Check if TiFlash read engine is removed due to strict sql mode.
405405
TiFlashEngineRemovedDueToStrictSQLMode bool
406+
// StaleTSOProvider is used to provide stale timestamp oracle for read-only transactions.
407+
StaleTSOProvider struct {
408+
sync.Mutex
409+
value *uint64
410+
eval func() (uint64, error)
411+
}
406412
}
407413

408414
// StmtHints are SessionVars related sql hints.
@@ -1196,6 +1202,32 @@ func (sc *StatementContext) DetachMemDiskTracker() {
11961202
}
11971203
}
11981204

1205+
// SetStaleTSOProvider sets the stale TSO provider.
1206+
func (sc *StatementContext) SetStaleTSOProvider(eval func() (uint64, error)) {
1207+
sc.StaleTSOProvider.Lock()
1208+
defer sc.StaleTSOProvider.Unlock()
1209+
sc.StaleTSOProvider.value = nil
1210+
sc.StaleTSOProvider.eval = eval
1211+
}
1212+
1213+
// GetStaleTSO returns the TSO for stale-read usage which calculate from PD's last response.
1214+
func (sc *StatementContext) GetStaleTSO() (uint64, error) {
1215+
sc.StaleTSOProvider.Lock()
1216+
defer sc.StaleTSOProvider.Unlock()
1217+
if sc.StaleTSOProvider.value != nil {
1218+
return *sc.StaleTSOProvider.value, nil
1219+
}
1220+
if sc.StaleTSOProvider.eval == nil {
1221+
return 0, nil
1222+
}
1223+
tso, err := sc.StaleTSOProvider.eval()
1224+
if err != nil {
1225+
return 0, err
1226+
}
1227+
sc.StaleTSOProvider.value = &tso
1228+
return tso, nil
1229+
}
1230+
11991231
// CopTasksDetails collects some useful information of cop-tasks during execution.
12001232
type CopTasksDetails struct {
12011233
NumCopTasks int

sessiontxn/staleread/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ go_library(
2929
"//types",
3030
"//util/dbterror",
3131
"@com_github_pingcap_errors//:errors",
32+
"@com_github_pingcap_failpoint//:failpoint",
3233
"@com_github_tikv_client_go_v2//oracle",
3334
],
3435
)

sessiontxn/staleread/processor.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ func parseAndValidateAsOf(ctx context.Context, sctx sessionctx.Context, asOf *as
280280
return 0, nil
281281
}
282282

283-
ts, err := CalculateAsOfTsExpr(sctx, asOf.TsExpr)
283+
ts, err := CalculateAsOfTsExpr(ctx, sctx, asOf.TsExpr)
284284
if err != nil {
285285
return 0, err
286286
}

sessiontxn/staleread/util.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"time"
2020

21+
"github.com/pingcap/failpoint"
2122
"github.com/pingcap/tidb/expression"
2223
"github.com/pingcap/tidb/parser/ast"
2324
"github.com/pingcap/tidb/parser/mysql"
@@ -29,7 +30,16 @@ import (
2930
)
3031

3132
// CalculateAsOfTsExpr calculates the TsExpr of AsOfClause to get a StartTS.
32-
func CalculateAsOfTsExpr(sctx sessionctx.Context, tsExpr ast.ExprNode) (uint64, error) {
33+
func CalculateAsOfTsExpr(ctx context.Context, sctx sessionctx.Context, tsExpr ast.ExprNode) (uint64, error) {
34+
sctx.GetSessionVars().StmtCtx.SetStaleTSOProvider(func() (uint64, error) {
35+
failpoint.Inject("mockStaleReadTSO", func(val failpoint.Value) (uint64, error) {
36+
return uint64(val.(int)), nil
37+
})
38+
// this function accepts a context, but we don't need it when there is a valid cached ts.
39+
// in most cases, the stale read ts can be calculated from `cached ts + time since cache - staleness`,
40+
// this can be more accurate than `time.Now() - staleness`, because TiDB's local time can drift.
41+
return sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0)
42+
})
3343
tsVal, err := expression.EvalAstExpr(sctx, tsExpr)
3444
if err != nil {
3545
return 0, err

0 commit comments

Comments
 (0)