diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index c68b8a5bfa511..ce5202ae58a75 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -256,65 +256,65 @@ func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) { defer tk.MustExec(`drop table if exists t`) tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:10'`) testcases := []struct { - name string - sql string - injectResolveTS uint64 - useResolveTS bool + name string + sql string + injectSafeTS uint64 + useSafeTS bool }{ { - name: "max 20 seconds ago, resolveTS 10 secs ago", + name: "max 20 seconds ago, safeTS 10 secs ago", sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:20'`, - injectResolveTS: func() uint64 { + injectSafeTS: func() uint64 { phy := time.Now().Add(-10*time.Second).Unix() * 1000 return oracle.ComposeTS(phy, 0) }(), - useResolveTS: true, + useSafeTS: true, }, { - name: "max 10 seconds ago, resolveTS 20 secs ago", + name: "max 10 seconds ago, safeTS 20 secs ago", sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:10'`, - injectResolveTS: func() uint64 { + injectSafeTS: func() uint64 { phy := time.Now().Add(-20*time.Second).Unix() * 1000 return oracle.ComposeTS(phy, 0) }(), - useResolveTS: false, + useSafeTS: false, }, { - name: "max 20 seconds ago, resolveTS 10 secs ago", + name: "max 20 seconds ago, safeTS 10 secs ago", sql: func() string { return fmt.Sprintf(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MIN READ TIMESTAMP '%v'`, time.Now().Add(-20*time.Second).Format("2006-01-02 15:04:05")) }(), - injectResolveTS: func() uint64 { + injectSafeTS: func() uint64 { phy := time.Now().Add(-10*time.Second).Unix() * 1000 return oracle.ComposeTS(phy, 0) }(), - useResolveTS: true, + useSafeTS: true, }, { - name: "max 10 seconds ago, resolveTS 20 secs ago", + name: "max 10 seconds ago, safeTS 20 secs ago", sql: func() string { return fmt.Sprintf(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MIN READ TIMESTAMP '%v'`, time.Now().Add(-10*time.Second).Format("2006-01-02 15:04:05")) }(), - injectResolveTS: func() uint64 { + injectSafeTS: func() uint64 { phy := time.Now().Add(-20*time.Second).Unix() * 1000 return oracle.ComposeTS(phy, 0) }(), - useResolveTS: false, + useSafeTS: false, }, } for _, testcase := range testcases { c.Log(testcase.name) - c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/injectResolveTS", - fmt.Sprintf("return(%v)", testcase.injectResolveTS)), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/injectSafeTS", + fmt.Sprintf("return(%v)", testcase.injectSafeTS)), IsNil) tk.MustExec(testcase.sql) - if testcase.useResolveTS { - c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Equals, testcase.injectResolveTS) + if testcase.useSafeTS { + c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Equals, testcase.injectSafeTS) } else { - c.Assert(oracle.CompareTS(tk.Se.GetSessionVars().TxnCtx.StartTS, testcase.injectResolveTS), Equals, 1) + c.Assert(oracle.CompareTS(tk.Se.GetSessionVars().TxnCtx.StartTS, testcase.injectSafeTS), Equals, 1) } tk.MustExec("commit") - failpoint.Disable("github.com/pingcap/tidb/store/tikv/injectResolveTS") + failpoint.Disable("github.com/pingcap/tidb/store/tikv/injectSafeTS") } } diff --git a/store/tikv/extract_start_ts_test.go b/store/tikv/extract_start_ts_test.go index 1422e387bfda5..a108a0f7e41cb 100644 --- a/store/tikv/extract_start_ts_test.go +++ b/store/tikv/extract_start_ts_test.go @@ -56,8 +56,8 @@ func (s *extractStartTsSuite) SetUpTest(c *C) { Value: "Some Random Label", }}, } - store.resolveTSMu.resolveTS[2] = 102 - store.resolveTSMu.resolveTS[3] = 101 + store.setSafeTS(2, 102) + store.setSafeTS(3, 101) s.store = store } @@ -105,8 +105,8 @@ func (s *extractStartTsSuite) TestExtractStartTs(c *C) { } func (s *extractStartTsSuite) TestMaxPrevSecFallback(c *C) { - s.store.resolveTSMu.resolveTS[2] = 0x8000000000000002 - s.store.resolveTSMu.resolveTS[3] = 0x8000000000000001 + s.store.setSafeTS(2, 0x8000000000000002) + s.store.setSafeTS(3, 0x8000000000000001) i := uint64(100) cases := []kv.TransactionOption{ diff --git a/store/tikv/kv.go b/store/tikv/kv.go index 5ddca52726a04..a487b0024e3e9 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -82,10 +82,10 @@ type KVStore struct { spMutex sync.RWMutex // this is used to update safePoint and spTime closed chan struct{} // this is used to nofity when the store is closed - resolveTSMu struct { - sync.RWMutex - resolveTS map[uint64]uint64 // storeID -> resolveTS - } + // storeID -> safeTS, stored as map[uint64]uint64 + // safeTS here will be used during the Stale Read process, + // it indicates the safe timestamp point that can be used to read consistent but may not the latest data. + safeTSMap sync.Map replicaReadSeed uint32 // this is used to load balance followers / learners when replica read is enabled } @@ -142,7 +142,6 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client replicaReadSeed: rand.Uint32(), } store.lockResolver = newLockResolver(store) - store.resolveTSMu.resolveTS = make(map[uint64]uint64) go store.runSafePointChecker() go store.safeTSUpdater() @@ -337,20 +336,30 @@ func (s *KVStore) GetTiKVClient() (client Client) { return s.client } -func (s *KVStore) getMinResolveTSByStores(stores []*Store) uint64 { - failpoint.Inject("injectResolveTS", func(val failpoint.Value) { +func (s *KVStore) getSafeTS(storeID uint64) uint64 { + safeTS, ok := s.safeTSMap.Load(storeID) + if !ok { + return 0 + } + return safeTS.(uint64) +} + +func (s *KVStore) setSafeTS(storeID, safeTS uint64) { + s.safeTSMap.Store(storeID, safeTS) +} + +func (s *KVStore) getMinSafeTSByStores(stores []*Store) uint64 { + failpoint.Inject("injectSafeTS", func(val failpoint.Value) { injectTS := val.(int) failpoint.Return(uint64(injectTS)) }) minSafeTS := uint64(math.MaxUint64) - s.resolveTSMu.RLock() - defer s.resolveTSMu.RUnlock() // when there is no store, return 0 in order to let minStartTS become startTS directly if len(stores) < 1 { return 0 } for _, store := range stores { - safeTS := s.resolveTSMu.resolveTS[store.storeID] + safeTS := s.getSafeTS(store.storeID) if safeTS < minSafeTS { minSafeTS = safeTS } @@ -368,12 +377,12 @@ func (s *KVStore) safeTSUpdater() { case <-s.Closed(): return case <-t.C: - s.updateResolveTS(ctx) + s.updateSafeTS(ctx) } } } -func (s *KVStore) updateResolveTS(ctx context.Context) { +func (s *KVStore) updateSafeTS(ctx context.Context) { stores := s.regionCache.getStoresByType(tikvrpc.TiKV) tikvClient := s.GetTiKVClient() wg := &sync.WaitGroup{} @@ -389,13 +398,11 @@ func (s *KVStore) updateResolveTS(ctx context.Context) { EndKey: []byte(""), }}), ReadTimeoutShort) if err != nil { - logutil.BgLogger().Debug("update resolveTS failed", zap.Error(err), zap.Uint64("store-id", storeID)) + logutil.BgLogger().Debug("update safeTS failed", zap.Error(err), zap.Uint64("store-id", storeID)) return } safeTSResp := resp.Resp.(*kvrpcpb.StoreSafeTSResponse) - s.resolveTSMu.Lock() - s.resolveTSMu.resolveTS[storeID] = safeTSResp.GetSafeTs() - s.resolveTSMu.Unlock() + s.setSafeTS(storeID, safeTSResp.GetSafeTs()) }(ctx, wg, storeID, storeAddr) } wg.Wait() diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 0ae2df13c12e9..1d678d010957e 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -112,12 +112,12 @@ func extractStartTs(store *KVStore, options kv.TransactionOption) (uint64, error } else { stores = allStores } - resolveTS := store.getMinResolveTSByStores(stores) + safeTS := store.getMinSafeTSByStores(stores) startTs = *options.MinStartTS - // If the resolveTS is larger than the minStartTS, we will use resolveTS as StartTS, otherwise we will use + // If the safeTS is larger than the minStartTS, we will use safeTS as StartTS, otherwise we will use // minStartTS directly. - if oracle.CompareTS(startTs, resolveTS) < 0 { - startTs = resolveTS + if oracle.CompareTS(startTs, safeTS) < 0 { + startTs = safeTS } } else if options.MaxPrevSec != nil { bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil)