Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: rename func OnSendFailForRegion to OnSendFailForTiFlash #24992

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 1 addition & 16 deletions store/copr/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"go.uber.org/zap"
)

// RegionCache wraps tikv.RegionCache.
Expand Down Expand Up @@ -119,20 +118,6 @@ func (c *RegionCache) OnSendFailForBatchRegions(bo *Backoffer, store *tikv.Store
if ri.Meta == nil {
continue
}
r := c.GetCachedRegionWithRLock(ri.Region)
if r == nil {
return
}
peersNum := len(r.GetMeta().Peers)
if len(ri.Meta.Peers) != peersNum {
logutil.Logger(bo.GetCtx()).Info("retry and refresh current region after send request fail and up/down stores length changed",
zap.Stringer("region", &ri.Region),
zap.Bool("needReload", scheduleReload),
zap.Reflect("oldPeers", ri.Meta.Peers),
zap.Reflect("newPeers", r.GetMeta().Peers),
zap.Error(err))
continue
}
c.OnSendFailForRegion(bo.TiKVBackoffer(), store, ri.Region, r, scheduleReload, err)
c.OnSendFailForTiFlash(bo.TiKVBackoffer(), store, ri.Region, ri.Meta, scheduleReload, err)
}
}
172 changes: 95 additions & 77 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,32 +733,41 @@ func (c *RegionCache) findRegionByKey(bo *Backoffer, key []byte, isEndKey bool)
return r, nil
}

