From d20a564b66f6297d0ee8a20b440f095d496d402e Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 23 May 2023 16:29:38 +0800 Subject: [PATCH] br/cmd: implement the `operator pause-gc-and-schedulers` command (#43562) (#43925) ref pingcap/tidb#43559 --- br/cmd/br/BUILD.bazel | 2 + br/cmd/br/main.go | 1 + br/cmd/br/operator.go | 49 +++++++++++++++ br/pkg/pdutil/pd.go | 3 +- br/pkg/task/common.go | 9 +++ br/pkg/task/operator/BUILD.bazel | 21 +++++++ br/pkg/task/operator/cmd.go | 103 +++++++++++++++++++++++++++++++ br/pkg/task/operator/config.go | 41 ++++++++++++ 8 files changed, 228 insertions(+), 1 deletion(-) create mode 100644 br/cmd/br/operator.go create mode 100644 br/pkg/task/operator/BUILD.bazel create mode 100644 br/pkg/task/operator/cmd.go create mode 100644 br/pkg/task/operator/config.go diff --git a/br/cmd/br/BUILD.bazel b/br/cmd/br/BUILD.bazel index 7202ee66a0419..e2921d867640b 100644 --- a/br/cmd/br/BUILD.bazel +++ b/br/cmd/br/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "cmd.go", "debug.go", "main.go", + "operator.go", "restore.go", "stream.go", ], @@ -26,6 +27,7 @@ go_library( "//br/pkg/streamhelper/config", "//br/pkg/summary", "//br/pkg/task", + "//br/pkg/task/operator", "//br/pkg/trace", "//br/pkg/utils", "//br/pkg/version/build", diff --git a/br/cmd/br/main.go b/br/cmd/br/main.go index 29944fa9e2691..d70d9425e0653 100644 --- a/br/cmd/br/main.go +++ b/br/cmd/br/main.go @@ -49,6 +49,7 @@ func main() { NewBackupCommand(), NewRestoreCommand(), NewStreamCommand(), + newOpeartorCommand(), ) // Outputs cmd.Print to stdout. rootCmd.SetOut(os.Stdout) diff --git a/br/cmd/br/operator.go b/br/cmd/br/operator.go new file mode 100644 index 0000000000000..57ed59b224d06 --- /dev/null +++ b/br/cmd/br/operator.go @@ -0,0 +1,49 @@ +// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0. + +package main + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/task" + "github.com/pingcap/tidb/br/pkg/task/operator" + "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/br/pkg/version/build" + "github.com/spf13/cobra" +) + +func newOpeartorCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "operator ", + Short: "utilities for operators like tidb-operator.", + PersistentPreRunE: func(c *cobra.Command, args []string) error { + if err := Init(c); err != nil { + return errors.Trace(err) + } + build.LogInfo(build.BR) + utils.LogEnvVariables() + task.LogArguments(c) + return nil + }, + Hidden: true, + } + cmd.AddCommand(newPauseGcCommand()) + return cmd +} + +func newPauseGcCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "pause-gc-and-schedulers", + Short: "pause gc and schedulers to the ts until the program exits.", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + cfg := operator.PauseGcConfig{} + if err := cfg.ParseFromFlags(cmd.Flags()); err != nil { + return err + } + ctx := GetDefaultContext() + return operator.PauseGCAndScheduler(ctx, &cfg) + }, + } + operator.DefineFlagsForPauseGcConfig(cmd.Flags()) + return cmd +} diff --git a/br/pkg/pdutil/pd.go b/br/pkg/pdutil/pd.go index f10425dbf203a..31a51f4a88581 100644 --- a/br/pkg/pdutil/pd.go +++ b/br/pkg/pdutil/pd.go @@ -254,7 +254,8 @@ func NewPdController( } } if failure != nil { - return nil, errors.Annotatef(berrors.ErrPDUpdateFailed, "pd address (%s) not available, please check network", pdAddrs) + return nil, errors.Annotatef(berrors.ErrPDUpdateFailed, + "pd address (%s) not available, error is %s, please check network", pdAddrs, failure) } version := parseVersion(versionBytes) diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 57e498f1205c4..3158d3790d23f 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -139,6 +139,15 @@ func (tls *TLSConfig) ToTLSConfig() (*tls.Config, error) { return tlsConfig, nil } +// Convert the TLS config to the PD security option. +func (tls *TLSConfig) ToPDSecurityOption() pd.SecurityOption { + securityOption := pd.SecurityOption{} + securityOption.CAPath = tls.CA + securityOption.CertPath = tls.Cert + securityOption.KeyPath = tls.Key + return securityOption +} + // ParseFromFlags parses the TLS config from the flag set. func (tls *TLSConfig) ParseFromFlags(flags *pflag.FlagSet) (err error) { tls.CA, tls.Cert, tls.Key, err = ParseTLSTripleFromFlags(flags) diff --git a/br/pkg/task/operator/BUILD.bazel b/br/pkg/task/operator/BUILD.bazel new file mode 100644 index 0000000000000..a291d68df5b12 --- /dev/null +++ b/br/pkg/task/operator/BUILD.bazel @@ -0,0 +1,21 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "operator", + srcs = [ + "cmd.go", + "config.go", + ], + importpath = "github.com/pingcap/tidb/br/pkg/task/operator", + visibility = ["//visibility:public"], + deps = [ + "//br/pkg/logutil", + "//br/pkg/pdutil", + "//br/pkg/task", + "//br/pkg/utils", + "@com_github_pingcap_log//:log", + "@com_github_spf13_pflag//:pflag", + "@org_golang_x_sync//errgroup", + "@org_uber_go_zap//:zap", + ], +) diff --git a/br/pkg/task/operator/cmd.go b/br/pkg/task/operator/cmd.go new file mode 100644 index 0000000000000..95c922b1c19cf --- /dev/null +++ b/br/pkg/task/operator/cmd.go @@ -0,0 +1,103 @@ +// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0. + +package operator + +import ( + "context" + "crypto/tls" + "strings" + "time" + + "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/logutil" + "github.com/pingcap/tidb/br/pkg/pdutil" + "github.com/pingcap/tidb/br/pkg/task" + "github.com/pingcap/tidb/br/pkg/utils" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error) { + pdAddrs := strings.Join(cfg.PD, ",") + var tc *tls.Config + if cfg.TLS.IsEnabled() { + var err error + tc, err = cfg.TLS.ToTLSConfig() + if err != nil { + return nil, err + } + } + mgr, err := pdutil.NewPdController(ctx, pdAddrs, tc, cfg.TLS.ToPDSecurityOption()) + if err != nil { + return nil, err + } + return mgr, nil +} + +func cleanUpWith(f func(ctx context.Context)) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + f(ctx) +} + +// PauseGCAndScheduler blocks the current goroutine and pause the GC safepoint and remove the scheduler by the config. +// This function will block until the context being canceled. +func PauseGCAndScheduler(ctx context.Context, cfg *PauseGcConfig) error { + mgr, err := dialPD(ctx, &cfg.Config) + if err != nil { + return err + } + + eg, ectx := errgroup.WithContext(ctx) + + eg.Go(func() error { return pauseGCKeeper(ectx, cfg, mgr) }) + eg.Go(func() error { return pauseSchedulerKeeper(ectx, mgr) }) + + return eg.Wait() +} + +func pauseGCKeeper(ctx context.Context, cfg *PauseGcConfig, ctl *pdutil.PdController) error { + // Note: should we remove the service safepoint as soon as this exits? + sp := utils.BRServiceSafePoint{ + ID: utils.MakeSafePointID(), + TTL: int64(cfg.TTL.Seconds()), + BackupTS: cfg.SafePoint, + } + if sp.BackupTS == 0 { + rts, err := ctl.GetMinResolvedTS(ctx) + if err != nil { + return err + } + log.Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts)) + sp.BackupTS = rts + } + err := utils.StartServiceSafePointKeeper(ctx, ctl.GetPDClient(), sp) + if err != nil { + return err + } + log.Info("GC is paused.", zap.Object("safepoint", sp)) + // Note: in fact we can directly return here. + // But the name `keeper` implies once the function exits, + // the GC should be resume, so let's block here. + <-ctx.Done() + return nil +} + +func pauseSchedulerKeeper(ctx context.Context, ctl *pdutil.PdController) error { + undo, err := ctl.RemoveAllPDSchedulers(ctx) + if undo != nil { + defer cleanUpWith(func(ctx context.Context) { + if err := undo(ctx); err != nil { + log.Warn("failed to restore pd scheduler.", logutil.ShortError(err)) + } + }) + } + if err != nil { + return err + } + log.Info("Schedulers are paused.") + // Wait until the context canceled. + // So we can properly do the clean up work. + <-ctx.Done() + return nil +} diff --git a/br/pkg/task/operator/config.go b/br/pkg/task/operator/config.go new file mode 100644 index 0000000000000..eb7e12a49af56 --- /dev/null +++ b/br/pkg/task/operator/config.go @@ -0,0 +1,41 @@ +// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0. + +package operator + +import ( + "time" + + "github.com/pingcap/tidb/br/pkg/task" + "github.com/spf13/pflag" +) + +type PauseGcConfig struct { + task.Config + + SafePoint uint64 `json:"safepoint" yaml:"safepoint"` + TTL time.Duration `json:"ttl" yaml:"ttl"` +} + +func DefineFlagsForPauseGcConfig(f *pflag.FlagSet) { + _ = f.DurationP("ttl", "i", 5*time.Minute, "The time-to-live of the safepoint.") + _ = f.Uint64P("safepoint", "t", 0, "The GC safepoint to be kept.") +} + +// ParseFromFlags fills the config via the flags. +func (cfg *PauseGcConfig) ParseFromFlags(flags *pflag.FlagSet) error { + if err := cfg.Config.ParseFromFlags(flags); err != nil { + return err + } + + var err error + cfg.SafePoint, err = flags.GetUint64("safepoint") + if err != nil { + return err + } + cfg.TTL, err = flags.GetDuration("ttl") + if err != nil { + return err + } + + return nil +}