Skip to content

Commit

Permalink
br: refine cmd stream restore to restore log
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer committed Apr 6, 2022
1 parent e92c8c4 commit 2a71e2d
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 114 deletions.
15 changes: 15 additions & 0 deletions br/cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func NewRestoreCommand() *cobra.Command {
newDBRestoreCommand(),
newTableRestoreCommand(),
newRawRestoreCommand(),
newStreamRestoreCommand(),
)
task.DefineRestoreFlags(command.PersistentFlags())

Expand Down Expand Up @@ -141,3 +142,17 @@ func newRawRestoreCommand() *cobra.Command {
task.DefineRawRestoreFlags(command)
return command
}

func newStreamRestoreCommand() *cobra.Command {
command := &cobra.Command{
Use: "log",
Short: "restore log backups",
Args: cobra.NoArgs,
RunE: func(command *cobra.Command, _ []string) error {
return runRestoreCommand(command, task.LogRestoreCmd)
},
}
task.DefineFilterFlags(command, filterOutSysAndMemTables, false)
task.DefineStreamRestoreFlags(command)
return command
}
25 changes: 0 additions & 25 deletions br/cmd/br/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func NewStreamCommand() *cobra.Command {
//newStreamPauseCommand(),
//newStreamResumeCommand(),
newStreamStatusCommand(),
newStreamRestoreCommand(),
newStreamTruncateCommand(),
)

Expand Down Expand Up @@ -127,23 +126,6 @@ func newStreamStatusCommand() *cobra.Command {
return command
}

// TODO maybe we should use `br restore stream` rather than `br stream restore`
// because the restore and stream task has no common flags.
func newStreamRestoreCommand() *cobra.Command {
command := &cobra.Command{
Use: "restore",
Short: "restore a stream backups",
Args: cobra.NoArgs,
RunE: func(command *cobra.Command, _ []string) error {
return streamCommand(command, task.StreamRestore)
},
}
task.DefineRestoreFlags(command.PersistentFlags())
task.DefineFilterFlags(command, filterOutSysAndMemTables, false)
task.DefineStreamRestoreFlags(command.Flags())
return command
}

func newStreamTruncateCommand() *cobra.Command {
command := &cobra.Command{
Use: "truncate",
Expand Down Expand Up @@ -172,13 +154,6 @@ func streamCommand(command *cobra.Command, cmdName string) error {
}

switch cmdName {
case task.StreamRestore:
if err = cfg.RestoreConfig.ParseFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}
if err = cfg.ParseStreamRestoreFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}
case task.StreamTruncate:
if err = cfg.ParseStreamTruncateFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
Expand Down
11 changes: 11 additions & 0 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/txnlock"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -446,3 +447,13 @@ func (mgr *Mgr) Close() {

mgr.PdController.Close()
}

// GetTS gets current ts from pd.
func (mgr *Mgr) GetTS(ctx context.Context) (uint64, error) {
p, l, err := mgr.GetPDClient().GetTS(ctx)
if err != nil {
return 0, errors.Trace(err)
}

return oracle.ComposeTS(p, l), nil
}
8 changes: 4 additions & 4 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ type Client struct {
// restoreTs is used for kv file restore.
// TiKV will filter the key space larger than this ts.
restoreTs uint64
// currentTS is used for rewrite meta kv when restore stream.
// currentTs is used for rewrite meta kv when restore stream.
// Can not use `restoreTS` directly, because schema created in `full backup` maybe is new than `restoreTS`.
currentTS uint64
currentTs uint64

storage storage.ExternalStorage
}
Expand Down Expand Up @@ -278,7 +278,7 @@ func (rc *Client) SetRestoreTs(ts uint64) {
}

func (rc *Client) SetCurrentTS(ts uint64) {
rc.currentTS = ts
rc.currentTs = ts
}

func (rc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, opts *storage.ExternalStorageOptions) error {
Expand Down Expand Up @@ -1728,7 +1728,7 @@ func (rc *Client) InitSchemasReplaceForDDL(
}()...)
}

return stream.NewSchemasReplace(dbMap, rc.currentTS, tableFilter, rc.GenGlobalID, rc.GenGlobalIDs), nil
return stream.NewSchemasReplace(dbMap, rc.currentTs, tableFilter, rc.GenGlobalID, rc.GenGlobalIDs), nil
}

