Skip to content

Commit

Permalink
region_cache: filter peers on tombstone or dropped stores (pingcap#24726
Browse files Browse the repository at this point in the history
)

Signed-off-by: youjiali1995 <zlwgx1023@gmail.com>
  • Loading branch information
youjiali1995 committed Jun 30, 2021
1 parent e3e6989 commit 68254b0
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 89 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0
github.com/grpc-ecosystem/grpc-gateway v1.14.3 // indirect
github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334
github.com/kisielk/errcheck v1.2.0 // indirect
github.com/klauspost/cpuid v1.2.1
github.com/kr/pretty v0.2.0 // indirect
github.com/mattn/go-colorable v0.1.7 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ github.com/juju/ratelimit v1.0.1 h1:+7AIFJVQ0EQgq/K9+0Krm7m530Du7tIz0METWzN0RgY=
github.com/juju/ratelimit v1.0.1/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.2.0 h1:reN85Pxc5larApoH1keMBiu2GWtPqXQ1nc9gx+jOU+E=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w=
Expand Down
1 change: 1 addition & 0 deletions store/mockstore/mocktikv/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (c *pdClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store,
default:
}
store := c.cluster.GetStore(storeID)
// It's same as PD's implementation.
if store == nil {
return nil, fmt.Errorf("invalid store ID %d, not found", storeID)
}
Expand Down
166 changes: 99 additions & 67 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -180,7 +181,7 @@ func (r *RegionStore) kvPeer(seed uint32) AccessIndex {
}

