Skip to content

Commit

Permalink
log-backup: add flag for restore point (#39431)
Browse files Browse the repository at this point in the history
close #39379
  • Loading branch information
joccau authored Nov 29, 2022
1 parent 5b16086 commit d61b8c4
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 20 deletions.
1 change: 0 additions & 1 deletion br/cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,5 @@ func newStreamRestoreCommand() *cobra.Command {
}
task.DefineFilterFlags(command, filterOutSysAndMemTables, true)
task.DefineStreamRestoreFlags(command)
command.Hidden = true
return command
}
50 changes: 31 additions & 19 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,19 @@ const (
// FlagStreamFullBackupStorage is used for log restore, represents the full backup storage.
FlagStreamFullBackupStorage = "full-backup-storage"
// FlagPiTRBatchCount and FlagPiTRBatchSize are used for restore log with batch method.
FlagPiTRBatchCount = "pitr-batch-count"
FlagPiTRBatchSize = "pitr-batch-size"

defaultPiTRBatchCount = 16
defaultPiTRBatchSize = 32 * 1024 * 1024
defaultRestoreConcurrency = 128
defaultRestoreStreamConcurrency = 128
maxRestoreBatchSizeLimit = 10240
defaultPDConcurrency = 1
defaultBatchFlushInterval = 16 * time.Second
defaultFlagDdlBatchSize = 128
resetSpeedLimitRetryTimes = 3
FlagPiTRBatchCount = "pitr-batch-count"
FlagPiTRBatchSize = "pitr-batch-size"
FlagPiTRConcurrency = "pitr-concurrency"

defaultPiTRBatchCount = 8
defaultPiTRBatchSize = 16 * 1024 * 1024
defaultRestoreConcurrency = 128
defaultPiTRConcurrency = 16
maxRestoreBatchSizeLimit = 10240
defaultPDConcurrency = 1
defaultBatchFlushInterval = 16 * time.Second
defaultFlagDdlBatchSize = 128
resetSpeedLimitRetryTimes = 3
)

const (
Expand Down Expand Up @@ -175,6 +176,7 @@ type RestoreConfig struct {
tiflashRecorder *tiflashrec.TiFlashRecorder `json:"-" toml:"-"`
PitrBatchCount uint32 `json:"pitr-batch-count" toml:"pitr-batch-count"`
PitrBatchSize uint32 `json:"pitr-batch-size" toml:"pitr-batch-size"`
PitrConcurrency uint32 `json:"-" toml:"-"`

// for ebs-based restore
FullBackupType FullBackupType `json:"full-backup-type" toml:"full-backup-type"`
Expand Down Expand Up @@ -206,10 +208,9 @@ func DefineStreamRestoreFlags(command *cobra.Command) {
"support TSO or datetime, e.g. '400036290571534337' or '2018-05-11 01:42:23+0800'")
command.Flags().String(FlagStreamFullBackupStorage, "", "specify the backup full storage. "+
"fill it if want restore full backup before restore log.")
command.Flags().Uint32(FlagPiTRBatchCount, defaultPiTRBatchCount, "")
command.Flags().Uint32(FlagPiTRBatchSize, defaultPiTRBatchSize, "")
_ = command.Flags().MarkHidden(FlagPiTRBatchCount)
_ = command.Flags().MarkHidden(FlagPiTRBatchSize)
command.Flags().Uint32(FlagPiTRBatchCount, defaultPiTRBatchCount, "specify the batch count to restore log.")
command.Flags().Uint32(FlagPiTRBatchSize, defaultPiTRBatchSize, "specify the batch size to retore log.")
command.Flags().Uint32(FlagPiTRConcurrency, defaultPiTRConcurrency, "specify the concurrency to restore log.")
}

// ParseStreamRestoreFlags parses the `restore stream` flags from the flag set.
Expand Down Expand Up @@ -244,6 +245,9 @@ func (cfg *RestoreConfig) ParseStreamRestoreFlags(flags *pflag.FlagSet) error {
if cfg.PitrBatchSize, err = flags.GetUint32(FlagPiTRBatchSize); err != nil {
return errors.Trace(err)
}
if cfg.PitrConcurrency, err = flags.GetUint32(FlagPiTRConcurrency); err != nil {
return errors.Trace(err)
}
return nil
}

Expand Down Expand Up @@ -370,10 +374,18 @@ func (cfg *RestoreConfig) Adjust() {
}

func (cfg *RestoreConfig) adjustRestoreConfigForStreamRestore() {
if cfg.Config.Concurrency == 0 {
log.Info("set restore kv files concurrency", zap.Int("concurrency", defaultRestoreStreamConcurrency))
cfg.Config.Concurrency = defaultRestoreStreamConcurrency
if cfg.PitrConcurrency == 0 {
cfg.PitrConcurrency = defaultPiTRConcurrency
}
if cfg.PitrBatchCount == 0 {
cfg.PitrBatchCount = defaultPiTRBatchCount
}
if cfg.PitrBatchSize == 0 {
cfg.PitrBatchSize = defaultPiTRBatchSize
}

log.Info("set restore kv files concurrency", zap.Int("concurrency", int(cfg.PitrConcurrency)))
cfg.Config.Concurrency = cfg.PitrConcurrency
}

func configureRestoreClient(ctx context.Context, client *restore.Client, cfg *RestoreConfig) error {
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/task/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ func TestConfigureRestoreClient(t *testing.T) {
require.True(t, client.IsOnline())
}

func TestAdjustRestoreConfigForStreamRestore(t *testing.T) {
restoreCfg := RestoreConfig{}

restoreCfg.adjustRestoreConfigForStreamRestore()
require.Equal(t, restoreCfg.PitrBatchCount, uint32(defaultPiTRBatchCount))
require.Equal(t, restoreCfg.PitrBatchSize, uint32(defaultPiTRBatchSize))
require.Equal(t, restoreCfg.PitrConcurrency, uint32(defaultPiTRConcurrency))
require.Equal(t, restoreCfg.Concurrency, restoreCfg.PitrConcurrency)
}

func TestCheckRestoreDBAndTable(t *testing.T) {
cases := []struct {
cfgSchemas map[string]struct{}
Expand Down

0 comments on commit d61b8c4

Please sign in to comment.