Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use tidb_kv_read_timeout as first kv request timeout #919

Merged
merged 9 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/gcworker/gcworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ func main() {
panic(err)
}


sysSafepoint, err := client.GC(context.Background(), *safepoint, tikv.WithConcurrency(10))
if err != nil {
panic(err)
Expand Down
6 changes: 6 additions & 0 deletions internal/client/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,12 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
}

func (a *batchConn) getClientAndSend() {
if val, err := util.EvalFailpoint("mockBatchClientSendDelay"); err == nil {
if timeout, ok := val.(int); ok && timeout > 0 {
time.Sleep(time.Duration(timeout * int(time.Millisecond)))
}
}

// Choose a connection by round-robbin.
var (
cli *batchCommandsClient
Expand Down
77 changes: 68 additions & 9 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ type replica struct {
peer *metapb.Peer
epoch uint32
attempts int
// deadlineErrUsingConfTimeout indicates the replica is already tried, but the received deadline exceeded error.
deadlineErrUsingConfTimeout bool
}

func (r *replica) isEpochStale() bool {
Expand Down Expand Up @@ -377,7 +379,7 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep
}

func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) {
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx}
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromOnNotLeader: true}
}

// tryFollower is the state where we cannot access the known leader
Expand All @@ -391,36 +393,57 @@ type tryFollower struct {
stateBase
leaderIdx AccessIndex
lastIdx AccessIndex
// fromOnNotLeader indicates whether the state is changed from onNotLeader.
fromOnNotLeader bool
}

func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
var targetReplica *replica
hasDeadlineExceededErr := false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto, we may need a way to distinguish timeout error using input timeout value or the default timeout value, maybe abstract another error type for the configurable timeout error?

// Search replica that is not attempted from the last accessed replica
for i := 1; i < len(selector.replicas); i++ {
idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas))
targetReplica = selector.replicas[idx]
hasDeadlineExceededErr = hasDeadlineExceededErr || targetReplica.deadlineErrUsingConfTimeout
if idx == state.leaderIdx {
continue
}
targetReplica = selector.replicas[idx]
// Each follower is only tried once
if !targetReplica.isExhausted(1) && targetReplica.store.getLivenessState() != unreachable {
if !targetReplica.isExhausted(1) && targetReplica.store.getLivenessState() != unreachable && !targetReplica.deadlineErrUsingConfTimeout {
state.lastIdx = idx
selector.targetIdx = idx
break
}
}
// If all followers are tried and fail, backoff and retry.
if selector.targetIdx < 0 {
if hasDeadlineExceededErr {
// when meet deadline exceeded error, do fast retry without invalidate region cache.
return nil, nil
}
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
return nil, nil
}
return selector.buildRPCContext(bo)
rpcCtx, err := selector.buildRPCContext(bo)
if err != nil || rpcCtx == nil {
return nil, err
}
// If the state is changed from onNotLeader, the `replicaRead` flag should not be set as leader read would still be used.
if !state.fromOnNotLeader {
replicaRead := selector.targetIdx != state.leaderIdx
rpcCtx.contextPatcher.replicaRead = &replicaRead
}
disableStaleRead := false
rpcCtx.contextPatcher.staleRead = &disableStaleRead
return rpcCtx, nil
}

