Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: refactor br cli backoff logic #54644

Merged
merged 5 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions br/pkg/backup/prepare_snap/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,17 @@ func (c CliEnv) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endK

type RetryAndSplitRequestEnv struct {
Env
GetBackoffer func() utils.Backoffer
GetBackoffStrategy func() utils.BackoffStrategy
}

func (r RetryAndSplitRequestEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) {
rs := utils.ConstantBackoff(10 * time.Second)
bo := utils.Backoffer(rs)
if r.GetBackoffer != nil {
bo = r.GetBackoffer()
var bo utils.BackoffStrategy
if r.GetBackoffStrategy != nil {
bo = r.GetBackoffStrategy()
} else {
bo = utils.ConstantBackoff(10 * time.Second)
}

cli, err := utils.WithRetryV2(ctx, bo, func(ctx context.Context) (PrepareClient, error) {
cli, err := r.Env.ConnectToStore(ctx, storeID)
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions br/pkg/backup/prepare_snap/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,8 @@ func TestRetryEnv(t *testing.T) {
return nil
}
ms := RetryAndSplitRequestEnv{Env: tms}
ms.GetBackoffer = func() utils.Backoffer {
o := utils.InitialRetryState(2, 0, 0)
return &o
ms.GetBackoffStrategy = func() utils.BackoffStrategy {
return utils.NewBackoffRetryAllErrorStrategy(2, 0, 0)
}
prep := New(ms)
ctx := context.Background()
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/backup/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func startBackup(
}
return nil
})
}, utils.NewBackupSSTBackoffer())
}, utils.NewBackupSSTBackoffStrategy())
})
}
return eg.Wait()
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/checkpoint/external_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (s *externalCheckpointStorage) getTS(ctx context.Context) (int64, int64, er
}

return nil
}, utils.NewPDReqBackoffer())
}, utils.NewAggressivePDBackoffStrategy())

return p, l, errors.Trace(errRetry)
}
Expand Down
4 changes: 1 addition & 3 deletions br/pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,6 @@ func (exec *Executor) Execute(
updateFn func(),
) (*tipb.ChecksumResponse, error) {
checksumResp := &tipb.ChecksumResponse{}
checksumBackoffer := utils.InitialRetryState(utils.ChecksumRetryTime,
utils.ChecksumWaitInterval, utils.ChecksumMaxWaitInterval)
for _, req := range exec.reqs {
// Pointer to SessionVars.Killed
// Killed is a flag to indicate that this query is killed.
Expand All @@ -397,7 +395,7 @@ func (exec *Executor) Execute(
return errors.Trace(err)
}
return nil
}, &checksumBackoffer)
}, utils.NewChecksumBackoffStrategy())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func GetAllTiKVStoresWithRetry(ctx context.Context,

return errors.Trace(err)
},
utils.NewPDReqBackoffer(),
utils.NewAggressivePDBackoffStrategy(),
)

