Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: refine cmd stream restore to restore log #33734

Merged
merged 10 commits into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions br/cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func NewRestoreCommand() *cobra.Command {
newDBRestoreCommand(),
newTableRestoreCommand(),
newRawRestoreCommand(),
newStreamRestoreCommand(),
)
task.DefineRestoreFlags(command.PersistentFlags())

Expand Down Expand Up @@ -141,3 +142,17 @@ func newRawRestoreCommand() *cobra.Command {
task.DefineRawRestoreFlags(command)
return command
}

func newStreamRestoreCommand() *cobra.Command {
command := &cobra.Command{
Use: "log",
Short: "restore log backups",
Args: cobra.NoArgs,
RunE: func(command *cobra.Command, _ []string) error {
return runRestoreCommand(command, task.LogRestoreCmd)
},
}
task.DefineFilterFlags(command, filterOutSysAndMemTables, false)
task.DefineStreamRestoreFlags(command)
return command
}
32 changes: 6 additions & 26 deletions br/cmd/br/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,19 @@ func NewStreamCommand() *cobra.Command {
return nil
},
}

command.AddCommand(
newStreamStartCommand(),
newStreamStopCommand(),
//newStreamPauseCommand(),
//newStreamResumeCommand(),
newStreamStatusCommand(),
newStreamRestoreCommand(),
newStreamTruncateCommand(),
)
command.SetHelpFunc(func(command *cobra.Command, strings []string) {
task.HiddenFlagsForStream(command.Parent().PersistentFlags())
command.Parent().HelpFunc()(command, strings)
})

return command
}
Expand All @@ -65,7 +69,7 @@ func newStreamStartCommand() *cobra.Command {

task.DefineStreamCommonFlags(command.Flags())
task.DefineFilterFlags(command, acceptAllTables, true)
task.DefineStreamStartFlags(command.PersistentFlags())
task.DefineStreamStartFlags(command.Flags())
return command
}

Expand Down Expand Up @@ -127,23 +131,6 @@ func newStreamStatusCommand() *cobra.Command {
return command
}

// TODO maybe we should use `br restore stream` rather than `br stream restore`
// because the restore and stream task has no common flags.
func newStreamRestoreCommand() *cobra.Command {
command := &cobra.Command{
Use: "restore",
Short: "restore a stream backups",
Args: cobra.NoArgs,
RunE: func(command *cobra.Command, _ []string) error {
return streamCommand(command, task.StreamRestore)
},
}
task.DefineRestoreFlags(command.PersistentFlags())
task.DefineFilterFlags(command, filterOutSysAndMemTables, false)
task.DefineStreamRestoreFlags(command.Flags())
return command
}

func newStreamTruncateCommand() *cobra.Command {
command := &cobra.Command{
Use: "truncate",
Expand Down Expand Up @@ -172,13 +159,6 @@ func streamCommand(command *cobra.Command, cmdName string) error {
}

switch cmdName {
case task.StreamRestore:
if err = cfg.RestoreConfig.ParseFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}
if err = cfg.ParseStreamRestoreFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}
case task.StreamTruncate:
if err = cfg.ParseStreamTruncateFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
Expand Down
11 changes: 11 additions & 0 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/txnlock"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -446,3 +447,13 @@ func (mgr *Mgr) Close() {

mgr.PdController.Close()
}

// GetTS gets current ts from pd.
func (mgr *Mgr) GetTS(ctx context.Context) (uint64, error) {
p, l, err := mgr.GetPDClient().GetTS(ctx)
if err != nil {
return 0, errors.Trace(err)
}

return oracle.ComposeTS(p, l), nil
}
8 changes: 4 additions & 4 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ type Client struct {
// restoreTs is used for kv file restore.
// TiKV will filter the key space larger than this ts.
restoreTs uint64
// currentTS is used for rewrite meta kv when restore stream.
// currentTs is used for rewrite meta kv when restore stream.
// Can not use `restoreTS` directly, because schema created in `full backup` maybe is new than `restoreTS`.
currentTS uint64
currentTs uint64

storage storage.ExternalStorage
}
Expand Down Expand Up @@ -278,7 +278,7 @@ func (rc *Client) SetRestoreTs(ts uint64) {
}