// OnSendFailForRegion handles send request fail logic on a region.
func (c *RegionCache) OnSendFailForRegion(bo *Backoffer, store *Store, rid RegionVerID, r *Region, scheduleReload bool, err error) {
// OnSendFailForTiFlash handles send request fail logic for tiflash.
func (c *RegionCache) OnSendFailForTiFlash(bo *Backoffer, store *Store, region RegionVerID, prev *metapb.Region, scheduleReload bool, err error) {

r := c.GetCachedRegionWithRLock(region)
if r == nil {
return
}

rs := r.getStore()
peersNum := len(r.GetMeta().Peers)
if len(prev.Peers) != peersNum {
logutil.Logger(bo.GetCtx()).Info("retry and refresh current region after send request fail and up/down stores length changed",
zap.Stringer("region", &region),
zap.Bool("needReload", scheduleReload),
zap.Reflect("oldPeers", prev.Peers),
zap.Reflect("newPeers", r.GetMeta().Peers),
zap.Error(err))
return
}

accessMode := TiFlashOnly
accessIdx := rs.getAccessIndex(accessMode, store)
if accessIdx == -1 {
logutil.Logger(bo.GetCtx()).Warn("can not get access index for region " + rid.String())
logutil.Logger(bo.GetCtx()).Warn("can not get access index for region " + region.String())
return
}
if err != nil {
storeIdx, s := rs.accessStore(accessMode, accessIdx)
epoch := rs.storeEpochs[storeIdx]
if atomic.CompareAndSwapUint32(&s.epoch, epoch, epoch+1) {
logutil.BgLogger().Info("mark store's regions need be refill", zap.String("store", s.addr))
metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc()
}
// schedule a store addr resolve.
s.markNeedCheck(c.notifyCheckCh)
c.markRegionNeedBeRefill(s, storeIdx, rs)
}

// try next peer
rs.switchNextFlashPeer(r, accessIdx)
logutil.Logger(bo.GetCtx()).Info("switch region tiflash peer to next due to send request fail",
zap.Stringer("region", &rid),
zap.Stringer("region", &region),
zap.Bool("needReload", scheduleReload),
zap.Error(err))

Expand All @@ -768,90 +777,99 @@ func (c *RegionCache) OnSendFailForRegion(bo *Backoffer, store *Store, rid Regio
}
}

func (c *RegionCache) markRegionNeedBeRefill(s *Store, storeIdx int, rs *RegionStore) int {
incEpochStoreIdx := -1
// invalidate regions in store.
epoch := rs.storeEpochs[storeIdx]
if atomic.CompareAndSwapUint32(&s.epoch, epoch, epoch+1) {
logutil.BgLogger().Info("mark store's regions need be refill", zap.String("store", s.addr))
incEpochStoreIdx = storeIdx
metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc()
}
// schedule a store addr resolve.
s.markNeedCheck(c.notifyCheckCh)
return incEpochStoreIdx
}

// OnSendFail handles send request fail logic.
func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload bool, err error) {
metrics.RegionCacheCounterWithSendFail.Inc()
r := c.GetCachedRegionWithRLock(ctx.Region)
if r != nil {
peersNum := len(r.meta.Peers)
if len(ctx.Meta.Peers) != peersNum {
logutil.Logger(bo.GetCtx()).Info("retry and refresh current ctx after send request fail and up/down stores length changed",
zap.Stringer("current", ctx),
zap.Bool("needReload", scheduleReload),
zap.Reflect("oldPeers", ctx.Meta.Peers),
zap.Reflect("newPeers", r.meta.Peers),
zap.Error(err))
return
}

rs := r.getStore()
startForwarding := false
incEpochStoreIdx := -1
if r == nil {
return
}
peersNum := len(r.meta.Peers)
if len(ctx.Meta.Peers) != peersNum {
logutil.Logger(bo.GetCtx()).Info("retry and refresh current ctx after send request fail and up/down stores length changed",
zap.Stringer("current", ctx),
zap.Bool("needReload", scheduleReload),
zap.Reflect("oldPeers", ctx.Meta.Peers),
zap.Reflect("newPeers", r.meta.Peers),
zap.Error(err))
return
}

if err != nil {
storeIdx, s := rs.accessStore(ctx.AccessMode, ctx.AccessIdx)
leaderReq := ctx.Store.storeType == tikvrpc.TiKV && rs.workTiKVIdx == ctx.AccessIdx

// Mark the store as failure if it's not a redirection request because we
// can't know the status of the proxy store by it.
if ctx.ProxyStore == nil {
// send fail but store is reachable, keep retry current peer for replica leader request.
// but we still need switch peer for follower-read or learner-read(i.e. tiflash)
if leaderReq {
if s.requestLiveness(bo, c) == reachable {
return
} else if c.enableForwarding {
s.startHealthCheckLoopIfNeeded(c)
startForwarding = true
}
}
rs := r.getStore()
startForwarding := false
incEpochStoreIdx := -1

// invalidate regions in store.
epoch := rs.storeEpochs[storeIdx]
if atomic.CompareAndSwapUint32(&s.epoch, epoch, epoch+1) {
logutil.BgLogger().Info("mark store's regions need be refill", zap.String("store", s.addr))
incEpochStoreIdx = storeIdx
metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc()
if err != nil {
storeIdx, s := rs.accessStore(ctx.AccessMode, ctx.AccessIdx)
leaderReq := ctx.Store.storeType == tikvrpc.TiKV && rs.workTiKVIdx == ctx.AccessIdx

// Mark the store as failure if it's not a redirection request because we
// can't know the status of the proxy store by it.
if ctx.ProxyStore == nil {
// send fail but store is reachable, keep retry current peer for replica leader request.
// but we still need switch peer for follower-read or learner-read(i.e. tiflash)
if leaderReq {
if s.requestLiveness(bo, c) == reachable {
return
} else if c.enableForwarding {
s.startHealthCheckLoopIfNeeded(c)
startForwarding = true
}
// schedule a store addr resolve.
s.markNeedCheck(c.notifyCheckCh)
}

// invalidate regions in store.
incEpochStoreIdx = c.markRegionNeedBeRefill(s, storeIdx, rs)
}
}

// try next peer to found new leader.
if ctx.AccessMode == TiKVOnly {
if startForwarding || ctx.ProxyStore != nil {
var currentProxyIdx AccessIndex = -1
if ctx.ProxyStore != nil {
currentProxyIdx = ctx.ProxyAccessIdx
}
// In case the epoch of the store is increased, try to avoid reloading the current region by also
// increasing the epoch stored in `rs`.
rs.switchNextProxyStore(r, currentProxyIdx, incEpochStoreIdx)
logutil.Logger(bo.GetCtx()).Info("switch region proxy peer to next due to send request fail",
zap.Stringer("current", ctx),
zap.Bool("needReload", scheduleReload),
zap.Error(err))
} else {
rs.switchNextTiKVPeer(r, ctx.AccessIdx)
logutil.Logger(bo.GetCtx()).Info("switch region peer to next due to send request fail",
zap.Stringer("current", ctx),
zap.Bool("needReload", scheduleReload),
zap.Error(err))
// try next peer to found new leader.
if ctx.AccessMode == TiKVOnly {
if startForwarding || ctx.ProxyStore != nil {
var currentProxyIdx AccessIndex = -1
if ctx.ProxyStore != nil {
currentProxyIdx = ctx.ProxyAccessIdx
}
// In case the epoch of the store is increased, try to avoid reloading the current region by also
// increasing the epoch stored in `rs`.
rs.switchNextProxyStore(r, currentProxyIdx, incEpochStoreIdx)
logutil.Logger(bo.GetCtx()).Info("switch region proxy peer to next due to send request fail",
zap.Stringer("current", ctx),
zap.Bool("needReload", scheduleReload),
zap.Error(err))
} else {
rs.switchNextFlashPeer(r, ctx.AccessIdx)
logutil.Logger(bo.GetCtx()).Info("switch region tiflash peer to next due to send request fail",
rs.switchNextTiKVPeer(r, ctx.AccessIdx)
logutil.Logger(bo.GetCtx()).Info("switch region peer to next due to send request fail",
zap.Stringer("current", ctx),
zap.Bool("needReload", scheduleReload),
zap.Error(err))
}
} else {
rs.switchNextFlashPeer(r, ctx.AccessIdx)
logutil.Logger(bo.GetCtx()).Info("switch region tiflash peer to next due to send request fail",
zap.Stringer("current", ctx),
zap.Bool("needReload", scheduleReload),
zap.Error(err))
}

// force reload region when retry all known peers in region.
if scheduleReload {
r.scheduleReload()
}
// force reload region when retry all known peers in region.
if scheduleReload {
r.scheduleReload()
}

}

// LocateRegionByID searches for the region with ID.
Expand Down