Skip to content

Commit

Permalink
br/operator: added some commands for managing migrations (#56857)
Browse files Browse the repository at this point in the history
close #56760
  • Loading branch information
YuJuncen authored Nov 4, 2024
1 parent 3651427 commit 1de6f3e
Show file tree
Hide file tree
Showing 7 changed files with 426 additions and 1 deletion.
57 changes: 57 additions & 0 deletions br/cmd/br/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ func newOperatorCommand() *cobra.Command {
cmd.AddCommand(newPrepareForSnapshotBackupCommand(
"prepare-for-snapshot-backup",
"pause gc, schedulers and importing until the program exits, for snapshot backup."))
cmd.AddCommand(newBase64ifyCommand())
cmd.AddCommand(newListMigrationsCommand())
cmd.AddCommand(newMigrateToCommand())
return cmd
}

Expand All @@ -52,3 +55,57 @@ func newPrepareForSnapshotBackupCommand(use string, short string) *cobra.Command
operator.DefineFlagsForPrepareSnapBackup(cmd.Flags())
return cmd
}

func newBase64ifyCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "base64ify [-r] -s <storage>",
Short: "generate base64 for a storage. this may be passed to `tikv-ctl compact-log-backup`.",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
cfg := operator.Base64ifyConfig{}
if err := cfg.ParseFromFlags(cmd.Flags()); err != nil {
return err
}
ctx := GetDefaultContext()
return operator.Base64ify(ctx, cfg)
},
}
operator.DefineFlagsForBase64ifyConfig(cmd.Flags())
return cmd
}

func newListMigrationsCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "list-migrations",
Short: "list all migrations",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
cfg := operator.ListMigrationConfig{}
if err := cfg.ParseFromFlags(cmd.Flags()); err != nil {
return err
}
ctx := GetDefaultContext()
return operator.RunListMigrations(ctx, cfg)
},
}
operator.DefineFlagsForListMigrationConfig(cmd.Flags())
return cmd
}

func newMigrateToCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "migrate-to",
Short: "migrate to a specific version",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
cfg := operator.MigrateToConfig{}
if err := cfg.ParseFromFlags(cmd.Flags()); err != nil {
return err
}
ctx := GetDefaultContext()
return operator.RunMigrateTo(ctx, cfg)
},
}
operator.DefineFlagsForMigrateToConfig(cmd.Flags())
return cmd
}
10 changes: 9 additions & 1 deletion br/pkg/task/operator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,28 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "operator",
srcs = [
"cmd.go",
"base64ify.go",
"config.go",
"list_migration.go",
"migrate_to.go",
"prepare_snap.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/task/operator",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/backup/prepare_snap",
"//br/pkg/errors",
"//br/pkg/glue",
"//br/pkg/logutil",
"//br/pkg/pdutil",
"//br/pkg/storage",
"//br/pkg/stream",
"//br/pkg/task",
"//br/pkg/utils",
"@com_github_fatih_color//:color",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_log//:log",
"@com_github_spf13_pflag//:pflag",
"@com_github_tikv_client_go_v2//tikv",
Expand Down
38 changes: 38 additions & 0 deletions br/pkg/task/operator/base64ify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package operator

import (
"context"
"encoding/base64"
"fmt"
"os"

"github.com/fatih/color"
"github.com/pingcap/tidb/br/pkg/storage"
)

func Base64ify(ctx context.Context, cfg Base64ifyConfig) error {
return runEncode(ctx, cfg) // Assuming runEncode will be similarly modified to accept Base64ifyConfig
}

func runEncode(ctx context.Context, cfg Base64ifyConfig) error {
s, err := storage.ParseBackend(cfg.StorageURI, &cfg.BackendOptions)
if err != nil {
return err
}
if cfg.LoadCerd {
_, err := storage.New(ctx, s, &storage.ExternalStorageOptions{
SendCredentials: true,
})
if err != nil {
return err
}
fmt.Fprintln(os.Stderr, color.HiRedString("Credientials are encoded to the base64 string. DON'T share this with untrusted people!"))
}

sBytes, err := s.Marshal()
if err != nil {
return err
}
fmt.Println(base64.StdEncoding.EncodeToString(sBytes))
return nil
}
138 changes: 138 additions & 0 deletions br/pkg/task/operator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ package operator
import (
"time"

"github.com/pingcap/errors"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/task"
"github.com/spf13/pflag"
)
Expand Down Expand Up @@ -42,3 +45,138 @@ func (cfg *PauseGcConfig) ParseFromFlags(flags *pflag.FlagSet) error {

return nil
}

type Base64ifyConfig struct {
storage.BackendOptions
StorageURI string
LoadCerd bool
}

func DefineFlagsForBase64ifyConfig(flags *pflag.FlagSet) {
storage.DefineFlags(flags)
flags.StringP("storage", "s", "", "The external storage input.")
flags.Bool("load-creds", false, "whether loading the credientials from current environment and marshal them to the base64 string. [!]")
}

func (cfg *Base64ifyConfig) ParseFromFlags(flags *pflag.FlagSet) error {
var err error
err = cfg.BackendOptions.ParseFromFlags(flags)
if err != nil {
return err
}
cfg.StorageURI, err = flags.GetString("storage")
if err != nil {
return err
}
cfg.LoadCerd, err = flags.GetBool("load-creds")
if err != nil {
return err
}
return nil
}

type ListMigrationConfig struct {
storage.BackendOptions
StorageURI string
JSONOutput bool
}

func DefineFlagsForListMigrationConfig(flags *pflag.FlagSet) {
storage.DefineFlags(flags)
flags.StringP("storage", "s", "", "the external storage input.")
flags.Bool("json", false, "output the result in json format.")
}

func (cfg *ListMigrationConfig) ParseFromFlags(flags *pflag.FlagSet) error {
var err error
err = cfg.BackendOptions.ParseFromFlags(flags)
if err != nil {
return err
}
cfg.StorageURI, err = flags.GetString("storage")
if err != nil {
return err
}
cfg.JSONOutput, err = flags.GetBool("json")
if err != nil {
return err
}
return nil
}

type MigrateToConfig struct {
storage.BackendOptions
StorageURI string
Recent bool
MigrateTo int
Base bool

Yes bool
DryRun bool
}

const (
flagStorage = "storage"
flagRecent = "recent"
flagTo = "to"
flagBase = "base"
flagYes = "yes"
flagDryRun = "dry-run"
)

func DefineFlagsForMigrateToConfig(flags *pflag.FlagSet) {
storage.DefineFlags(flags)
flags.StringP(flagStorage, "s", "", "the external storage input.")
flags.Bool(flagRecent, true, "migrate to the most recent migration and BASE.")
flags.Int(flagTo, 0, "migrate all migrations from the BASE to the specified sequence number.")
flags.Bool(flagBase, false, "don't merge any migrations, just retry run pending operations in BASE.")
flags.BoolP(flagYes, "y", false, "skip all effect estimating and confirming. execute directly.")
flags.Bool(flagDryRun, false, "do not actually perform the migration, just print the effect.")
}

func (cfg *MigrateToConfig) ParseFromFlags(flags *pflag.FlagSet) error {
var err error
err = cfg.BackendOptions.ParseFromFlags(flags)
if err != nil {
return err
}
cfg.StorageURI, err = flags.GetString(flagStorage)
if err != nil {
return err
}
cfg.Recent, err = flags.GetBool(flagRecent)
if err != nil {
return err
}
cfg.MigrateTo, err = flags.GetInt(flagTo)
if err != nil {
return err
}
cfg.Base, err = flags.GetBool(flagBase)
if err != nil {
return err
}
cfg.Yes, err = flags.GetBool(flagYes)
if err != nil {
return err
}
cfg.DryRun, err = flags.GetBool(flagDryRun)
if err != nil {
return err
}
return nil
}

func (cfg *MigrateToConfig) Verify() error {
if cfg.Recent && cfg.MigrateTo != 0 {
return errors.Annotatef(berrors.ErrInvalidArgument,
"the --%s and --%s flag cannot be used at the same time",
flagRecent, flagTo)
}
if cfg.Base && (cfg.Recent || cfg.MigrateTo != 0) {
return errors.Annotatef(berrors.ErrInvalidArgument,
"the --%s and ( --%s or --%s ) flag cannot be used at the same time",
flagBase, flagTo, flagRecent)
}
return nil
}
53 changes: 53 additions & 0 deletions br/pkg/task/operator/list_migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package operator

import (
"context"
"encoding/json"
"fmt"
"os"

"github.com/fatih/color"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/stream"
)

// statusOK make a string like <green>●</green> <bold>{message}</bold>
func statusOK(message string) string {
return color.GreenString("●") + color.New(color.Bold).Sprintf(" %s", message)
}

func RunListMigrations(ctx context.Context, cfg ListMigrationConfig) error {
backend, err := storage.ParseBackend(cfg.StorageURI, &cfg.BackendOptions)
if err != nil {
return err
}
st, err := storage.Create(ctx, backend, false)
if err != nil {
return err
}
ext := stream.MigerationExtension(st)
migs, err := ext.Load(ctx)
if err != nil {
return err
}
if cfg.JSONOutput {
if err := json.NewEncoder(os.Stdout).Encode(migs); err != nil {
return err
}
} else {
console := glue.ConsoleOperations{ConsoleGlue: glue.StdIOGlue{}}
console.Println(statusOK(fmt.Sprintf("Total %d Migrations.", len(migs.Layers)+1)))
console.Printf("> BASE <\n")
tbl := console.CreateTable()
stream.AddMigrationToTable(migs.Base, tbl)
tbl.Print()
for _, t := range migs.Layers {
console.Printf("> %08d <\n", t.SeqNum)
tbl := console.CreateTable()
stream.AddMigrationToTable(&t.Content, tbl)
tbl.Print()
}
}
return nil
}
Loading

0 comments on commit 1de6f3e

Please sign in to comment.