diff --git a/executor/show_test.go b/executor/show_test.go index a343779245a3f..ea6d6734159b6 100644 --- a/executor/show_test.go +++ b/executor/show_test.go @@ -1102,9 +1102,10 @@ func (s *testSuite5) TestShowBuiltin(c *C) { res := tk.MustQuery("show builtins;") c.Assert(res, NotNil) rows := res.Rows() - c.Assert(268, Equals, len(rows)) + const builtinFuncNum = 269 + c.Assert(builtinFuncNum, Equals, len(rows)) c.Assert("abs", Equals, rows[0][0].(string)) - c.Assert("yearweek", Equals, rows[267][0].(string)) + c.Assert("yearweek", Equals, rows[builtinFuncNum-1][0].(string)) } func (s *testSuite5) TestShowClusterConfig(c *C) { diff --git a/expression/builtin.go b/expression/builtin.go index 9c530f92949e1..a33650eef7b1f 100644 --- a/expression/builtin.go +++ b/expression/builtin.go @@ -687,6 +687,9 @@ var funcs = map[string]functionClass{ ast.Year: &yearFunctionClass{baseFunctionClass{ast.Year, 1, 1}}, ast.YearWeek: &yearWeekFunctionClass{baseFunctionClass{ast.YearWeek, 1, 2}}, ast.LastDay: &lastDayFunctionClass{baseFunctionClass{ast.LastDay, 1, 1}}, + // TSO functions + ast.TiDBBoundedStaleness: &tidbBoundedStalenessFunctionClass{baseFunctionClass{ast.TiDBBoundedStaleness, 2, 2}}, + ast.TiDBParseTso: &tidbParseTsoFunctionClass{baseFunctionClass{ast.TiDBParseTso, 1, 1}}, // string functions ast.ASCII: &asciiFunctionClass{baseFunctionClass{ast.ASCII, 1, 1}}, @@ -881,7 +884,6 @@ var funcs = map[string]functionClass{ // This function is used to show tidb-server version info. ast.TiDBVersion: &tidbVersionFunctionClass{baseFunctionClass{ast.TiDBVersion, 0, 0}}, ast.TiDBIsDDLOwner: &tidbIsDDLOwnerFunctionClass{baseFunctionClass{ast.TiDBIsDDLOwner, 0, 0}}, - ast.TiDBParseTso: &tidbParseTsoFunctionClass{baseFunctionClass{ast.TiDBParseTso, 1, 1}}, ast.TiDBDecodePlan: &tidbDecodePlanFunctionClass{baseFunctionClass{ast.TiDBDecodePlan, 1, 1}}, // TiDB Sequence function. diff --git a/expression/builtin_time.go b/expression/builtin_time.go index 1d52cf6adc2c3..13b3d1eef3def 100644 --- a/expression/builtin_time.go +++ b/expression/builtin_time.go @@ -27,6 +27,7 @@ import ( "github.com/cznic/mathutil" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" @@ -7113,3 +7114,97 @@ func handleInvalidZeroTime(ctx sessionctx.Context, t types.Time) (bool, error) { } return true, handleInvalidTimeError(ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, t.String())) } + +// tidbBoundedStalenessFunctionClass reads a time window [a, b] and compares it with the latest SafeTS +// to determine which TS to use in a read only transaction. +type tidbBoundedStalenessFunctionClass struct { + baseFunctionClass +} + +func (c *tidbBoundedStalenessFunctionClass) getFunction(ctx sessionctx.Context, args []Expression) (builtinFunc, error) { + if err := c.verifyArgs(args); err != nil { + return nil, err + } + bf, err := newBaseBuiltinFuncWithTp(ctx, c.funcName, args, types.ETDatetime, types.ETDatetime, types.ETDatetime) + if err != nil { + return nil, err + } + sig := &builtinTiDBBoundedStalenessSig{bf} + return sig, nil +} + +type builtinTiDBBoundedStalenessSig struct { + baseBuiltinFunc +} + +func (b *builtinTiDBBoundedStalenessSig) Clone() builtinFunc { + newSig := &builtinTidbParseTsoSig{} + newSig.cloneFrom(&b.baseBuiltinFunc) + return newSig +} + +func (b *builtinTiDBBoundedStalenessSig) evalTime(row chunk.Row) (types.Time, bool, error) { + leftTime, isNull, err := b.args[0].EvalTime(b.ctx, row) + if isNull || err != nil { + return types.ZeroTime, true, handleInvalidTimeError(b.ctx, err) + } + rightTime, isNull, err := b.args[1].EvalTime(b.ctx, row) + if isNull || err != nil { + return types.ZeroTime, true, handleInvalidTimeError(b.ctx, err) + } + if invalidLeftTime, invalidRightTime := leftTime.InvalidZero(), rightTime.InvalidZero(); invalidLeftTime || invalidRightTime { + if invalidLeftTime { + err = handleInvalidTimeError(b.ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, leftTime.String())) + } + if invalidRightTime { + err = handleInvalidTimeError(b.ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, rightTime.String())) + } + return types.ZeroTime, true, err + } + timeZone := getTimeZone(b.ctx) + minTime, err := leftTime.GoTime(timeZone) + if err != nil { + return types.ZeroTime, true, err + } + maxTime, err := rightTime.GoTime(timeZone) + if err != nil { + return types.ZeroTime, true, err + } + if minTime.After(maxTime) { + return types.ZeroTime, true, nil + } + // Because the minimum unit of a TSO is millisecond, so we only need fsp to be 3. + return types.NewTime(types.FromGoTime(calAppropriateTime(minTime, maxTime, getMinSafeTime(b.ctx, timeZone))), mysql.TypeDatetime, 3), false, nil +} + +func getMinSafeTime(sessionCtx sessionctx.Context, timeZone *time.Location) time.Time { + var minSafeTS uint64 + if store := sessionCtx.GetStore(); store != nil { + minSafeTS = store.GetMinSafeTS(sessionCtx.GetSessionVars().CheckAndGetTxnScope()) + } + // Inject mocked SafeTS for test. + failpoint.Inject("injectSafeTS", func(val failpoint.Value) { + injectTS := val.(int) + minSafeTS = uint64(injectTS) + }) + // Try to get from the stmt cache to make sure this function is deterministic. + stmtCtx := sessionCtx.GetSessionVars().StmtCtx + minSafeTS = stmtCtx.GetOrStoreStmtCache(stmtctx.StmtSafeTSCacheKey, minSafeTS).(uint64) + return oracle.GetTimeFromTS(minSafeTS).In(timeZone) +} + +// For a SafeTS t and a time range [t1, t2]: +// 1. If t < t1, we will use t1 as the result, +// and with it, a read request may fail because it's an unreached SafeTS. +// 2. If t1 <= t <= t2, we will use t as the result, and with it, +// a read request won't fail. +// 2. If t2 < t, we will use t2 as the result, +// and with it, a read request won't fail because it's bigger than the latest SafeTS. +func calAppropriateTime(minTime, maxTime, minSafeTime time.Time) time.Time { + if minSafeTime.Before(minTime) { + return minTime + } else if minSafeTime.After(maxTime) { + return maxTime + } + return minSafeTime +} diff --git a/expression/builtin_time_test.go b/expression/builtin_time_test.go index f82f6fb8f76ea..161912b07e973 100644 --- a/expression/builtin_time_test.go +++ b/expression/builtin_time_test.go @@ -14,12 +14,14 @@ package expression import ( + "fmt" "math" "strings" "time" . "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/charset" "github.com/pingcap/parser/mysql" @@ -27,6 +29,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/mock" @@ -804,7 +807,7 @@ func (s *testEvaluatorSuite) TestTime(c *C) { } func resetStmtContext(ctx sessionctx.Context) { - ctx.GetSessionVars().StmtCtx.ResetNowTs() + ctx.GetSessionVars().StmtCtx.ResetStmtCache() } func (s *testEvaluatorSuite) TestNowAndUTCTimestamp(c *C) { @@ -2854,6 +2857,103 @@ func (s *testEvaluatorSuite) TestTidbParseTso(c *C) { } } +func (s *testEvaluatorSuite) TestTiDBBoundedStaleness(c *C) { + t1, err := time.Parse(types.TimeFormat, "2015-09-21 09:53:04") + c.Assert(err, IsNil) + // time.Parse uses UTC time zone by default, we need to change it to Local manually. + t1 = t1.Local() + t1Str := t1.Format(types.TimeFormat) + t2 := time.Now() + t2Str := t2.Format(types.TimeFormat) + timeZone := time.Local + s.ctx.GetSessionVars().TimeZone = timeZone + tests := []struct { + leftTime interface{} + rightTime interface{} + injectSafeTS uint64 + isNull bool + expect time.Time + }{ + // SafeTS is in the range. + { + leftTime: t1Str, + rightTime: t2Str, + injectSafeTS: oracle.GoTimeToTS(t2.Add(-1 * time.Second)), + isNull: false, + expect: t2.Add(-1 * time.Second), + }, + // SafeTS is less than the left time. + { + leftTime: t1Str, + rightTime: t2Str, + injectSafeTS: oracle.GoTimeToTS(t1.Add(-1 * time.Second)), + isNull: false, + expect: t1, + }, + // SafeTS is bigger than the right time. + { + leftTime: t1Str, + rightTime: t2Str, + injectSafeTS: oracle.GoTimeToTS(t2.Add(time.Second)), + isNull: false, + expect: t2, + }, + // Wrong time order. + { + leftTime: t2Str, + rightTime: t1Str, + injectSafeTS: 0, + isNull: true, + expect: time.Time{}, + }, + } + + fc := funcs[ast.TiDBBoundedStaleness] + for _, test := range tests { + c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + fmt.Sprintf("return(%v)", test.injectSafeTS)), IsNil) + f, err := fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{types.NewDatum(test.leftTime), types.NewDatum(test.rightTime)})) + c.Assert(err, IsNil) + d, err := evalBuiltinFunc(f, chunk.Row{}) + c.Assert(err, IsNil) + if test.isNull { + c.Assert(d.IsNull(), IsTrue) + } else { + goTime, err := d.GetMysqlTime().GoTime(timeZone) + c.Assert(err, IsNil) + c.Assert(goTime.Format(types.TimeFormat), Equals, test.expect.Format(types.TimeFormat)) + } + resetStmtContext(s.ctx) + } + + // Test whether it's deterministic. + safeTime1 := t2.Add(-1 * time.Second) + safeTS1 := oracle.ComposeTS(safeTime1.Unix()*1000, 0) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + fmt.Sprintf("return(%v)", safeTS1)), IsNil) + f, err := fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{types.NewDatum(t1Str), types.NewDatum(t2Str)})) + c.Assert(err, IsNil) + d, err := evalBuiltinFunc(f, chunk.Row{}) + c.Assert(err, IsNil) + goTime, err := d.GetMysqlTime().GoTime(timeZone) + c.Assert(err, IsNil) + resultTime := goTime.Format(types.TimeFormat) + c.Assert(resultTime, Equals, safeTime1.Format(types.TimeFormat)) + // SafeTS updated. + safeTime2 := t2.Add(1 * time.Second) + safeTS2 := oracle.ComposeTS(safeTime2.Unix()*1000, 0) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + fmt.Sprintf("return(%v)", safeTS2)), IsNil) + f, err = fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{types.NewDatum(t1Str), types.NewDatum(t2Str)})) + c.Assert(err, IsNil) + d, err = evalBuiltinFunc(f, chunk.Row{}) + c.Assert(err, IsNil) + // Still safeTime1 + c.Assert(resultTime, Equals, safeTime1.Format(types.TimeFormat)) + resetStmtContext(s.ctx) + failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS") +} + func (s *testEvaluatorSuite) TestGetIntervalFromDecimal(c *C) { du := baseDateArithmitical{} diff --git a/expression/builtin_time_vec.go b/expression/builtin_time_vec.go index 94c1cd8b6f0c4..6f74a8f587e50 100644 --- a/expression/builtin_time_vec.go +++ b/expression/builtin_time_vec.go @@ -854,6 +854,70 @@ func (b *builtinTidbParseTsoSig) vecEvalTime(input *chunk.Chunk, result *chunk.C return nil } +func (b *builtinTiDBBoundedStalenessSig) vectorized() bool { + return true +} + +func (b *builtinTiDBBoundedStalenessSig) vecEvalTime(input *chunk.Chunk, result *chunk.Column) error { + n := input.NumRows() + buf0, err := b.bufAllocator.get(types.ETDatetime, n) + if err != nil { + return err + } + defer b.bufAllocator.put(buf0) + if err = b.args[0].VecEvalTime(b.ctx, input, buf0); err != nil { + return err + } + buf1, err := b.bufAllocator.get(types.ETDatetime, n) + if err != nil { + return err + } + defer b.bufAllocator.put(buf1) + if err = b.args[1].VecEvalTime(b.ctx, input, buf1); err != nil { + return err + } + args0 := buf0.Times() + args1 := buf1.Times() + timeZone := getTimeZone(b.ctx) + minSafeTime := getMinSafeTime(b.ctx, timeZone) + result.ResizeTime(n, false) + result.MergeNulls(buf0, buf1) + times := result.Times() + for i := 0; i < n; i++ { + if result.IsNull(i) { + continue + } + if invalidArg0, invalidArg1 := args0[i].InvalidZero(), args1[i].InvalidZero(); invalidArg0 || invalidArg1 { + if invalidArg0 { + err = handleInvalidTimeError(b.ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, args0[i].String())) + } + if invalidArg1 { + err = handleInvalidTimeError(b.ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, args1[i].String())) + } + if err != nil { + return err + } + result.SetNull(i, true) + continue + } + minTime, err := args0[i].GoTime(timeZone) + if err != nil { + return err + } + maxTime, err := args1[i].GoTime(timeZone) + if err != nil { + return err + } + if minTime.After(maxTime) { + result.SetNull(i, true) + continue + } + // Because the minimum unit of a TSO is millisecond, so we only need fsp to be 3. + times[i] = types.NewTime(types.FromGoTime(calAppropriateTime(minTime, maxTime, minSafeTime)), mysql.TypeDatetime, 3) + } + return nil +} + func (b *builtinFromDaysSig) vectorized() bool { return true } diff --git a/expression/builtin_time_vec_test.go b/expression/builtin_time_vec_test.go index 593cce162d7ff..a757b867b783c 100644 --- a/expression/builtin_time_vec_test.go +++ b/expression/builtin_time_vec_test.go @@ -519,6 +519,13 @@ var vecBuiltinTimeCases = map[string][]vecExprBenchCase{ geners: []dataGenerator{newRangeInt64Gener(0, math.MaxInt64)}, }, }, + // Todo: how to inject the safeTS for better testing. + ast.TiDBBoundedStaleness: { + { + retEvalType: types.ETDatetime, + childrenTypes: []types.EvalType{types.ETDatetime, types.ETDatetime}, + }, + }, ast.LastDay: { {retEvalType: types.ETDatetime, childrenTypes: []types.EvalType{types.ETDatetime}}, }, diff --git a/expression/helper.go b/expression/helper.go index c5f91dbd090b5..d9f1e22610b62 100644 --- a/expression/helper.go +++ b/expression/helper.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" driver "github.com/pingcap/tidb/types/parser_driver" @@ -155,5 +156,5 @@ func getStmtTimestamp(ctx sessionctx.Context) (time.Time, error) { return time.Unix(timestamp, 0), nil } stmtCtx := ctx.GetSessionVars().StmtCtx - return stmtCtx.GetNowTsCached(), nil + return stmtCtx.GetOrStoreStmtCache(stmtctx.StmtNowTsCacheKey, time.Now()).(time.Time), nil } diff --git a/expression/integration_test.go b/expression/integration_test.go index 80e39b76ce746..69142c01c3f35 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" @@ -2263,6 +2264,79 @@ func (s *testIntegrationSuite2) TestTimeBuiltin(c *C) { result = tk.MustQuery(`select tidb_parse_tso(-1)`) result.Check(testkit.Rows("")) + // for tidb_bounded_staleness + tk.MustExec("SET time_zone = '+00:00';") + t := time.Now().UTC() + ts := oracle.GoTimeToTS(t) + tidbBoundedStalenessTests := []struct { + sql string + injectSafeTS uint64 + expect string + }{ + { + sql: `select tidb_bounded_staleness(DATE_SUB(NOW(), INTERVAL 600 SECOND), DATE_ADD(NOW(), INTERVAL 600 SECOND))`, + injectSafeTS: ts, + expect: t.Format(types.TimeFSPFormat[:len(types.TimeFSPFormat)-3]), + }, + { + sql: `select tidb_bounded_staleness("2021-04-27 12:00:00.000", "2021-04-27 13:00:00.000")`, + injectSafeTS: func() uint64 { + t, err := time.Parse("2006-01-02 15:04:05.000", "2021-04-27 13:30:04.877") + c.Assert(err, IsNil) + return oracle.GoTimeToTS(t) + }(), + expect: "2021-04-27 13:00:00.000", + }, + { + sql: `select tidb_bounded_staleness("2021-04-27 12:00:00.000", "2021-04-27 13:00:00.000")`, + injectSafeTS: func() uint64 { + t, err := time.Parse("2006-01-02 15:04:05.000", "2021-04-27 11:30:04.877") + c.Assert(err, IsNil) + return oracle.GoTimeToTS(t) + }(), + expect: "2021-04-27 12:00:00.000", + }, + { + sql: `select tidb_bounded_staleness("2021-04-27 12:00:00.000", "2021-04-27 11:00:00.000")`, + injectSafeTS: 0, + expect: "", + }, + // Time is too small. + { + sql: `select tidb_bounded_staleness("0020-04-27 12:00:00.000", "2021-04-27 11:00:00.000")`, + injectSafeTS: 0, + expect: "1970-01-01 00:00:00.000", + }, + // Wrong value. + { + sql: `select tidb_bounded_staleness(1, 2)`, + injectSafeTS: 0, + expect: "", + }, + { + sql: `select tidb_bounded_staleness("invalid_time_1", "invalid_time_2")`, + injectSafeTS: 0, + expect: "", + }, + } + for _, test := range tidbBoundedStalenessTests { + c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + fmt.Sprintf("return(%v)", test.injectSafeTS)), IsNil) + result = tk.MustQuery(test.sql) + result.Check(testkit.Rows(test.expect)) + } + failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS") + // test whether tidb_bounded_staleness is deterministic + result = tk.MustQuery(`select tidb_bounded_staleness(NOW(), DATE_ADD(NOW(), INTERVAL 600 SECOND)), tidb_bounded_staleness(NOW(), DATE_ADD(NOW(), INTERVAL 600 SECOND))`) + c.Assert(result.Rows()[0], HasLen, 2) + c.Assert(result.Rows()[0][0], Equals, result.Rows()[0][1]) + preResult := result.Rows()[0][0] + time.Sleep(time.Second) + result = tk.MustQuery(`select tidb_bounded_staleness(NOW(), DATE_ADD(NOW(), INTERVAL 600 SECOND)), tidb_bounded_staleness(NOW(), DATE_ADD(NOW(), INTERVAL 600 SECOND))`) + c.Assert(result.Rows()[0], HasLen, 2) + c.Assert(result.Rows()[0][0], Equals, result.Rows()[0][1]) + c.Assert(result.Rows()[0][0], Not(Equals), preResult) + // fix issue 10308 result = tk.MustQuery("select time(\"- -\");") result.Check(testkit.Rows("00:00:00")) diff --git a/go.mod b/go.mod index fe8e08ae42e47..f82a8a187775f 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,7 @@ require ( github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 github.com/pingcap/kvproto v0.0.0-20210507054410-a8152f8a876c github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 - github.com/pingcap/parser v0.0.0-20210513020953-ae2c4497c07b + github.com/pingcap/parser v0.0.0-20210518053259-92fa6fe07eb6 github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible github.com/pingcap/tipb v0.0.0-20210422074242-57dd881b81b1 diff --git a/go.sum b/go.sum index 3ee71da011a54..14986c3d1f025 100644 --- a/go.sum +++ b/go.sum @@ -443,8 +443,8 @@ github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIf github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 h1:ERrF0fTuIOnwfGbt71Ji3DKbOEaP189tjym50u8gpC8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= -github.com/pingcap/parser v0.0.0-20210513020953-ae2c4497c07b h1:eLuDQ6eJCEKCbGwhGrkjzagwev1GJGU2Y2kFkAsBzV0= -github.com/pingcap/parser v0.0.0-20210513020953-ae2c4497c07b/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= +github.com/pingcap/parser v0.0.0-20210518053259-92fa6fe07eb6 h1:wsH3psMH5ksDowsN9VUE9ZqSrX6oF4AYQQfOunkvSfU= +github.com/pingcap/parser v0.0.0-20210518053259-92fa6fe07eb6/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI= github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 h1:A9KL9R+lWSVPH8IqUuH1QSTRJ5FGoY1bT2IcfPKsWD8= github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041UWP+NqYzrJ3fMgC/Hw9wnmQ/tUkp/JaHly8= diff --git a/kv/interface_mock_test.go b/kv/interface_mock_test.go index e1d41f1693088..5d85261bc2111 100644 --- a/kv/interface_mock_test.go +++ b/kv/interface_mock_test.go @@ -213,6 +213,10 @@ func (s *mockStorage) GetMemCache() MemManager { return nil } +func (s *mockStorage) GetMinSafeTS(txnScope string) uint64 { + return 0 +} + // newMockStorage creates a new mockStorage. func newMockStorage() Storage { return &mockStorage{} diff --git a/kv/kv.go b/kv/kv.go index 572fe104024bc..20b0fc84b7144 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -417,6 +417,8 @@ type Storage interface { ShowStatus(ctx context.Context, key string) (interface{}, error) // GetMemCache return memory manager of the storage. GetMemCache() MemManager + // GetMinSafeTS return the minimal SafeTS of the storage with given txnScope. + GetMinSafeTS(txnScope string) uint64 } // EtcdBackend is used for judging a storage is a real TiKV. diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 8df0001427173..d8a75aec48610 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -140,8 +140,6 @@ type StatementContext struct { RuntimeStatsColl *execdetails.RuntimeStatsColl TableIDs []int64 IndexNames []string - nowTs time.Time // use this variable for now/current_timestamp calculation/cache for one stmt - stmtTimeCached bool StmtType string OriginalSQL string digestMemo struct { @@ -164,6 +162,9 @@ type StatementContext struct { TblInfo2UnionScan map[*model.TableInfo]bool TaskID uint64 // unique ID for an execution of a statement TaskMapBakTS uint64 // counter for + + // stmtCache is used to store some statement-related values. + stmtCache map[StmtCacheKey]interface{} } // StmtHints are SessionVars related sql hints. @@ -195,19 +196,35 @@ func (sh *StmtHints) TaskMapNeedBackUp() bool { return sh.ForceNthPlan != -1 } -// GetNowTsCached getter for nowTs, if not set get now time and cache it -func (sc *StatementContext) GetNowTsCached() time.Time { - if !sc.stmtTimeCached { - now := time.Now() - sc.nowTs = now - sc.stmtTimeCached = true +// StmtCacheKey represents the key type in the StmtCache. +type StmtCacheKey int + +const ( + // StmtNowTsCacheKey is a variable for now/current_timestamp calculation/cache of one stmt. + StmtNowTsCacheKey StmtCacheKey = iota + // StmtSafeTSCacheKey is a variable for safeTS calculation/cache of one stmt. + StmtSafeTSCacheKey +) + +// GetOrStoreStmtCache gets the cached value of the given key if it exists, otherwise stores the value. +func (sc *StatementContext) GetOrStoreStmtCache(key StmtCacheKey, value interface{}) interface{} { + if sc.stmtCache == nil { + sc.stmtCache = make(map[StmtCacheKey]interface{}) + } + if _, ok := sc.stmtCache[key]; !ok { + sc.stmtCache[key] = value } - return sc.nowTs + return sc.stmtCache[key] +} + +// ResetInStmtCache resets the cache of given key. +func (sc *StatementContext) ResetInStmtCache(key StmtCacheKey) { + delete(sc.stmtCache, key) } -// ResetNowTs resetter for nowTs, clear cached time flag -func (sc *StatementContext) ResetNowTs() { - sc.stmtTimeCached = false +// ResetStmtCache resets all cached values. +func (sc *StatementContext) ResetStmtCache() { + sc.stmtCache = make(map[StmtCacheKey]interface{}) } // SQLDigest gets normalized and digest for provided sql. diff --git a/store/helper/helper.go b/store/helper/helper.go index e96ad4ae21851..49aa7cf2107e0 100644 --- a/store/helper/helper.go +++ b/store/helper/helper.go @@ -71,6 +71,7 @@ type Storage interface { SetTiKVClient(client tikv.Client) GetTiKVClient() tikv.Client Closed() <-chan struct{} + GetMinSafeTS(txnScope string) uint64 } // Helper is a middleware to get some information from tikv/pd. It can be used for TiDB's http api or mem table. diff --git a/store/mockstore/mockstorage/storage.go b/store/mockstore/mockstorage/storage.go index 36ded5e434817..6221ef855707d 100644 --- a/store/mockstore/mockstorage/storage.go +++ b/store/mockstore/mockstorage/storage.go @@ -99,6 +99,11 @@ func (s *mockStorage) CurrentVersion(txnScope string) (kv.Version, error) { return kv.NewVersion(ver), err } +// GetMinSafeTS return the minimal SafeTS of the storage with given txnScope. +func (s *mockStorage) GetMinSafeTS(txnScope string) uint64 { + return 0 +} + func newTiKVTxn(txn *tikv.KVTxn, err error) (kv.Transaction, error) { if err != nil { return nil, err diff --git a/store/tikv/kv.go b/store/tikv/kv.go index bbf8517a42a8c..8cec3dfbca964 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" tidbkv "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/config" tikverr "github.com/pingcap/tidb/store/tikv/error" @@ -84,7 +85,7 @@ type KVStore struct { safePoint uint64 spTime time.Time spMutex sync.RWMutex // this is used to update safePoint and spTime - closed chan struct{} // this is used to nofity when the store is closed + closed chan struct{} // this is used to notify when the store is closed // storeID -> safeTS, stored as map[uint64]uint64 // safeTS here will be used during the Stale Read process, @@ -358,6 +359,27 @@ func (s *KVStore) GetTiKVClient() (client Client) { return s.clientMu.client } +// GetMinSafeTS return the minimal safeTS of the storage with given txnScope. +func (s *KVStore) GetMinSafeTS(txnScope string) uint64 { + stores := make([]*Store, 0) + allStores := s.regionCache.getStoresByType(tikvrpc.TiKV) + if txnScope != oracle.GlobalTxnScope { + for _, store := range allStores { + if store.IsLabelsMatch([]*metapb.StoreLabel{ + { + Key: DCLabelKey, + Value: txnScope, + }, + }) { + stores = append(stores, store) + } + } + } else { + stores = allStores + } + return s.getMinSafeTSByStores(stores) +} + func (s *KVStore) getSafeTS(storeID uint64) uint64 { safeTS, ok := s.safeTSMap.Load(storeID) if !ok { diff --git a/util/mock/store.go b/util/mock/store.go index 804f3d6a3f2d3..7c86de4b3cb72 100644 --- a/util/mock/store.go +++ b/util/mock/store.go @@ -72,3 +72,8 @@ func (s *Store) GetMemCache() kv.MemManager { // ShowStatus implements kv.Storage interface. func (s *Store) ShowStatus(ctx context.Context, key string) (interface{}, error) { return nil, nil } + +// GetMinSafeTS implements kv.Storage interface. +func (s *Store) GetMinSafeTS(txnScope string) uint64 { + return 0 +}