func (state *tryFollower) onSendSuccess(selector *replicaSelector) {
if !selector.region.switchWorkLeaderToPeer(selector.targetReplica().peer) {
panic("the store must exist")
if state.fromOnNotLeader {
if !selector.region.switchWorkLeaderToPeer(selector.targetReplica().peer) {
panic("the store must exist")
}
}
}

Expand Down Expand Up @@ -617,6 +640,10 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
zap.Bool("leader-invalid", leaderInvalid),
zap.Any("labels", state.option.labels))
}
// If leader tried and received deadline exceeded error, return nil to upper layer to retry with default timeout.
if leader.deadlineErrUsingConfTimeout {
return nil, nil
}
if leaderInvalid {
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
Expand Down Expand Up @@ -665,7 +692,7 @@ func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replic

func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool {
// the epoch is staled or retry exhausted, or the store is unreachable.
if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable {
if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable || replica.deadlineErrUsingConfTimeout {
return false
}
// The request can only be sent to the leader.
Expand Down Expand Up @@ -947,6 +974,16 @@ func (s *replicaSelector) onSendFailure(bo *retry.Backoffer, err error) {
s.state.onSendFailure(bo, s, err)
}

func (s *replicaSelector) onDeadlineExceeded() {
if target := s.targetReplica(); target != nil {
target.deadlineErrUsingConfTimeout = true
}
if accessLeader, ok := s.state.(*accessKnownLeader); ok {
// If leader return deadline exceeded error, we should try to access follower next time.
s.state = &tryFollower{leaderIdx: accessLeader.leaderIdx, lastIdx: accessLeader.leaderIdx}
}
}

func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState {
store := accessReplica.store
liveness := store.requestLiveness(bo, s.regionCache)
Expand Down Expand Up @@ -1608,7 +1645,7 @@ func (s *RegionRequestSender) sendReqToRegion(
return nil, false, err
}
}
if e := s.onSendFail(bo, rpcCtx, err); e != nil {
if e := s.onSendFail(bo, rpcCtx, req, err); e != nil {
return nil, false, err
}
return nil, true, nil
Expand Down Expand Up @@ -1638,7 +1675,7 @@ func (s *RegionRequestSender) releaseStoreToken(st *Store) {
logutil.BgLogger().Warn("release store token failed, count equals to 0")
}

func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, err error) error {
func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, err error) error {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("regionRequest.onSendFail", opentracing.ChildOf(span.Context()))
defer span1.Finish()
Expand All @@ -1649,6 +1686,11 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, e
return errors.WithStack(err)
} else if LoadShuttingDown() > 0 {
return errors.WithStack(tikverr.ErrTiDBShuttingDown)
} else if errors.Cause(err) == context.DeadlineExceeded && req.MaxExecutionDurationMs < uint64(client.ReadTimeoutShort.Milliseconds()) {
if s.replicaSelector != nil {
s.replicaSelector.onDeadlineExceeded()
return nil
}
}
if status.Code(errors.Cause(err)) == codes.Canceled {
select {
Expand Down Expand Up @@ -1740,6 +1782,9 @@ func regionErrorToLabel(e *errorpb.Error) string {
} else if e.GetEpochNotMatch() != nil {
return "epoch_not_match"
} else if e.GetServerIsBusy() != nil {
if strings.Contains(e.GetServerIsBusy().GetReason(), "deadline is exceeded") {
return "deadline_exceeded"
}
return "server_is_busy"
} else if e.GetStaleCommand() != nil {
return "stale_command"
Expand Down Expand Up @@ -1767,10 +1812,16 @@ func regionErrorToLabel(e *errorpb.Error) string {
return "flashback_not_prepared"
} else if e.GetIsWitness() != nil {
return "peer_is_witness"
} else if isDeadlineExceeded(e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to distinguish the default KvTimeout and ConfigurableKvTimeout errors.
@you06 What do you think?

return "deadline_exceeded"
}
return "unknown"
}

func isDeadlineExceeded(e *errorpb.Error) bool {
return strings.Contains(e.GetMessage(), "Deadline is exceeded")
}

func (s *RegionRequestSender) onRegionError(
bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, regionErr *errorpb.Error,
) (shouldRetry bool, err error) {
Expand Down Expand Up @@ -1918,6 +1969,10 @@ func (s *RegionRequestSender) onRegionError(
}

if serverIsBusy := regionErr.GetServerIsBusy(); serverIsBusy != nil {
if s.replicaSelector != nil && strings.Contains(serverIsBusy.GetReason(), "deadline is exceeded") {
s.replicaSelector.onDeadlineExceeded()
return true, nil
}
if s.replicaSelector != nil {
return s.replicaSelector.onServerIsBusy(bo, ctx, req, serverIsBusy)
}
Expand Down Expand Up @@ -2046,6 +2101,10 @@ func (s *RegionRequestSender) onRegionError(
return true, nil
}

if isDeadlineExceeded(regionErr) && s.replicaSelector != nil {
s.replicaSelector.onDeadlineExceeded()
}

logutil.Logger(bo.GetCtx()).Debug(
"tikv reports region failed",
zap.Stringer("regionErr", regionErr),
Expand Down
129 changes: 124 additions & 5 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/stretchr/testify/suite"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/internal/apicodec"
"github.com/tikv/client-go/v2/internal/client"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/kv"
Expand Down Expand Up @@ -711,7 +712,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
// Normal
bo := retry.NewBackoffer(context.Background(), -1)
sender := s.regionRequestSender
resp, _, err := sender.SendReq(bo, req, region.Region, time.Second)
resp, _, err := sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort)
s.Nil(err)
s.NotNil(resp)
s.True(bo.GetTotalBackoffTimes() == 0)
Expand All @@ -720,7 +721,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
bo = retry.NewBackoffer(context.Background(), -1)
s.cluster.ChangeLeader(s.regionID, s.peerIDs[1])
s.cluster.StopStore(s.storeIDs[0])
resp, _, err = sender.SendReq(bo, req, region.Region, time.Second)
resp, _, err = sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort)
s.Nil(err)
s.NotNil(resp)
s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1))
Expand All @@ -729,8 +730,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
atomic.StoreUint32(&regionStore.stores[0].livenessState, uint32(reachable))

// Leader is updated because of send success, so no backoff.
reloadRegion()
bo = retry.NewBackoffer(context.Background(), -1)
resp, _, err = sender.SendReq(bo, req, region.Region, time.Second)
resp, _, err = sender.SendReq(bo, req, region.Region, client.ReadTimeoutShort)
s.Nil(err)
s.NotNil(resp)
s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1))
Expand Down Expand Up @@ -1092,7 +1094,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown()
s.NotEqual(leaderAddr, "")
for i := 0; i < 10; i++ {
bo := retry.NewBackofferWithVars(context.Background(), 100, nil)
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV)
s.Nil(err)
s.NotNil(resp)

Expand Down Expand Up @@ -1135,7 +1137,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown()

for i := 0; i < 100; i++ {
bo := retry.NewBackofferWithVars(context.Background(), 1, nil)
resp, _, retryTimes, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
resp, _, retryTimes, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV)
s.Nil(err)
s.NotNil(resp)
// since all follower'store is unreachable, the request will be sent to leader, the backoff times should be 0.
Expand Down Expand Up @@ -1199,3 +1201,120 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback() {
s.True(ok)
s.Equal(getResp.Value, value)
}

func (s *testRegionRequestToThreeStoresSuite) TestSendReqFirstTimeout() {
leaderAddr := ""
reqTargetAddrs := make(map[string]struct{})
s.regionRequestSender.RegionRequestRuntimeStats = NewRegionRequestRuntimeStats()
bo := retry.NewBackoffer(context.Background(), 10000)
mockRPCClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
reqTargetAddrs[addr] = struct{}{}
if req.Context.MaxExecutionDurationMs < 10 {
return nil, context.DeadlineExceeded
}
if addr != leaderAddr && !req.Context.ReplicaRead && !req.Context.StaleRead {
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}, nil
}}
getLocFn := func() *KeyLocation {
loc, err := s.regionRequestSender.regionCache.LocateKey(bo, []byte("a"))
s.Nil(err)
region := s.regionRequestSender.regionCache.GetCachedRegionWithRLock(loc.Region)
leaderStore, _, _, _ := region.WorkStorePeer(region.getStore())
leaderAddr, err = s.regionRequestSender.regionCache.getStoreAddr(s.bo, region, leaderStore)
s.Nil(err)
return loc
}
resetStats := func() {
reqTargetAddrs = make(map[string]struct{})
s.regionRequestSender = NewRegionRequestSender(s.cache, mockRPCClient)
s.regionRequestSender.RegionRequestRuntimeStats = NewRegionRequestRuntimeStats()
}

resetStats()
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a")}, kvrpcpb.Context{})
loc := getLocFn()
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Millisecond, tikvrpc.TiKV)
s.Nil(err)
regionErr, err := resp.GetRegionError()
s.Nil(err)
s.True(IsFakeRegionError(regionErr))
s.Equal(1, len(s.regionRequestSender.Stats))
s.Equal(int64(3), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 2 rpc
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
s.Equal(3, len(reqTargetAddrs)) // each rpc to a different store.
s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry.
// warn: must rest MaxExecutionDurationMs before retry.
resetStats()
req.Context.MaxExecutionDurationMs = 0
resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
s.Nil(err)
regionErr, err = resp.GetRegionError()
s.Nil(err)
s.Nil(regionErr)
s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value)
s.Equal(1, len(s.regionRequestSender.Stats))
s.Equal(int64(1), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 2 rpc
s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry.
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved

// Test stale read.
resetStats()
req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a")}, kvrpcpb.Context{})
req.EnableStaleRead()
loc = getLocFn()
resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Millisecond, tikvrpc.TiKV)
s.Nil(err)
regionErr, err = resp.GetRegionError()
s.Nil(err)
s.True(IsFakeRegionError(regionErr))
s.Equal(1, len(s.regionRequestSender.Stats))
s.True(s.regionRequestSender.Stats[tikvrpc.CmdGet].Count >= 1)
s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry.
// warn: must rest MaxExecutionDurationMs before retry.
resetStats()
req.EnableStaleRead()
req.Context.MaxExecutionDurationMs = 0
resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
s.Nil(err)
regionErr, err = resp.GetRegionError()
s.Nil(err)
s.Nil(regionErr)
s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value)
s.Equal(1, len(s.regionRequestSender.Stats))
s.Equal(int64(1), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 2 rpc
s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry.

//Test different read type.
replicaReadTypes := []kv.ReplicaReadType{kv.ReplicaReadLeader, kv.ReplicaReadFollower, kv.ReplicaReadMixed}
for _, tp := range replicaReadTypes {
resetStats()
req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a")}, kvrpcpb.Context{})
req.ReplicaRead = tp.IsFollowerRead()
req.ReplicaReadType = tp
loc = getLocFn()
resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Millisecond, tikvrpc.TiKV)
s.Nil(err)
regionErr, err = resp.GetRegionError()
s.Nil(err)
s.True(IsFakeRegionError(regionErr))
//s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value)
s.Equal(1, len(s.regionRequestSender.Stats))
s.Equal(int64(3), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 2 rpc
s.Equal(3, len(reqTargetAddrs))
s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry.
// warn: must rest MaxExecutionDurationMs before retry.
resetStats()
req.ReplicaRead = tp.IsFollowerRead()
req.ReplicaReadType = tp
req.Context.MaxExecutionDurationMs = 0
resp, _, _, err = s.regionRequestSender.SendReqCtx(bo, req, loc.Region, time.Second, tikvrpc.TiKV)
s.Nil(err)
regionErr, err = resp.GetRegionError()
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
s.Nil(err)
s.Nil(regionErr)
s.Equal([]byte("value"), resp.Resp.(*kvrpcpb.GetResponse).Value)
s.Equal(1, len(s.regionRequestSender.Stats))
s.Equal(int64(1), s.regionRequestSender.Stats[tikvrpc.CmdGet].Count) // 2 rpc
s.Equal(0, bo.GetTotalBackoffTimes()) // no backoff since fast retry.
}
}
Loading
Loading