Skip to content

Commit

Permalink
Use seed to try on different peers
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Jun 2, 2021
1 parent f7d4718 commit 884d469
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 96 deletions.
46 changes: 14 additions & 32 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/pingcap/tidb/store/tikv/retry"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/pkg/slice"
atomic2 "go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
Expand Down Expand Up @@ -186,13 +185,7 @@ func (r *RegionStore) kvPeer(seed uint32, op *storeSelectorOp) AccessIndex {

func (r *RegionStore) filterStoreCandidate(aidx AccessIndex, op *storeSelectorOp) bool {
_, s := r.accessStore(TiKVOnly, aidx)
// Filter by the excludedStoreIDs first.
if slice.AnyOf(op.excludedStoreIDs, func(i int) bool {
return op.excludedStoreIDs[i] == s.storeID
}) {
return false
}
// Filter label unmatched store.
// filter label unmatched store
return s.IsLabelsMatch(op.labels)
}

Expand Down Expand Up @@ -427,9 +420,7 @@ type RPCContext struct {
ProxyAddr string // valid when ProxyStore is not nil
TiKVNum int // Number of TiKV nodes among the region's peers. Assuming non-TiKV peers are all TiFlash peers.

isStaleRead bool
storeSelectorOptions []StoreSelectorOption
lastStoreID uint64
isStaleRead bool
}

func (c *RPCContext) String() string {
Expand All @@ -446,8 +437,7 @@ func (c *RPCContext) String() string {
}

type storeSelectorOp struct {
labels []*metapb.StoreLabel
excludedStoreIDs []uint64
labels []*metapb.StoreLabel
}

// StoreSelectorOption configures storeSelectorOp.
Expand All @@ -460,13 +450,6 @@ func WithMatchLabels(labels []*metapb.StoreLabel) StoreSelectorOption {
}
}

// WithExcludedStoreIDs indicates selecting stores without these excluded StoreIDs.
func WithExcludedStoreIDs(storeIDs []uint64) StoreSelectorOption {
return func(op *storeSelectorOp) {
op.excludedStoreIDs = append(op.excludedStoreIDs, storeIDs...)
}
}

// GetTiKVRPCContext returns RPCContext for a region. If it returns nil, the region
// must be out of date and already dropped from cache.
func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRead kv.ReplicaReadType, followerStoreSeed uint32, opts ...StoreSelectorOption) (*RPCContext, error) {
Expand Down Expand Up @@ -564,18 +547,17 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe
}

return &RPCContext{
Region: id,
Meta: cachedRegion.meta,
Peer: peer,
AccessIdx: accessIdx,
Store: store,
Addr: addr,
AccessMode: TiKVOnly,
ProxyStore: proxyStore,
ProxyAccessIdx: proxyAccessIdx,
ProxyAddr: proxyAddr,
TiKVNum: regionStore.accessStoreNum(TiKVOnly),
storeSelectorOptions: opts,
Region: id,
Meta: cachedRegion.meta,
Peer: peer,
AccessIdx: accessIdx,
Store: store,
Addr: addr,
AccessMode: TiKVOnly,
ProxyStore: proxyStore,
ProxyAccessIdx: proxyAccessIdx,
ProxyAddr: proxyAddr,
TiKVNum: regionStore.accessStoreNum(TiKVOnly),
}, nil
}

Expand Down
55 changes: 24 additions & 31 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,17 +278,10 @@ func (s *RegionRequestSender) SendReqCtx(
logutil.Logger(bo.GetCtx()).Warn("retry get ", zap.Uint64("region = ", regionID.GetID()), zap.Int("times = ", tryTimes))
}

var lastStoreID uint64
if rpcCtx != nil && rpcCtx.Store != nil {
lastStoreID = rpcCtx.Store.storeID
}
rpcCtx, err = s.getRPCContext(bo, req, regionID, et, opts...)
if err != nil {
return nil, nil, err
}
if rpcCtx != nil {
rpcCtx.lastStoreID = lastStoreID
}

