From e601a9d7f69926b27dbb9339d5f002dbe235b9d3 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Mon, 24 Jul 2023 17:28:36 +0800 Subject: [PATCH 1/2] Resume max retry time check for stale read retry with leader option Signed-off-by: cfzjywxk --- internal/locate/region_request.go | 15 ++++++- internal/locate/region_request3_test.go | 56 +++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 1edad08e8..f54320c08 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -574,7 +574,7 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector logutil.BgLogger().Warn("unable to find stores with given labels") } leader := selector.replicas[state.leaderIdx] - if leader.isEpochStale() || (!state.option.leaderOnly && leader.isExhausted(1)) { + if leader.isEpochStale() || state.IsLeaderExhausted(leader) { metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() selector.invalidateRegion() return nil, nil @@ -593,6 +593,19 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector return rpcCtx, nil } +func (state *accessFollower) IsLeaderExhausted(leader *replica) bool { + // Allow another extra retry for the following case: + // 1. The stale read is enabled and leader peer is selected as the target peer at first. + // 2. Data is not ready is returned from the leader peer. + // 3. Stale read flag is removed and processing falls back to snapshot read on the leader peer. + // 4. The leader peer should be retried again using snapshot read. + if state.isStaleRead && state.option.leaderOnly { + return leader.isExhausted(2) + } else { + return leader.isExhausted(1) + } +} + func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) { if selector.checkLiveness(bo, selector.targetReplica()) != reachable { selector.invalidateReplicaStore(selector.targetReplica(), cause) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 62912d143..d85afd01c 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -36,6 +36,7 @@ package locate import ( "context" + "strconv" "sync/atomic" "testing" "time" @@ -1018,3 +1019,58 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown() s.Equal(0, bo.GetTotalBackoffTimes()) } } + +func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() { + leaderStore, _ := s.loadAndGetLeaderStore() + leaderLabel := []*metapb.StoreLabel{ + { + Key: "id", + Value: strconv.FormatUint(leaderStore.StoreID(), 10), + }, + } + regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) + s.Nil(err) + s.NotNil(regionLoc) + value := []byte("value") + isFirstReq := true + + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + select { + case <-ctx.Done(): + return nil, errors.New("timeout") + default: + } + // Return `DataIsNotReady` for the first time on leader. + if isFirstReq { + isFirstReq = false + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + DataIsNotReady: &errorpb.DataIsNotReady{}, + }}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil + }} + + region := s.cache.getRegionByIDFromCache(regionLoc.Region.GetID()) + s.True(region.isValid()) + + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil) + req.ReadReplicaScope = oracle.GlobalTxnScope + req.TxnScope = oracle.GlobalTxnScope + req.EnableStaleRead() + req.ReplicaReadType = kv.ReplicaReadMixed + var ops []StoreSelectorOption + ops = append(ops, WithMatchLabels(leaderLabel)) + + ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) + bo := retry.NewBackoffer(ctx, -1) + s.Nil(err) + resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...) + s.Nil(err) + + regionErr, err := resp.GetRegionError() + s.Nil(err) + s.Nil(regionErr) + getResp, ok := resp.Resp.(*kvrpcpb.GetResponse) + s.True(ok) + s.Equal(getResp.Value, value) +} From ecb880e2ae22e78d022b85cecbc2c7a0ea15ec70 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Wed, 26 Jul 2023 15:46:20 +0800 Subject: [PATCH 2/2] add fail path test Signed-off-by: cfzjywxk --- internal/locate/region_request3_test.go | 34 ++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index d85afd01c..850f24d30 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1032,8 +1032,13 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() { s.Nil(err) s.NotNil(regionLoc) value := []byte("value") - isFirstReq := true + type testState struct { + tryTimes uint8 + succ bool + } + + state := &testState{} s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { select { case <-ctx.Done(): @@ -1041,18 +1046,26 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() { default: } // Return `DataIsNotReady` for the first time on leader. - if isFirstReq { - isFirstReq = false + if state.tryTimes == 0 { + state.tryTimes++ return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ DataIsNotReady: &errorpb.DataIsNotReady{}, }}}, nil + } else if state.tryTimes == 1 && state.succ { + state.tryTimes++ + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil } - return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil + state.tryTimes++ + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + DiskFull: &errorpb.DiskFull{}, + }}}, nil }} region := s.cache.getRegionByIDFromCache(regionLoc.Region.GetID()) s.True(region.isValid()) + // Test the successful path. + state.succ = true req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil) req.ReadReplicaScope = oracle.GlobalTxnScope req.TxnScope = oracle.GlobalTxnScope @@ -1073,4 +1086,17 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() { getResp, ok := resp.Resp.(*kvrpcpb.GetResponse) s.True(ok) s.Equal(getResp.Value, value) + + // Test the fail path leader retry limit is reached, epoch not match error would be returned. + state.tryTimes = 0 + state.succ = false + req.EnableStaleRead() + resp, _, err = s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...) + s.Nil(err) + + regionErr, err = resp.GetRegionError() + s.Nil(err) + s.NotNil(regionErr) + s.NotNil(regionErr.GetEpochNotMatch()) + s.Nil(regionErr.GetDiskFull()) }