Skip to content

Commit

Permalink
store/tikv: rename func OnSendFailForRegion to OnSendFailForTiFlash (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreMouche authored May 31, 2021
1 parent 238cab8 commit 7af4fea
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 93 deletions.
17 changes: 1 addition & 16 deletions store/copr/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"go.uber.org/zap"
)

// RegionCache wraps tikv.RegionCache.
Expand Down Expand Up @@ -119,20 +118,6 @@ func (c *RegionCache) OnSendFailForBatchRegions(bo *Backoffer, store *tikv.Store
if ri.Meta == nil {
continue
}
r := c.GetCachedRegionWithRLock(ri.Region)
if r == nil {
return
}
peersNum := len(r.GetMeta().Peers)
if len(ri.Meta.Peers) != peersNum {
logutil.Logger(bo.GetCtx()).Info("retry and refresh current region after send request fail and up/down stores length changed",
zap.Stringer("region", &ri.Region),
zap.Bool("needReload", scheduleReload),
zap.Reflect("oldPeers", ri.Meta.Peers),
zap.Reflect("newPeers", r.GetMeta().Peers),
zap.Error(err))
continue
}
c.OnSendFailForRegion(bo.TiKVBackoffer(), store, ri.Region, r, scheduleReload, err)
c.OnSendFailForTiFlash(bo.TiKVBackoffer(), store, ri.Region, ri.Meta, scheduleReload, err)
}
}
172 changes: 95 additions & 77 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,32 +733,41 @@ func (c *RegionCache) findRegionByKey(bo *Backoffer, key []byte, isEndKey bool)
return r, nil
}