failpoint.Inject("invalidCacheAndRetry", func() {
// cooperate with github.com/pingcap/tidb/store/gcworker/setGcResolveMaxBackoff
Expand Down Expand Up @@ -344,7 +337,7 @@ func (s *RegionRequestSender) SendReqCtx(
}
})
if regionErr != nil {
retry, opts, err = s.onRegionError(bo, rpcCtx, req.ReplicaReadSeed, regionErr)
retry, err = s.onRegionError(bo, rpcCtx, req.ReplicaReadSeed, regionErr)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down Expand Up @@ -653,22 +646,14 @@ func regionErrorToLabel(e *errorpb.Error) string {
return "unknown"
}

func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed *uint32, regionErr *errorpb.Error) (shouldRetry bool, opts []StoreSelectorOption, err error) {
func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed *uint32, regionErr *errorpb.Error) (shouldRetry bool, err error) {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("tikv.onRegionError", opentracing.ChildOf(span.Context()))
defer span1.Finish()
// TODO(MyonKeminta): Make sure trace works without cloning the backoffer.
// bo = bo.Clone()
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
}
opts = ctx.storeSelectorOptions
// Stale Read request will retry the leader or next peer on error,
// so we will exclude the StoreID of the requested peer every time.
// If the new StoreID keeps being the same with the last one, we
// should not continue excluding it to make the opts become bigger.
if ctx.Store != nil && ctx.isStaleRead && ctx.lastStoreID != ctx.Store.storeID {
opts = append(opts, WithExcludedStoreIDs([]uint64{ctx.Store.storeID}))
}

metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrorToLabel(regionErr)).Inc()
if notLeader := regionErr.GetNotLeader(); notLeader != nil {
Expand All @@ -684,14 +669,14 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed
// the region from PD.
s.regionCache.InvalidateCachedRegionWithReason(ctx.Region, NoLeader)
if err = bo.Backoff(retry.BoRegionMiss, errors.Errorf("not leader: %v, ctx: %v", notLeader, ctx)); err != nil {
return false, opts, errors.Trace(err)
return false, errors.Trace(err)
}
} else {
// don't backoff if a new leader is returned.
s.regionCache.UpdateLeader(ctx.Region, notLeader.GetLeader().GetStoreId(), ctx.AccessIdx)
}

return true, opts, nil
return true, nil
}

if storeNotMatch := regionErr.GetStoreNotMatch(); storeNotMatch != nil {
Expand All @@ -701,7 +686,7 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed
zap.Stringer("ctx", ctx))
ctx.Store.markNeedCheck(s.regionCache.notifyCheckCh)
s.regionCache.InvalidateCachedRegion(ctx.Region)
return true, opts, nil
return true, nil
}

