From afc8953c547c4b8e12917430fac36fc84e2054d8 Mon Sep 17 00:00:00 2001 From: shirly Date: Mon, 31 May 2021 17:11:25 +0800 Subject: [PATCH] store/tikv: rename func OnSendFailForRegion to OnSendFailForTiFlash Signed-off-by: shirly --- store/copr/region_cache.go | 17 +--- store/tikv/region_cache.go | 172 ++++++++++++++++++++----------------- 2 files changed, 96 insertions(+), 93 deletions(-) diff --git a/store/copr/region_cache.go b/store/copr/region_cache.go index 9d6bd4dc973c6..23c49725b40e6 100644 --- a/store/copr/region_cache.go +++ b/store/copr/region_cache.go @@ -21,7 +21,6 @@ import ( "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" - "go.uber.org/zap" ) // RegionCache wraps tikv.RegionCache. @@ -119,20 +118,6 @@ func (c *RegionCache) OnSendFailForBatchRegions(bo *Backoffer, store *tikv.Store if ri.Meta == nil { continue } - r := c.GetCachedRegionWithRLock(ri.Region) - if r == nil { - return - } - peersNum := len(r.GetMeta().Peers) - if len(ri.Meta.Peers) != peersNum { - logutil.Logger(bo.GetCtx()).Info("retry and refresh current region after send request fail and up/down stores length changed", - zap.Stringer("region", &ri.Region), - zap.Bool("needReload", scheduleReload), - zap.Reflect("oldPeers", ri.Meta.Peers), - zap.Reflect("newPeers", r.GetMeta().Peers), - zap.Error(err)) - continue - } - c.OnSendFailForRegion(bo.TiKVBackoffer(), store, ri.Region, r, scheduleReload, err) + c.OnSendFailForTiFlash(bo.TiKVBackoffer(), store, ri.Region, ri.Meta, scheduleReload, err) } } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index d3eead0444940..2fb006138dff5 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -733,32 +733,41 @@ func (c *RegionCache) findRegionByKey(bo *Backoffer, key []byte, isEndKey bool) return r, nil } -// OnSendFailForRegion handles send request fail logic on a region. -func (c *RegionCache) OnSendFailForRegion(bo *Backoffer, store *Store, rid RegionVerID, r *Region, scheduleReload bool, err error) { +// OnSendFailForTiFlash handles send request fail logic for tiflash. +func (c *RegionCache) OnSendFailForTiFlash(bo *Backoffer, store *Store, region RegionVerID, prev *metapb.Region, scheduleReload bool, err error) { + + r := c.GetCachedRegionWithRLock(region) + if r == nil { + return + } rs := r.getStore() + peersNum := len(r.GetMeta().Peers) + if len(prev.Peers) != peersNum { + logutil.Logger(bo.GetCtx()).Info("retry and refresh current region after send request fail and up/down stores length changed", + zap.Stringer("region", ®ion), + zap.Bool("needReload", scheduleReload), + zap.Reflect("oldPeers", prev.Peers), + zap.Reflect("newPeers", r.GetMeta().Peers), + zap.Error(err)) + return + } accessMode := TiFlashOnly accessIdx := rs.getAccessIndex(accessMode, store) if accessIdx == -1 { - logutil.Logger(bo.GetCtx()).Warn("can not get access index for region " + rid.String()) + logutil.Logger(bo.GetCtx()).Warn("can not get access index for region " + region.String()) return } if err != nil { storeIdx, s := rs.accessStore(accessMode, accessIdx) - epoch := rs.storeEpochs[storeIdx] - if atomic.CompareAndSwapUint32(&s.epoch, epoch, epoch+1) { - logutil.BgLogger().Info("mark store's regions need be refill", zap.String("store", s.addr)) - metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc() - } - // schedule a store addr resolve. - s.markNeedCheck(c.notifyCheckCh) + c.markRegionNeedBeRefill(s, storeIdx, rs) } // try next peer rs.switchNextFlashPeer(r, accessIdx) logutil.Logger(bo.GetCtx()).Info("switch region tiflash peer to next due to send request fail", - zap.Stringer("region", &rid), + zap.Stringer("region", ®ion), zap.Bool("needReload", scheduleReload), zap.Error(err)) @@ -768,90 +777,99 @@ func (c *RegionCache) OnSendFailForRegion(bo *Backoffer, store *Store, rid Regio } } +func (c *RegionCache) markRegionNeedBeRefill(s *Store, storeIdx int, rs *RegionStore) int { + incEpochStoreIdx := -1 + // invalidate regions in store. + epoch := rs.storeEpochs[storeIdx] + if atomic.CompareAndSwapUint32(&s.epoch, epoch, epoch+1) { + logutil.BgLogger().Info("mark store's regions need be refill", zap.String("store", s.addr)) + incEpochStoreIdx = storeIdx + metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc() + } + // schedule a store addr resolve. + s.markNeedCheck(c.notifyCheckCh) + return incEpochStoreIdx +} + // OnSendFail handles send request fail logic. func (c *RegionCache) OnSendFail(bo *Backoffer, ctx *RPCContext, scheduleReload bool, err error) { metrics.RegionCacheCounterWithSendFail.Inc() r := c.GetCachedRegionWithRLock(ctx.Region) - if r != nil { - peersNum := len(r.meta.Peers) - if len(ctx.Meta.Peers) != peersNum { - logutil.Logger(bo.GetCtx()).Info("retry and refresh current ctx after send request fail and up/down stores length changed", - zap.Stringer("current", ctx), - zap.Bool("needReload", scheduleReload), - zap.Reflect("oldPeers", ctx.Meta.Peers), - zap.Reflect("newPeers", r.meta.Peers), - zap.Error(err)) - return - } - - rs := r.getStore() - startForwarding := false - incEpochStoreIdx := -1 + if r == nil { + return + } + peersNum := len(r.meta.Peers) + if len(ctx.Meta.Peers) != peersNum { + logutil.Logger(bo.GetCtx()).Info("retry and refresh current ctx after send request fail and up/down stores length changed", + zap.Stringer("current", ctx), + zap.Bool("needReload", scheduleReload), + zap.Reflect("oldPeers", ctx.Meta.Peers), + zap.Reflect("newPeers", r.meta.Peers), + zap.Error(err)) + return + } - if err != nil { - storeIdx, s := rs.accessStore(ctx.AccessMode, ctx.AccessIdx) - leaderReq := ctx.Store.storeType == tikvrpc.TiKV && rs.workTiKVIdx == ctx.AccessIdx - - // Mark the store as failure if it's not a redirection request because we - // can't know the status of the proxy store by it. - if ctx.ProxyStore == nil { - // send fail but store is reachable, keep retry current peer for replica leader request. - // but we still need switch peer for follower-read or learner-read(i.e. tiflash) - if leaderReq { - if s.requestLiveness(bo, c) == reachable { - return - } else if c.enableForwarding { - s.startHealthCheckLoopIfNeeded(c) - startForwarding = true - } - } + rs := r.getStore() + startForwarding := false + incEpochStoreIdx := -1 - // invalidate regions in store. - epoch := rs.storeEpochs[storeIdx] - if atomic.CompareAndSwapUint32(&s.epoch, epoch, epoch+1) { - logutil.BgLogger().Info("mark store's regions need be refill", zap.String("store", s.addr)) - incEpochStoreIdx = storeIdx - metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc() + if err != nil { + storeIdx, s := rs.accessStore(ctx.AccessMode, ctx.AccessIdx) + leaderReq := ctx.Store.storeType == tikvrpc.TiKV && rs.workTiKVIdx == ctx.AccessIdx + + // Mark the store as failure if it's not a redirection request because we + // can't know the status of the proxy store by it. + if ctx.ProxyStore == nil { + // send fail but store is reachable, keep retry current peer for replica leader request. + // but we still need switch peer for follower-read or learner-read(i.e. tiflash) + if leaderReq { + if s.requestLiveness(bo, c) == reachable { + return + } else if c.enableForwarding { + s.startHealthCheckLoopIfNeeded(c) + startForwarding = true } - // schedule a store addr resolve. - s.markNeedCheck(c.notifyCheckCh) } + + // invalidate regions in store. + incEpochStoreIdx = c.markRegionNeedBeRefill(s, storeIdx, rs) } + } - // try next peer to found new leader. - if ctx.AccessMode == TiKVOnly { - if startForwarding || ctx.ProxyStore != nil { - var currentProxyIdx AccessIndex = -1 - if ctx.ProxyStore != nil { - currentProxyIdx = ctx.ProxyAccessIdx - } - // In case the epoch of the store is increased, try to avoid reloading the current region by also - // increasing the epoch stored in `rs`. - rs.switchNextProxyStore(r, currentProxyIdx, incEpochStoreIdx) - logutil.Logger(bo.GetCtx()).Info("switch region proxy peer to next due to send request fail", - zap.Stringer("current", ctx), - zap.Bool("needReload", scheduleReload), - zap.Error(err)) - } else { - rs.switchNextTiKVPeer(r, ctx.AccessIdx) - logutil.Logger(bo.GetCtx()).Info("switch region peer to next due to send request fail", - zap.Stringer("current", ctx), - zap.Bool("needReload", scheduleReload), - zap.Error(err)) + // try next peer to found new leader. + if ctx.AccessMode == TiKVOnly { + if startForwarding || ctx.ProxyStore != nil { + var currentProxyIdx AccessIndex = -1 + if ctx.ProxyStore != nil { + currentProxyIdx = ctx.ProxyAccessIdx } + // In case the epoch of the store is increased, try to avoid reloading the current region by also + // increasing the epoch stored in `rs`. + rs.switchNextProxyStore(r, currentProxyIdx, incEpochStoreIdx) + logutil.Logger(bo.GetCtx()).Info("switch region proxy peer to next due to send request fail", + zap.Stringer("current", ctx), + zap.Bool("needReload", scheduleReload), + zap.Error(err)) } else { - rs.switchNextFlashPeer(r, ctx.AccessIdx) - logutil.Logger(bo.GetCtx()).Info("switch region tiflash peer to next due to send request fail", + rs.switchNextTiKVPeer(r, ctx.AccessIdx) + logutil.Logger(bo.GetCtx()).Info("switch region peer to next due to send request fail", zap.Stringer("current", ctx), zap.Bool("needReload", scheduleReload), zap.Error(err)) } + } else { + rs.switchNextFlashPeer(r, ctx.AccessIdx) + logutil.Logger(bo.GetCtx()).Info("switch region tiflash peer to next due to send request fail", + zap.Stringer("current", ctx), + zap.Bool("needReload", scheduleReload), + zap.Error(err)) + } - // force reload region when retry all known peers in region. - if scheduleReload { - r.scheduleReload() - } + // force reload region when retry all known peers in region. + if scheduleReload { + r.scheduleReload() } + } // LocateRegionByID searches for the region with ID.