From 3cf1f230c9704d5bfb4c37e6b96def89f6c59b93 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 30 Jun 2023 14:46:35 +0800 Subject: [PATCH] *: refine non-global stale-read request retry logic Signed-off-by: crazycs520 --- internal/locate/region_request.go | 32 +++++++++++------------ internal/locate/region_request_test.go | 36 ++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 17 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index c877dfadc..aac428b48 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -532,12 +532,12 @@ func (state *tryNewProxy) onNoLeader(selector *replicaSelector) { type accessFollower struct { stateBase // If tryLeader is true, the request can also be sent to the leader when !leader.isSlow() - tryLeader bool - isGlobalStaleRead bool - option storeSelectorOp - leaderIdx AccessIndex - lastIdx AccessIndex - learnerOnly bool + tryLeader bool + isStaleRead bool + option storeSelectorOp + leaderIdx AccessIndex + lastIdx AccessIndex + learnerOnly bool } func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { @@ -558,12 +558,10 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector } } } else { - // Stale Read request will retry the leader or next peer on error, - // if txnScope is global, we will only retry the leader by using the WithLeaderOnly option, - // if txnScope is local, we will retry both other peers and the leader by the strategy of replicaSelector. - if state.isGlobalStaleRead { + // Stale Read request will retry the leader only by using the WithLeaderOnly option. + if state.isStaleRead { WithLeaderOnly()(&state.option) - // retry on the leader should not use stale read flag to avoid possible DataIsNotReady error as it always can serve any read + // retry on the leader should not use stale read flag to avoid possible DataIsNotReady error as it always can serve any read. resetStaleRead = true } state.lastIdx++ @@ -766,12 +764,12 @@ func newReplicaSelector( } tryLeader := req.ReplicaReadType == kv.ReplicaReadMixed || req.ReplicaReadType == kv.ReplicaReadPreferLeader state = &accessFollower{ - tryLeader: tryLeader, - isGlobalStaleRead: req.IsGlobalStaleRead(), - option: option, - leaderIdx: regionStore.workTiKVIdx, - lastIdx: -1, - learnerOnly: req.ReplicaReadType == kv.ReplicaReadLearner, + tryLeader: tryLeader, + isStaleRead: req.StaleRead, + option: option, + leaderIdx: regionStore.workTiKVIdx, + lastIdx: -1, + learnerOnly: req.ReplicaReadType == kv.ReplicaReadLearner, } } diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index b587e6c0f..7f52d34cd 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -657,3 +657,39 @@ func (s *testRegionRequestToSingleStoreSuite) TestCloseConnectionOnStoreNotMatch s.NotNil(regionErr) s.Equal(target, client.closedAddr) } + +func (s *testRegionRequestToSingleStoreSuite) TestStaleReadRetry() { + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ + Key: []byte("key"), + }) + req.EnableStaleRead() + req.ReadReplicaScope = "z1" // not global stale read. + region, err := s.cache.LocateRegionByID(s.bo, s.region) + s.Nil(err) + s.NotNil(region) + + oc := s.regionRequestSender.client + defer func() { + s.regionRequestSender.client = oc + }() + + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + if req.StaleRead { + // Mock for stale-read request always return DataIsNotReady error when tikv `ResolvedTS` is blocked. + response = &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ + RegionError: &errorpb.Error{DataIsNotReady: &errorpb.DataIsNotReady{}}, + }} + } else { + response = &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}} + } + return response, nil + }} + + bo := retry.NewBackofferWithVars(context.Background(), 5, nil) + resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second) + s.Nil(err) + s.NotNil(resp) + regionErr, _ := resp.GetRegionError() + s.Nil(regionErr) + s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value) +}