Skip to content

Commit

Permalink
use tidb_kv_read_timeout as first kv request timeout (#919) (#948)
Browse files Browse the repository at this point in the history
Signed-off-by: crazycs520 <crazycs520@gmail.com>
  • Loading branch information
crazycs520 authored and cfzjywxk committed Aug 29, 2023
1 parent e5fe177 commit dfae543
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 31 deletions.
6 changes: 6 additions & 0 deletions internal/client/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,12 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
}

func (a *batchConn) getClientAndSend() {
if val, err := util.EvalFailpoint("mockBatchClientSendDelay"); err == nil {
if timeout, ok := val.(int); ok && timeout > 0 {
time.Sleep(time.Duration(timeout * int(time.Millisecond)))
}
}

// Choose a connection by round-robbin.
var (
cli *batchCommandsClient
Expand Down
94 changes: 70 additions & 24 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ type replica struct {
peer *metapb.Peer
epoch uint32
attempts int
// deadlineErrUsingConfTimeout indicates the replica is already tried, but the received deadline exceeded error.
deadlineErrUsingConfTimeout bool
}

func (r *replica) isEpochStale() bool {
Expand Down Expand Up @@ -337,7 +339,7 @@ func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelec
// a request. So, before the new leader is elected, we should not send requests
// to the unreachable old leader to avoid unnecessary timeout.
if liveness != reachable || leader.isExhausted(maxReplicaAttempt) {
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx}
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true}
return nil, stateChanged{}
}
selector.targetIdx = state.leaderIdx
Expand All @@ -352,15 +354,15 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep
return
}
if liveness != reachable || selector.targetReplica().isExhausted(maxReplicaAttempt) {
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx}
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true}
}
if liveness != reachable {
selector.invalidateReplicaStore(selector.targetReplica(), cause)
}
}

func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) {
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx}
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx, fromAccessKnownLeader: true}
}

