diff --git a/br/pkg/restore/data.go b/br/pkg/restore/data.go index 061c3114980ba..acbe595b59007 100644 --- a/br/pkg/restore/data.go +++ b/br/pkg/restore/data.go @@ -27,6 +27,75 @@ import ( "google.golang.org/grpc/backoff" ) +type RecoveryStage int + +const ( + StageUnknown RecoveryStage = iota + StageCollectingMeta + StageMakingRecoveryPlan + StageResetPDAllocateID + StageRecovering + StageFlashback +) + +func (s RecoveryStage) String() string { + switch s { + case StageCollectingMeta: + return "collecting meta" + case StageMakingRecoveryPlan: + return "making recovery plan" + case StageResetPDAllocateID: + return "resetting PD allocate ID" + case StageRecovering: + return "recovering" + case StageFlashback: + return "flashback" + default: + return "unknown" + } +} + +type recoveryError struct { + error + atStage RecoveryStage +} + +func FailedAt(err error) RecoveryStage { + if rerr, ok := err.(recoveryError); ok { + return rerr.atStage + } + return StageUnknown +} + +type recoveryBackoffer struct { + state utils.RetryState +} + +func newRecoveryBackoffer() *recoveryBackoffer { + return &recoveryBackoffer{ + state: utils.InitialRetryState(16, 30*time.Second, 4*time.Minute), + } +} + +func (bo *recoveryBackoffer) NextBackoff(err error) time.Duration { + s := FailedAt(err) + switch s { + case StageCollectingMeta, StageMakingRecoveryPlan, StageResetPDAllocateID, StageRecovering: + log.Info("Recovery data retrying.", zap.Error(err), zap.Stringer("stage", s)) + return bo.state.ExponentialBackoff() + case StageFlashback: + log.Info("Giving up retry for flashback stage.", zap.Error(err), zap.Stringer("stage", s)) + bo.state.GiveUp() + return 0 + } + log.Warn("unknown stage of backing off.", zap.Int("val", int(s))) + return bo.state.ExponentialBackoff() +} + +func (bo *recoveryBackoffer) Attempt() int { + return bo.state.Attempt() +} + // RecoverData recover the tikv cluster // 1. read all meta data from tikvs // 2. make recovery plan and then recovery max allocate ID firstly @@ -35,39 +104,52 @@ import ( // 5. prepare the flashback // 6. flashback to resolveTS func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, restoreTS uint64, concurrency uint32) (int, error) { + // Roughly handle the case that some TiKVs are rebooted during making plan. + // Generally, retry the whole procedure will be fine for most cases. But perhaps we can do finer-grained retry, + // say, we may reuse the recovery plan, and probably no need to rebase PD allocation ID once we have done it. + return utils.WithRetryV2(ctx, newRecoveryBackoffer(), func(ctx context.Context) (int, error) { + return doRecoveryData(ctx, resolveTS, allStores, mgr, progress, restoreTS, concurrency) + }) +} + +func doRecoveryData(ctx context.Context, resolveTS uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, restoreTS uint64, concurrency uint32) (int, error) { + var cancel context.CancelFunc + ctx, cancel = context.WithCancel(ctx) + defer cancel() + var recovery = NewRecovery(allStores, mgr, progress, concurrency) if err := recovery.ReadRegionMeta(ctx); err != nil { - return 0, errors.Trace(err) + return 0, recoveryError{error: err, atStage: StageCollectingMeta} } totalRegions := recovery.GetTotalRegions() if err := recovery.MakeRecoveryPlan(); err != nil { - return totalRegions, errors.Trace(err) + return totalRegions, recoveryError{error: err, atStage: StageMakingRecoveryPlan} } log.Info("recover the alloc id to pd", zap.Uint64("max alloc id", recovery.MaxAllocID)) if err := recovery.mgr.RecoverBaseAllocID(ctx, recovery.MaxAllocID); err != nil { - return totalRegions, errors.Trace(err) + return totalRegions, recoveryError{error: err, atStage: StageResetPDAllocateID} } // Once TiKV shuts down and reboot then, it may be left with no leader because of the recovery mode. // This wathcher will retrigger `RecoveryRegions` for those stores. recovery.SpawnTiKVShutDownWatchers(ctx) if err := recovery.RecoverRegions(ctx); err != nil { - return totalRegions, errors.Trace(err) + return totalRegions, recoveryError{error: err, atStage: StageRecovering} } if err := recovery.WaitApply(ctx); err != nil { - return totalRegions, errors.Trace(err) + return totalRegions, recoveryError{error: err, atStage: StageRecovering} } if err := recovery.PrepareFlashbackToVersion(ctx, resolveTS, restoreTS-1); err != nil { - return totalRegions, errors.Trace(err) + return totalRegions, recoveryError{error: err, atStage: StageFlashback} } if err := recovery.FlashbackToVersion(ctx, resolveTS, restoreTS); err != nil { - return totalRegions, errors.Trace(err) + return totalRegions, recoveryError{error: err, atStage: StageFlashback} } return totalRegions, nil diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index 02d21994118e6..368c11b7f0b6a 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -72,6 +72,10 @@ func (rs *RetryState) ExponentialBackoff() time.Duration { return backoff } +func (rs *RetryState) GiveUp() { + rs.retryTimes = rs.maxRetry +} + // InitialRetryState make the initial state for retrying. func InitialRetryState(maxRetryTimes int, initialBackoff, maxBackoff time.Duration) RetryState { return RetryState{ diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index 0cdd33934a0d5..b4ab0437cf651 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -34,6 +34,8 @@ var retryableServerError = []string{ // RetryableFunc presents a retryable operation. type RetryableFunc func() error +type RetryableFuncV2[T any] func(context.Context) (T, error) + // Backoffer implements a backoff policy for retrying operations. type Backoffer interface { // NextBackoff returns a duration to wait before retrying again @@ -51,21 +53,37 @@ func WithRetry( retryableFunc RetryableFunc, backoffer Backoffer, ) error { + _, err := WithRetryV2[struct{}](ctx, backoffer, func(ctx context.Context) (struct{}, error) { + innerErr := retryableFunc() + return struct{}{}, innerErr + }) + return err +} + +// WithRetryV2 retries a given operation with a backoff policy. +// +// Returns the returned value if `retryableFunc` succeeded at least once. Otherwise, returns a +// multierr that containing all errors encountered. +// Comparing with `WithRetry`, this function reordered the argument order and supports catching the return value. +func WithRetryV2[T any]( + ctx context.Context, + backoffer Backoffer, + fn RetryableFuncV2[T], +) (T, error) { var allErrors error for backoffer.Attempt() > 0 { - err := retryableFunc() - if err != nil { - allErrors = multierr.Append(allErrors, err) - select { - case <-ctx.Done(): - return allErrors // nolint:wrapcheck - case <-time.After(backoffer.NextBackoff(err)): - } - } else { - return nil + res, err := fn(ctx) + if err == nil { + return res, nil + } + allErrors = multierr.Append(allErrors, err) + select { + case <-ctx.Done(): + return *new(T), allErrors + case <-time.After(backoffer.NextBackoff(err)): } } - return allErrors // nolint:wrapcheck + return *new(T), allErrors // nolint:wrapcheck } // MessageIsRetryableStorageError checks whether the message returning from TiKV is retryable ExternalStorageError.