func (rc *Client) SetCurrentTS(ts uint64) {
rc.currentTS = ts
rc.currentTs = ts
}

func (rc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, opts *storage.ExternalStorageOptions) error {
Expand Down Expand Up @@ -1728,7 +1728,7 @@ func (rc *Client) InitSchemasReplaceForDDL(
}()...)
}

return stream.NewSchemasReplace(dbMap, rc.currentTS, tableFilter, rc.GenGlobalID, rc.GenGlobalIDs), nil
return stream.NewSchemasReplace(dbMap, rc.currentTs, tableFilter, rc.GenGlobalID, rc.GenGlobalIDs), nil
}

// RestoreMetaKVFiles tries to restore files about meta kv-event from stream-backup.
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/storage/azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ func defineAzblobFlags(flags *pflag.FlagSet) {
flags.String(azblobAccountKey, "", "Specify the account key for azblob")
}

func hiddenAzblobFlags(flags *pflag.FlagSet) {
_ = flags.MarkHidden(azblobEndpointOption)
_ = flags.MarkHidden(azblobAccessTierOption)
_ = flags.MarkHidden(azblobAccountName)
_ = flags.MarkHidden(azblobAccountKey)
}

func (options *AzblobBackendOptions) parseFromFlags(flags *pflag.FlagSet) error {
var err error
options.Endpoint, err = flags.GetString(azblobEndpointOption)
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/storage/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ func DefineFlags(flags *pflag.FlagSet) {
defineAzblobFlags(flags)
}

// HiddenFlagsForStream hidden flags for stream cmd.
func HiddenFlagsForStream(flags *pflag.FlagSet) {
hiddenGCSFlags(flags)
hiddenAzblobFlags(flags)
}

// ParseFromFlags obtains the backend options from the flag set.
func (options *BackendOptions) ParseFromFlags(flags *pflag.FlagSet) error {
if err := options.S3.parseFromFlags(flags); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ func defineGCSFlags(flags *pflag.FlagSet) {
flags.String(gcsCredentialsFile, "", "(experimental) Set the GCS credentials file path")
}

func hiddenGCSFlags(flags *pflag.FlagSet) {
_ = flags.MarkHidden(gcsEndpointOption)
_ = flags.MarkHidden(gcsStorageClassOption)
_ = flags.MarkHidden(gcsPredefinedACL)
_ = flags.MarkHidden(gcsCredentialsFile)
}

func (options *GCSBackendOptions) parseFromFlags(flags *pflag.FlagSet) error {
var err error
options.Endpoint, err = flags.GetString(gcsEndpointOption)
Expand Down
15 changes: 15 additions & 0 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,21 @@ func DefineCommonFlags(flags *pflag.FlagSet) {
storage.DefineFlags(flags)
}

// HiddenFlagsForStream temporary hidden flags that stream cmd not support.
func HiddenFlagsForStream(flags *pflag.FlagSet) {
_ = flags.MarkHidden(flagChecksum)
_ = flags.MarkHidden(flagChecksumConcurrency)
_ = flags.MarkHidden(flagRateLimit)
_ = flags.MarkHidden(flagRateLimitUnit)
_ = flags.MarkHidden(flagRemoveTiFlash)
_ = flags.MarkHidden(flagCipherType)
_ = flags.MarkHidden(flagCipherKey)
_ = flags.MarkHidden(flagCipherKeyFile)
_ = flags.MarkHidden(flagSwitchModeInterval)

storage.HiddenFlagsForStream(flags)
}

// DefineDatabaseFlags defines the required --db flag for `db` subcommand.
func DefineDatabaseFlags(command *cobra.Command) {
command.Flags().String(flagDatabase, "", "database name")
Expand Down
47 changes: 46 additions & 1 deletion br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/config"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand All @@ -44,6 +45,11 @@ const (
// current only support STRICT or IGNORE, the default is STRICT according to tidb.
FlagWithPlacementPolicy = "with-tidb-placement-mode"

// FlagStreamRestoreTS is used for log restore.
FlagStreamRestoreTS = "restore-ts"
// FlagStreamFullBackupStorage is used for log restore, represents the full backup storage.
FlagStreamFullBackupStorage = "full-backup-storage"

defaultRestoreConcurrency = 128
maxRestoreBatchSizeLimit = 10240
defaultPDConcurrency = 1
Expand All @@ -55,6 +61,7 @@ const (
FullRestoreCmd = "Full Restore"
DBRestoreCmd = "DataBase Restore"
TableRestoreCmd = "Table Restore"
LogRestoreCmd = "Log Restore"
RawRestoreCmd = "Raw Restore"
)

Expand Down Expand Up @@ -132,6 +139,12 @@ type RestoreConfig struct {
DdlBatchSize uint `json:"ddl-batch-size" toml:"ddl-batch-size"`

WithPlacementPolicy string `json:"with-tidb-placement-mode" toml:"with-tidb-placement-mode"`

// FullBackupStorage is used to run `restore full` before `restore log`.
// if it is empty, directly take restoring log justly.
FullBackupStorage string `json:"full-backup-storage" toml:"full-backup-storage"`

RestoreTS uint64 `json:"restore-ts" toml:"restore-ts"`
}

// DefineRestoreFlags defines common flags for the restore tidb command.
Expand All @@ -144,6 +157,14 @@ func DefineRestoreFlags(flags *pflag.FlagSet) {
DefineRestoreCommonFlags(flags)
}

// DefineStreamRestoreFlags defines for the restore log command.
func DefineStreamRestoreFlags(command *cobra.Command) {
command.Flags().String(FlagStreamRestoreTS, "", "the point of restore, used for log restore.\n"+
"support TSO or datetime, e.g. '400036290571534337' or '2018-05-11 01:42:23'")
command.Flags().String(FlagStreamFullBackupStorage, "", "specify the backup full storage. "+
"fill it if want restore full backup before restore log.")
}

// ParseFromFlags parses the restore-related flags from the flag set.
func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error {
var err error
Expand Down Expand Up @@ -180,6 +201,16 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error {
if err != nil {
return errors.Annotatef(err, "failed to get flag %s", FlagWithPlacementPolicy)
}
tsString, err := flags.GetString(FlagStreamRestoreTS)
if err != nil {
return errors.Trace(err)
}
if cfg.RestoreTS, err = ParseTSString(tsString); err != nil {
return errors.Trace(err)
}
if cfg.FullBackupStorage, err = flags.GetString(FlagStreamFullBackupStorage); err != nil {
return errors.Trace(err)
}
return nil
}

Expand Down Expand Up @@ -208,6 +239,12 @@ func (cfg *RestoreConfig) adjustRestoreConfig() {
}
}

func (cfg *RestoreConfig) adjustRestoreConfigForStreamRestore() {
if cfg.Config.Concurrency == 0 {
cfg.Config.Concurrency = 32
}
}

func configureRestoreClient(ctx context.Context, client *restore.Client, cfg *RestoreConfig) error {
client.SetRateLimit(cfg.RateLimit)
client.SetCrypter(&cfg.CipherInfo)
Expand Down Expand Up @@ -269,10 +306,18 @@ func isFullRestore(cmdName string) bool {
return cmdName == FullRestoreCmd
}

func isStreamRestore(cmdName string) bool {
return cmdName == LogRestoreCmd
}

// RunRestore starts a restore task inside the current goroutine.
func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error {
cfg.adjustRestoreConfig()
if isStreamRestore(cmdName) {
cfg.adjustRestoreConfigForStreamRestore()
return RunStreamRestore(c, g, cmdName, cfg)
}

cfg.adjustRestoreConfig()
defer summary.Summary(cmdName)
ctx, cancel := context.WithCancel(c)
defer cancel()
Expand Down
Loading