// tryFollower is the state where we cannot access the known leader
Expand All @@ -372,22 +374,24 @@ func (state *accessKnownLeader) onNoLeader(selector *replicaSelector) {
// the leader will be updated to replicas[0] and give it another chance.
type tryFollower struct {
stateBase
// if the leader is unavailable, but it still holds the leadership, fallbackFromLeader is true and replica read is enabled.
fallbackFromLeader bool
leaderIdx AccessIndex
lastIdx AccessIndex
labels []*metapb.StoreLabel
leaderIdx AccessIndex
lastIdx AccessIndex
labels []*metapb.StoreLabel
// fromAccessKnownLeader indicates whether the state is changed from `accessKnownLeader`.
fromAccessKnownLeader bool
}

func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
hasDeadlineExceededErr := false
filterReplicas := func(fn func(*replica) bool) (AccessIndex, *replica) {
for i := 0; i < len(selector.replicas); i++ {
idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas))
selectReplica := selector.replicas[idx]
hasDeadlineExceededErr = hasDeadlineExceededErr || selectReplica.deadlineErrUsingConfTimeout
if idx == state.leaderIdx {
continue
}
selectReplica := selector.replicas[idx]
if fn(selectReplica) && selectReplica.store.getLivenessState() != unreachable {
if fn(selectReplica) && selectReplica.store.getLivenessState() != unreachable && !selectReplica.deadlineErrUsingConfTimeout {
return idx, selectReplica
}
}
Expand Down Expand Up @@ -418,6 +422,10 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (

// If all followers are tried and fail, backoff and retry.
if selector.targetIdx < 0 {
if hasDeadlineExceededErr {
// when meet deadline exceeded error, do fast retry without invalidate region cache.
return nil, nil
}
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
return nil, nil
Expand All @@ -426,17 +434,17 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (
if err != nil || rpcCtx == nil {
return rpcCtx, err
}
if state.fallbackFromLeader {
staleRead := false
rpcCtx.contextPatcher.staleRead = &staleRead
if !state.fromAccessKnownLeader {
replicaRead := true
rpcCtx.contextPatcher.replicaRead = &replicaRead
}
staleRead := false
rpcCtx.contextPatcher.staleRead = &staleRead
return rpcCtx, nil
}

func (state *tryFollower) onSendSuccess(selector *replicaSelector) {
if !state.fallbackFromLeader {
if state.fromAccessKnownLeader {
peer := selector.targetReplica().peer
if !selector.region.switchWorkLeaderToPeer(peer) {
logutil.BgLogger().Warn("the store must exist",
Expand Down Expand Up @@ -626,16 +634,19 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
zap.Bool("leader-invalid", leaderInvalid),
zap.Any("labels", state.option.labels))
}
// If leader tried and received deadline exceeded error, return nil to upper layer to retry with default timeout.
if leader.deadlineErrUsingConfTimeout {
return nil, nil
}
if leaderInvalid {
// In stale-read, the request will fallback to leader after the local follower failure.
// If the leader is also unavailable, we can fallback to the follower and use replica-read flag again,
// The remote follower not tried yet, and the local follower can retry without stale-read flag.
if state.isStaleRead {
selector.state = &tryFollower{
fallbackFromLeader: true,
leaderIdx: state.leaderIdx,
lastIdx: state.leaderIdx,
labels: state.option.labels,
leaderIdx: state.leaderIdx,
lastIdx: state.leaderIdx,
labels: state.option.labels,
}
if leaderEpochStale {
selector.regionCache.scheduleReloadRegion(selector.region)
Expand Down Expand Up @@ -680,7 +691,8 @@ func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replic
}

func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool {
if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable {
// the epoch is staled or retry exhausted, or the store is unreachable.
if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable || replica.deadlineErrUsingConfTimeout {
return false
}
if state.option.leaderOnly && idx == state.leaderIdx {
Expand Down Expand Up @@ -887,6 +899,16 @@ func (s *replicaSelector) onSendFailure(bo *retry.Backoffer, err error) {
s.state.onSendFailure(bo, s, err)
}

func (s *replicaSelector) onDeadlineExceeded() {
if target := s.targetReplica(); target != nil {
target.deadlineErrUsingConfTimeout = true
}
if accessLeader, ok := s.state.(*accessKnownLeader); ok {
// If leader return deadline exceeded error, we should try to access follower next time.
s.state = &tryFollower{leaderIdx: accessLeader.leaderIdx, lastIdx: accessLeader.leaderIdx}
}
}

func (s *replicaSelector) checkLiveness(bo *retry.Backoffer, accessReplica *replica) livenessState {
store := accessReplica.store
liveness := store.requestLiveness(bo, s.regionCache)
Expand Down Expand Up @@ -1487,7 +1509,7 @@ func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, rpcCtx *RPCCo
return nil, false, err
}
}
if e := s.onSendFail(bo, rpcCtx, err); e != nil {
if e := s.onSendFail(bo, rpcCtx, req, err); e != nil {
return nil, false, err
}
return nil, true, nil
Expand Down Expand Up @@ -1517,7 +1539,7 @@ func (s *RegionRequestSender) releaseStoreToken(st *Store) {
logutil.BgLogger().Warn("release store token failed, count equals to 0")
}

func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, err error) error {
func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, err error) error {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("regionRequest.onSendFail", opentracing.ChildOf(span.Context()))
defer span1.Finish()
Expand All @@ -1528,6 +1550,11 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, e
return errors.WithStack(err)
} else if LoadShuttingDown() > 0 {
return errors.WithStack(tikverr.ErrTiDBShuttingDown)
} else if errors.Cause(err) == context.DeadlineExceeded && req.MaxExecutionDurationMs < uint64(client.ReadTimeoutShort.Milliseconds()) {
if s.replicaSelector != nil {
s.replicaSelector.onDeadlineExceeded()
return nil
}
}
if status.Code(errors.Cause(err)) == codes.Canceled {
select {
Expand Down Expand Up @@ -1601,6 +1628,9 @@ func regionErrorToLabel(e *errorpb.Error) string {
} else if e.GetEpochNotMatch() != nil {
return "epoch_not_match"
} else if e.GetServerIsBusy() != nil {
if strings.Contains(e.GetServerIsBusy().GetReason(), "deadline is exceeded") {
return "deadline_exceeded"
}
return "server_is_busy"
} else if e.GetStaleCommand() != nil {
return "stale_command"
Expand Down Expand Up @@ -1631,10 +1661,16 @@ func regionErrorToLabel(e *errorpb.Error) string {
// the `mismatch peer id` error does not has a specific error type, so we have to match the error message.
// TODO: add a specific error type for `mismatch peer id`.
return "mismatch_peer_id"
} else if isDeadlineExceeded(e) {
return "deadline_exceeded"
}
return "unknown"
}

func isDeadlineExceeded(e *errorpb.Error) bool {
return strings.Contains(e.GetMessage(), "Deadline is exceeded")
}

func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, regionErr *errorpb.Error) (shouldRetry bool, err error) {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("tikv.onRegionError", opentracing.ChildOf(span.Context()))
Expand Down Expand Up @@ -1728,8 +1764,13 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
return retry, err
}

if regionErr.GetServerIsBusy() != nil {
logutil.BgLogger().Warn("tikv reports `ServerIsBusy` retry later",
if serverIsBusy := regionErr.GetServerIsBusy(); serverIsBusy != nil {
if s.replicaSelector != nil && strings.Contains(serverIsBusy.GetReason(), "deadline is exceeded") {
s.replicaSelector.onDeadlineExceeded()
return true, nil
}
logutil.Logger(bo.GetCtx()).Warn(
"tikv reports `ServerIsBusy` retry later",
zap.String("reason", regionErr.GetServerIsBusy().GetReason()),
zap.Stringer("ctx", ctx))
if s.replicaSelector.canFallback2Follower() {
Expand Down Expand Up @@ -1849,7 +1890,12 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
return true, nil
}

logutil.BgLogger().Debug("tikv reports region failed",
if isDeadlineExceeded(regionErr) && s.replicaSelector != nil {
s.replicaSelector.onDeadlineExceeded()
}

logutil.Logger(bo.GetCtx()).Debug(
"tikv reports region failed",
zap.Stringer("regionErr", regionErr),
zap.Stringer("ctx", ctx))

Expand Down
Loading

0 comments on commit dfae543

Please sign in to comment.