return stores, errors.Trace(errRetry)
Expand Down Expand Up @@ -420,7 +420,7 @@ func (mgr *Mgr) GetConfigFromTiKV(ctx context.Context, cli *http.Client, fn func
return err
}
return nil
}, utils.NewPDReqBackoffer())
}, utils.NewAggressivePDBackoffStrategy())
if err != nil {
// if one store failed, break and return error
return err
Expand Down
10 changes: 5 additions & 5 deletions br/pkg/encryption/master_key/kms_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package encryption
import (
"context"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/encryptionpb"
Expand Down Expand Up @@ -55,12 +56,11 @@ func (k *KmsBackend) Decrypt(ctx context.Context, content *encryptionpb.Encrypte
return k.state.cached.encryptionBackend.DecryptContent(ctx, content)
}

// piggyback on NewDownloadSSTBackoffer, a refactor is ongoing to remove all the backoffers
// so user don't need to write a backoffer for every type
decryptedKey, err :=
utils.WithRetryV2(ctx, utils.NewDownloadSSTBackoffer(), func(ctx context.Context) ([]byte, error) {
return k.kmsProvider.DecryptDataKey(ctx, ciphertextKey)
})
utils.WithRetryV2(ctx, utils.NewBackoffRetryAllErrorStrategy(10, 500*time.Millisecond, 5*time.Second),
func(ctx context.Context) ([]byte, error) {
return k.kmsProvider.DecryptDataKey(ctx, ciphertextKey)
})
if err != nil {
return nil, errors.Annotate(err, "decrypt encrypted key failed")
}
Expand Down
47 changes: 17 additions & 30 deletions br/pkg/restore/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package data

import (
"context"
stdErr "errors"
"io"
"time"

Expand Down Expand Up @@ -62,40 +63,27 @@ type recoveryError struct {
atStage RecoveryStage
}

func FailedAt(err error) RecoveryStage {
if rerr, ok := err.(recoveryError); ok {
return rerr.atStage
func atStage(err error) RecoveryStage {
var recoveryErr recoveryError
if stdErr.As(err, &recoveryErr) {
return recoveryErr.atStage
}
return StageUnknown
}

type recoveryBackoffer struct {
state utils.RetryState
}

func newRecoveryBackoffer() *recoveryBackoffer {
return &recoveryBackoffer{
state: utils.InitialRetryState(16, 30*time.Second, 4*time.Minute),
}
}

func (bo *recoveryBackoffer) NextBackoff(err error) time.Duration {
s := FailedAt(err)
switch s {
func isRetryErr(err error) bool {
stage := atStage(err)
switch stage {
case StageCollectingMeta, StageMakingRecoveryPlan, StageResetPDAllocateID, StageRecovering:
log.Info("Recovery data retrying.", zap.Error(err), zap.Stringer("stage", s))
return bo.state.ExponentialBackoff()
log.Info("Recovery data retrying.", zap.Error(err), zap.Stringer("stage", stage))
return true
case StageFlashback:
log.Info("Giving up retry for flashback stage.", zap.Error(err), zap.Stringer("stage", s))
bo.state.GiveUp()
return 0
log.Info("Giving up retry for flashback stage.", zap.Error(err), zap.Stringer("stage", stage))
return false
default:
log.Warn("unknown stage of recovery for backoff.", zap.Int("val", int(stage)))
return false
}
log.Warn("unknown stage of backing off.", zap.Int("val", int(s)))
return bo.state.ExponentialBackoff()
}

func (bo *recoveryBackoffer) Attempt() int {
return bo.state.Attempt()
}

// RecoverData recover the tikv cluster
Expand All @@ -109,7 +97,7 @@ func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Stor
// Roughly handle the case that some TiKVs are rebooted during making plan.
// Generally, retry the whole procedure will be fine for most cases. But perhaps we can do finer-grained retry,
// say, we may reuse the recovery plan, and probably no need to rebase PD allocation ID once we have done it.
return utils.WithRetryV2(ctx, newRecoveryBackoffer(), func(ctx context.Context) (int, error) {
return utils.WithRetryV2(ctx, utils.NewRecoveryBackoffStrategy(isRetryErr), func(ctx context.Context) (int, error) {
return doRecoveryData(ctx, resolveTS, allStores, mgr, progress, restoreTS, concurrency)
})
}
Expand Down Expand Up @@ -395,7 +383,6 @@ func (recovery *Recovery) SpawnTiKVShutDownWatchers(ctx context.Context) {

// prepare the region for flashback the data, the purpose is to stop region service, put region in flashback state
func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolveTS uint64, startTS uint64) (err error) {
retryState := utils.InitialRetryState(utils.FlashbackRetryTime, utils.FlashbackWaitInterval, utils.FlashbackMaxWaitInterval)
retryErr := utils.WithRetry(
ctx,
func() error {
Expand All @@ -416,7 +403,7 @@ func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolve
}
log.Info("region flashback prepare complete", zap.Int("regions", runner.CompletedRegions()))
return nil
}, &retryState)
}, utils.NewFlashBackBackoffStrategy())

recovery.progress.Inc()
return retryErr
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/log_client/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ func (importer *LogFileImporter) ImportKVFiles(

// This RetryState will retry 45 time, about 10 min.
rs := utils.InitialRetryState(45, 100*time.Millisecond, 15*time.Second)
ctl := OverRegionsInRange(startKey, endKey, importer.metaClient, &rs)
err = ctl.Run(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
ctl := CreateRangeController(startKey, endKey, importer.metaClient, &rs)
err = ctl.ApplyFuncToRange(ctx, func(ctx context.Context, r *split.RegionInfo) RPCResult {
subfiles, errFilter := filterFilesByRegion(files, ranges, r)
if errFilter != nil {
return RPCResultFromError(errFilter)
Expand Down
79 changes: 32 additions & 47 deletions br/pkg/restore/log_client/import_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import (

type RegionFunc func(ctx context.Context, r *split.RegionInfo) RPCResult

type OverRegionsInRangeController struct {
// RangeController manages the execution of operations over a range of regions.
// It provides functionality to scan regions within a specified key range and
// apply a given function to each region, handling errors and retries automatically.
type RangeController struct {
start []byte
end []byte
metaClient split.SplitClient
Expand All @@ -34,60 +37,49 @@ type OverRegionsInRangeController struct {
rs *utils.RetryState
}

// OverRegionsInRange creates a controller that cloud be used to scan regions in a range and
// CreateRangeController creates a controller that cloud be used to scan regions in a range and
// apply a function over these regions.
// You can then call the `Run` method for applying some functions.
func OverRegionsInRange(start, end []byte, metaClient split.SplitClient, retryStatus *utils.RetryState) OverRegionsInRangeController {
func CreateRangeController(start, end []byte, metaClient split.SplitClient, retryStatus *utils.RetryState) RangeController {
// IMPORTANT: we record the start/end key with TimeStamp.
// but scanRegion will drop the TimeStamp and the end key is exclusive.
// if we do not use PrefixNextKey. we might scan fewer regions than we expected.
// and finally cause the data lost.
end = restoreutils.TruncateTS(end)
end = kv.PrefixNextKey(end)

return OverRegionsInRangeController{
return RangeController{
start: start,
end: end,
metaClient: metaClient,
rs: retryStatus,
}
}

func (o *OverRegionsInRangeController) onError(_ context.Context, result RPCResult, region *split.RegionInfo) {
func (o *RangeController) onError(_ context.Context, result RPCResult, region *split.RegionInfo) {
o.errors = multierr.Append(o.errors, errors.Annotatef(&result, "execute over region %v failed", region.Region))
// TODO: Maybe handle some of region errors like `epoch not match`?
}

func (o *OverRegionsInRangeController) tryFindLeader(ctx context.Context, region *split.RegionInfo) (*metapb.Peer, error) {
var leader *metapb.Peer
failed := false
leaderRs := utils.InitialRetryState(4, 5*time.Second, 10*time.Second)
err := utils.WithRetry(ctx, func() error {
func (o *RangeController) tryFindLeader(ctx context.Context, region *split.RegionInfo) (*metapb.Peer, error) {
backoffStrategy := utils.NewBackoffRetryAllErrorStrategy(4, 2*time.Second, 10*time.Second)
return utils.WithRetryV2(ctx, backoffStrategy, func(ctx context.Context) (*metapb.Peer, error) {
r, err := o.metaClient.GetRegionByID(ctx, region.Region.Id)
if err != nil {
return err
return nil, err
}
if !split.CheckRegionEpoch(r, region) {
failed = true
return nil
return nil, errors.Annotatef(berrors.ErrKVEpochNotMatch, "the current epoch of %s has changed", region)
}
if r.Leader != nil {
leader = r.Leader
return nil
return r.Leader, nil
}
return errors.Annotatef(berrors.ErrPDLeaderNotFound, "there is no leader for region %d", region.Region.Id)
}, &leaderRs)
if failed {
return nil, errors.Annotatef(berrors.ErrKVEpochNotMatch, "the current epoch of %s is changed", region)
}
if err != nil {
return nil, err
}
return leader, nil
return nil, errors.Annotatef(berrors.ErrPDLeaderNotFound, "there is no leader for region %d", region.Region.Id)
})
}

// handleInRegionError handles the error happens internal in the region. Update the region info, and perform a suitable backoff.
func (o *OverRegionsInRangeController) handleInRegionError(ctx context.Context, result RPCResult, region *split.RegionInfo) (cont bool) {
// handleRegionError handles the error happens internal in the region. Update the region info, and perform a suitable backoff.
func (o *RangeController) handleRegionError(ctx context.Context, result RPCResult, region *split.RegionInfo) (cont bool) {
if result.StoreError.GetServerIsBusy() != nil {
if strings.Contains(result.StoreError.GetMessage(), "memory is limited") {
sleepDuration := 15 * time.Second
Expand Down Expand Up @@ -126,35 +118,32 @@ func (o *OverRegionsInRangeController) handleInRegionError(ctx context.Context,
return true
}

func (o *OverRegionsInRangeController) prepareLogCtx(ctx context.Context) context.Context {
lctx := logutil.ContextWithField(
func (o *RangeController) prepareLogCtx(ctx context.Context) context.Context {
return logutil.ContextWithField(
ctx,
logutil.Key("startKey", o.start),
logutil.Key("endKey", o.end),
)
return lctx
}

// Run executes the `regionFunc` over the regions in `o.start` and `o.end`.
// It would retry the errors according to the `rpcResponse`.
func (o *OverRegionsInRangeController) Run(ctx context.Context, f RegionFunc) error {
return o.runOverRegions(o.prepareLogCtx(ctx), f)
}
// ApplyFuncToRange apples the `regionFunc` for all regions in `o.start` and `o.end`.
// It would retry errors according to the `rpcResponse`.
func (o *RangeController) ApplyFuncToRange(ctx context.Context, f RegionFunc) error {
adjustedCtx := o.prepareLogCtx(ctx)

func (o *OverRegionsInRangeController) runOverRegions(ctx context.Context, f RegionFunc) error {
if !o.rs.ShouldRetry() {
return o.errors
}

// Scan regions covered by the file range
regionInfos, errScanRegion := split.PaginateScanRegion(
ctx, o.metaClient, o.start, o.end, split.ScanRegionPaginationLimit)
adjustedCtx, o.metaClient, o.start, o.end, split.ScanRegionPaginationLimit)
if errScanRegion != nil {
return errors.Trace(errScanRegion)
}

for _, region := range regionInfos {
cont, err := o.runInRegion(ctx, f, region)
cont, err := o.applyFuncToRegion(adjustedCtx, f, region)
if err != nil {
return err
}
Expand All @@ -165,8 +154,8 @@ func (o *OverRegionsInRangeController) runOverRegions(ctx context.Context, f Reg
return nil
}

// runInRegion executes the function in the region, and returns `cont = false` if no need for trying for next region.
func (o *OverRegionsInRangeController) runInRegion(ctx context.Context, f RegionFunc, region *split.RegionInfo) (cont bool, err error) {
// applyFuncToRegion executes the function in the region, and returns `cont = false` if no need for trying for next region.
func (o *RangeController) applyFuncToRegion(ctx context.Context, f RegionFunc, region *split.RegionInfo) (cont bool, err error) {
if !o.rs.ShouldRetry() {
return false, o.errors
}
Expand All @@ -180,16 +169,16 @@ func (o *OverRegionsInRangeController) runInRegion(ctx context.Context, f Region
return false, o.errors
case StrategyFromThisRegion:
logutil.CL(ctx).Warn("retry for region", logutil.Region(region.Region), logutil.ShortError(&result))
if !o.handleInRegionError(ctx, result, region) {
return false, o.runOverRegions(ctx, f)
if !o.handleRegionError(ctx, result, region) {
return false, o.ApplyFuncToRange(ctx, f)
}
return o.runInRegion(ctx, f, region)
return o.applyFuncToRegion(ctx, f, region)
case StrategyFromStart:
logutil.CL(ctx).Warn("retry for execution over regions", logutil.ShortError(&result))
// TODO: make a backoffer considering more about the error info,
// instead of ingore the result and retry.
time.Sleep(o.rs.ExponentialBackoff())
return false, o.runOverRegions(ctx, f)
return false, o.ApplyFuncToRange(ctx, f)
}
}
return true, nil
Expand Down Expand Up @@ -251,10 +240,6 @@ func (r *RPCResult) StrategyForRetryStoreError() RetryStrategy {
}

func (r *RPCResult) StrategyForRetryGoError() RetryStrategy {
if r.Err == nil {
Copy link
Contributor Author

@Tristan1900 Tristan1900 Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only get called when error is not nil

return StrategyGiveUp
}

// we should unwrap the error or we cannot get the write gRPC status.
if gRPCErr, ok := status.FromError(errors.Cause(r.Err)); ok {
switch gRPCErr.Code() {
Expand Down
Loading