Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tikv: fix switch region leader to TiFlash when TiFlash return EpochNotMatch #17931

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions store/mockstore/mocktikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,10 @@ func (c *Cluster) RemoveStore(storeID uint64) {
}

// UpdateStoreAddr updates store address for cluster.
func (c *Cluster) UpdateStoreAddr(storeID uint64, addr string) {
func (c *Cluster) UpdateStoreAddr(storeID uint64, addr string, labels ...*metapb.StoreLabel) {
c.Lock()
defer c.Unlock()
c.stores[storeID] = newStore(storeID, addr)
c.stores[storeID] = newStore(storeID, addr, labels...)
}

// GetRegion returns a Region's meta and leader ID.
Expand Down Expand Up @@ -648,11 +648,12 @@ type Store struct {
tokenCount atomic.Int64
}

func newStore(storeID uint64, addr string) *Store {
func newStore(storeID uint64, addr string, labels ...*metapb.StoreLabel) *Store {
return &Store{
meta: &metapb.Store{
Id: storeID,
Address: addr,
Labels: labels,
},
}
}
43 changes: 33 additions & 10 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,17 +290,22 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store) {

// RPCContext contains data that is needed to send RPC to a region.
type RPCContext struct {
Region RegionVerID
Meta *metapb.Region
Peer *metapb.Peer
PeerIdx int
Store *Store
Addr string
Region RegionVerID
Meta *metapb.Region
Peer *metapb.Peer
PeerIdx int
Store *Store
Addr string
ReqStoreType kv.StoreType
}

func (c *RPCContext) String() string {
return fmt.Sprintf("region ID: %d, meta: %s, peer: %s, addr: %s, idx: %d",
c.Region.GetID(), c.Meta, c.Peer, c.Addr, c.PeerIdx)
var runStoreType string
if c.Store != nil {
runStoreType = c.Store.storeType.Name()
}
return fmt.Sprintf("region ID: %d, meta: %s, peer: %s, addr: %s, idx: %d, reqStoreType: %s, runStoreType: %s",
c.Region.GetID(), c.Meta, c.Peer, c.Addr, c.PeerIdx, c.ReqStoreType.Name(), runStoreType)
}

// GetTiKVRPCContext returns RPCContext for a region. If it returns nil, the region
Expand Down Expand Up @@ -522,7 +527,7 @@ func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload
}

// try next peer to found new leader.
if ctx.Store.storeType == kv.TiKV {
if ctx.ReqStoreType == kv.TiKV {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear what's the difference between ReqStoreType and Store.storeType.

rs.switchNextPeer(r, ctx.PeerIdx)
} else {
rs.switchNextFlashPeer(r, ctx.PeerIdx)
Expand Down Expand Up @@ -1085,7 +1090,13 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, curr
}
region := &Region{meta: meta}
region.init(c)
c.switchToPeer(region, ctx.Store.storeID)
var initLeader uint64
if ctx.Store.storeType == kv.TiFlash {
initLeader = region.findElectableStoreID()
} else {
initLeader = ctx.Store.storeID
}
c.switchToPeer(region, initLeader)
c.insertRegionToCache(region)
if ctx.Region == region.VerID() {
needInvalidateOld = false
Expand Down Expand Up @@ -1248,6 +1259,18 @@ func (r *RegionStore) switchNextPeer(rr *Region, currentPeerIdx int) {
rr.compareAndSwapStore(r, newRegionStore)
}

func (r *Region) findElectableStoreID() uint64 {
if len(r.meta.Peers) == 0 {
return 0
}
for _, p := range r.meta.Peers {
if !p.IsLearner {
return p.StoreId
}
}
return 0
}

func (c *RegionCache) getPeerStoreIndex(r *Region, id uint64) (idx int, found bool) {
if len(r.meta.Peers) == 0 {
return
Expand Down
33 changes: 33 additions & 0 deletions store/tikv/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,39 @@ func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV(c *C) {
c.Assert(len(bo.errors), Equals, 2)
}

func (s *testRegionCacheSuite) TestRegionEpochOnTiFlash(c *C) {
// add store3 as tiflash
store3 := s.cluster.AllocID()
peer3 := s.cluster.AllocID()
s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store1), &metapb.StoreLabel{Key: "engine", Value: "tiflash"})
s.cluster.AddStore(store3, s.storeAddr(store3))
s.cluster.AddPeer(s.region1, store3, peer3)
s.cluster.ChangeLeader(s.region1, peer3)

// pre-load region cache
loc1, err := s.cache.LocateKey(s.bo, []byte("a"))
c.Assert(err, IsNil)
c.Assert(loc1.Region.id, Equals, s.region1)
lctx, err := s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0)
c.Assert(err, IsNil)
c.Assert(lctx.Peer.Id, Equals, peer3)

// epoch-not-match on tiflash
ctxTiFlash, err := s.cache.GetTiFlashRPCContext(s.bo, loc1.Region)
c.Assert(err, IsNil)
c.Assert(ctxTiFlash.Peer.Id, Equals, s.peer1)
ctxTiFlash.Peer.IsLearner = true
r := ctxTiFlash.Meta
reqSend := NewRegionRequestSender(s.cache, nil)
regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{CurrentRegions: []*metapb.Region{r}}}
reqSend.onRegionError(s.bo, ctxTiFlash, nil, regionErr)

// check leader read should not go to tiflash
lctx, err = s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0)
c.Assert(err, IsNil)
c.Assert(lctx.Peer.Id, Not(Equals), s.peer1)
}

const regionSplitKeyFormat = "t%08d"

func createClusterWithStoresAndRegions(regionCnt, storeCount int) *mocktikv.Cluster {
Expand Down
1 change: 1 addition & 0 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ func (s *RegionRequestSender) SendReqCtx(
}

logutil.Eventf(bo.ctx, "send %s request to region %d at %s", req.Type, regionID.id, rpcCtx.Addr)
rpcCtx.ReqStoreType = sType
s.storeAddr = rpcCtx.Addr
var retry bool
resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout)
Expand Down