if epochNotMatch := regionErr.GetEpochNotMatch(); epochNotMatch != nil {
Expand All @@ -712,7 +697,7 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed
*seed = *seed + 1
}
err = s.regionCache.OnRegionEpochNotMatch(bo, ctx, epochNotMatch.CurrentRegions)
return false, opts, errors.Trace(err)
return false, errors.Trace(err)
}
if regionErr.GetServerIsBusy() != nil {
logutil.BgLogger().Warn("tikv reports `ServerIsBusy` retry later",
Expand All @@ -724,36 +709,44 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed
err = bo.Backoff(retry.BoTiKVServerBusy, errors.Errorf("server is busy, ctx: %v", ctx))
}
if err != nil {
return false, opts, errors.Trace(err)
return false, errors.Trace(err)
}
return true, opts, nil
return true, nil
}
if regionErr.GetStaleCommand() != nil {
logutil.BgLogger().Debug("tikv reports `StaleCommand`", zap.Stringer("ctx", ctx))
err = bo.Backoff(retry.BoStaleCmd, errors.Errorf("stale command, ctx: %v", ctx))
if err != nil {
return false, opts, errors.Trace(err)
return false, errors.Trace(err)
}
return true, opts, nil
return true, nil
}
if regionErr.GetRaftEntryTooLarge() != nil {
logutil.BgLogger().Warn("tikv reports `RaftEntryTooLarge`", zap.Stringer("ctx", ctx))
return false, opts, errors.New(regionErr.String())
return false, errors.New(regionErr.String())
}
if regionErr.GetDataIsNotReady() != nil && ctx.isStaleRead {
logutil.BgLogger().Warn("tikv reports `DataIsNotReady` retry later",
zap.Uint64("peer-id", regionErr.GetDataIsNotReady().GetPeerId()),
zap.Uint64("region-id", regionErr.GetDataIsNotReady().GetRegionId()),
zap.Uint64("safe-ts", regionErr.GetDataIsNotReady().GetSafeTs()),
zap.Stringer("ctx", ctx))
return true, opts, nil
// Stale Read request will retry the leader or next peer on error,
// so we will add seed every time.
if seed != nil {
*seed = *seed + 1
}
return true, nil
}
// A stale read request may be sent to a peer which has not been initialized yet, we should retry in this case.
if regionErr.GetRegionNotInitialized() != nil && ctx.isStaleRead {
logutil.BgLogger().Warn("tikv reports `RegionNotInitialized` retry later",
zap.Uint64("region-id", regionErr.GetRegionNotInitialized().GetRegionId()),
zap.Stringer("ctx", ctx))
return true, opts, nil
if seed != nil {
*seed = *seed + 1
}
return true, nil
}
if regionErr.GetRegionNotFound() != nil && seed != nil {
logutil.BgLogger().Debug("tikv reports `RegionNotFound` in follow-reader",
Expand All @@ -764,9 +757,9 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed
logutil.BgLogger().Warn("tikv reports `MaxTimestampNotSynced`", zap.Stringer("ctx", ctx))
err = bo.Backoff(retry.BoMaxTsNotSynced, errors.Errorf("max timestamp not synced, ctx: %v", ctx))
if err != nil {
return false, opts, errors.Trace(err)
return false, errors.Trace(err)
}
return true, opts, nil
return true, nil
}
// For other errors, we only drop cache here.
// Because caller may need to re-split the request.
Expand All @@ -778,5 +771,5 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed
if ctx.Region.id != 0 {
s.regionCache.InvalidateCachedRegion(ctx.Region)
}
return false, opts, nil
return false, nil
}
33 changes: 0 additions & 33 deletions store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,39 +186,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestStoreTokenLimit(c *C) {
kv.StoreLimit.Store(oldStoreLimit)
}

func (s *testRegionRequestToThreeStoresSuite) TestGetRPCContextWithExcludedStoreIDs(c *C) {
// Load the bootstrapped region into the cache.
_, err := s.cache.BatchLoadRegionsFromKey(s.bo, []byte{}, 1)
c.Assert(err, IsNil)

var (
seed uint32 = 0
regionID = RegionVerID{s.regionID, 0, 0}
)
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kv.ReplicaReadMixed, &seed)
rpcCtx, err := s.regionRequestSender.getRPCContext(s.bo, req, regionID, tikvrpc.TiKV)
c.Assert(err, IsNil)
storeID := rpcCtx.Store.storeID

rpcCtx, err = s.regionRequestSender.getRPCContext(s.bo, req, regionID, tikvrpc.TiKV, WithExcludedStoreIDs([]uint64{storeID}))
c.Assert(err, IsNil)
storeID2 := rpcCtx.Store.storeID
c.Assert(storeID, Not(Equals), storeID2)

rpcCtx, err = s.regionRequestSender.getRPCContext(s.bo, req, regionID, tikvrpc.TiKV, WithExcludedStoreIDs([]uint64{storeID, storeID2}))
c.Assert(err, IsNil)
storeID3 := rpcCtx.Store.storeID
c.Assert(storeID, Not(Equals), storeID3)
c.Assert(storeID2, Not(Equals), storeID3)

// All stores are excluded, leader peer will be chosen.
rpcCtx, err = s.regionRequestSender.getRPCContext(s.bo, req, regionID, tikvrpc.TiKV, WithExcludedStoreIDs([]uint64{storeID, storeID2, storeID3}))
c.Assert(err, IsNil)
storeID4 := rpcCtx.Store.storeID
c.Assert(storeID, Equals, storeID4)
c.Assert(rpcCtx.Peer.GetId(), Equals, s.leaderPeer)
}

// Test whether the Stale Read request will retry the leader or next peer on error.
func (s *testRegionRequestToThreeStoresSuite) TestStaleReadRetry(c *C) {
region, err := s.cache.LocateRegionByID(s.bo, s.regionID)
Expand Down

0 comments on commit 884d469

Please sign in to comment.