From dff471aa3e3d74b38db0001d901e96dc5d03b8db Mon Sep 17 00:00:00 2001 From: you06 Date: Wed, 26 Jul 2023 20:11:24 +0800 Subject: [PATCH 1/5] fallback to follower when leader is busy Signed-off-by: you06 --- internal/locate/region_cache.go | 10 +++- internal/locate/region_request.go | 44 ++++++++++++++-- internal/locate/region_request3_test.go | 68 ++++++++++++++++++++++++- 3 files changed, 114 insertions(+), 8 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 8482c81d0..c12dd436f 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -572,13 +572,17 @@ func (c *RPCContext) String() string { } type contextPatcher struct { - staleRead *bool + staleRead *bool + replicaRead *bool } func (patcher *contextPatcher) applyTo(pbCtx *kvrpcpb.Context) { if patcher.staleRead != nil { pbCtx.StaleRead = *patcher.staleRead } + if patcher.replicaRead != nil { + pbCtx.ReplicaRead = *patcher.replicaRead + } } type storeSelectorOp struct { @@ -1191,9 +1195,11 @@ func (c *RegionCache) reloadRegion(regionID uint64) { // ignore error and use old region info. logutil.Logger(bo.GetCtx()).Error("load region failure", zap.Uint64("regionID", regionID), zap.Error(err)) + c.mu.RLock() if oldRegion := c.getRegionByIDFromCache(regionID); oldRegion != nil { - atomic.StoreInt32(&oldRegion.asyncReload, 0) + atomic.CompareAndSwapInt32(&oldRegion.asyncReload, 1, 0) } + c.mu.RUnlock() return } c.mu.Lock() diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index f54320c08..8e3aca793 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -371,8 +371,9 @@ func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) { // the leader will be updated to replicas[0] and give it another chance. type tryFollower struct { stateBase - leaderIdx AccessIndex - lastIdx AccessIndex + fallbackFromLeader bool + leaderIdx AccessIndex + lastIdx AccessIndex } func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { @@ -397,12 +398,25 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( selector.invalidateRegion() return nil, nil } - return selector.buildRPCContext(bo) + rpcCtx, err := selector.buildRPCContext(bo) + if err != nil || rpcCtx == nil { + return rpcCtx, err + } + if state.fallbackFromLeader { + replicaRead := true + rpcCtx.contextPatcher.replicaRead = &replicaRead + } + return rpcCtx, err } func (state *tryFollower) onSendSuccess(selector *replicaSelector) { - if !selector.region.switchWorkLeaderToPeer(selector.targetReplica().peer) { - panic("the store must exist") + if !state.fallbackFromLeader { + peer := selector.targetReplica().peer + if !selector.region.switchWorkLeaderToPeer(peer) { + logutil.BgLogger().Warn("the store must exist", + zap.Uint64("store", peer.StoreId), + zap.Uint64("peer", peer.Id)) + } } } @@ -888,6 +902,22 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) { s.region.invalidate(StoreNotFound) } +// For some reason, the leader is unreachable by now, try followers instead. +func (s *replicaSelector) fallback2Follower(ctx *RPCContext) bool { + if ctx == nil || s == nil || s.state == nil { + return false + } + state, ok := s.state.(*accessFollower) + if !ok { + return false + } + if state.lastIdx != state.leaderIdx { + return false + } + s.state = &tryFollower{fallbackFromLeader: true, leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} + return true +} + func (s *replicaSelector) invalidateRegion() { if s.region != nil { s.region.invalidate(Other) @@ -1566,6 +1596,10 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext logutil.BgLogger().Warn("tikv reports `ServerIsBusy` retry later", zap.String("reason", regionErr.GetServerIsBusy().GetReason()), zap.Stringer("ctx", ctx)) + if s.replicaSelector.fallback2Follower(ctx) { + // immediately retry on followers. + return true, nil + } if ctx != nil && ctx.Store != nil && ctx.Store.storeType.IsTiFlashRelatedType() { err = bo.Backoff(retry.BoTiFlashServerBusy, errors.Errorf("server is busy, ctx: %v", ctx)) } else { diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 850f24d30..4b63641a8 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1020,7 +1020,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown() } } -func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() { +func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Leader() { leaderStore, _ := s.loadAndGetLeaderStore() leaderLabel := []*metapb.StoreLabel{ { @@ -1100,3 +1100,69 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() { s.NotNil(regionErr.GetEpochNotMatch()) s.Nil(regionErr.GetDiskFull()) } + +func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Follower() { + leaderStore, _ := s.loadAndGetLeaderStore() + leaderLabel := []*metapb.StoreLabel{ + { + Key: "id", + Value: strconv.FormatUint(leaderStore.StoreID(), 10), + }, + } + regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) + s.Nil(err) + s.NotNil(regionLoc) + value := []byte("value") + + dataIsNotReady := false + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + select { + case <-ctx.Done(): + return nil, errors.New("timeout") + default: + } + if addr == leaderStore.addr { + if dataIsNotReady && req.StaleRead { + dataIsNotReady = false + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + DataIsNotReady: &errorpb.DataIsNotReady{}, + }}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + ServerIsBusy: &errorpb.ServerIsBusy{}, + }}}, nil + } + if !req.ReplicaRead { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + NotLeader: &errorpb.NotLeader{}, + }}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil + }} + + dataIsNotReady = true + // data is not ready, then server is busy in the first round, + // directly server is busy in the second round. + for i := 0; i < 2; i++ { + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil) + req.ReadReplicaScope = oracle.GlobalTxnScope + req.TxnScope = oracle.GlobalTxnScope + req.EnableStaleRead() + req.ReplicaReadType = kv.ReplicaReadMixed + var ops []StoreSelectorOption + ops = append(ops, WithMatchLabels(leaderLabel)) + + ctx, _ := context.WithTimeout(context.Background(), 10000*time.Second) + bo := retry.NewBackoffer(ctx, -1) + s.Nil(err) + resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...) + s.Nil(err) + + regionErr, err := resp.GetRegionError() + s.Nil(err) + s.Nil(regionErr) + getResp, ok := resp.Resp.(*kvrpcpb.GetResponse) + s.True(ok) + s.Equal(getResp.Value, value) + } +} From add9f79f190a651f13091ad534b6bdf942cbdbd2 Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 27 Jul 2023 12:08:31 +0800 Subject: [PATCH 2/5] add comment Signed-off-by: you06 --- internal/locate/region_request.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 8e3aca793..c8e97c8bf 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -371,6 +371,7 @@ func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) { // the leader will be updated to replicas[0] and give it another chance. type tryFollower struct { stateBase + // if the leader is unavailable, but it still holds the leadership, fallbackFromLeader is true and replica read is enabled. fallbackFromLeader bool leaderIdx AccessIndex lastIdx AccessIndex From ebc24373888373c6f91b3de32486b0b011830d7a Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 27 Jul 2023 21:27:19 +0800 Subject: [PATCH 3/5] Update internal/locate/region_request.go Co-authored-by: cfzjywxk --- internal/locate/region_request.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index c8e97c8bf..4abd19a36 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -407,7 +407,7 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( replicaRead := true rpcCtx.contextPatcher.replicaRead = &replicaRead } - return rpcCtx, err + return rpcCtx, nil } func (state *tryFollower) onSendSuccess(selector *replicaSelector) { From aac21b71ae3b45d1af6643c8301123d7a56d53b8 Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 28 Jul 2023 12:16:13 +0800 Subject: [PATCH 4/5] after fallback to replica read from leader, retry local follower first Signed-off-by: you06 --- internal/locate/region_request.go | 50 ++++++++++++--- internal/locate/region_request3_test.go | 84 ++++++++++++++++--------- 2 files changed, 94 insertions(+), 40 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 4abd19a36..164795d61 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -375,24 +375,49 @@ type tryFollower struct { fallbackFromLeader bool leaderIdx AccessIndex lastIdx AccessIndex + labels []*metapb.StoreLabel } func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { var targetReplica *replica - // Search replica that is not attempted from the last accessed replica - for i := 1; i < len(selector.replicas); i++ { - idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas)) - if idx == state.leaderIdx { - continue + filterReplicas := func(fn func(*replica) bool) (AccessIndex, *replica) { + for i := 0; i < len(selector.replicas); i++ { + idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas)) + if idx == state.leaderIdx { + continue + } + selectReplica := selector.replicas[idx] + if fn(selectReplica) && selectReplica.store.getLivenessState() != unreachable { + return idx, selectReplica + } + } + return -1, nil + } + + if len(state.labels) > 0 { + idx, selectReplica := filterReplicas(func(selectReplica *replica) bool { + return selectReplica.store.IsLabelsMatch(state.labels) + }) + if selectReplica != nil && idx >= 0 { + state.lastIdx = idx + selector.targetIdx = idx + targetReplica = selectReplica } - targetReplica = selector.replicas[idx] - // Each follower is only tried once - if !targetReplica.isExhausted(1) && targetReplica.store.getLivenessState() != unreachable { + // labels only take effect for first try. + state.labels = nil + } + if targetReplica == nil { + // Search replica that is not attempted from the last accessed replica + idx, selectReplica := filterReplicas(func(selectReplica *replica) bool { + return !selectReplica.isExhausted(1) + }) + if selectReplica != nil && idx >= 0 { state.lastIdx = idx selector.targetIdx = idx - break + targetReplica = selectReplica } } + // If all followers are tried and fail, backoff and retry. if selector.targetIdx < 0 { metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() @@ -915,7 +940,12 @@ func (s *replicaSelector) fallback2Follower(ctx *RPCContext) bool { if state.lastIdx != state.leaderIdx { return false } - s.state = &tryFollower{fallbackFromLeader: true, leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} + s.state = &tryFollower{ + fallbackFromLeader: true, + leaderIdx: state.leaderIdx, + lastIdx: state.leaderIdx, + labels: state.option.labels, + } return true } diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 4b63641a8..19b1624da 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1109,10 +1109,24 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Follower() { Value: strconv.FormatUint(leaderStore.StoreID(), 10), }, } + var followerID *uint64 + for _, storeID := range s.storeIDs { + if storeID != leaderStore.storeID { + followerID = &storeID + break + } + } + s.NotNil(followerID) + followerLabel := []*metapb.StoreLabel{ + { + Key: "id", + Value: strconv.FormatUint(*followerID, 10), + }, + } + regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) s.Nil(err) s.NotNil(regionLoc) - value := []byte("value") dataIsNotReady := false s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { @@ -1121,13 +1135,13 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Follower() { return nil, errors.New("timeout") default: } + if dataIsNotReady && req.StaleRead { + dataIsNotReady = false + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + DataIsNotReady: &errorpb.DataIsNotReady{}, + }}}, nil + } if addr == leaderStore.addr { - if dataIsNotReady && req.StaleRead { - dataIsNotReady = false - return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ - DataIsNotReady: &errorpb.DataIsNotReady{}, - }}}, nil - } return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ ServerIsBusy: &errorpb.ServerIsBusy{}, }}}, nil @@ -1137,32 +1151,42 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Follower() { NotLeader: &errorpb.NotLeader{}, }}}, nil } - return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: value}}, nil + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte(addr)}}, nil }} - dataIsNotReady = true - // data is not ready, then server is busy in the first round, - // directly server is busy in the second round. - for i := 0; i < 2; i++ { - req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil) - req.ReadReplicaScope = oracle.GlobalTxnScope - req.TxnScope = oracle.GlobalTxnScope - req.EnableStaleRead() - req.ReplicaReadType = kv.ReplicaReadMixed - var ops []StoreSelectorOption - ops = append(ops, WithMatchLabels(leaderLabel)) + for _, localLeader := range []bool{true, false} { + dataIsNotReady = true + // data is not ready, then server is busy in the first round, + // directly server is busy in the second round. + for i := 0; i < 2; i++ { + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil) + req.ReadReplicaScope = oracle.GlobalTxnScope + req.TxnScope = oracle.GlobalTxnScope + req.EnableStaleRead() + req.ReplicaReadType = kv.ReplicaReadMixed + var ops []StoreSelectorOption + if localLeader { + ops = append(ops, WithMatchLabels(leaderLabel)) + } else { + ops = append(ops, WithMatchLabels(followerLabel)) + } - ctx, _ := context.WithTimeout(context.Background(), 10000*time.Second) - bo := retry.NewBackoffer(ctx, -1) - s.Nil(err) - resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...) - s.Nil(err) + ctx, _ := context.WithTimeout(context.Background(), 10000*time.Second) + bo := retry.NewBackoffer(ctx, -1) + s.Nil(err) + resp, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV, ops...) + s.Nil(err) - regionErr, err := resp.GetRegionError() - s.Nil(err) - s.Nil(regionErr) - getResp, ok := resp.Resp.(*kvrpcpb.GetResponse) - s.True(ok) - s.Equal(getResp.Value, value) + regionErr, err := resp.GetRegionError() + s.Nil(err) + s.Nil(regionErr) + getResp, ok := resp.Resp.(*kvrpcpb.GetResponse) + s.True(ok) + if localLeader { + s.NotEqual(getResp.Value, []byte("store"+leaderLabel[0].Value)) + } else { + s.Equal(getResp.Value, []byte("store"+followerLabel[0].Value)) + } + } } } From 7da51eb7fda1fd4d3b3b49d68a50121eecc65a45 Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 28 Jul 2023 17:08:43 +0800 Subject: [PATCH 5/5] address comment Signed-off-by: you06 --- internal/locate/region_request.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 164795d61..0928e352e 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -379,7 +379,6 @@ type tryFollower struct { } func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { - var targetReplica *replica filterReplicas := func(fn func(*replica) bool) (AccessIndex, *replica) { for i := 0; i < len(selector.replicas); i++ { idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas)) @@ -401,12 +400,11 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( if selectReplica != nil && idx >= 0 { state.lastIdx = idx selector.targetIdx = idx - targetReplica = selectReplica } // labels only take effect for first try. state.labels = nil } - if targetReplica == nil { + if selector.targetIdx < 0 { // Search replica that is not attempted from the last accessed replica idx, selectReplica := filterReplicas(func(selectReplica *replica) bool { return !selectReplica.isExhausted(1) @@ -414,7 +412,6 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( if selectReplica != nil && idx >= 0 { state.lastIdx = idx selector.targetIdx = idx - targetReplica = selectReplica } }