Skip to content

Commit

Permalink
Address the comments
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Jun 2, 2021
1 parent 2d7bd46 commit aa554ca
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 47 deletions.
45 changes: 17 additions & 28 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,21 +167,12 @@ func (r *RegionStore) follower(seed uint32, op *storeSelectorOp) AccessIndex {
}

// return next leader or follower store's index
func (r *RegionStore) kvPeer(region *Region, seed uint32, op *storeSelectorOp) AccessIndex {
func (r *RegionStore) kvPeer(seed uint32, op *storeSelectorOp) AccessIndex {
candidates := make([]AccessIndex, 0, r.accessStoreNum(TiKVOnly))
for i := 0; i < r.accessStoreNum(TiKVOnly); i++ {
accessIdx := AccessIndex(i)
storeIdx, s := r.accessStore(TiKVOnly, accessIdx)
if r.storeEpochs[storeIdx] != atomic.LoadUint32(&s.epoch) || !r.filterStoreCandidate(AccessIndex(i), op) {
continue
}
// Exclude by the excludedPeerIDs.
if peer := region.meta.Peers[storeIdx]; slice.AnyOf(op.excludedPeerIDs, func(i int) bool {
return op.excludedPeerIDs[i] == peer.GetId()
}) ||
// Learner peer is only an intermediate state, so we should not send any request to it.
// TODO: better to retry by TiKV response's error message in (*RegionRequestSender).onRegionError.
peer.GetRole() == metapb.PeerRole_Learner {
if r.storeEpochs[storeIdx] != atomic.LoadUint32(&s.epoch) || !r.filterStoreCandidate(accessIdx, op) {
continue
}
candidates = append(candidates, accessIdx)
Expand All @@ -195,7 +186,13 @@ func (r *RegionStore) kvPeer(region *Region, seed uint32, op *storeSelectorOp) A

func (r *RegionStore) filterStoreCandidate(aidx AccessIndex, op *storeSelectorOp) bool {
_, s := r.accessStore(TiKVOnly, aidx)
// filter label unmatched store
// Filter by the excludedStoreIDs first.
if slice.AnyOf(op.excludedStoreIDs, func(i int) bool {
return op.excludedStoreIDs[i] == s.storeID
}) {
return false
}
// Filter label unmatched store.
return s.IsLabelsMatch(op.labels)
}

Expand Down Expand Up @@ -432,7 +429,7 @@ type RPCContext struct {

isStaleRead bool
storeSelectorOptions []StoreSelectorOption
lastPeerID uint64
lastStoreID uint64
}

func (c *RPCContext) String() string {
Expand All @@ -449,8 +446,8 @@ func (c *RPCContext) String() string {
}

type storeSelectorOp struct {
labels []*metapb.StoreLabel
excludedPeerIDs []uint64
labels []*metapb.StoreLabel
excludedStoreIDs []uint64
}

// StoreSelectorOption configures storeSelectorOp.
Expand All @@ -459,22 +456,14 @@ type StoreSelectorOption func(*storeSelectorOp)
// WithMatchLabels indicates selecting stores with matched labels.
func WithMatchLabels(labels []*metapb.StoreLabel) StoreSelectorOption {
return func(op *storeSelectorOp) {
if op.labels != nil {
op.labels = append(op.labels, labels...)
} else {
op.labels = labels
}
op.labels = append(op.labels, labels...)
}
}

// WithExcludedPeerIDs indicates selecting stores without any matched peerID on it.
func WithExcludedPeerIDs(storeIDs []uint64) StoreSelectorOption {
// WithExcludedStoreIDs indicates selecting stores without these excluded StoreIDs.
func WithExcludedStoreIDs(storeIDs []uint64) StoreSelectorOption {
return func(op *storeSelectorOp) {
if op.excludedPeerIDs != nil {
op.excludedPeerIDs = append(op.excludedPeerIDs, storeIDs...)
} else {
op.excludedPeerIDs = storeIDs
}
op.excludedStoreIDs = append(op.excludedStoreIDs, storeIDs...)
}
}

Expand Down Expand Up @@ -1698,7 +1687,7 @@ func (r *Region) FollowerStorePeer(rs *RegionStore, followerStoreSeed uint32, op

// AnyStorePeer returns a leader or follower store with the associated peer.
func (r *Region) AnyStorePeer(rs *RegionStore, followerStoreSeed uint32, op *storeSelectorOp) (store *Store, peer *metapb.Peer, accessIdx AccessIndex, storeIdx int) {
return r.getKvStorePeer(rs, rs.kvPeer(r, followerStoreSeed, op))
return r.getKvStorePeer(rs, rs.kvPeer(followerStoreSeed, op))
}

// RegionVerID is a unique ID that can identify a Region at a specific version.
Expand Down
14 changes: 7 additions & 7 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,16 +278,16 @@ func (s *RegionRequestSender) SendReqCtx(
logutil.Logger(bo.GetCtx()).Warn("retry get ", zap.Uint64("region = ", regionID.GetID()), zap.Int("times = ", tryTimes))
}

var lastPeerID uint64
var lastStoreID uint64
if rpcCtx != nil {
lastPeerID = rpcCtx.Peer.GetId()
lastStoreID = rpcCtx.Store.storeID
}
rpcCtx, err = s.getRPCContext(bo, req, regionID, et, opts...)
if err != nil {
return nil, nil, err
}
if rpcCtx != nil {
rpcCtx.lastPeerID = lastPeerID
rpcCtx.lastStoreID = lastStoreID
}

failpoint.Inject("invalidCacheAndRetry", func() {
Expand Down Expand Up @@ -663,11 +663,11 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed
}
opts = ctx.storeSelectorOptions
// Stale Read request will retry the leader or next peer on error,
// so we will exclude the PeerID of the requested peer every time.
// If the new PeerID keeps being the same with the last one, we
// so we will exclude the StoreID of the requested peer every time.
// If the new StoreID keeps being the same with the last one, we
// should not continue excluding it to make the opts become bigger.
if ctx.isStaleRead && ctx.lastPeerID != ctx.Peer.GetId() {
opts = append(opts, WithExcludedPeerIDs([]uint64{ctx.Peer.GetId()}))
if ctx.isStaleRead && ctx.lastStoreID != ctx.Store.storeID {
opts = append(opts, WithExcludedStoreIDs([]uint64{ctx.Store.storeID}))
}

metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrorToLabel(regionErr)).Inc()
Expand Down
24 changes: 12 additions & 12 deletions store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestStoreTokenLimit(c *C) {
kv.StoreLimit.Store(oldStoreLimit)
}

func (s *testRegionRequestToThreeStoresSuite) TestGetRPCContextWithExcludedPeerIDs(c *C) {
func (s *testRegionRequestToThreeStoresSuite) TestGetRPCContextWithExcludedStoreIDs(c *C) {
// Load the bootstrapped region into the cache.
_, err := s.cache.BatchLoadRegionsFromKey(s.bo, []byte{}, 1)
c.Assert(err, IsNil)
Expand All @@ -198,24 +198,24 @@ func (s *testRegionRequestToThreeStoresSuite) TestGetRPCContextWithExcludedPeerI
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kv.ReplicaReadMixed, &seed)
rpcCtx, err := s.regionRequestSender.getRPCContext(s.bo, req, regionID, tikvrpc.TiKV)
c.Assert(err, IsNil)
peedID1 := rpcCtx.Peer.GetId()
storeID := rpcCtx.Store.storeID

rpcCtx, err = s.regionRequestSender.getRPCContext(s.bo, req, regionID, tikvrpc.TiKV, WithExcludedPeerIDs([]uint64{peedID1}))
rpcCtx, err = s.regionRequestSender.getRPCContext(s.bo, req, regionID, tikvrpc.TiKV, WithExcludedStoreIDs([]uint64{storeID}))
c.Assert(err, IsNil)
peedID2 := rpcCtx.Peer.GetId()
c.Assert(peedID1, Not(Equals), peedID2)
storeID2 := rpcCtx.Store.storeID
c.Assert(storeID, Not(Equals), storeID2)

rpcCtx, err = s.regionRequestSender.getRPCContext(s.bo, req, regionID, tikvrpc.TiKV, WithExcludedPeerIDs([]uint64{peedID1, peedID2}))
rpcCtx, err = s.regionRequestSender.getRPCContext(s.bo, req, regionID, tikvrpc.TiKV, WithExcludedStoreIDs([]uint64{storeID, storeID2}))
c.Assert(err, IsNil)
peedID3 := rpcCtx.Peer.GetId()
c.Assert(peedID1, Not(Equals), peedID3)
c.Assert(peedID2, Not(Equals), peedID3)
storeID3 := rpcCtx.Store.storeID
c.Assert(storeID, Not(Equals), storeID3)
c.Assert(storeID2, Not(Equals), storeID3)

// All stores are excluded, leader peer will be chosen.
rpcCtx, err = s.regionRequestSender.getRPCContext(s.bo, req, regionID, tikvrpc.TiKV, WithExcludedPeerIDs([]uint64{peedID1, peedID2, peedID3}))
rpcCtx, err = s.regionRequestSender.getRPCContext(s.bo, req, regionID, tikvrpc.TiKV, WithExcludedStoreIDs([]uint64{storeID, storeID2, storeID3}))
c.Assert(err, IsNil)
peedID4 := rpcCtx.Peer.GetId()
c.Assert(peedID1, Equals, peedID4)
storeID4 := rpcCtx.Store.storeID
c.Assert(storeID, Equals, storeID4)
c.Assert(rpcCtx.Peer.GetId(), Equals, s.leaderPeer)
}

Expand Down

0 comments on commit aa554ca

Please sign in to comment.