// init initializes region after constructed.
func (r *Region) init(c *RegionCache) error {
func (r *Region) init(bo *Backoffer, c *RegionCache) error {
// region store pull used store from global store map
// to avoid acquire storeMu in later access.
rs := &RegionStore{
Expand All @@ -189,17 +190,23 @@ func (r *Region) init(c *RegionCache) error {
stores: make([]*Store, 0, len(r.meta.Peers)),
storeEpochs: make([]uint32, 0, len(r.meta.Peers)),
}
availablePeers := r.meta.GetPeers()[:0]
for _, p := range r.meta.Peers {
c.storeMu.RLock()
store, exists := c.storeMu.stores[p.StoreId]
c.storeMu.RUnlock()
if !exists {
store = c.getStoreByStoreID(p.StoreId)
}
_, err := store.initResolve(NewNoopBackoff(context.Background()), c)
addr, err := store.initResolve(bo, c)
if err != nil {
return err
}
// Filter the peer on a tombstone store.
if addr == "" {
continue
}
availablePeers = append(availablePeers, p)
switch store.storeType {
case kv.TiKV:
rs.accessIndex[TiKvOnly] = append(rs.accessIndex[TiKvOnly], len(rs.stores))
Expand All @@ -209,6 +216,13 @@ func (r *Region) init(c *RegionCache) error {
rs.stores = append(rs.stores, store)
rs.storeEpochs = append(rs.storeEpochs, atomic.LoadUint32(&store.epoch))
}
// TODO(youjiali1995): It's possible the region info in PD is stale for now but it can recover.
// Maybe we need backoff here.
if len(availablePeers) == 0 {
return errors.Errorf("no available peers, region: {%v}", r.meta)
}
r.meta.Peers = availablePeers

atomic.StorePointer(&r.store, unsafe.Pointer(rs))

// mark region has been init accessed.
Expand Down Expand Up @@ -294,6 +308,17 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
return c
}

// clear clears all cached data in the RegionCache. It's only used in tests.
func (c *RegionCache) clear() {
c.mu.Lock()
c.mu.regions = make(map[RegionVerID]*Region)
c.mu.sorted = btree.New(btreeDegree)
c.mu.Unlock()
c.storeMu.Lock()
c.storeMu.stores = make(map[uint64]*Store)
c.storeMu.Unlock()
}

// Close releases region cache's resource.
func (c *RegionCache) Close() {
close(c.closeCh)
Expand All @@ -303,19 +328,21 @@ func (c *RegionCache) Close() {
func (c *RegionCache) asyncCheckAndResolveLoop() {
var needCheckStores []*Store
for {
needCheckStores = needCheckStores[:0]
select {
case <-c.closeCh:
return
case <-c.notifyCheckCh:
needCheckStores = needCheckStores[:0]
c.checkAndResolve(needCheckStores)
c.checkAndResolve(needCheckStores, func(s *Store) bool {
return s.getResolveState() == needCheck
})
}
}
}

// checkAndResolve checks and resolve addr of failed stores.
// this method isn't thread-safe and only be used by one goroutine.
func (c *RegionCache) checkAndResolve(needCheckStores []*Store) {
func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*Store) bool) {
defer func() {
r := recover()
if r != nil {
Expand All @@ -327,15 +354,15 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store) {

c.storeMu.RLock()
for _, store := range c.storeMu.stores {
state := store.getResolveState()
if state == needCheck {
if needCheck(store) {
needCheckStores = append(needCheckStores, store)
}
}
c.storeMu.RUnlock()

for _, store := range needCheckStores {
store.reResolve(c)
_, err := store.reResolve(c)
terror.Log(err)
}
}

Expand Down Expand Up @@ -461,7 +488,8 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *Backoffer, id RegionVerID) (*RPCC
return nil, nil
}
if store.getResolveState() == needCheck {
store.reResolve(c)
_, err := store.reResolve(c)
terror.Log(err)
}
atomic.StoreInt32(&regionStore.workTiFlashIdx, int32(accessIdx))
peer := cachedRegion.meta.Peers[storeIdx]
Expand Down Expand Up @@ -945,9 +973,6 @@ func filterUnavailablePeers(region *pd.Region) {
new = append(new, p)
}
}
for i := len(new); i < len(region.Meta.Peers); i++ {
region.Meta.Peers[i] = nil
}
region.Meta.Peers = new
}

Expand Down Expand Up @@ -1000,7 +1025,7 @@ func (c *RegionCache) loadRegion(bo *Backoffer, key []byte, isEndKey bool) (*Reg
continue
}
region := &Region{meta: reg.Meta}
err = region.init(c)
err = region.init(bo, c)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1045,7 +1070,7 @@ func (c *RegionCache) loadRegionByID(bo *Backoffer, regionID uint64) (*Region, e
return nil, errors.New("receive Region with no available peer")
}
region := &Region{meta: reg.Meta}
err = region.init(c)
err = region.init(bo, c)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1099,7 +1124,7 @@ func (c *RegionCache) scanRegions(bo *Backoffer, startKey, endKey []byte, limit
regions := make([]*Region, 0, len(metas))
for i, meta := range metas {
region := &Region{meta: meta}
err := region.init(c)
err := region.init(bo, c)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1140,11 +1165,15 @@ func (c *RegionCache) getStoreAddr(bo *Backoffer, region *Region, store *Store,
case deleted:
addr = c.changeToActiveStore(region, store, storeIdx)
return
case tombstone:
return "", nil
default:
panic("unsupported resolve state")
}
}

// changeToActiveStore replace the deleted store in the region by an up-to-date store in the stores map.
// The order is guaranteed by reResolve() which adds the new store before marking old store deleted.
func (c *RegionCache) changeToActiveStore(region *Region, store *Store, storeIdx int) (addr string) {
c.storeMu.RLock()
store = c.storeMu.stores[store.storeID]
Expand Down Expand Up @@ -1207,7 +1236,7 @@ func (c *RegionCache) OnRegionEpochNotMatch(bo *Backoffer, ctx *RPCContext, curr
}
}
region := &Region{meta: meta}
err := region.init(c)
err := region.init(bo, c)
if err != nil {
return err
}
Expand Down Expand Up @@ -1460,19 +1489,31 @@ type Store struct {
type resolveState uint64

const (
// The store is just created and normally is being resolved.
// Store in this state will only be resolved by initResolve().
unresolved resolveState = iota
// The store is resolved and its address is valid.
resolved
// Request failed on this store and it will be re-resolved by asyncCheckAndResolveLoop().
needCheck
// The store's address or label is changed and marked deleted.
// There is a new store struct replaced it in the RegionCache and should
// call changeToActiveStore() to get the new struct.
deleted
// The store is a tombstone. Should invalidate the region if tries to access it.
tombstone
)

// initResolve resolves addr for store that never resolved.
// initResolve resolves the address of the store that never resolved and returns an
// empty string if it's a tombstone.
func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err error) {
s.resolveMutex.Lock()
state := s.getResolveState()
defer s.resolveMutex.Unlock()
if state != unresolved {
addr = s.addr
if state != tombstone {
addr = s.addr
}
return
}
var store *metapb.Store
Expand All @@ -1483,34 +1524,32 @@ func (s *Store) initResolve(bo *Backoffer, c *RegionCache) (addr string, err err
} else {
tikvRegionCacheCounterWithGetStoreOK.Inc()
}
if bo.ctx.Err() != nil && errors.Cause(bo.ctx.Err()) == context.Canceled {
return
}
if err != nil && !isStoreNotFoundError(err) {
// TODO: more refine PD error status handle.
if errors.Cause(err) == context.Canceled {
return
}
err = errors.Errorf("loadStore from PD failed, id: %d, err: %v", s.storeID, err)
if err = bo.Backoff(BoPDRPC, err); err != nil {
return
}
continue
}
// The store is a tombstone.
if store == nil {
s.setResolveState(tombstone)
return "", nil
}
addr = store.GetAddress()
if addr == "" {
return "", errors.Errorf("empty store(%d) address", s.storeID)
}
s.addr = addr
s.saddr = store.GetStatusAddress()
s.storeType = GetStoreTypeByMeta(store)
retry:
state = s.getResolveState()
if state != unresolved {
addr = s.addr
return
}
if !s.compareAndSwapState(state, resolved) {
goto retry
}
return
// Shouldn't have other one changing its state concurrently, but we still use changeResolveStateTo for safety.
s.changeResolveStateTo(unresolved, resolved)
return s.addr, nil
}
}

Expand Down Expand Up @@ -1557,41 +1596,22 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
logutil.BgLogger().Info("invalidate regions in removed store",
zap.Uint64("store", s.storeID), zap.String("add", s.addr))
atomic.AddUint32(&s.epoch, 1)
atomic.StoreUint64(&s.state, uint64(deleted))
s.setResolveState(tombstone)
tikvRegionCacheCounterWithInvalidateStoreRegionsOK.Inc()
return false, nil
}

storeType := GetStoreTypeByMeta(store)
addr = store.GetAddress()
if s.addr != addr {
state := resolved
newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType}
newStore.state = *(*uint64)(&state)
newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType, state: uint64(resolved)}
c.storeMu.Lock()
c.storeMu.stores[newStore.storeID] = newStore
c.storeMu.Unlock()
retryMarkDel:
// all region used those
oldState := s.getResolveState()
if oldState == deleted {
return false, nil
}
newState := deleted
if !s.compareAndSwapState(oldState, newState) {
goto retryMarkDel
}
s.setResolveState(deleted)
return false, nil
}
retryMarkResolved:
oldState := s.getResolveState()
if oldState != needCheck {
return true, nil
}
newState := resolved
if !s.compareAndSwapState(oldState, newState) {
goto retryMarkResolved
}
s.changeResolveStateTo(needCheck, resolved)
return true, nil
}

Expand All @@ -1603,23 +1623,35 @@ func (s *Store) getResolveState() resolveState {
return resolveState(atomic.LoadUint64(&s.state))
}

func (s *Store) compareAndSwapState(oldState, newState resolveState) bool {
return atomic.CompareAndSwapUint64(&s.state, uint64(oldState), uint64(newState))
func (s *Store) setResolveState(state resolveState) {
atomic.StoreUint64(&s.state, uint64(state))
}

// changeResolveStateTo changes the store resolveState from the old state to the new state.
// Returns true if it changes the state successfully, and false if the store's state
// is changed by another one.
func (s *Store) changeResolveStateTo(from, to resolveState) bool {
for {
state := s.getResolveState()
if state == to {
return true
}
if state != from {
return false
}
if atomic.CompareAndSwapUint64(&s.state, uint64(from), uint64(to)) {
return true
}
}
}

// markNeedCheck marks resolved store to be async resolve to check store addr change.
func (s *Store) markNeedCheck(notifyCheckCh chan struct{}) {
retry:
oldState := s.getResolveState()
if oldState != resolved {
return
}
if !s.compareAndSwapState(oldState, needCheck) {
goto retry
}
select {
case notifyCheckCh <- struct{}{}:
default:
if s.changeResolveStateTo(resolved, needCheck) {
select {
case notifyCheckCh <- struct{}{}:
default:
}
}

}
Expand Down
Loading

0 comments on commit 68254b0

Please sign in to comment.