Skip to content

Commit

Permalink
Merge branch 'master' into feat-ignore-lock-for-temp-table
Browse files Browse the repository at this point in the history
  • Loading branch information
mmyj authored May 11, 2021
2 parents 7f27c38 + 5460b5c commit 99099a1
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 46 deletions.
44 changes: 22 additions & 22 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,65 +256,65 @@ func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) {
defer tk.MustExec(`drop table if exists t`)
tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:10'`)
testcases := []struct {
name string
sql string
injectResolveTS uint64
useResolveTS bool
name string
sql string
injectSafeTS uint64
useSafeTS bool
}{
{
name: "max 20 seconds ago, resolveTS 10 secs ago",
name: "max 20 seconds ago, safeTS 10 secs ago",
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:20'`,
injectResolveTS: func() uint64 {
injectSafeTS: func() uint64 {
phy := time.Now().Add(-10*time.Second).Unix() * 1000
return oracle.ComposeTS(phy, 0)
}(),
useResolveTS: true,
useSafeTS: true,
},
{
name: "max 10 seconds ago, resolveTS 20 secs ago",
name: "max 10 seconds ago, safeTS 20 secs ago",
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:10'`,
injectResolveTS: func() uint64 {
injectSafeTS: func() uint64 {
phy := time.Now().Add(-20*time.Second).Unix() * 1000
return oracle.ComposeTS(phy, 0)
}(),
useResolveTS: false,
useSafeTS: false,
},
{
name: "max 20 seconds ago, resolveTS 10 secs ago",
name: "max 20 seconds ago, safeTS 10 secs ago",
sql: func() string {
return fmt.Sprintf(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MIN READ TIMESTAMP '%v'`,
time.Now().Add(-20*time.Second).Format("2006-01-02 15:04:05"))
}(),
injectResolveTS: func() uint64 {
injectSafeTS: func() uint64 {
phy := time.Now().Add(-10*time.Second).Unix() * 1000
return oracle.ComposeTS(phy, 0)
}(),
useResolveTS: true,
useSafeTS: true,
},
{
name: "max 10 seconds ago, resolveTS 20 secs ago",
name: "max 10 seconds ago, safeTS 20 secs ago",
sql: func() string {
return fmt.Sprintf(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MIN READ TIMESTAMP '%v'`,
time.Now().Add(-10*time.Second).Format("2006-01-02 15:04:05"))
}(),
injectResolveTS: func() uint64 {
injectSafeTS: func() uint64 {
phy := time.Now().Add(-20*time.Second).Unix() * 1000
return oracle.ComposeTS(phy, 0)
}(),
useResolveTS: false,
useSafeTS: false,
},
}
for _, testcase := range testcases {
c.Log(testcase.name)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/injectResolveTS",
fmt.Sprintf("return(%v)", testcase.injectResolveTS)), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/injectSafeTS",
fmt.Sprintf("return(%v)", testcase.injectSafeTS)), IsNil)
tk.MustExec(testcase.sql)
if testcase.useResolveTS {
c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Equals, testcase.injectResolveTS)
if testcase.useSafeTS {
c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Equals, testcase.injectSafeTS)
} else {
c.Assert(oracle.CompareTS(tk.Se.GetSessionVars().TxnCtx.StartTS, testcase.injectResolveTS), Equals, 1)
c.Assert(oracle.CompareTS(tk.Se.GetSessionVars().TxnCtx.StartTS, testcase.injectSafeTS), Equals, 1)
}
tk.MustExec("commit")
failpoint.Disable("github.com/pingcap/tidb/store/tikv/injectResolveTS")
failpoint.Disable("github.com/pingcap/tidb/store/tikv/injectSafeTS")
}
}
8 changes: 4 additions & 4 deletions store/tikv/extract_start_ts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func (s *extractStartTsSuite) SetUpTest(c *C) {
Value: "Some Random Label",
}},
}
store.resolveTSMu.resolveTS[2] = 102
store.resolveTSMu.resolveTS[3] = 101
store.setSafeTS(2, 102)
store.setSafeTS(3, 101)
s.store = store
}

Expand Down Expand Up @@ -105,8 +105,8 @@ func (s *extractStartTsSuite) TestExtractStartTs(c *C) {
}

