From 1c0e1e5bb49dab984929ad08b9fcfb79a7738cf1 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 15 Feb 2023 15:40:01 +0800 Subject: [PATCH] br: add retry for prepare flashback for backup cluster is empty and there are only one region (#41059) (#41116) close pingcap/tidb#41058 --- br/pkg/restore/data.go | 40 +++++++++++++++++++++------------------- br/pkg/utils/backoff.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 19 deletions(-) diff --git a/br/pkg/restore/data.go b/br/pkg/restore/data.go index b4ed1c1144dd8..351798dbaa7ab 100644 --- a/br/pkg/restore/data.go +++ b/br/pkg/restore/data.go @@ -305,27 +305,29 @@ func (recovery *Recovery) WaitApply(ctx context.Context) (err error) { // prepare the region for flashback the data, the purpose is to stop region service, put region in flashback state func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolveTS uint64, startTS uint64) (err error) { - var totalRegions atomic.Uint64 - totalRegions.Store(0) - - handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { - stats, err := ddl.SendPrepareFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, startTS, r) - totalRegions.Add(uint64(stats.CompletedRegions)) - return stats, err - } - - runner := rangetask.NewRangeTaskRunner("br-flashback-prepare-runner", recovery.mgr.GetStorage().(tikv.Storage), int(recovery.concurrency), handler) - // Run prepare flashback on the entire TiKV cluster. Empty keys means the range is unbounded. - err = runner.RunOnRange(ctx, []byte(""), []byte("")) - if err != nil { - log.Error("region flashback prepare get error") - return errors.Trace(err) - } + retryErr := utils.WithRetry( + ctx, + func() error { + handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { + stats, err := ddl.SendPrepareFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, startTS, r) + return stats, err + } - recovery.totalFlashbackRegions = totalRegions.Load() - log.Info("region flashback prepare complete", zap.Int("regions", runner.CompletedRegions())) + runner := rangetask.NewRangeTaskRunner("br-flashback-prepare-runner", recovery.mgr.GetStorage().(tikv.Storage), int(recovery.concurrency), handler) + // Run prepare flashback on the entire TiKV cluster. Empty keys means the range is unbounded. + err = runner.RunOnRange(ctx, []byte(""), []byte("")) + if err != nil { + log.Warn("region flashback prepare get error") + return errors.Trace(err) + } + log.Info("region flashback prepare complete", zap.Int("regions", runner.CompletedRegions())) + return nil + }, + utils.NewFlashBackBackoffer(), + ) - return nil + recovery.progress.Inc() + return retryErr } // flashback the region data to version resolveTS diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index 5353c972d24ad..bff2490b56650 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -33,6 +33,11 @@ const ( resetTSRetryTimeExt = 600 resetTSWaitIntervalExt = 500 * time.Millisecond resetTSMaxWaitIntervalExt = 300 * time.Second + + // region heartbeat are 10 seconds by default, if some region has 2 heartbeat missing (15 seconds), it appear to be a network issue between PD and TiKV. + flashbackRetryTime = 3 + flashbackWaitInterval = 3000 * time.Millisecond + flashbackMaxWaitInterval = 15 * time.Second ) // RetryState is the mutable state needed for retrying. @@ -204,3 +209,34 @@ func (bo *pdReqBackoffer) NextBackoff(err error) time.Duration { func (bo *pdReqBackoffer) Attempt() int { return bo.attempt } + +type flashbackBackoffer struct { + attempt int + delayTime time.Duration + maxDelayTime time.Duration +} + +// NewBackoffer creates a new controller regulating a truncated exponential backoff. +func NewFlashBackBackoffer() Backoffer { + return &flashbackBackoffer{ + attempt: flashbackRetryTime, + delayTime: flashbackWaitInterval, + maxDelayTime: flashbackMaxWaitInterval, + } +} + +// retry 3 times when prepare flashback failure. +func (bo *flashbackBackoffer) NextBackoff(err error) time.Duration { + bo.delayTime = 2 * bo.delayTime + bo.attempt-- + log.Warn("region may not ready to serve, retry it...", zap.Error(err)) + + if bo.delayTime > bo.maxDelayTime { + return bo.maxDelayTime + } + return bo.delayTime +} + +func (bo *flashbackBackoffer) Attempt() int { + return bo.attempt +}