From d2c6232391ba251f2eed04364dfd68a7686989eb Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Thu, 18 Jan 2024 20:48:43 +0800 Subject: [PATCH] Check time spent on attempting RPC to avoid spending too much time on retrying (#1117) * Check time spent on attempting RPC to avoid spending too much time on retrying Signed-off-by: MyonKeminta * Handle refreshRegionStore Signed-off-by: MyonKeminta * Add test Signed-off-by: MyonKeminta * Address comments Signed-off-by: MyonKeminta --------- Signed-off-by: MyonKeminta Co-authored-by: MyonKeminta --- internal/locate/main_test.go | 3 + internal/locate/region_request.go | 72 +++++++++++----- internal/locate/region_request3_test.go | 105 ++++++++++++++++++++++++ 3 files changed, 160 insertions(+), 20 deletions(-) diff --git a/internal/locate/main_test.go b/internal/locate/main_test.go index e60db8d2e..d160281fc 100644 --- a/internal/locate/main_test.go +++ b/internal/locate/main_test.go @@ -17,10 +17,13 @@ package locate import ( "testing" + "github.com/tikv/client-go/v2/util" "go.uber.org/goleak" ) func TestMain(m *testing.M) { + util.EnableFailpoints() + opts := []goleak.Option{ goleak.IgnoreTopFunction("github.com/pingcap/goleveldb/leveldb.(*DB).mpoolDrain"), } diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 514c76e17..21bada053 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -245,10 +245,11 @@ func (s *RegionRequestSender) SendReq( } type replica struct { - store *Store - peer *metapb.Peer - epoch uint32 - attempts int + store *Store + peer *metapb.Peer + epoch uint32 + attempts int + attemptedTime time.Duration // deadlineErrUsingConfTimeout indicates the replica is already tried, but the received deadline exceeded error. deadlineErrUsingConfTimeout bool } @@ -261,8 +262,8 @@ func (r *replica) isEpochStale() bool { return r.epoch != atomic.LoadUint32(&r.store.epoch) } -func (r *replica) isExhausted(maxAttempt int) bool { - return r.attempts >= maxAttempt +func (r *replica) isExhausted(maxAttempt int, maxAttemptTime time.Duration) bool { + return r.attempts >= maxAttempt || (maxAttemptTime > 0 && r.attemptedTime >= maxAttemptTime) } type replicaSelector struct { @@ -412,7 +413,7 @@ func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelec // will not be wakened up and re-elect the leader until the follower receives // a request. So, before the new leader is elected, we should not send requests // to the unreachable old leader to avoid unnecessary timeout. - if liveness != reachable || leader.isExhausted(maxReplicaAttempt) { + if liveness != reachable || leader.isExhausted(maxReplicaAttempt, maxReplicaAttemptTime) { selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true} return nil, stateChanged{} } @@ -436,7 +437,7 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep selector.state = &accessByKnownProxy{leaderIdx: state.leaderIdx} return } - if liveness != reachable || selector.targetReplica().isExhausted(maxReplicaAttempt) { + if liveness != reachable || selector.targetReplica().isExhausted(maxReplicaAttempt, maxReplicaAttemptTime) { selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true} } if liveness != reachable { @@ -497,7 +498,7 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( if selector.targetIdx < 0 { // Search replica that is not attempted from the last accessed replica idx, selectReplica := filterReplicas(func(selectReplica *replica) bool { - return !selectReplica.isExhausted(1) + return !selectReplica.isExhausted(1, 0) }) if selectReplica != nil && idx >= 0 { state.lastIdx = idx @@ -627,7 +628,7 @@ func (state *tryNewProxy) next(bo *retry.Backoffer, selector *replicaSelector) ( func (state *tryNewProxy) isCandidate(idx AccessIndex, replica *replica) bool { // Try each peer only once - return idx != state.leaderIdx && !replica.isExhausted(1) + return idx != state.leaderIdx && !replica.isExhausted(1, 0) } func (state *tryNewProxy) onSendSuccess(selector *replicaSelector) { @@ -791,9 +792,9 @@ func (state *accessFollower) IsLeaderExhausted(leader *replica) bool { // 3. Stale read flag is removed and processing falls back to snapshot read on the leader peer. // 4. The leader peer should be retried again using snapshot read. if state.isStaleRead && state.option.leaderOnly { - return leader.isExhausted(2) + return leader.isExhausted(2, 0) } else { - return leader.isExhausted(1) + return leader.isExhausted(1, 0) } } @@ -805,7 +806,7 @@ func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replic func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool { // the epoch is staled or retry exhausted, or the store is unreachable. - if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable || replica.deadlineErrUsingConfTimeout { + if replica.isEpochStale() || replica.isExhausted(1, 0) || replica.store.getLivenessState() == unreachable || replica.deadlineErrUsingConfTimeout { return false } if state.option.leaderOnly { @@ -848,7 +849,7 @@ func (state *tryIdleReplica) next(bo *retry.Backoffer, selector *replicaSelector continue } // Skip replicas that have been tried. - if r.isExhausted(1) { + if r.isExhausted(1, 0) { continue } estimated := r.store.EstimatedWaitTime() @@ -957,6 +958,16 @@ func newReplicaSelector( } } + if val, err := util.EvalFailpoint("newReplicaSelectorInitialAttemptedTime"); err == nil { + attemptedTime, err := time.ParseDuration(val.(string)) + if err != nil { + panic(err) + } + for _, r := range replicas { + r.attemptedTime = attemptedTime + } + } + return &replicaSelector{ regionCache, cachedRegion, @@ -970,7 +981,13 @@ func newReplicaSelector( }, nil } -const maxReplicaAttempt = 10 +const ( + maxReplicaAttempt = 10 + // The maximum time to allow retrying sending requests after RPC failure. In case an RPC request fails after + // timeout (there might be network issue or the TiKV node stuck), we use this to avoid retrying 10 times which may cost too much time. + // For request using `client.ReadTimeoutShort` which is 30s, it might retry twice which costs 1min. + maxReplicaAttemptTime = time.Second * 50 +) // next creates the RPCContext of the current candidate replica. // It returns a SendError if runs out of all replicas or the cached region is invalidated. @@ -1033,8 +1050,9 @@ func (s *replicaSelector) refreshRegionStore() { // request is sent to the leader. newLeaderIdx := newRegionStore.workTiKVIdx s.state = &accessKnownLeader{leaderIdx: newLeaderIdx} - if s.replicas[newLeaderIdx].attempts == maxReplicaAttempt { - s.replicas[newLeaderIdx].attempts-- + if s.replicas[newLeaderIdx].isExhausted(maxReplicaAttempt, maxReplicaAttemptTime) { + s.replicas[newLeaderIdx].attempts = maxReplicaAttempt - 1 + s.replicas[newLeaderIdx].attemptedTime = 0 } } } @@ -1176,10 +1194,11 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) { if replica.store.getLivenessState() != reachable { return } - if replica.isExhausted(maxReplicaAttempt) { + if replica.isExhausted(maxReplicaAttempt, maxReplicaAttemptTime) { // Give the replica one more chance and because each follower is tried only once, // it won't result in infinite retry. replica.attempts = maxReplicaAttempt - 1 + replica.attemptedTime = 0 } s.state = &accessKnownLeader{leaderIdx: AccessIndex(i)} // Update the workTiKVIdx so that following requests can be sent to the leader immediately. @@ -1678,12 +1697,16 @@ func (s *RegionRequestSender) sendReqToRegion( if !injectFailOnSend { start := time.Now() resp, err = s.client.SendRequest(ctx, sendToAddr, req, timeout) + rpcDuration := time.Since(start) + if s.replicaSelector != nil { + s.replicaSelector.recordAttemptedTime(rpcDuration) + } // Record timecost of external requests on related Store when `ReplicaReadMode == "PreferLeader"`. if rpcCtx.Store != nil && req.ReplicaReadType == kv.ReplicaReadPreferLeader && !util.IsInternalRequest(req.RequestSource) { - rpcCtx.Store.recordSlowScoreStat(time.Since(start)) + rpcCtx.Store.recordSlowScoreStat(rpcDuration) } if s.Stats != nil { - RecordRegionRequestRuntimeStats(s.Stats, req.Type, time.Since(start)) + RecordRegionRequestRuntimeStats(s.Stats, req.Type, rpcDuration) if val, fpErr := util.EvalFailpoint("tikvStoreRespResult"); fpErr == nil { if val.(bool) { if req.Type == tikvrpc.CmdCop && bo.GetTotalSleep() == 0 { @@ -2386,3 +2409,12 @@ func (s *replicaSelector) patchRequestSource(req *tikvrpc.Request, rpcCtx *RPCCo } sb.WriteString(req.ReadType) } + +func (s *replicaSelector) recordAttemptedTime(duration time.Duration) { + if targetReplica := s.targetReplica(); targetReplica != nil { + targetReplica.attemptedTime += duration + } + if proxyReplica := s.proxyReplica(); proxyReplica != nil { + proxyReplica.attemptedTime += duration + } +} diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 2cb73d059..a865f78bf 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -38,11 +38,13 @@ import ( "context" "fmt" "strconv" + "sync" "sync/atomic" "testing" "time" "unsafe" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" @@ -1620,3 +1622,106 @@ func (s *testRegionRequestToThreeStoresSuite) TestDoNotTryUnreachableLeader() { // `tryFollower` always try the local peer firstly s.Equal(follower.addr, string(resp.Resp.(*kvrpcpb.GetResponse).Value)) } + +func (s *testRegionRequestToThreeStoresSuite) TestLeaderStuck() { + key := []byte("key") + value := []byte("value1") + + s.NoError(failpoint.Enable("tikvclient/injectLiveness", `return("reachable")`)) + defer func() { + s.NoError(failpoint.Disable("tikvclient/injectLiveness")) + }() + + region, err := s.regionRequestSender.regionCache.findRegionByKey(s.bo, key, false) + s.Nil(err) + regionStore := region.getStore() + oldLeader, oldLeaderPeer, _, _ := region.WorkStorePeer(regionStore) + // The follower will become the new leader later + follower, followerPeer, _, _ := region.FollowerStorePeer(regionStore, 0, &storeSelectorOp{}) + + currLeader := struct { + sync.Mutex + addr string + peer *metapb.Peer + }{ + addr: oldLeader.addr, + peer: oldLeaderPeer, + } + + requestHandled := false + + s.regionRequestSender.client = &fnClient{ + fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + if addr == oldLeader.addr { + time.Sleep(timeout) + return nil, context.DeadlineExceeded + } + + currLeader.Lock() + leaderAddr := currLeader.addr + leaderPeer := currLeader.peer + currLeader.Unlock() + + if addr != leaderAddr { + return &tikvrpc.Response{Resp: &kvrpcpb.PrewriteResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{ + RegionId: region.GetID(), + Leader: leaderPeer, + }}}}, nil + } + + requestHandled = true + return &tikvrpc.Response{Resp: &kvrpcpb.PrewriteResponse{}}, nil + }, + } + + // Simulate the attempted time is nearly reached so that the test won't take too much time to run. + // But the `replicaSelector` of the request sender is not initialized yet before sending any request. + // So try to control it by using a failpoint. + s.NoError(failpoint.Enable("tikvclient/newReplicaSelectorInitialAttemptedTime", fmt.Sprintf(`return("%s")`, (maxReplicaAttemptTime-time.Second).String()))) + defer func() { + s.NoError(failpoint.Disable("tikvclient/newReplicaSelectorInitialAttemptedTime")) + }() + + resCh := make(chan struct { + resp *tikvrpc.Response + err error + }) + startTime := time.Now() + go func() { + bo := retry.NewBackoffer(context.Background(), -1) + req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{ + Mutations: []*kvrpcpb.Mutation{{ + Op: kvrpcpb.Op_Put, + Key: key, + Value: value, + }}, + StartVersion: 100, + }) + resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, region.VerID(), time.Second*2, tikvrpc.TiKV) + resCh <- struct { + resp *tikvrpc.Response + err error + }{resp: resp, err: err} + }() + + select { + case res := <-resCh: + s.Fail("request finished too early", fmt.Sprintf("resp: %s, error: %+q", res.resp, res.err)) + case <-time.After(time.Millisecond * 200): + } + + s.cluster.ChangeLeader(region.GetID(), followerPeer.GetId()) + currLeader.Lock() + currLeader.addr = follower.addr + currLeader.peer = followerPeer + currLeader.Unlock() + + res := <-resCh + elapsed := time.Since(startTime) + + s.NoError(res.err) + s.Nil(res.resp.GetRegionError()) + s.IsType(&kvrpcpb.PrewriteResponse{}, res.resp.Resp) + s.Less(elapsed, time.Millisecond*2500) + s.True(requestHandled) +}