diff --git a/store/tikv/mockstore/mocktikv/pd.go b/store/tikv/mockstore/mocktikv/pd.go index b2db0de8fb343..6531a4ca71cc3 100644 --- a/store/tikv/mockstore/mocktikv/pd.go +++ b/store/tikv/mockstore/mocktikv/pd.go @@ -15,6 +15,7 @@ package mocktikv import ( "context" + "fmt" "math" "sync" "time" @@ -126,6 +127,13 @@ func (c *pdClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, default: } store := c.cluster.GetStore(storeID) + // It's same as PD's implementation. + if store == nil { + return nil, fmt.Errorf("invalid store ID %d, not found", storeID) + } + if store.GetState() == metapb.StoreState_Tombstone { + return nil, nil + } return store, nil } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 0d9423a9f5a7e..5dc27503465d4 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -187,7 +187,7 @@ func (r *RegionStore) filterStoreCandidate(aidx AccessIndex, op *storeSelectorOp } // init initializes region after constructed. -func (r *Region) init(c *RegionCache) error { +func (r *Region) init(bo *Backoffer, c *RegionCache) error { // region store pull used store from global store map // to avoid acquire storeMu in later access. rs := &RegionStore{ @@ -197,6 +197,7 @@ func (r *Region) init(c *RegionCache) error { stores: make([]*Store, 0, len(r.meta.Peers)), storeEpochs: make([]uint32, 0, len(r.meta.Peers)), } + availablePeers := r.meta.GetPeers()[:0] for _, p := range r.meta.Peers { c.storeMu.RLock() store, exists := c.storeMu.stores[p.StoreId] @@ -204,10 +205,15 @@ func (r *Region) init(c *RegionCache) error { if !exists { store = c.getStoreByStoreID(p.StoreId) } - _, err := store.initResolve(retry.NewNoopBackoff(context.Background()), c) + addr, err := store.initResolve(bo, c) if err != nil { return err } + // Filter the peer on a tombstone store. + if addr == "" { + continue + } + availablePeers = append(availablePeers, p) switch store.storeType { case tikvrpc.TiKV: rs.accessIndex[TiKVOnly] = append(rs.accessIndex[TiKVOnly], len(rs.stores)) @@ -217,6 +223,13 @@ func (r *Region) init(c *RegionCache) error { rs.stores = append(rs.stores, store) rs.storeEpochs = append(rs.storeEpochs, atomic.LoadUint32(&store.epoch)) } + // TODO(youjiali1995): It's possible the region info in PD is stale for now but it can recover. + // Maybe we need backoff here. + if len(availablePeers) == 0 { + return errors.Errorf("no available peers, region: {%v}", r.meta) + } + r.meta.Peers = availablePeers + atomic.StorePointer(&r.store, unsafe.Pointer(rs)) // mark region has been init accessed. @@ -321,6 +334,18 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { return c } +// clear clears all cached data in the RegionCache. It's only used in tests. +func (c *RegionCache) clear() { + c.mu.Lock() + c.mu.regions = make(map[RegionVerID]*Region) + c.mu.latestVersions = make(map[uint64]RegionVerID) + c.mu.sorted = btree.New(btreeDegree) + c.mu.Unlock() + c.storeMu.Lock() + c.storeMu.stores = make(map[uint64]*Store) + c.storeMu.Unlock() +} + // Close releases region cache's resource. func (c *RegionCache) Close() { close(c.closeCh) @@ -332,32 +357,29 @@ func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) { defer ticker.Stop() var needCheckStores []*Store for { + needCheckStores = needCheckStores[:0] select { case <-c.closeCh: return case <-c.notifyCheckCh: - needCheckStores = needCheckStores[:0] - c.checkAndResolve(needCheckStores) + c.checkAndResolve(needCheckStores, func(s *Store) bool { + return s.getResolveState() == needCheck + }) case <-ticker.C: - // refresh store once a minute to update labels - var stores []*Store - c.storeMu.RLock() - stores = make([]*Store, 0, len(c.storeMu.stores)) - for _, s := range c.storeMu.stores { - stores = append(stores, s) - } - c.storeMu.RUnlock() - for _, store := range stores { - _, err := store.reResolve(c) - terror.Log(err) - } + // refresh store to update labels. + c.checkAndResolve(needCheckStores, func(s *Store) bool { + state := s.getResolveState() + // Only valid stores should be reResolved. In fact, it's impossible + // there's a deleted store in the stores map which guaranteed by reReslve(). + return state != unresolved && state != tombstone && state != deleted + }) } } } // checkAndResolve checks and resolve addr of failed stores. // this method isn't thread-safe and only be used by one goroutine. -func (c *RegionCache) checkAndResolve(needCheckStores []*Store) { +func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*Store) bool) { defer func() { r := recover() if r != nil { @@ -369,8 +391,7 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store) { c.storeMu.RLock() for _, store := range c.storeMu.stores { - state := store.getResolveState() - if state == needCheck { + if needCheck(store) { needCheckStores = append(needCheckStores, store) } } @@ -1217,9 +1238,6 @@ func filterUnavailablePeers(region *pd.Region) { new = append(new, p) } } - for i := len(new); i < len(region.Meta.Peers); i++ { - region.Meta.Peers[i] = nil - } region.Meta.Peers = new } @@ -1272,7 +1290,7 @@ func (c *RegionCache) loadRegion(bo *Backoffer, key []byte, isEndKey bool) (*Reg continue } region := &Region{meta: reg.Meta} - err = region.init(c) + err = region.init(bo, c) if err != nil { return nil, err } @@ -1317,7 +1335,7 @@ func (c *RegionCache) loadRegionByID(bo *Backoffer, regionID uint64) (*Region, e return nil, errors.New("receive Region with no available peer") } region := &Region{meta: reg.Meta} - err = region.init(c) + err = region.init(bo, c) if err != nil { return nil, err } @@ -1368,7 +1386,7 @@ func (c *RegionCache) scanRegions(bo *Backoffer, startKey, endKey []byte, limit regions := make([]*Region, 0, len(regionsInfo)) for _, r := range regionsInfo { region := &Region{meta: r.Meta} - err := region.init(c) + err := region.init(bo, c) if err != nil { return nil, err } @@ -1409,6 +1427,8 @@ func (c *RegionCache) getStoreAddr(bo *Backoffer, region *Region, store *Store, case deleted: addr = c.changeToActiveStore(region, store, storeIdx) return + case tombstone: + return "", nil default: panic("unsupported resolve state") } @@ -1456,6 +1476,8 @@ func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *RegionStor return nil, 0, 0 } +// changeToActiveStore replace the deleted store in the region by an up-to-date store in the stores map. +// The order is guaranteed by reResolve() which adds the new store before marking old store deleted. func (c *RegionCache) changeToActiveStore(region *Region, store *Store, storeIdx int) (addr string) { c.storeMu.RLock() store = c.storeMu.stores[store.storeID] @@ -1530,7 +1552,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, curr } } region := &Region{meta: meta} - err := region.init(c) + err := region.init(bo, c) if err != nil { return err } @@ -1860,19 +1882,31 @@ type Store struct { type resolveState uint64 const ( + // The store is just created and normally is being resolved. + // Store in this state will only be resolved by initResolve(). unresolved resolveState = iota + // The store is resolved and its address is valid. resolved + // Request failed on this store and it will be re-resolved by asyncCheckAndResolveLoop(). needCheck + // The store's address or label is changed and marked deleted. + // There is a new store struct replaced it in the RegionCache and should + // call changeToActiveStore() to get the new struct. deleted + // The store is a tombstone. Should invalidate the region if tries to access it. + tombstone ) -// initResolve resolves addr for store that never resolved. +// initResolve resolves the address of the store that never resolved and returns an +// empty string if it's a tombstone. func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err error) { s.resolveMutex.Lock() state := s.getResolveState() defer s.resolveMutex.Unlock() if state != unresolved { - addr = s.addr + if state != tombstone { + addr = s.addr + } return } var store *metapb.Store @@ -1883,35 +1917,33 @@ func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err err } else { metrics.RegionCacheCounterWithGetStoreOK.Inc() } - if err != nil { + if bo.GetCtx().Err() != nil && errors.Cause(bo.GetCtx().Err()) == context.Canceled { + return + } + if err != nil && !isStoreNotFoundError(err) { // TODO: more refine PD error status handle. - if errors.Cause(err) == context.Canceled { - return - } err = errors.Errorf("loadStore from PD failed, id: %d, err: %v", s.storeID, err) if err = bo.Backoff(retry.BoPDRPC, err); err != nil { return } continue } + // The store is a tombstone. if store == nil { - return + s.setResolveState(tombstone) + return "", nil } addr = store.GetAddress() + if addr == "" { + return "", errors.Errorf("empty store(%d) address", s.storeID) + } s.addr = addr s.saddr = store.GetStatusAddress() s.storeType = GetStoreTypeByMeta(store) s.labels = store.GetLabels() - retry: - state = s.getResolveState() - if state != unresolved { - addr = s.addr - return - } - if !s.compareAndSwapState(state, resolved) { - goto retry - } - return + // Shouldn't have other one changing its state concurrently, but we still use changeResolveStateTo for safety. + s.changeResolveStateTo(unresolved, resolved) + return s.addr, nil } } @@ -1944,7 +1976,7 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) { logutil.BgLogger().Info("invalidate regions in removed store", zap.Uint64("store", s.storeID), zap.String("add", s.addr)) atomic.AddUint32(&s.epoch, 1) - atomic.StoreUint64(&s.state, uint64(deleted)) + s.setResolveState(tombstone) metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc() return false, nil } @@ -1952,33 +1984,14 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) { storeType := GetStoreTypeByMeta(store) addr = store.GetAddress() if s.addr != addr || !s.IsSameLabels(store.GetLabels()) { - state := resolved - newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels()} - newStore.state = *(*uint64)(&state) + newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)} c.storeMu.Lock() c.storeMu.stores[newStore.storeID] = newStore c.storeMu.Unlock() - retryMarkDel: - // all region used those - oldState := s.getResolveState() - if oldState == deleted { - return false, nil - } - newState := deleted - if !s.compareAndSwapState(oldState, newState) { - goto retryMarkDel - } + s.setResolveState(deleted) return false, nil } -retryMarkResolved: - oldState := s.getResolveState() - if oldState != needCheck { - return true, nil - } - newState := resolved - if !s.compareAndSwapState(oldState, newState) { - goto retryMarkResolved - } + s.changeResolveStateTo(needCheck, resolved) return true, nil } @@ -1990,23 +2003,35 @@ func (s *Store) getResolveState() resolveState { return resolveState(atomic.LoadUint64(&s.state)) } -func (s *Store) compareAndSwapState(oldState, newState resolveState) bool { - return atomic.CompareAndSwapUint64(&s.state, uint64(oldState), uint64(newState)) +func (s *Store) setResolveState(state resolveState) { + atomic.StoreUint64(&s.state, uint64(state)) +} + +// changeResolveStateTo changes the store resolveState from the old state to the new state. +// Returns true if it changes the state successfully, and false if the store's state +// is changed by another one. +func (s *Store) changeResolveStateTo(from, to resolveState) bool { + for { + state := s.getResolveState() + if state == to { + return true + } + if state != from { + return false + } + if atomic.CompareAndSwapUint64(&s.state, uint64(from), uint64(to)) { + return true + } + } } // markNeedCheck marks resolved store to be async resolve to check store addr change. func (s *Store) markNeedCheck(notifyCheckCh chan struct{}) { -retry: - oldState := s.getResolveState() - if oldState != resolved { - return - } - if !s.compareAndSwapState(oldState, needCheck) { - goto retry - } - select { - case notifyCheckCh <- struct{}{}: - default: + if s.changeResolveStateTo(resolved, needCheck) { + select { + case notifyCheckCh <- struct{}{}: + default: + } } } diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index 7b0e5884abb2a..acd267ceed760 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -181,32 +181,176 @@ func (s *testRegionCacheSuite) TestSimple(c *C) { c.Assert(r, IsNil) } -func (s *testRegionCacheSuite) TestDropStore(c *C) { - bo := NewBackofferWithVars(context.Background(), 100, nil) +// TestResolveStateTransition verifies store's resolve state transition. For example, +// a newly added store is in unresolved state and will be resolved soon if it's an up store, +// or in tombstone state if it's a tombstone. +func (s *testRegionCacheSuite) TestResolveStateTransition(c *C) { + cache := s.cache + bo := retry.NewNoopBackoff(context.Background()) + + // Check resolving normal stores. The resolve state should be resolved. + for _, storeMeta := range s.cluster.GetAllStores() { + store := cache.getStoreByStoreID(storeMeta.GetId()) + c.Assert(store.getResolveState(), Equals, unresolved) + addr, err := store.initResolve(bo, cache) + c.Assert(err, IsNil) + c.Assert(addr, Equals, storeMeta.GetAddress()) + c.Assert(store.getResolveState(), Equals, resolved) + } + + waitResolve := func(s *Store) { + for i := 0; i < 10; i++ { + if s.getResolveState() != needCheck { + break + } + time.Sleep(50 * time.Millisecond) + } + } + + // Mark the store needCheck. The resolve state should be resolved soon. + store := cache.getStoreByStoreID(s.store1) + store.markNeedCheck(cache.notifyCheckCh) + waitResolve(store) + c.Assert(store.getResolveState(), Equals, resolved) + + // Mark the store needCheck and it becomes a tombstone. The resolve state should be tombstone. + s.cluster.MarkTombstone(s.store1) + store.markNeedCheck(cache.notifyCheckCh) + waitResolve(store) + c.Assert(store.getResolveState(), Equals, tombstone) + s.cluster.StartStore(s.store1) + + // Mark the store needCheck and it's deleted from PD. The resolve state should be tombstone. + cache.clear() + store = cache.getStoreByStoreID(s.store1) + store.initResolve(bo, cache) + c.Assert(store.getResolveState(), Equals, resolved) + storeMeta := s.cluster.GetStore(s.store1) s.cluster.RemoveStore(s.store1) - loc, err := s.cache.LocateKey(bo, []byte("a")) - c.Assert(err, IsNil) - ctx, err := s.cache.GetTiKVRPCContext(bo, loc.Region, kv.ReplicaReadLeader, 0) + store.markNeedCheck(cache.notifyCheckCh) + waitResolve(store) + c.Assert(store.getResolveState(), Equals, tombstone) + s.cluster.AddStore(storeMeta.GetId(), storeMeta.GetAddress(), storeMeta.GetLabels()...) + + // Mark the store needCheck and its address and labels are changed. + // The resolve state should be deleted and a new store is added to the cache. + cache.clear() + store = cache.getStoreByStoreID(s.store1) + store.initResolve(bo, cache) + c.Assert(store.getResolveState(), Equals, resolved) + s.cluster.UpdateStoreAddr(s.store1, store.addr+"0", &metapb.StoreLabel{Key: "k", Value: "v"}) + store.markNeedCheck(cache.notifyCheckCh) + waitResolve(store) + c.Assert(store.getResolveState(), Equals, deleted) + newStore := cache.getStoreByStoreID(s.store1) + c.Assert(newStore.getResolveState(), Equals, resolved) + c.Assert(newStore.addr, Equals, store.addr+"0") + c.Assert(newStore.labels, DeepEquals, []*metapb.StoreLabel{{Key: "k", Value: "v"}}) + + // Check initResolve()ing a tombstone store. The resolve state should be tombstone. + cache.clear() + s.cluster.MarkTombstone(s.store1) + store = cache.getStoreByStoreID(s.store1) + for i := 0; i < 2; i++ { + addr, err := store.initResolve(bo, cache) + c.Assert(err, IsNil) + c.Assert(addr, Equals, "") + c.Assert(store.getResolveState(), Equals, tombstone) + } + s.cluster.StartStore(s.store1) + cache.clear() + + // Check initResolve()ing a dropped store. The resolve state should be tombstone. + cache.clear() + storeMeta = s.cluster.GetStore(s.store1) + s.cluster.RemoveStore(s.store1) + store = cache.getStoreByStoreID(s.store1) + for i := 0; i < 2; i++ { + addr, err := store.initResolve(bo, cache) + c.Assert(err, IsNil) + c.Assert(addr, Equals, "") + c.Assert(store.getResolveState(), Equals, tombstone) + } + s.cluster.AddStore(storeMeta.GetId(), storeMeta.GetAddress(), storeMeta.GetLabels()...) +} + +// TestFilterDownPeersOrPeersOnTombstoneOrDroppedStore verifies the RegionCache filter +// region's down peers and peers on tombstone or dropped stores. RegionCache shouldn't +// report errors in such cases if there are available peers. +func (s *testRegionCacheSuite) TestFilterDownPeersOrPeersOnTombstoneOrDroppedStores(c *C) { + key := []byte("a") + bo := NewBackofferWithVars(context.Background(), 100, nil) + + verifyGetRPCCtx := func(meta *metapb.Region) { + loc, err := s.cache.LocateKey(bo, key) + c.Assert(loc, NotNil) + c.Assert(err, IsNil) + ctx, err := s.cache.GetTiKVRPCContext(bo, loc.Region, kv.ReplicaReadLeader, 0) + c.Assert(err, IsNil) + c.Assert(ctx, NotNil) + c.Assert(ctx.Meta, DeepEquals, meta) + ctx, err = s.cache.GetTiKVRPCContext(bo, loc.Region, kv.ReplicaReadFollower, rand.Uint32()) + c.Assert(err, IsNil) + c.Assert(ctx, NotNil) + c.Assert(ctx.Meta, DeepEquals, meta) + } + + // When all peers are normal, the cached region should contain all peers. + reg, err := s.cache.findRegionByKey(bo, key, false) + c.Assert(reg, NotNil) c.Assert(err, IsNil) - c.Assert(ctx, IsNil) - ctx, err = s.cache.GetTiKVRPCContext(bo, loc.Region, kv.ReplicaReadFollower, rand.Uint32()) + regInPD, _ := s.cluster.GetRegion(reg.GetID()) + c.Assert(reg.meta, DeepEquals, regInPD) + c.Assert(len(reg.meta.GetPeers()), Equals, len(reg.getStore().stores)) + verifyGetRPCCtx(reg.meta) + s.checkCache(c, 1) + s.cache.clear() + + // Shouldn't contain the peer on the tombstone store. + s.cluster.MarkTombstone(s.store1) + reg, err = s.cache.findRegionByKey(bo, key, false) + c.Assert(reg, NotNil) c.Assert(err, IsNil) - c.Assert(ctx, IsNil) - s.checkCache(c, 0) -} + c.Assert(len(reg.meta.GetPeers()), Equals, len(regInPD.GetPeers())-1) + c.Assert(len(reg.meta.GetPeers()), Equals, len(reg.getStore().stores)) + for _, peer := range reg.meta.GetPeers() { + c.Assert(peer.GetStoreId(), Not(Equals), s.store1) + } + for _, store := range reg.getStore().stores { + c.Assert(store.storeID, Not(Equals), s.store1) + } + verifyGetRPCCtx(reg.meta) + s.checkCache(c, 1) + s.cache.clear() + s.cluster.StartStore(s.store1) -func (s *testRegionCacheSuite) TestDropStoreRetry(c *C) { + // Shouldn't contain the peer on the dropped store. + store := s.cluster.GetStore(s.store1) s.cluster.RemoveStore(s.store1) - done := make(chan struct{}) - go func() { - time.Sleep(time.Millisecond * 10) - s.cluster.AddStore(s.store1, s.storeAddr(s.store1)) - close(done) - }() - loc, err := s.cache.LocateKey(s.bo, []byte("a")) + reg, err = s.cache.findRegionByKey(bo, key, false) + c.Assert(reg, NotNil) c.Assert(err, IsNil) - c.Assert(loc.Region.id, Equals, s.region1) - <-done + c.Assert(len(reg.meta.GetPeers()), Equals, len(regInPD.GetPeers())-1) + c.Assert(len(reg.meta.GetPeers()), Equals, len(reg.getStore().stores)) + for _, peer := range reg.meta.GetPeers() { + c.Assert(peer.GetStoreId(), Not(Equals), s.store1) + } + for _, store := range reg.getStore().stores { + c.Assert(store.storeID, Not(Equals), s.store1) + } + verifyGetRPCCtx(reg.meta) + s.checkCache(c, 1) + s.cache.clear() + s.cluster.AddStore(store.GetId(), store.GetAddress(), store.GetLabels()...) + + // Report an error when there's no available peers. + s.cluster.MarkTombstone(s.store1) + s.cluster.MarkTombstone(s.store2) + _, err = s.cache.findRegionByKey(bo, key, false) + c.Assert(err, NotNil) + c.Assert(err.Error(), Matches, ".*no available peers.*") + s.cluster.StartStore(s.store1) + s.cluster.StartStore(s.store2) } func (s *testRegionCacheSuite) TestUpdateLeader(c *C) { @@ -312,7 +456,7 @@ func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) { c.Assert(err, IsNil) c.Assert(ctx.Addr, Equals, "store2") s.cache.OnSendFail(retry.NewNoopBackoff(context.Background()), ctx, false, errors.New("send fail")) - s.cache.checkAndResolve(nil) + s.cache.checkAndResolve(nil, func(*Store) bool { return true }) s.cache.UpdateLeader(loc.Region, s.store2, 0) addr := s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0) c.Assert(addr, Equals, "") @@ -1325,7 +1469,7 @@ func (s *testRegionCacheSuite) TestPeersLenChange(c *C) { } filterUnavailablePeers(cpRegion) region := &Region{meta: cpRegion.Meta} - err = region.init(s.cache) + err = region.init(s.bo, s.cache) c.Assert(err, IsNil) s.cache.insertRegionToCache(region)