From db9a8f16b2f069ae6ce18f06de31f722de765cbe Mon Sep 17 00:00:00 2001 From: lysu Date: Wed, 10 Jun 2020 21:07:20 +0800 Subject: [PATCH 1/3] tikv: fix switch region leader to tiflash when tiflash return EpochNotMatch --- store/mockstore/mocktikv/cluster.go | 3 ++- store/tikv/region_cache.go | 20 ++++++++++++++++++- store/tikv/region_cache_test.go | 31 +++++++++++++++++++++++++++++ 3 files changed, 52 insertions(+), 2 deletions(-) diff --git a/store/mockstore/mocktikv/cluster.go b/store/mockstore/mocktikv/cluster.go index d2d9a4227c574..3bb9b782afc65 100644 --- a/store/mockstore/mocktikv/cluster.go +++ b/store/mockstore/mocktikv/cluster.go @@ -213,10 +213,11 @@ func (c *Cluster) RemoveStore(storeID uint64) { } // UpdateStoreAddr updates store address for cluster. -func (c *Cluster) UpdateStoreAddr(storeID uint64, addr string) { +func (c *Cluster) UpdateStoreAddr(storeID uint64, addr string, labels ...*metapb.StoreLabel) { c.Lock() defer c.Unlock() c.stores[storeID] = newStore(storeID, addr) + c.stores[storeID].meta.Labels = labels } // GetRegion returns a Region's meta and leader ID. diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 46fd7b38c89ee..f7a700d88f51a 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -1085,7 +1085,13 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, curr } region := &Region{meta: meta} region.init(c) - c.switchToPeer(region, ctx.Store.storeID) + var initLeader uint64 + if ctx.Store.storeType == kv.TiFlash { + initLeader = region.findElectableStoreIndex() + } else { + initLeader = ctx.Store.storeID + } + c.switchToPeer(region, initLeader) c.insertRegionToCache(region) if ctx.Region == region.VerID() { needInvalidateOld = false @@ -1248,6 +1254,18 @@ func (r *RegionStore) switchNextPeer(rr *Region, currentPeerIdx int) { rr.compareAndSwapStore(r, newRegionStore) } +func (r *Region) findElectableStoreIndex() uint64 { + if len(r.meta.Peers) == 0 { + return 0 + } + for i, p := range r.meta.Peers { + if !p.IsLearner { + return uint64(i) + } + } + return 0 +} + func (c *RegionCache) getPeerStoreIndex(r *Region, id uint64) (idx int, found bool) { if len(r.meta.Peers) == 0 { return diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index 20770de4931a5..961222a8045b3 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -675,6 +675,37 @@ func (s *testRegionCacheSuite) TestRegionEpochAheadOfTiKV(c *C) { c.Assert(len(bo.errors), Equals, 2) } +func (s *testRegionCacheSuite) TestRegionEpochOnTiFlash(c *C) { + // add store3 as tiflash + store3 := s.cluster.AllocID() + peer3 := s.cluster.AllocID() + s.cluster.AddStore(store3, s.storeAddr(store3)) + s.cluster.UpdateStoreAddr(store3, s.storeAddr(store3), &metapb.StoreLabel{Key: "engine", Value: "tiflash"}) + s.cluster.AddPeer(s.region1, store3, peer3) + s.cluster.ChangeLeader(s.region1, s.peer1) + + // pre-load region cache + loc1, err := s.cache.LocateKey(s.bo, []byte("a")) + c.Assert(err, IsNil) + c.Assert(loc1.Region.id, Equals, s.region1) + lctx, err := s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0) + c.Assert(err, IsNil) + c.Assert(lctx.Peer.Id, Not(Equals), peer3) + + // epoch-not-match on tiflash + ctxTiFlash, err := s.cache.GetTiFlashRPCContext(s.bo, loc1.Region) + c.Assert(err, IsNil) + r := ctxTiFlash.Meta + reqSend := NewRegionRequestSender(s.cache, nil) + regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{CurrentRegions: []*metapb.Region{r}}} + reqSend.onRegionError(s.bo, ctxTiFlash, nil, regionErr) + + // check leader read should not go to tiflash + lctx, err = s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0) + c.Assert(err, IsNil) + c.Assert(lctx.Peer.Id, Not(Equals), peer3) +} + const regionSplitKeyFormat = "t%08d" func createClusterWithStoresAndRegions(regionCnt, storeCount int) *mocktikv.Cluster { From 9de1157e43491d50ec46fb73d29a805b5052fecd Mon Sep 17 00:00:00 2001 From: lysu Date: Thu, 11 Jun 2020 11:09:45 +0800 Subject: [PATCH 2/3] fix idx/id bug & address comments --- store/mockstore/mocktikv/cluster.go | 6 +++--- store/tikv/region_cache.go | 8 ++++---- store/tikv/region_cache_test.go | 10 ++++++---- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/store/mockstore/mocktikv/cluster.go b/store/mockstore/mocktikv/cluster.go index 3bb9b782afc65..c4803640a8db5 100644 --- a/store/mockstore/mocktikv/cluster.go +++ b/store/mockstore/mocktikv/cluster.go @@ -216,8 +216,7 @@ func (c *Cluster) RemoveStore(storeID uint64) { func (c *Cluster) UpdateStoreAddr(storeID uint64, addr string, labels ...*metapb.StoreLabel) { c.Lock() defer c.Unlock() - c.stores[storeID] = newStore(storeID, addr) - c.stores[storeID].meta.Labels = labels + c.stores[storeID] = newStore(storeID, addr, labels...) } // GetRegion returns a Region's meta and leader ID. @@ -649,11 +648,12 @@ type Store struct { tokenCount atomic.Int64 } -func newStore(storeID uint64, addr string) *Store { +func newStore(storeID uint64, addr string, labels ...*metapb.StoreLabel) *Store { return &Store{ meta: &metapb.Store{ Id: storeID, Address: addr, + Labels: labels, }, } } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index f7a700d88f51a..0398669c3bb38 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -1087,7 +1087,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, curr region.init(c) var initLeader uint64 if ctx.Store.storeType == kv.TiFlash { - initLeader = region.findElectableStoreIndex() + initLeader = region.findElectableStoreID() } else { initLeader = ctx.Store.storeID } @@ -1254,13 +1254,13 @@ func (r *RegionStore) switchNextPeer(rr *Region, currentPeerIdx int) { rr.compareAndSwapStore(r, newRegionStore) } -func (r *Region) findElectableStoreIndex() uint64 { +func (r *Region) findElectableStoreID() uint64 { if len(r.meta.Peers) == 0 { return 0 } - for i, p := range r.meta.Peers { + for _, p := range r.meta.Peers { if !p.IsLearner { - return uint64(i) + return p.StoreId } } return 0 diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index 961222a8045b3..b8db40eba6b15 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -679,10 +679,10 @@ func (s *testRegionCacheSuite) TestRegionEpochOnTiFlash(c *C) { // add store3 as tiflash store3 := s.cluster.AllocID() peer3 := s.cluster.AllocID() + s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store1), &metapb.StoreLabel{Key: "engine", Value: "tiflash"}) s.cluster.AddStore(store3, s.storeAddr(store3)) - s.cluster.UpdateStoreAddr(store3, s.storeAddr(store3), &metapb.StoreLabel{Key: "engine", Value: "tiflash"}) s.cluster.AddPeer(s.region1, store3, peer3) - s.cluster.ChangeLeader(s.region1, s.peer1) + s.cluster.ChangeLeader(s.region1, peer3) // pre-load region cache loc1, err := s.cache.LocateKey(s.bo, []byte("a")) @@ -690,11 +690,13 @@ func (s *testRegionCacheSuite) TestRegionEpochOnTiFlash(c *C) { c.Assert(loc1.Region.id, Equals, s.region1) lctx, err := s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) - c.Assert(lctx.Peer.Id, Not(Equals), peer3) + c.Assert(lctx.Peer.Id, Equals, peer3) // epoch-not-match on tiflash ctxTiFlash, err := s.cache.GetTiFlashRPCContext(s.bo, loc1.Region) c.Assert(err, IsNil) + c.Assert(ctxTiFlash.Peer.Id, Equals, s.peer1) + ctxTiFlash.Peer.IsLearner = true r := ctxTiFlash.Meta reqSend := NewRegionRequestSender(s.cache, nil) regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{CurrentRegions: []*metapb.Region{r}}} @@ -703,7 +705,7 @@ func (s *testRegionCacheSuite) TestRegionEpochOnTiFlash(c *C) { // check leader read should not go to tiflash lctx, err = s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) - c.Assert(lctx.Peer.Id, Not(Equals), peer3) + c.Assert(lctx.Peer.Id, Not(Equals), s.peer1) } const regionSplitKeyFormat = "t%08d" From 5bfecf547fbbb8ef7f3528fda7748f45721c601a Mon Sep 17 00:00:00 2001 From: lysu Date: Fri, 12 Jun 2020 20:35:32 +0800 Subject: [PATCH 3/3] give chance to retry right store when some wrong happend --- store/tikv/region_cache.go | 23 ++++++++++++++--------- store/tikv/region_request.go | 1 + 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 0398669c3bb38..d9f2f69391d98 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -290,17 +290,22 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store) { // RPCContext contains data that is needed to send RPC to a region. type RPCContext struct { - Region RegionVerID - Meta *metapb.Region - Peer *metapb.Peer - PeerIdx int - Store *Store - Addr string + Region RegionVerID + Meta *metapb.Region + Peer *metapb.Peer + PeerIdx int + Store *Store + Addr string + ReqStoreType kv.StoreType } func (c *RPCContext) String() string { - return fmt.Sprintf("region ID: %d, meta: %s, peer: %s, addr: %s, idx: %d", - c.Region.GetID(), c.Meta, c.Peer, c.Addr, c.PeerIdx) + var runStoreType string + if c.Store != nil { + runStoreType = c.Store.storeType.Name() + } + return fmt.Sprintf("region ID: %d, meta: %s, peer: %s, addr: %s, idx: %d, reqStoreType: %s, runStoreType: %s", + c.Region.GetID(), c.Meta, c.Peer, c.Addr, c.PeerIdx, c.ReqStoreType.Name(), runStoreType) } // GetTiKVRPCContext returns RPCContext for a region. If it returns nil, the region @@ -522,7 +527,7 @@ func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload } // try next peer to found new leader. - if ctx.Store.storeType == kv.TiKV { + if ctx.ReqStoreType == kv.TiKV { rs.switchNextPeer(r, ctx.PeerIdx) } else { rs.switchNextFlashPeer(r, ctx.PeerIdx) diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index d50ab28bbf2f0..0122454630033 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -211,6 +211,7 @@ func (s *RegionRequestSender) SendReqCtx( } logutil.Eventf(bo.ctx, "send %s request to region %d at %s", req.Type, regionID.id, rpcCtx.Addr) + rpcCtx.ReqStoreType = sType s.storeAddr = rpcCtx.Addr var retry bool resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout)