diff --git a/executor/split.go b/executor/split.go index a560d3cb0d372..a36a048bbde50 100755 --- a/executor/split.go +++ b/executor/split.go @@ -724,7 +724,7 @@ func getRegionMeta(tikvStore tikv.Storage, regionMetas []*tikv.Region, uniqueReg uniqueRegionMap[r.GetID()] = struct{}{} regions = append(regions, regionMeta{ region: r.GetMeta(), - leaderID: r.GetLeaderID(), + leaderID: r.GetLeaderPeerID(), storeID: r.GetLeaderStoreID(), }) } diff --git a/store/mockstore/mocktikv/cluster.go b/store/mockstore/mocktikv/cluster.go index 1f53e577ae96f..cfc1f09e5405a 100644 --- a/store/mockstore/mocktikv/cluster.go +++ b/store/mockstore/mocktikv/cluster.go @@ -210,10 +210,10 @@ func (c *Cluster) RemoveStore(storeID uint64) { } // UpdateStoreAddr updates store address for cluster. -func (c *Cluster) UpdateStoreAddr(storeID uint64, addr string) { +func (c *Cluster) UpdateStoreAddr(storeID uint64, addr string, labels ...*metapb.StoreLabel) { c.Lock() defer c.Unlock() - c.stores[storeID] = newStore(storeID, addr) + c.stores[storeID] = newStore(storeID, addr, labels...) } // GetRegion returns a Region's meta and leader ID. @@ -645,11 +645,12 @@ type Store struct { tokenCount atomic.Int64 } -func newStore(storeID uint64, addr string) *Store { +func newStore(storeID uint64, addr string, labels ...*metapb.StoreLabel) *Store { return &Store{ meta: &metapb.Store{ Id: storeID, Address: addr, + Labels: labels, }, } } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 6b3bfe8cd2133..eabab4071a8ac 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -78,43 +78,82 @@ type Region struct { lastAccess int64 // last region access time, see checkRegionCacheTTL } +// AccessMode uses to index stores for different region cache access requirements. +type AccessMode int + +const ( + // TiKvOnly indicates stores list that use for TiKv access(include both leader request and follower read). + TiKvOnly AccessMode = iota + // TiFlashOnly indicates stores list that use for TiFlash request. + TiFlashOnly + // NumAccessMode reserved to keep max access mode value. + NumAccessMode +) + +func (a AccessMode) String() string { + switch a { + case TiKvOnly: + return "TiKvOnly" + case TiFlashOnly: + return "TiFlashOnly" + default: + return fmt.Sprintf("%d", a) + } +} + +// AccessIndex represent the index for accessIndex array +type AccessIndex int + // RegionStore represents region stores info // it will be store as unsafe.Pointer and be load at once type RegionStore struct { - workTiKVIdx int32 // point to current work peer in meta.Peers and work store in stores(same idx) for tikv peer - workTiFlashIdx int32 // point to current work peer in meta.Peers and work store in stores(same idx) for tiflash peer - stores []*Store // stores in this region - storeEpochs []uint32 // snapshots of store's epoch, need reload when `storeEpochs[curr] != stores[cur].fail` + workTiKVIdx AccessIndex // point to current work peer in meta.Peers and work store in stores(same idx) for tikv peer + workTiFlashIdx int32 // point to current work peer in meta.Peers and work store in stores(same idx) for tiflash peer + stores []*Store // stores in this region + storeEpochs []uint32 // snapshots of store's epoch, need reload when `storeEpochs[curr] != stores[cur].fail` + accessIndex [NumAccessMode][]int // AccessMode => idx in stores +} + +func (r *RegionStore) accessStore(mode AccessMode, idx AccessIndex) (int, *Store) { + sidx := r.accessIndex[mode][idx] + return sidx, r.stores[sidx] +} + +func (r *RegionStore) accessStoreNum(mode AccessMode) int { + return len(r.accessIndex[mode]) } // clone clones region store struct. func (r *RegionStore) clone() *RegionStore { storeEpochs := make([]uint32, len(r.stores)) - copy(storeEpochs, r.storeEpochs) - return &RegionStore{ + rs := &RegionStore{ workTiFlashIdx: r.workTiFlashIdx, workTiKVIdx: r.workTiKVIdx, stores: r.stores, storeEpochs: storeEpochs, } + copy(storeEpochs, r.storeEpochs) + for i := 0; i < int(NumAccessMode); i++ { + rs.accessIndex[i] = make([]int, len(r.accessIndex[i])) + copy(rs.accessIndex[i], r.accessIndex[i]) + } + return rs } // return next follower store's index -func (r *RegionStore) follower(seed uint32) int32 { - l := uint32(len(r.stores)) +func (r *RegionStore) follower(seed uint32) AccessIndex { + l := uint32(r.accessStoreNum(TiKvOnly)) if l <= 1 { return r.workTiKVIdx } for retry := l - 1; retry > 0; retry-- { - followerIdx := int32(seed % (l - 1)) + followerIdx := AccessIndex(seed % (l - 1)) if followerIdx >= r.workTiKVIdx { followerIdx++ } - if r.stores[followerIdx].storeType != kv.TiKV { - continue - } - if r.storeEpochs[followerIdx] == atomic.LoadUint32(&r.stores[followerIdx].epoch) { + storeIdx, s := r.accessStore(TiKvOnly, followerIdx) + if r.storeEpochs[storeIdx] == atomic.LoadUint32(&s.epoch) { return followerIdx } seed++ @@ -123,16 +162,14 @@ func (r *RegionStore) follower(seed uint32) int32 { } // return next leader or follower store's index -func (r *RegionStore) peer(seed uint32) int32 { - candidates := make([]int32, 0, len(r.stores)) - for i := 0; i < len(r.stores); i++ { - if r.stores[i].storeType != kv.TiKV { +func (r *RegionStore) kvPeer(seed uint32) AccessIndex { + candidates := make([]AccessIndex, 0, r.accessStoreNum(TiKvOnly)) + for i := 0; i < r.accessStoreNum(TiKvOnly); i++ { + storeIdx, s := r.accessStore(TiKvOnly, AccessIndex(i)) + if r.storeEpochs[storeIdx] != atomic.LoadUint32(&s.epoch) { continue } - if r.storeEpochs[i] != atomic.LoadUint32(&r.stores[i].epoch) { - continue - } - candidates = append(candidates, int32(i)) + candidates = append(candidates, AccessIndex(i)) } if len(candidates) == 0 { @@ -142,7 +179,7 @@ func (r *RegionStore) peer(seed uint32) int32 { } // init initializes region after constructed. -func (r *Region) init(c *RegionCache) { +func (r *Region) init(c *RegionCache) error { // region store pull used store from global store map // to avoid acquire storeMu in later access. rs := &RegionStore{ @@ -158,6 +195,16 @@ func (r *Region) init(c *RegionCache) { if !exists { store = c.getStoreByStoreID(p.StoreId) } + _, err := store.initResolve(NewNoopBackoff(context.Background()), c) + if err != nil { + return err + } + switch store.storeType { + case kv.TiKV: + rs.accessIndex[TiKvOnly] = append(rs.accessIndex[TiKvOnly], len(rs.stores)) + case kv.TiFlash: + rs.accessIndex[TiFlashOnly] = append(rs.accessIndex[TiFlashOnly], len(rs.stores)) + } rs.stores = append(rs.stores, store) rs.storeEpochs = append(rs.storeEpochs, atomic.LoadUint32(&store.epoch)) } @@ -165,6 +212,7 @@ func (r *Region) init(c *RegionCache) { // mark region has been init accessed. r.lastAccess = time.Now().Unix() + return nil } func (r *Region) getStore() (store *RegionStore) { @@ -290,12 +338,13 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store) { // RPCContext contains data that is needed to send RPC to a region. type RPCContext struct { - Region RegionVerID - Meta *metapb.Region - Peer *metapb.Peer - PeerIdx int - Store *Store - Addr string + Region RegionVerID + Meta *metapb.Region + Peer *metapb.Peer + AccessIdx AccessIndex + Store *Store + Addr string + AccessMode AccessMode } // GetStoreID returns StoreID. @@ -307,8 +356,12 @@ func (c *RPCContext) GetStoreID() uint64 { } func (c *RPCContext) String() string { - return fmt.Sprintf("region ID: %d, meta: %s, peer: %s, addr: %s, idx: %d", - c.Region.GetID(), c.Meta, c.Peer, c.Addr, c.PeerIdx) + var runStoreType string + if c.Store != nil { + runStoreType = c.Store.storeType.Name() + } + return fmt.Sprintf("region ID: %d, meta: %s, peer: %s, addr: %s, idx: %d, reqStoreType: %s, runStoreType: %s", + c.Region.GetID(), c.Meta, c.Peer, c.Addr, c.AccessIdx, c.AccessMode, runStoreType) } // GetTiKVRPCContext returns RPCContext for a region. If it returns nil, the region @@ -326,16 +379,19 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe } regionStore := cachedRegion.getStore() - var store *Store - var peer *metapb.Peer - var storeIdx int + var ( + store *Store + peer *metapb.Peer + storeIdx int + accessIdx AccessIndex + ) switch replicaRead { case kv.ReplicaReadFollower: - store, peer, storeIdx = cachedRegion.FollowerStorePeer(regionStore, followerStoreSeed) + store, peer, accessIdx, storeIdx = cachedRegion.FollowerStorePeer(regionStore, followerStoreSeed) case kv.ReplicaReadMixed: - store, peer, storeIdx = cachedRegion.AnyStorePeer(regionStore, followerStoreSeed) + store, peer, accessIdx, storeIdx = cachedRegion.AnyStorePeer(regionStore, followerStoreSeed) default: - store, peer, storeIdx = cachedRegion.WorkStorePeer(regionStore) + store, peer, accessIdx, storeIdx = cachedRegion.WorkStorePeer(regionStore) } addr, err := c.getStoreAddr(bo, cachedRegion, store, storeIdx) if err != nil { @@ -363,12 +419,13 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe } return &RPCContext{ - Region: id, - Meta: cachedRegion.meta, - Peer: peer, - PeerIdx: storeIdx, - Store: store, - Addr: addr, + Region: id, + Meta: cachedRegion.meta, + Peer: peer, + AccessIdx: accessIdx, + Store: store, + Addr: addr, + AccessMode: TiKvOnly, }, nil } @@ -389,9 +446,9 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID) (*RPCC // sIdx is for load balance of TiFlash store. sIdx := int(atomic.AddInt32(®ionStore.workTiFlashIdx, 1)) - for i := range regionStore.stores { - storeIdx := (sIdx + i) % len(regionStore.stores) - store := regionStore.stores[storeIdx] + for i := 0; i < regionStore.accessStoreNum(TiFlashOnly); i++ { + accessIdx := AccessIndex((sIdx + i) % regionStore.accessStoreNum(TiFlashOnly)) + storeIdx, store := regionStore.accessStore(TiFlashOnly, accessIdx) addr, err := c.getStoreAddr(bo, cachedRegion, store, storeIdx) if err != nil { return nil, err @@ -403,10 +460,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID) (*RPCC if store.getResolveState() == needCheck { store.reResolve(c) } - if store.storeType != kv.TiFlash { - continue - } - atomic.StoreInt32(®ionStore.workTiFlashIdx, int32(storeIdx)) + atomic.StoreInt32(®ionStore.workTiFlashIdx, int32(accessIdx)) peer := cachedRegion.meta.Peers[storeIdx] storeFailEpoch := atomic.LoadUint32(&store.epoch) if storeFailEpoch != regionStore.storeEpochs[storeIdx] { @@ -417,12 +471,13 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID) (*RPCC return nil, nil } return &RPCContext{ - Region: id, - Meta: cachedRegion.meta, - Peer: peer, - PeerIdx: storeIdx, - Store: store, - Addr: addr, + Region: id, + Meta: cachedRegion.meta, + Peer: peer, + AccessIdx: accessIdx, + Store: store, + Addr: addr, + AccessMode: TiFlashOnly, }, nil } @@ -520,8 +575,8 @@ func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload if r != nil { rs := r.getStore() if err != nil { - s := rs.stores[ctx.PeerIdx] - followerRead := int(rs.workTiKVIdx) != ctx.PeerIdx + storeIdx, s := rs.accessStore(ctx.AccessMode, ctx.AccessIdx) + followerRead := rs.workTiKVIdx != ctx.AccessIdx // send fail but store is reachable, keep retry current peer for replica leader request. // but we still need switch peer for follower-read or learner-read(i.e. tiflash) @@ -530,7 +585,7 @@ func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload } // invalidate regions in store. - epoch := rs.storeEpochs[ctx.PeerIdx] + epoch := rs.storeEpochs[storeIdx] if atomic.CompareAndSwapUint32(&s.epoch, epoch, epoch+1) { logutil.BgLogger().Info("mark store's regions need be refill", zap.String("store", s.addr)) tikvRegionCacheCounterWithInvalidateStoreRegionsOK.Inc() @@ -541,10 +596,10 @@ func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload } // try next peer to found new leader. - if ctx.Store.storeType == kv.TiKV { - rs.switchNextPeer(r, ctx.PeerIdx) + if ctx.AccessMode == TiKvOnly { + rs.switchNextTiKVPeer(r, ctx.AccessIdx) } else { - rs.switchNextFlashPeer(r, ctx.PeerIdx) + rs.switchNextFlashPeer(r, ctx.AccessIdx) } // force reload region when retry all known peers in region. @@ -746,7 +801,7 @@ func (c *RegionCache) InvalidateCachedRegion(id RegionVerID) { } // UpdateLeader update some region cache with newer leader info. -func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64, currentPeerIdx int) { +func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64, currentPeerIdx AccessIndex) { r := c.getCachedRegionWithRLock(regionID) if r == nil { logutil.BgLogger().Debug("regionCache: cannot find region when updating leader", @@ -757,23 +812,23 @@ func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64, c if leaderStoreID == 0 { rs := r.getStore() - rs.switchNextPeer(r, currentPeerIdx) + rs.switchNextTiKVPeer(r, currentPeerIdx) logutil.BgLogger().Info("switch region peer to next due to NotLeader with NULL leader", - zap.Int("currIdx", currentPeerIdx), + zap.Int("currIdx", int(currentPeerIdx)), zap.Uint64("regionID", regionID.GetID())) return } - if !c.switchToPeer(r, leaderStoreID) { + if !c.switchWorkLeaderToPeer(r, leaderStoreID) { logutil.BgLogger().Info("invalidate region cache due to cannot find peer when updating leader", zap.Uint64("regionID", regionID.GetID()), - zap.Int("currIdx", currentPeerIdx), + zap.Int("currIdx", int(currentPeerIdx)), zap.Uint64("leaderStoreID", leaderStoreID)) r.invalidate() } else { logutil.BgLogger().Info("switch region leader to specific leader due to kv return NotLeader", zap.Uint64("regionID", regionID.GetID()), - zap.Int("currIdx", currentPeerIdx), + zap.Int("currIdx", int(currentPeerIdx)), zap.Uint64("leaderStoreID", leaderStoreID)) } } @@ -917,9 +972,12 @@ func (c *RegionCache) loadRegion(bo *Backoffer, key []byte, isEndKey bool) (*Reg continue } region := &Region{meta: reg.Meta} - region.init(c) + err = region.init(c) + if err != nil { + return nil, err + } if reg.Leader != nil { - c.switchToPeer(region, reg.Leader.StoreId) + c.switchWorkLeaderToPeer(region, reg.Leader.StoreId) } return region, nil } @@ -953,9 +1011,12 @@ func (c *RegionCache) loadRegionByID(bo *Backoffer, regionID uint64) (*Region, e return nil, errors.New("receive Region with no available peer") } region := &Region{meta: reg.Meta} - region.init(c) + err = region.init(c) + if err != nil { + return nil, err + } if reg.Leader != nil { - c.switchToPeer(region, reg.Leader.GetStoreId()) + c.switchWorkLeaderToPeer(region, reg.Leader.GetStoreId()) } return region, nil } @@ -998,11 +1059,14 @@ func (c *RegionCache) scanRegions(bo *Backoffer, startKey, endKey []byte, limit regions := make([]*Region, 0, len(metas)) for i, meta := range metas { region := &Region{meta: meta} - region.init(c) + err := region.init(c) + if err != nil { + return nil, err + } leader := leaders[i] // Leader id = 0 indicates no leader. if leader.GetId() != 0 { - c.switchToPeer(region, leader.GetStoreId()) + c.switchWorkLeaderToPeer(region, leader.GetStoreId()) regions = append(regions, region) } } @@ -1103,8 +1167,17 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, curr } } region := &Region{meta: meta} - region.init(c) - c.switchToPeer(region, ctx.Store.storeID) + err := region.init(c) + if err != nil { + return err + } + var initLeader uint64 + if ctx.Store.storeType == kv.TiFlash { + initLeader = region.findElectableStoreID() + } else { + initLeader = ctx.Store.storeID + } + c.switchWorkLeaderToPeer(region, initLeader) c.insertRegionToCache(region) if ctx.Region == region.VerID() { needInvalidateOld = false @@ -1157,13 +1230,14 @@ func (r *Region) GetMeta() *metapb.Region { return proto.Clone(r.meta).(*metapb.Region) } -// GetLeaderID returns leader region ID. -func (r *Region) GetLeaderID() uint64 { +// GetLeaderPeerID returns leader peer ID. +func (r *Region) GetLeaderPeerID() uint64 { store := r.getStore() if int(store.workTiKVIdx) >= len(r.meta.Peers) { return 0 } - return r.meta.Peers[int(r.getStore().workTiKVIdx)].Id + storeIdx, _ := store.accessStore(TiKvOnly, store.workTiKVIdx) + return r.meta.Peers[storeIdx].Id } // GetLeaderStoreID returns the store ID of the leader region. @@ -1172,29 +1246,30 @@ func (r *Region) GetLeaderStoreID() uint64 { if int(store.workTiKVIdx) >= len(r.meta.Peers) { return 0 } - return r.meta.Peers[int(r.getStore().workTiKVIdx)].StoreId + storeIdx, _ := store.accessStore(TiKvOnly, store.workTiKVIdx) + return r.meta.Peers[storeIdx].StoreId } -func (r *Region) getStorePeer(rs *RegionStore, pidx int32) (store *Store, peer *metapb.Peer, idx int) { - store = rs.stores[pidx] - peer = r.meta.Peers[pidx] - idx = int(pidx) +func (r *Region) getKvStorePeer(rs *RegionStore, aidx AccessIndex) (store *Store, peer *metapb.Peer, accessIdx AccessIndex, storeIdx int) { + storeIdx, store = rs.accessStore(TiKvOnly, aidx) + peer = r.meta.Peers[storeIdx] + accessIdx = aidx return } // WorkStorePeer returns current work store with work peer. -func (r *Region) WorkStorePeer(rs *RegionStore) (store *Store, peer *metapb.Peer, idx int) { - return r.getStorePeer(rs, rs.workTiKVIdx) +func (r *Region) WorkStorePeer(rs *RegionStore) (store *Store, peer *metapb.Peer, accessIdx AccessIndex, storeIdx int) { + return r.getKvStorePeer(rs, rs.workTiKVIdx) } // FollowerStorePeer returns a follower store with follower peer. -func (r *Region) FollowerStorePeer(rs *RegionStore, followerStoreSeed uint32) (*Store, *metapb.Peer, int) { - return r.getStorePeer(rs, rs.follower(followerStoreSeed)) +func (r *Region) FollowerStorePeer(rs *RegionStore, followerStoreSeed uint32) (store *Store, peer *metapb.Peer, accessIdx AccessIndex, storeIdx int) { + return r.getKvStorePeer(rs, rs.follower(followerStoreSeed)) } // AnyStorePeer returns a leader or follower store with the associated peer. -func (r *Region) AnyStorePeer(rs *RegionStore, followerStoreSeed uint32) (*Store, *metapb.Peer, int) { - return r.getStorePeer(rs, rs.peer(followerStoreSeed)) +func (r *Region) AnyStorePeer(rs *RegionStore, followerStoreSeed uint32) (store *Store, peer *metapb.Peer, accessIdx AccessIndex, storeIdx int) { + return r.getKvStorePeer(rs, rs.kvPeer(followerStoreSeed)) } // RegionVerID is a unique ID that can identify a Region at a specific version. @@ -1238,35 +1313,59 @@ func (r *Region) EndKey() []byte { return r.meta.EndKey } -// switchToPeer switches current store to the one on specific store. It returns +// switchWorkLeaderToPeer switches current store to the one on specific store. It returns // false if no peer matches the storeID. -func (c *RegionCache) switchToPeer(r *Region, targetStoreID uint64) (found bool) { - leaderIdx, found := c.getPeerStoreIndex(r, targetStoreID) - c.switchWorkIdx(r, leaderIdx) +func (c *RegionCache) switchWorkLeaderToPeer(r *Region, targetStoreID uint64) (found bool) { + globalStoreIdx, found := c.getPeerStoreIndex(r, targetStoreID) +retry: + // switch to new leader. + oldRegionStore := r.getStore() + var leaderIdx AccessIndex + for i, gIdx := range oldRegionStore.accessIndex[TiKvOnly] { + if gIdx == globalStoreIdx { + leaderIdx = AccessIndex(i) + } + } + if oldRegionStore.workTiKVIdx == leaderIdx { + return + } + newRegionStore := oldRegionStore.clone() + newRegionStore.workTiKVIdx = leaderIdx + if !r.compareAndSwapStore(oldRegionStore, newRegionStore) { + goto retry + } return } -func (r *RegionStore) switchNextFlashPeer(rr *Region, currentPeerIdx int) { - nextIdx := (currentPeerIdx + 1) % len(r.stores) +func (r *RegionStore) switchNextFlashPeer(rr *Region, currentPeerIdx AccessIndex) { + nextIdx := (currentPeerIdx + 1) % AccessIndex(r.accessStoreNum(TiFlashOnly)) newRegionStore := r.clone() newRegionStore.workTiFlashIdx = int32(nextIdx) rr.compareAndSwapStore(r, newRegionStore) } -func (r *RegionStore) switchNextPeer(rr *Region, currentPeerIdx int) { - if int(r.workTiKVIdx) != currentPeerIdx { +func (r *RegionStore) switchNextTiKVPeer(rr *Region, currentPeerIdx AccessIndex) { + if r.workTiKVIdx != currentPeerIdx { return } - - nextIdx := (currentPeerIdx + 1) % len(r.stores) - for r.stores[nextIdx].storeType == kv.TiFlash { - nextIdx = (nextIdx + 1) % len(r.stores) - } + nextIdx := (currentPeerIdx + 1) % AccessIndex(r.accessStoreNum(TiKvOnly)) newRegionStore := r.clone() - newRegionStore.workTiKVIdx = int32(nextIdx) + newRegionStore.workTiKVIdx = nextIdx rr.compareAndSwapStore(r, newRegionStore) } +func (r *Region) findElectableStoreID() uint64 { + if len(r.meta.Peers) == 0 { + return 0 + } + for _, p := range r.meta.Peers { + if !p.IsLearner { + return p.StoreId + } + } + return 0 +} + func (c *RegionCache) getPeerStoreIndex(r *Region, id uint64) (idx int, found bool) { if len(r.meta.Peers) == 0 { return @@ -1281,20 +1380,6 @@ func (c *RegionCache) getPeerStoreIndex(r *Region, id uint64) (idx int, found bo return } -func (c *RegionCache) switchWorkIdx(r *Region, leaderIdx int) { -retry: - // switch to new leader. - oldRegionStore := r.getStore() - if oldRegionStore.workTiKVIdx == int32(leaderIdx) { - return - } - newRegionStore := oldRegionStore.clone() - newRegionStore.workTiKVIdx = int32(leaderIdx) - if !r.compareAndSwapStore(oldRegionStore, newRegionStore) { - goto retry - } -} - // Contains checks whether the key is in the region, for the maximum region endKey is empty. // startKey <= key < endKey. func (r *Region) Contains(key []byte) bool { diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index 06cdca6d56cdb..facfdc6ce3d92 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -129,7 +129,7 @@ func (s *testRegionCacheSuite) TestSimple(c *C) { c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed), Equals, s.storeAddr(s.store2)) s.checkCache(c, 1) c.Assert(r.GetMeta(), DeepEquals, r.meta) - c.Assert(r.GetLeaderID(), Equals, r.meta.Peers[r.getStore().workTiKVIdx].Id) + c.Assert(r.GetLeaderPeerID(), Equals, r.meta.Peers[r.getStore().workTiKVIdx].Id) s.cache.mu.regions[r.VerID()].lastAccess = 0 r = s.cache.searchCachedRegion([]byte("a"), true) c.Assert(r, IsNil) @@ -259,16 +259,26 @@ func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) { c.Assert(err, IsNil) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) + loc, err = s.cache.LocateKey(s.bo, []byte("a")) + c.Assert(err, IsNil) + // return resolved store2 address and send fail + ctx, err := s.cache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, seed) + c.Assert(err, IsNil) + c.Assert(ctx.Addr, Equals, "store2") + s.cache.OnSendFail(NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail")) + s.cache.checkAndResolve(nil) + s.cache.UpdateLeader(loc.Region, s.store2, 0) addr := s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0) c.Assert(addr, Equals, "") - s.getRegion(c, []byte("a")) - // pd-server should return the new leader. - c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(store3)) + addr = s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0) + c.Assert(addr, Equals, s.storeAddr(store3)) + addr = s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed) - c.Assert(addr == s.storeAddr(s.store1) || len(addr) == 0, IsTrue) addr2 := s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed+1) - c.Assert(addr2 == s.storeAddr(s.store1) || len(addr2) == 0, IsTrue) - c.Assert((len(addr2) == 0 && len(addr) == 0) || addr != addr2, IsTrue) + c.Assert(addr, Not(Equals), s.storeAddr(store3)) + c.Assert(addr2, Not(Equals), s.storeAddr(store3)) + c.Assert(addr, Not(Equals), "") + c.Assert(addr2, Not(Equals), "") } func (s *testRegionCacheSuite) TestSendFailedButLeaderNotChange(c *C) { @@ -328,7 +338,7 @@ func (s *testRegionCacheSuite) TestSendFailedButLeaderNotChange(c *C) { c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) // access 1 it will return NotLeader, leader back to 2 again - s.cache.UpdateLeader(loc.Region, s.store2, ctx.PeerIdx) + s.cache.UpdateLeader(loc.Region, s.store2, ctx.AccessIdx) ctx, err = s.cache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer2) @@ -409,7 +419,7 @@ func (s *testRegionCacheSuite) TestSendFailedInHibernateRegion(c *C) { c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) // access 2, it's in hibernate and return 0 leader, so switch to 3 - s.cache.UpdateLeader(loc.Region, 0, ctx.PeerIdx) + s.cache.UpdateLeader(loc.Region, 0, ctx.AccessIdx) ctx, err = s.cache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, peer3) @@ -434,7 +444,7 @@ func (s *testRegionCacheSuite) TestSendFailedInHibernateRegion(c *C) { // again peer back to 1 ctx, err = s.cache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) - s.cache.UpdateLeader(loc.Region, 0, ctx.PeerIdx) + s.cache.UpdateLeader(loc.Region, 0, ctx.AccessIdx) ctx, err = s.cache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer1) @@ -563,7 +573,7 @@ func (s *testRegionCacheSuite) TestSendFailedInMultipleNode(c *C) { c.Assert(ctxFollower1.Peer.Id, Equals, ctxFollower2.Peer.Id) // 3 can be access, so switch to 1 - s.cache.UpdateLeader(loc.Region, s.store1, ctx.PeerIdx) + s.cache.UpdateLeader(loc.Region, s.store1, ctx.AccessIdx) ctx, err = s.cache.GetTiKVRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer1) @@ -675,6 +685,39 @@ func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV(c *C) { c.Assert(len(bo.errors), Equals, 2) } +func (s *testRegionCacheSuite) TestRegionEpochOnTiFlash(c *C) { + // add store3 as tiflash + store3 := s.cluster.AllocID() + peer3 := s.cluster.AllocID() + s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store1), &metapb.StoreLabel{Key: "engine", Value: "tiflash"}) + s.cluster.AddStore(store3, s.storeAddr(store3)) + s.cluster.AddPeer(s.region1, store3, peer3) + s.cluster.ChangeLeader(s.region1, peer3) + + // pre-load region cache + loc1, err := s.cache.LocateKey(s.bo, []byte("a")) + c.Assert(err, IsNil) + c.Assert(loc1.Region.id, Equals, s.region1) + lctx, err := s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0) + c.Assert(err, IsNil) + c.Assert(lctx.Peer.Id, Equals, peer3) + + // epoch-not-match on tiflash + ctxTiFlash, err := s.cache.GetTiFlashRPCContext(s.bo, loc1.Region) + c.Assert(err, IsNil) + c.Assert(ctxTiFlash.Peer.Id, Equals, s.peer1) + ctxTiFlash.Peer.IsLearner = true + r := ctxTiFlash.Meta + reqSend := NewRegionRequestSender(s.cache, nil) + regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{CurrentRegions: []*metapb.Region{r}}} + reqSend.onRegionError(s.bo, ctxTiFlash, nil, regionErr) + + // check leader read should not go to tiflash + lctx, err = s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0) + c.Assert(err, IsNil) + c.Assert(lctx.Peer.Id, Not(Equals), s.peer1) +} + const regionSplitKeyFormat = "t%08d" func createClusterWithStoresAndRegions(regionCnt, storeCount int) *mocktikv.Cluster { @@ -827,7 +870,7 @@ func (s *testRegionCacheSuite) TestScanRegions(c *C) { c.Assert(len(scannedRegions), Equals, 5) for i := 0; i < 5; i++ { r := scannedRegions[i] - _, p, _ := r.WorkStorePeer(r.getStore()) + _, p, _, _ := r.WorkStorePeer(r.getStore()) c.Assert(r.meta.Id, Equals, regions[i]) c.Assert(p.Id, Equals, peers[i][0]) @@ -838,7 +881,7 @@ func (s *testRegionCacheSuite) TestScanRegions(c *C) { c.Assert(len(scannedRegions), Equals, 3) for i := 1; i < 4; i++ { r := scannedRegions[i-1] - _, p, _ := r.WorkStorePeer(r.getStore()) + _, p, _, _ := r.WorkStorePeer(r.getStore()) c.Assert(r.meta.Id, Equals, regions[i]) c.Assert(p.Id, Equals, peers[i][0]) @@ -849,7 +892,7 @@ func (s *testRegionCacheSuite) TestScanRegions(c *C) { c.Assert(len(scannedRegions), Equals, 1) r0 := scannedRegions[0] - _, p0, _ := r0.WorkStorePeer(r0.getStore()) + _, p0, _, _ := r0.WorkStorePeer(r0.getStore()) c.Assert(r0.meta.Id, Equals, regions[1]) c.Assert(p0.Id, Equals, peers[1][0]) @@ -860,7 +903,7 @@ func (s *testRegionCacheSuite) TestScanRegions(c *C) { c.Assert(err, IsNil) for i := 0; i < 3; i++ { r := scannedRegions[i] - _, p, _ := r.WorkStorePeer(r.getStore()) + _, p, _, _ := r.WorkStorePeer(r.getStore()) c.Assert(r.meta.Id, Equals, regions[i*2]) c.Assert(p.Id, Equals, peers[i*2][0]) @@ -1128,19 +1171,20 @@ func BenchmarkOnRequestFail(b *testing.B) { region := cache.getRegionByIDFromCache(loc.Region.id) b.ResetTimer() regionStore := region.getStore() - store, peer, idx := region.WorkStorePeer(regionStore) + store, peer, accessIdx, _ := region.WorkStorePeer(regionStore) b.RunParallel(func(pb *testing.PB) { for pb.Next() { rpcCtx := &RPCContext{ - Region: loc.Region, - Meta: region.meta, - PeerIdx: idx, - Peer: peer, - Store: store, + Region: loc.Region, + Meta: region.meta, + AccessIdx: accessIdx, + Peer: peer, + Store: store, + AccessMode: TiKvOnly, } r := cache.getCachedRegionWithRLock(rpcCtx.Region) if r != nil { - r.getStore().switchNextPeer(r, rpcCtx.PeerIdx) + r.getStore().switchNextTiKVPeer(r, rpcCtx.AccessIdx) } } }) diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index 89ea74a582bd1..5fc9506421939 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -358,7 +358,7 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed } } else { // don't backoff if a new leader is returned. - s.regionCache.UpdateLeader(ctx.Region, notLeader.GetLeader().GetStoreId(), ctx.PeerIdx) + s.regionCache.UpdateLeader(ctx.Region, notLeader.GetLeader().GetStoreId(), ctx.AccessIdx) } return true, nil