// OnSendFailForRegion handles send request fail logic on a region.
func (c *RegionCache) OnSendFailForRegion(bo *Backoffer, store *Store, rid RegionVerID, r *Region, scheduleReload bool, err error) {
// OnSendFailForTiFlash handles send request fail logic for tiflash.
func (c *RegionCache) OnSendFailForTiFlash(bo *Backoffer, store *Store, region RegionVerID, prev *metapb.Region, scheduleReload bool, err error) {

r := c.GetCachedRegionWithRLock(region)
if r == nil {
return
}

rs := r.getStore()
peersNum := len(r.GetMeta().Peers)
if len(prev.Peers) != peersNum {
logutil.Logger(bo.GetCtx()).Info("retry and refresh current region after send request fail and up/down stores length changed",
zap.Stringer("region", &region),
zap.Bool("needReload", scheduleReload),
zap.Reflect("oldPeers", prev.Peers),
zap.Reflect("newPeers", r.GetMeta().Peers),
zap.Error(err))
return
}

accessMode := TiFlashOnly
accessIdx := rs.getAccessIndex(accessMode, store)
if accessIdx == -1 {
logutil.Logger(bo.GetCtx()).Warn("can not get access index for region " + rid.String())
logutil.Logger(bo.GetCtx()).Warn("can not get access index for region " + region.String())
return
}
if err != nil {
storeIdx, s := rs.accessStore(accessMode, accessIdx)
epoch := rs.storeEpochs[storeIdx]
if atomic.CompareAndSwapUint32(&s.epoch, epoch, epoch+1) {
logutil.BgLogger().Info("mark store's regions need be refill", zap.String("store", s.addr))
metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc()
}
// schedule a store addr resolve.
s.markNeedCheck(c.notifyCheckCh)
c.markRegionNeedBeRefill(s, storeIdx, rs)
}

// try next peer
rs.switchNextFlashPeer(r, accessIdx)
logutil.Logger(bo.GetCtx()).Info("switch region tiflash peer to next due to send request fail",
zap.Stringer("region", &rid),
zap.Stringer("region", &region),
zap.Bool("needReload", scheduleReload),
zap.Error(err))

Expand All @@ -768,90 +777,99 @@ func (c *RegionCache) OnSendFailForRegion(bo *Backoffer, store *Store, rid Regio
}
}

func (c *RegionCache) markRegionNeedBeRefill(s *Store, storeIdx int, rs *RegionStore) int {
incEpochStoreIdx := -1
// invalidate regions in store.
epoch := rs.storeEpochs[storeIdx]
if atomic.CompareAndSwapUint32(&s.epoch, epoch, epoch+1) {
logutil.BgLogger().Info("mark store's regions need be refill", zap.String("store", s.addr))
incEpochStoreIdx = storeIdx
metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc()
}
// schedule a store addr resolve.
s.markNeedCheck(c.notifyCheckCh)
return incEpochStoreIdx
}

// OnSendFail handles send request fail logic.
func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload bool, err error) {
metrics.RegionCacheCounterWithSendFail.Inc()
r := c.GetCachedRegionWithRLock(ctx.Region)
if r != nil {
peersNum := len(r.meta.Peers)
if len(ctx.Meta.Peers) != peersNum {
logutil.Logger(bo.GetCtx()).Info("retry and refresh current ctx after send request fail and up/down stores length changed",
zap.Stringer("current", ctx),
zap.Bool("needReload", scheduleReload),
zap.Reflect("oldPeers", ctx.Meta.Peers),
zap.Reflect("newPeers", r.meta.Peers),
zap.Error(err))
return
}

rs := r.getStore()
startForwarding := false
incEpochStoreIdx := -1
if r == nil {
return
}
peersNum := len(r.meta.Peers)
if len(ctx.Meta.Peers) != peersNum {
logutil.Logger(bo.GetCtx()).Info("retry and refresh current ctx after send request fail and up/down stores length changed",
zap.Stringer("current", ctx),
zap.Bool("needReload", scheduleReload),
zap.Reflect("oldPeers", ctx.Meta.Peers),
zap.Reflect("newPeers", r.meta.Peers),
zap.Error(err))
return
}

if err != nil {
storeIdx, s := rs.accessStore(ctx.AccessMode, ctx.AccessIdx)
leaderReq := ctx.Store.storeType == tikvrpc.TiKV && rs.workTiKVIdx == ctx.AccessIdx

// Mark the store as failure if it's not a redirection request because we
// can't know the status of the proxy store by it.
if ctx.ProxyStore == nil {
// send fail but store is reachable, keep retry current peer for replica leader request.
// but we still need switch peer for follower-read or learner-read(i.e. tiflash)
if leaderReq {
if s.requestLiveness(bo, c) == reachable {
return
} else if c.enableForwarding {
s.startHealthCheckLoopIfNeeded(c)
startForwarding = true
}
}
rs := r.getStore()
startForwarding := false
incEpochStoreIdx := -1

// invalidate regions in store.
epoch := rs.storeEpochs[storeIdx]
if atomic.CompareAndSwapUint32(&s.epoch, epoch, epoch+1) {
logutil.BgLogger().Info("mark store's regions need be refill", zap.String("store", s.addr))
incEpochStoreIdx = storeIdx
metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc()
if err != nil {
storeIdx, s := rs.accessStore(ctx.AccessMode, ctx.AccessIdx)
leaderReq := ctx.Store.storeType == tikvrpc.TiKV && rs.workTiKVIdx == ctx.AccessIdx

// Mark the store as failure if it's not a redirection request because we
// can't know the status of the proxy store by it.
if ctx.ProxyStore == nil {
// send fail but store is reachable, keep retry current peer for replica leader request.
// but we still need switch peer for follower-read or learner-read(i.e. tiflash)
if leaderReq {
if s.requestLiveness(bo, c) == reachable {
return
} else if c.enableForwarding {
s.startHealthCheckLoopIfNeeded(c)
startForwarding = true
}
// schedule a store addr resolve.
s.markNeedCheck(c.notifyCheckCh)
}

// invalidate regions in store.
incEpochStoreIdx = c.markRegionNeedBeRefill(s, storeIdx, rs)
}
}

// try next peer to found new leader.
if ctx.AccessMode == TiKVOnly {
if startForwarding || ctx.ProxyStore != nil {
var currentProxyIdx AccessIndex = -1
if ctx.ProxyStore != nil {
currentProxyIdx = ctx.ProxyAccessIdx
}
// In case the epoch of the store is increased, try to avoid reloading the current region by also
// increasing the epoch stored in `rs`.
rs.switchNextProxyStore(r, currentProxyIdx, incEpochStoreIdx)
logutil.Logger(bo.GetCtx()).Info("switch region proxy peer to next due to send request fail",
zap.Stringer("current", ctx),
zap.Bool("needReload", scheduleReload),
zap.Error(err))
} else {
rs.switchNextTiKVPeer(r, ctx.AccessIdx)
logutil.Logger(bo.GetCtx()).Info("switch region peer to next due to send request fail",
zap.Stringer("current", ctx),
zap.Bool("needReload", scheduleReload),
zap.Error(err))
// try next peer to found new leader.
if ctx.AccessMode == TiKVOnly {
if startForwarding || ctx.ProxyStore != nil {
var currentProxyIdx AccessIndex = -1
if ctx.ProxyStore != nil {
currentProxyIdx = ctx.ProxyAccessIdx
}
// In case the epoch of the store is increased, try to avoid reloading the current region by also
// increasing the epoch stored in `rs`.
rs.switchNextProxyStore(r, currentProxyIdx, incEpochStoreIdx)
logutil.Logger(bo.GetCtx()).Info("switch region proxy peer to next due to send request fail",
zap.Stringer("current", ctx),
zap.Bool("needReload", scheduleReload),
zap.Error(err))
} else {
rs.switchNextFlashPeer(r, ctx.AccessIdx)
logutil.Logger(bo.GetCtx()).Info("switch region tiflash peer to next due to send request fail",
rs.switchNextTiKVPeer(r, ctx.AccessIdx)
logutil.Logger(bo.GetCtx()).Info("switch region peer to next due to send request fail",
zap.Stringer("current", ctx),
zap.Bool("needReload", scheduleReload),
zap.Error(err))
}
} else {
rs.switchNextFlashPeer(r, ctx.AccessIdx)
logutil.Logger(bo.GetCtx()).Info("switch region tiflash peer to next due to send request fail",
zap.Stringer("current", ctx),
zap.Bool("needReload", scheduleReload),
zap.Error(err))
}

// force reload region when retry all known peers in region.
if scheduleReload {
r.scheduleReload()
}
// force reload region when retry all known peers in region.
if scheduleReload {
r.scheduleReload()
}

}

// LocateRegionByID searches for the region with ID.
Expand Down

0 comments on commit 7af4fea

Please sign in to comment.