// RestoreMetaKVFiles tries to restore files about meta kv-event from stream-backup.
Expand Down
47 changes: 46 additions & 1 deletion br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package task

import (
"context"
"github.com/spf13/cobra"
"time"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -44,6 +45,11 @@ const (
// current only support STRICT or IGNORE, the default is STRICT according to tidb.
FlagWithPlacementPolicy = "with-tidb-placement-mode"

// FlagStreamRestoreTS is used for log restore.
FlagStreamRestoreTS = "restore-ts"
// FlagStreamFullBackupStorage is used for log restore, represents the full backup storage.
FlagStreamFullBackupStorage = "full-backup-storage"

defaultRestoreConcurrency = 128
maxRestoreBatchSizeLimit = 10240
defaultPDConcurrency = 1
Expand All @@ -55,6 +61,7 @@ const (
FullRestoreCmd = "Full Restore"
DBRestoreCmd = "DataBase Restore"
TableRestoreCmd = "Table Restore"
LogRestoreCmd = "Log Restore"
RawRestoreCmd = "Raw Restore"
)

Expand Down Expand Up @@ -132,6 +139,12 @@ type RestoreConfig struct {
DdlBatchSize uint `json:"ddl-batch-size" toml:"ddl-batch-size"`

WithPlacementPolicy string `json:"with-tidb-placement-mode" toml:"with-tidb-placement-mode"`

// FullBackupStorage is used to find the maps between table name and table id during restoration.
// if not specified. we cannot apply kv directly.
FullBackupStorage string `json:"full-backup-storage" toml:"full-backup-storage"`

RestoreTS uint64 `json:"restore-ts" toml:"restore-ts"`
}

// DefineRestoreFlags defines common flags for the restore tidb command.
Expand All @@ -144,6 +157,14 @@ func DefineRestoreFlags(flags *pflag.FlagSet) {
DefineRestoreCommonFlags(flags)
}

// DefineStreamRestoreFlags defines for the restore log command.
func DefineStreamRestoreFlags(command *cobra.Command) {
command.Flags().String(FlagStreamRestoreTS, "", "the point of restore, used for log restore.\n"+
"support TSO or datetime, e.g. '400036290571534337' or '2018-05-11 01:42:23'")
command.Flags().String(FlagStreamFullBackupStorage, "", "specify the backup full storage. "+
"fill it if want restore full backup before restore log.")
}

// ParseFromFlags parses the restore-related flags from the flag set.
func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error {
var err error
Expand Down Expand Up @@ -180,6 +201,16 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error {
if err != nil {
return errors.Annotatef(err, "failed to get flag %s", FlagWithPlacementPolicy)
}
tsString, err := flags.GetString(FlagStreamRestoreTS)
if err != nil {
return errors.Trace(err)
}
if cfg.RestoreTS, err = ParseTSString(tsString); err != nil {
return errors.Trace(err)
}
if cfg.FullBackupStorage, err = flags.GetString(FlagStreamFullBackupStorage); err != nil {
return errors.Trace(err)
}
return nil
}

Expand Down Expand Up @@ -208,6 +239,12 @@ func (cfg *RestoreConfig) adjustRestoreConfig() {
}
}

func (cfg *RestoreConfig) adjustRestoreConfigForStreamRestore() {
if cfg.Config.Concurrency == 0 {
cfg.Config.Concurrency = 32
}
}

func configureRestoreClient(ctx context.Context, client *restore.Client, cfg *RestoreConfig) error {
client.SetRateLimit(cfg.RateLimit)
client.SetCrypter(&cfg.CipherInfo)
Expand Down Expand Up @@ -269,10 +306,18 @@ func isFullRestore(cmdName string) bool {
return cmdName == FullRestoreCmd
}

func isStreamRestore(cmdName string) bool {
return cmdName == LogRestoreCmd
}

// RunRestore starts a restore task inside the current goroutine.
func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error {
cfg.adjustRestoreConfig()
if isStreamRestore(cmdName) {
cfg.adjustRestoreConfigForStreamRestore()
return RunStreamRestore(c, g, cmdName, cfg)
}

cfg.adjustRestoreConfig()
defer summary.Summary(cmdName)
ctx, cancel := context.WithCancel(c)
defer cancel()
Expand Down
Loading

0 comments on commit 2a71e2d

Please sign in to comment.