Skip to content

Commit

Permalink
br: add retry for prepare flashback for backup cluster is empty and t…
Browse files Browse the repository at this point in the history
…here are only one region (#41059) (#41116)

close #41058
  • Loading branch information
ti-chi-bot authored Feb 15, 2023
1 parent f45b3a8 commit 1c0e1e5
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 19 deletions.
40 changes: 21 additions & 19 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,27 +305,29 @@ func (recovery *Recovery) WaitApply(ctx context.Context) (err error) {

// prepare the region for flashback the data, the purpose is to stop region service, put region in flashback state
func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolveTS uint64, startTS uint64) (err error) {
var totalRegions atomic.Uint64
totalRegions.Store(0)

handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := ddl.SendPrepareFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, startTS, r)
totalRegions.Add(uint64(stats.CompletedRegions))
return stats, err
}

runner := rangetask.NewRangeTaskRunner("br-flashback-prepare-runner", recovery.mgr.GetStorage().(tikv.Storage), int(recovery.concurrency), handler)
// Run prepare flashback on the entire TiKV cluster. Empty keys means the range is unbounded.
err = runner.RunOnRange(ctx, []byte(""), []byte(""))
if err != nil {
log.Error("region flashback prepare get error")
return errors.Trace(err)
}
retryErr := utils.WithRetry(
ctx,
func() error {
handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
stats, err := ddl.SendPrepareFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, startTS, r)
return stats, err
}

recovery.totalFlashbackRegions = totalRegions.Load()
log.Info("region flashback prepare complete", zap.Int("regions", runner.CompletedRegions()))
runner := rangetask.NewRangeTaskRunner("br-flashback-prepare-runner", recovery.mgr.GetStorage().(tikv.Storage), int(recovery.concurrency), handler)
// Run prepare flashback on the entire TiKV cluster. Empty keys means the range is unbounded.
err = runner.RunOnRange(ctx, []byte(""), []byte(""))
if err != nil {
log.Warn("region flashback prepare get error")
return errors.Trace(err)
}
log.Info("region flashback prepare complete", zap.Int("regions", runner.CompletedRegions()))
return nil
},
utils.NewFlashBackBackoffer(),
)

return nil
recovery.progress.Inc()
return retryErr
}

// flashback the region data to version resolveTS
Expand Down
36 changes: 36 additions & 0 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ const (
resetTSRetryTimeExt = 600
resetTSWaitIntervalExt = 500 * time.Millisecond
resetTSMaxWaitIntervalExt = 300 * time.Second

// region heartbeat are 10 seconds by default, if some region has 2 heartbeat missing (15 seconds), it appear to be a network issue between PD and TiKV.
flashbackRetryTime = 3
flashbackWaitInterval = 3000 * time.Millisecond
flashbackMaxWaitInterval = 15 * time.Second
)

// RetryState is the mutable state needed for retrying.
Expand Down Expand Up @@ -204,3 +209,34 @@ func (bo *pdReqBackoffer) NextBackoff(err error) time.Duration {
func (bo *pdReqBackoffer) Attempt() int {
return bo.attempt
}

type flashbackBackoffer struct {
attempt int
delayTime time.Duration
maxDelayTime time.Duration
}

// NewBackoffer creates a new controller regulating a truncated exponential backoff.
func NewFlashBackBackoffer() Backoffer {
return &flashbackBackoffer{
attempt: flashbackRetryTime,
delayTime: flashbackWaitInterval,
maxDelayTime: flashbackMaxWaitInterval,
}
}

// retry 3 times when prepare flashback failure.
func (bo *flashbackBackoffer) NextBackoff(err error) time.Duration {
bo.delayTime = 2 * bo.delayTime
bo.attempt--
log.Warn("region may not ready to serve, retry it...", zap.Error(err))

if bo.delayTime > bo.maxDelayTime {
return bo.maxDelayTime
}
return bo.delayTime
}

func (bo *flashbackBackoffer) Attempt() int {
return bo.attempt
}

0 comments on commit 1c0e1e5

Please sign in to comment.