Skip to content

Commit

Permalink
tikv: drop store's regions when resolve store with tombstone status (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored Mar 3, 2021
1 parent 39fc7db commit abbf3fe
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 12 deletions.
18 changes: 14 additions & 4 deletions store/mockstore/mocktikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,19 +180,20 @@ func (c *Cluster) GetStoreByAddr(addr string) *metapb.Store {
}

// GetAndCheckStoreByAddr checks and returns a Store's meta by an addr
func (c *Cluster) GetAndCheckStoreByAddr(addr string) (*metapb.Store, error) {
func (c *Cluster) GetAndCheckStoreByAddr(addr string) (ss []*metapb.Store, err error) {
c.RLock()
defer c.RUnlock()

for _, s := range c.stores {
if s.cancel {
return nil, context.Canceled
err = context.Canceled
return
}
if s.meta.GetAddress() == addr {
return proto.Clone(s.meta).(*metapb.Store), nil
ss = append(ss, proto.Clone(s.meta).(*metapb.Store))
}
}
return nil, nil
return
}

// AddStore add a new Store to the cluster.
Expand All @@ -211,6 +212,15 @@ func (c *Cluster) RemoveStore(storeID uint64) {
delete(c.stores, storeID)
}

// MarkTombstone marks store as tombstone.
func (c *Cluster) MarkTombstone(storeID uint64) {
c.Lock()
defer c.Unlock()
nm := *c.stores[storeID].meta
nm.State = metapb.StoreState_Tombstone
c.stores[storeID].meta = &nm
}

// UpdateStoreAddr updates store address for cluster.
func (c *Cluster) UpdateStoreAddr(storeID uint64, addr string, labels ...*metapb.StoreLabel) {
c.Lock()
Expand Down
14 changes: 8 additions & 6 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,18 +750,20 @@ func NewRPCClient(cluster *Cluster, mvccStore MVCCStore) *RPCClient {
}

func (c *RPCClient) getAndCheckStoreByAddr(addr string) (*metapb.Store, error) {
store, err := c.Cluster.GetAndCheckStoreByAddr(addr)
stores, err := c.Cluster.GetAndCheckStoreByAddr(addr)
if err != nil {
return nil, err
}
if store == nil {
if len(stores) == 0 {
return nil, errors.New("connect fail")
}
if store.GetState() == metapb.StoreState_Offline ||
store.GetState() == metapb.StoreState_Tombstone {
return nil, errors.New("connection refused")
for _, store := range stores {
if store.GetState() != metapb.StoreState_Offline &&
store.GetState() != metapb.StoreState_Tombstone {
return store, nil
}
}
return store, nil
return nil, errors.New("connection refused")
}

func (c *RPCClient) checkArgs(ctx context.Context, addr string) (*rpcHandler, error) {
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1516,7 +1516,7 @@ func (s *Store) reResolve(c *RegionCache) {
// we cannot do backoff in reResolve loop but try check other store and wait tick.
return
}
if store == nil {
if store == nil || store.State == metapb.StoreState_Tombstone {
// store has be removed in PD, we should invalidate all regions using those store.
logutil.BgLogger().Info("invalidate regions in removed store",
zap.Uint64("store", s.storeID), zap.String("add", s.addr))
Expand Down
27 changes: 27 additions & 0 deletions store/tikv/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,33 @@ func (s *testRegionCacheSuite) TestReplaceNewAddrAndOldOfflineImmediately(c *C)
c.Assert(getVal, BytesEquals, testValue)
}

func (s *testRegionCacheSuite) TestReplaceStore(c *C) {
mvccStore := mocktikv.MustNewMVCCStore()
defer mvccStore.Close()

client := &RawKVClient{
clusterID: 0,
regionCache: NewRegionCache(mocktikv.NewPDClient(s.cluster)),
rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore),
}
defer client.Close()
testKey := []byte("test_key")
testValue := []byte("test_value")
err := client.Put(testKey, testValue)
c.Assert(err, IsNil)

s.cluster.MarkTombstone(s.store1)
store3 := s.cluster.AllocID()
peer3 := s.cluster.AllocID()
s.cluster.AddStore(store3, s.storeAddr(s.store1))
s.cluster.AddPeer(s.region1, store3, peer3)
s.cluster.RemovePeer(s.region1, s.peer1)
s.cluster.ChangeLeader(s.region1, peer3)

err = client.Put(testKey, testValue)
c.Assert(err, IsNil)
}

func (s *testRegionCacheSuite) TestListRegionIDsInCache(c *C) {
// ['' - 'm' - 'z']
region2 := s.cluster.AllocID()
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed

if storeNotMatch := regionErr.GetStoreNotMatch(); storeNotMatch != nil {
// store not match
logutil.BgLogger().Warn("tikv reports `StoreNotMatch` retry later",
logutil.BgLogger().Debug("tikv reports `StoreNotMatch` retry later",
zap.Stringer("storeNotMatch", storeNotMatch),
zap.Stringer("ctx", ctx))
ctx.Store.markNeedCheck(s.regionCache.notifyCheckCh)
Expand Down

0 comments on commit abbf3fe

Please sign in to comment.