diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index caa5eae0bb082..466d934ebcab2 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -92,11 +92,12 @@ type BackupConfig struct { CompressionConfig // for ebs-based backup - FullBackupType FullBackupType `json:"full-backup-type" toml:"full-backup-type"` - VolumeFile string `json:"volume-file" toml:"volume-file"` - SkipAWS bool `json:"skip-aws" toml:"skip-aws"` - CloudAPIConcurrency uint `json:"cloud-api-concurrency" toml:"cloud-api-concurrency"` - ProgressFile string `json:"progress-file" toml:"progress-file"` + FullBackupType FullBackupType `json:"full-backup-type" toml:"full-backup-type"` + VolumeFile string `json:"volume-file" toml:"volume-file"` + SkipAWS bool `json:"skip-aws" toml:"skip-aws"` + CloudAPIConcurrency uint `json:"cloud-api-concurrency" toml:"cloud-api-concurrency"` + ProgressFile string `json:"progress-file" toml:"progress-file"` + SkipPauseGCAndScheduler bool `json:"skip-pause-gc-and-scheduler" toml:"skip-pause-gc-and-scheduler"` } // DefineBackupFlags defines common flags for the backup command. @@ -244,6 +245,10 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } + cfg.SkipPauseGCAndScheduler, err = flags.GetBool(flagOperatorPausedGCAndSchedulers) + if err != nil { + return errors.Trace(err) + } } cfg.ReplicaReadLabel, err = parseReplicaReadLabelFlag(flags) diff --git a/br/pkg/task/backup_ebs.go b/br/pkg/task/backup_ebs.go index 49ddcf3781d43..acbdc42065d10 100644 --- a/br/pkg/task/backup_ebs.go +++ b/br/pkg/task/backup_ebs.go @@ -9,6 +9,7 @@ import ( "encoding/json" "fmt" "io" + "os" "sort" "sync" "time" @@ -26,6 +27,7 @@ import ( "github.com/pingcap/tidb/br/pkg/conn/util" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/metautil" + "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" @@ -55,12 +57,14 @@ func DefineBackupEBSFlags(flags *pflag.FlagSet) { flags.Bool(flagSkipAWS, false, "don't access to aws environment if set to true") flags.Uint(flagCloudAPIConcurrency, defaultCloudAPIConcurrency, "concurrency of calling cloud api") flags.String(flagProgressFile, "progress.txt", "the file name of progress file") + flags.Bool(flagOperatorPausedGCAndSchedulers, false, "if the GC and scheduler are paused by the `operator` command in another therad, set this so we can skip pausing GC and schedulers.") _ = flags.MarkHidden(flagFullBackupType) _ = flags.MarkHidden(flagBackupVolumeFile) _ = flags.MarkHidden(flagSkipAWS) _ = flags.MarkHidden(flagCloudAPIConcurrency) _ = flags.MarkHidden(flagProgressFile) + _ = flags.MarkHidden(flagOperatorPausedGCAndSchedulers) } // RunBackupEBS starts a backup task to backup volume vai EBS snapshot. @@ -131,24 +135,28 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error { } // Step.1.1 stop scheduler as much as possible. - log.Info("starting to remove some PD schedulers") - restoreFunc, e := mgr.RemoveAllPDSchedulers(ctx) - if e != nil { - return errors.Trace(err) - } - var scheduleRestored bool - defer func() { - if ctx.Err() != nil { - log.Warn("context canceled, doing clean work with background context") - ctx = context.Background() - } - if scheduleRestored { - return - } - if restoreE := restoreFunc(ctx); restoreE != nil { - log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) + log.Info("starting to remove some PD schedulers and pausing GC", zap.Bool("already-paused-by-operator", cfg.SkipPauseGCAndScheduler)) + var restoreFunc pdutil.UndoFunc + + if !cfg.SkipPauseGCAndScheduler { + var e error + restoreFunc, e = mgr.RemoveAllPDSchedulers(ctx) + if e != nil { + return errors.Trace(err) } - }() + defer func() { + if ctx.Err() != nil { + log.Warn("context canceled, doing clean work with background context") + ctx = context.Background() + } + if restoreFunc == nil { + return + } + if restoreE := restoreFunc(ctx); restoreE != nil { + log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) + } + }() + } if err := waitAllScheduleStoppedAndNoRegionHole(ctx, cfg.Config, mgr); err != nil { return errors.Trace(err) @@ -159,15 +167,17 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error { if err != nil { return errors.Trace(err) } - sp := utils.BRServiceSafePoint{ - BackupTS: resolvedTs, - TTL: utils.DefaultBRGCSafePointTTL, - ID: utils.MakeSafePointID(), - } - log.Info("safe point will be stuck during ebs backup", zap.Object("safePoint", sp)) - err = utils.StartServiceSafePointKeeper(ctx, mgr.GetPDClient(), sp) - if err != nil { - return errors.Trace(err) + if !cfg.SkipPauseGCAndScheduler { + sp := utils.BRServiceSafePoint{ + BackupTS: resolvedTs, + TTL: utils.DefaultBRGCSafePointTTL, + ID: utils.MakeSafePointID(), + } + log.Info("safe point will be stuck during ebs backup", zap.Object("safePoint", sp)) + err = utils.StartServiceSafePointKeeper(ctx, mgr.GetPDClient(), sp) + if err != nil { + return errors.Trace(err) + } } // Step.1.3 backup the key info to recover cluster. e.g. PD alloc_id/cluster_id @@ -210,7 +220,8 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error { if restoreE := restoreFunc(ctx); restoreE != nil { log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) } else { - scheduleRestored = true + // Clear the restore func, so we won't execute it many times. + restoreFunc = nil } log.Info("wait async snapshots finish") @@ -223,8 +234,13 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error { for i := 0; i < int(storeCount); i++ { progress.IncBy(100) totalSize = 1024 - log.Info("mock snapshot finished.", zap.Int("index", i)) - time.Sleep(800 * time.Millisecond) + timeToSleep := getMockSleepTime() + log.Info("mock snapshot finished.", zap.Int("index", i), zap.Duration("time-to-sleep", timeToSleep)) + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case <-time.After(timeToSleep): + } } } progress.Close() @@ -245,6 +261,19 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error { return nil } +func getMockSleepTime() time.Duration { + dft := 800 * time.Millisecond + v, ok := os.LookupEnv("br_ebs_backup_mocking_wait_snapshot_duration") + if !ok { + return dft + } + d, err := time.ParseDuration(v) + if err != nil { + return dft + } + return d +} + func waitAllScheduleStoppedAndNoRegionHole(ctx context.Context, cfg Config, mgr *conn.Mgr) error { allStores, err := conn.GetAllTiKVStoresWithRetry(ctx, mgr.GetPDClient(), util.SkipTiFlash) if err != nil { diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 98b0810e8261e..cf832f2ecaf2a 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -78,9 +78,10 @@ const ( flagSkipCheckPath = "skip-check-path" flagDryRun = "dry-run" // TODO used for local test, should be removed later - flagSkipAWS = "skip-aws" - flagCloudAPIConcurrency = "cloud-api-concurrency" - flagWithSysTable = "with-sys-table" + flagSkipAWS = "skip-aws" + flagCloudAPIConcurrency = "cloud-api-concurrency" + flagWithSysTable = "with-sys-table" + flagOperatorPausedGCAndSchedulers = "operator-paused-gc-and-scheduler" defaultSwitchInterval = 5 * time.Minute defaultGRPCKeepaliveTime = 10 * time.Second