func (s *extractStartTsSuite) TestMaxPrevSecFallback(c *C) {
s.store.resolveTSMu.resolveTS[2] = 0x8000000000000002
s.store.resolveTSMu.resolveTS[3] = 0x8000000000000001
s.store.setSafeTS(2, 0x8000000000000002)
s.store.setSafeTS(3, 0x8000000000000001)

i := uint64(100)
cases := []kv.TransactionOption{
Expand Down
39 changes: 23 additions & 16 deletions store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ type KVStore struct {
spMutex sync.RWMutex // this is used to update safePoint and spTime
closed chan struct{} // this is used to nofity when the store is closed

resolveTSMu struct {
sync.RWMutex
resolveTS map[uint64]uint64 // storeID -> resolveTS
}
// storeID -> safeTS, stored as map[uint64]uint64
// safeTS here will be used during the Stale Read process,
// it indicates the safe timestamp point that can be used to read consistent but may not the latest data.
safeTSMap sync.Map

replicaReadSeed uint32 // this is used to load balance followers / learners when replica read is enabled
}
Expand Down Expand Up @@ -142,7 +142,6 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client
replicaReadSeed: rand.Uint32(),
}
store.lockResolver = newLockResolver(store)
store.resolveTSMu.resolveTS = make(map[uint64]uint64)

go store.runSafePointChecker()
go store.safeTSUpdater()
Expand Down Expand Up @@ -337,20 +336,30 @@ func (s *KVStore) GetTiKVClient() (client Client) {
return s.client
}

func (s *KVStore) getMinResolveTSByStores(stores []*Store) uint64 {
failpoint.Inject("injectResolveTS", func(val failpoint.Value) {
func (s *KVStore) getSafeTS(storeID uint64) uint64 {
safeTS, ok := s.safeTSMap.Load(storeID)
if !ok {
return 0
}
return safeTS.(uint64)
}

func (s *KVStore) setSafeTS(storeID, safeTS uint64) {
s.safeTSMap.Store(storeID, safeTS)
}

func (s *KVStore) getMinSafeTSByStores(stores []*Store) uint64 {
failpoint.Inject("injectSafeTS", func(val failpoint.Value) {
injectTS := val.(int)
failpoint.Return(uint64(injectTS))
})
minSafeTS := uint64(math.MaxUint64)
s.resolveTSMu.RLock()
defer s.resolveTSMu.RUnlock()
// when there is no store, return 0 in order to let minStartTS become startTS directly
if len(stores) < 1 {
return 0
}
for _, store := range stores {
safeTS := s.resolveTSMu.resolveTS[store.storeID]
safeTS := s.getSafeTS(store.storeID)
if safeTS < minSafeTS {
minSafeTS = safeTS
}
Expand All @@ -368,12 +377,12 @@ func (s *KVStore) safeTSUpdater() {
case <-s.Closed():
return
case <-t.C:
s.updateResolveTS(ctx)
s.updateSafeTS(ctx)
}
}
}

func (s *KVStore) updateResolveTS(ctx context.Context) {
func (s *KVStore) updateSafeTS(ctx context.Context) {
stores := s.regionCache.getStoresByType(tikvrpc.TiKV)
tikvClient := s.GetTiKVClient()
wg := &sync.WaitGroup{}
Expand All @@ -389,13 +398,11 @@ func (s *KVStore) updateResolveTS(ctx context.Context) {
EndKey: []byte(""),
}}), ReadTimeoutShort)
if err != nil {
logutil.BgLogger().Debug("update resolveTS failed", zap.Error(err), zap.Uint64("store-id", storeID))
logutil.BgLogger().Debug("update safeTS failed", zap.Error(err), zap.Uint64("store-id", storeID))
return
}
safeTSResp := resp.Resp.(*kvrpcpb.StoreSafeTSResponse)
s.resolveTSMu.Lock()
s.resolveTSMu.resolveTS[storeID] = safeTSResp.GetSafeTs()
s.resolveTSMu.Unlock()
s.setSafeTS(storeID, safeTSResp.GetSafeTs())
}(ctx, wg, storeID, storeAddr)
}
wg.Wait()
Expand Down
8 changes: 4 additions & 4 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ func extractStartTs(store *KVStore, options kv.TransactionOption) (uint64, error
} else {
stores = allStores
}
resolveTS := store.getMinResolveTSByStores(stores)
safeTS := store.getMinSafeTSByStores(stores)
startTs = *options.MinStartTS
// If the resolveTS is larger than the minStartTS, we will use resolveTS as StartTS, otherwise we will use
// If the safeTS is larger than the minStartTS, we will use safeTS as StartTS, otherwise we will use
// minStartTS directly.
if oracle.CompareTS(startTs, resolveTS) < 0 {
startTs = resolveTS
if oracle.CompareTS(startTs, safeTS) < 0 {
startTs = safeTS
}
} else if options.MaxPrevSec != nil {
bo := NewBackofferWithVars(context.Background(), tsoMaxBackoff, nil)
Expand Down

0 comments on commit 99099a1

Please sign in to comment.