From 9a2f7c96c7611e122b885632500761d1c3746f5d Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 5 May 2023 17:08:09 +0800 Subject: [PATCH 1/6] cherry-pick Signed-off-by: hillium --- br/cmd/br/BUILD.bazel | 2 ++ br/cmd/br/main.go | 1 + br/cmd/br/operator.go | 47 +++++++++++++++++++++++++++ br/pkg/pdutil/pd.go | 2 +- br/pkg/task/common.go | 8 +++++ br/pkg/task/operator/BUILD.bazel | 19 +++++++++++ br/pkg/task/operator/cmd.go | 56 ++++++++++++++++++++++++++++++++ br/pkg/task/operator/config.go | 30 +++++++++++++++++ 8 files changed, 164 insertions(+), 1 deletion(-) create mode 100644 br/cmd/br/operator.go create mode 100644 br/pkg/task/operator/BUILD.bazel create mode 100644 br/pkg/task/operator/cmd.go create mode 100644 br/pkg/task/operator/config.go diff --git a/br/cmd/br/BUILD.bazel b/br/cmd/br/BUILD.bazel index a95909dc84f03..1c20553569792 100644 --- a/br/cmd/br/BUILD.bazel +++ b/br/cmd/br/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "cmd.go", "debug.go", "main.go", + "operator.go", "restore.go", "stream.go", ], @@ -26,6 +27,7 @@ go_library( "//br/pkg/streamhelper/config", "//br/pkg/summary", "//br/pkg/task", + "//br/pkg/task/operator", "//br/pkg/trace", "//br/pkg/utils", "//br/pkg/version/build", diff --git a/br/cmd/br/main.go b/br/cmd/br/main.go index 29944fa9e2691..d70d9425e0653 100644 --- a/br/cmd/br/main.go +++ b/br/cmd/br/main.go @@ -49,6 +49,7 @@ func main() { NewBackupCommand(), NewRestoreCommand(), NewStreamCommand(), + newOpeartorCommand(), ) // Outputs cmd.Print to stdout. rootCmd.SetOut(os.Stdout) diff --git a/br/cmd/br/operator.go b/br/cmd/br/operator.go new file mode 100644 index 0000000000000..7967921c375cf --- /dev/null +++ b/br/cmd/br/operator.go @@ -0,0 +1,47 @@ +// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0. + +package main + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/task" + "github.com/pingcap/tidb/br/pkg/task/operator" + "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/br/pkg/version/build" + "github.com/spf13/cobra" +) + +func newOpeartorCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "operator ", + Short: "utilities for operators like tidb-operator.", + PersistentPreRunE: func(c *cobra.Command, args []string) error { + if err := Init(c); err != nil { + return errors.Trace(err) + } + build.LogInfo(build.BR) + utils.LogEnvVariables() + task.LogArguments(c) + return nil + }, + Hidden: true, + } + cmd.AddCommand(newPauseGcCommand()) + return cmd +} + +func newPauseGcCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "pause-gc", + Short: "pause gc to the ts until the program exits.", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + cfg := operator.PauseGcConfig{} + cfg.ParseFromFlags(cmd.Flags()) + ctx := GetDefaultContext() + return operator.PauseGC(ctx, &cfg) + }, + } + operator.DefineFlagsForPauseGcConfig(cmd.Flags()) + return cmd +} diff --git a/br/pkg/pdutil/pd.go b/br/pkg/pdutil/pd.go index c0546a1801843..14dac11b19e79 100644 --- a/br/pkg/pdutil/pd.go +++ b/br/pkg/pdutil/pd.go @@ -277,7 +277,7 @@ func NewPdController( } if failure != nil { return nil, errors.Annotatef(berrors.ErrPDUpdateFailed, - "pd address (%s) not available, please check network", pdAddrs) + "pd address (%s) not available, error is %s, please check network", pdAddrs, failure) } version := parseVersion(versionBytes) diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 3e1fa25d72840..54dc131b2e9d4 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -138,6 +138,14 @@ func (tls *TLSConfig) ToTLSConfig() (*tls.Config, error) { return tlsConfig, nil } +func (tls *TLSConfig) ToPDSecurityOption() pd.SecurityOption { + securityOption := pd.SecurityOption{} + securityOption.CAPath = tls.CA + securityOption.CertPath = tls.Cert + securityOption.KeyPath = tls.Key + return securityOption +} + // ParseFromFlags parses the TLS config from the flag set. func (tls *TLSConfig) ParseFromFlags(flags *pflag.FlagSet) (err error) { tls.CA, tls.Cert, tls.Key, err = ParseTLSTripleFromFlags(flags) diff --git a/br/pkg/task/operator/BUILD.bazel b/br/pkg/task/operator/BUILD.bazel new file mode 100644 index 0000000000000..c2f1575403aec --- /dev/null +++ b/br/pkg/task/operator/BUILD.bazel @@ -0,0 +1,19 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "operator", + srcs = [ + "cmd.go", + "config.go", + ], + importpath = "github.com/pingcap/tidb/br/pkg/task/operator", + visibility = ["//visibility:public"], + deps = [ + "//br/pkg/pdutil", + "//br/pkg/task", + "//br/pkg/utils", + "@com_github_pingcap_log//:log", + "@com_github_spf13_pflag//:pflag", + "@org_uber_go_zap//:zap", + ], +) diff --git a/br/pkg/task/operator/cmd.go b/br/pkg/task/operator/cmd.go new file mode 100644 index 0000000000000..b191124d5a69a --- /dev/null +++ b/br/pkg/task/operator/cmd.go @@ -0,0 +1,56 @@ +package operator + +import ( + "context" + "crypto/tls" + "strings" + + "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/pdutil" + "github.com/pingcap/tidb/br/pkg/task" + "github.com/pingcap/tidb/br/pkg/utils" + "go.uber.org/zap" +) + +func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error) { + pdAddrs := strings.Join(cfg.PD, ",") + var tc *tls.Config + if cfg.TLS.IsEnabled() { + var err error + tc, err = cfg.TLS.ToTLSConfig() + if err != nil { + return nil, err + } + } + mgr, err := pdutil.NewPdController(ctx, pdAddrs, tc, cfg.TLS.ToPDSecurityOption()) + if err != nil { + return nil, err + } + return mgr, nil +} + +func PauseGC(ctx context.Context, cfg *PauseGcConfig) error { + mgr, err := dialPD(ctx, &cfg.Config) + if err != nil { + return err + } + sp := utils.BRServiceSafePoint{ + ID: utils.MakeSafePointID(), + TTL: int64(cfg.TTL.Seconds()), + BackupTS: cfg.SafePoint, + } + if sp.BackupTS == 0 { + rts, err := mgr.GetMinResolvedTS(ctx) + if err != nil { + return err + } + log.Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts)) + sp.BackupTS = rts + } + err = utils.StartServiceSafePointKeeper(ctx, mgr.GetPDClient(), sp) + if err != nil { + return err + } + <-ctx.Done() + return nil +} diff --git a/br/pkg/task/operator/config.go b/br/pkg/task/operator/config.go new file mode 100644 index 0000000000000..68762160ca5f5 --- /dev/null +++ b/br/pkg/task/operator/config.go @@ -0,0 +1,30 @@ +// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0. + +package operator + +import ( + "time" + + "github.com/pingcap/tidb/br/pkg/task" + "github.com/pingcap/tidb/br/pkg/utils" + "github.com/spf13/pflag" +) + +type PauseGcConfig struct { + task.Config + + SafePoint uint64 `json:"safepoint" yaml:"safepoint"` + TTL time.Duration `json:"ttl" yaml:"ttl"` +} + +func DefineFlagsForPauseGcConfig(f *pflag.FlagSet) { + _ = f.DurationP("ttl", "i", 5*time.Minute, "The time-to-live of the safepoint.") + _ = f.Uint64P("safepoint", "t", 0, "The GC safepoint to be kept.") +} + +func (cfg *PauseGcConfig) ParseFromFlags(flags *pflag.FlagSet) { + cfg.Config.ParseFromFlags(flags) + + cfg.SafePoint = utils.Must(flags.GetUint64("safepoint")) + cfg.TTL = utils.Must(flags.GetDuration("ttl")) +} From e9bda5882b566891bcb8a0fde929cc829ad616ad Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 5 May 2023 17:39:29 +0800 Subject: [PATCH 2/6] make clippy happy Signed-off-by: hillium --- br/cmd/br/operator.go | 4 +++- br/pkg/task/common.go | 1 + br/pkg/task/operator/cmd.go | 1 + br/pkg/task/operator/config.go | 23 +++++++++++++++++------ 4 files changed, 22 insertions(+), 7 deletions(-) diff --git a/br/cmd/br/operator.go b/br/cmd/br/operator.go index 7967921c375cf..cfd48d85ae393 100644 --- a/br/cmd/br/operator.go +++ b/br/cmd/br/operator.go @@ -37,7 +37,9 @@ func newPauseGcCommand() *cobra.Command { Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, args []string) error { cfg := operator.PauseGcConfig{} - cfg.ParseFromFlags(cmd.Flags()) + if err := cfg.ParseFromFlags(cmd.Flags()); err != nil { + return err + } ctx := GetDefaultContext() return operator.PauseGC(ctx, &cfg) }, diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 54dc131b2e9d4..98b0810e8261e 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -138,6 +138,7 @@ func (tls *TLSConfig) ToTLSConfig() (*tls.Config, error) { return tlsConfig, nil } +// Convert the TLS config to the PD security option. func (tls *TLSConfig) ToPDSecurityOption() pd.SecurityOption { securityOption := pd.SecurityOption{} securityOption.CAPath = tls.CA diff --git a/br/pkg/task/operator/cmd.go b/br/pkg/task/operator/cmd.go index b191124d5a69a..961b919073c9a 100644 --- a/br/pkg/task/operator/cmd.go +++ b/br/pkg/task/operator/cmd.go @@ -29,6 +29,7 @@ func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error) return mgr, nil } +// PauseGC blocks the current goroutine and pause the GC safepoint by the config. func PauseGC(ctx context.Context, cfg *PauseGcConfig) error { mgr, err := dialPD(ctx, &cfg.Config) if err != nil { diff --git a/br/pkg/task/operator/config.go b/br/pkg/task/operator/config.go index 68762160ca5f5..eb7e12a49af56 100644 --- a/br/pkg/task/operator/config.go +++ b/br/pkg/task/operator/config.go @@ -6,7 +6,6 @@ import ( "time" "github.com/pingcap/tidb/br/pkg/task" - "github.com/pingcap/tidb/br/pkg/utils" "github.com/spf13/pflag" ) @@ -22,9 +21,21 @@ func DefineFlagsForPauseGcConfig(f *pflag.FlagSet) { _ = f.Uint64P("safepoint", "t", 0, "The GC safepoint to be kept.") } -func (cfg *PauseGcConfig) ParseFromFlags(flags *pflag.FlagSet) { - cfg.Config.ParseFromFlags(flags) - - cfg.SafePoint = utils.Must(flags.GetUint64("safepoint")) - cfg.TTL = utils.Must(flags.GetDuration("ttl")) +// ParseFromFlags fills the config via the flags. +func (cfg *PauseGcConfig) ParseFromFlags(flags *pflag.FlagSet) error { + if err := cfg.Config.ParseFromFlags(flags); err != nil { + return err + } + + var err error + cfg.SafePoint, err = flags.GetUint64("safepoint") + if err != nil { + return err + } + cfg.TTL, err = flags.GetDuration("ttl") + if err != nil { + return err + } + + return nil } From 02c58ade4768e6600e543a1fcd1e6c9c08f7c2a7 Mon Sep 17 00:00:00 2001 From: hillium Date: Sat, 6 May 2023 17:33:46 +0800 Subject: [PATCH 3/6] fix lint Signed-off-by: hillium --- br/cmd/br/operator.go | 2 +- br/pkg/task/operator/BUILD.bazel | 3 ++ br/pkg/task/operator/cmd.go | 52 +++++++++++++++++++++++++++++--- 3 files changed, 52 insertions(+), 5 deletions(-) diff --git a/br/cmd/br/operator.go b/br/cmd/br/operator.go index cfd48d85ae393..d05c9cde9ca3f 100644 --- a/br/cmd/br/operator.go +++ b/br/cmd/br/operator.go @@ -41,7 +41,7 @@ func newPauseGcCommand() *cobra.Command { return err } ctx := GetDefaultContext() - return operator.PauseGC(ctx, &cfg) + return operator.PauseGCAndScheduler(ctx, &cfg) }, } operator.DefineFlagsForPauseGcConfig(cmd.Flags()) diff --git a/br/pkg/task/operator/BUILD.bazel b/br/pkg/task/operator/BUILD.bazel index c2f1575403aec..a48a35c60b343 100644 --- a/br/pkg/task/operator/BUILD.bazel +++ b/br/pkg/task/operator/BUILD.bazel @@ -9,11 +9,14 @@ go_library( importpath = "github.com/pingcap/tidb/br/pkg/task/operator", visibility = ["//visibility:public"], deps = [ + "//br/pkg/logutil", "//br/pkg/pdutil", "//br/pkg/task", "//br/pkg/utils", + "//util", "@com_github_pingcap_log//:log", "@com_github_spf13_pflag//:pflag", + "@org_golang_x_sync//errgroup", "@org_uber_go_zap//:zap", ], ) diff --git a/br/pkg/task/operator/cmd.go b/br/pkg/task/operator/cmd.go index 961b919073c9a..e356a0ac8f98c 100644 --- a/br/pkg/task/operator/cmd.go +++ b/br/pkg/task/operator/cmd.go @@ -4,12 +4,15 @@ import ( "context" "crypto/tls" "strings" + "time" "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/task" "github.com/pingcap/tidb/br/pkg/utils" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error) { @@ -29,29 +32,70 @@ func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error) return mgr, nil } -// PauseGC blocks the current goroutine and pause the GC safepoint by the config. -func PauseGC(ctx context.Context, cfg *PauseGcConfig) error { +func cleanUpWith(f func(ctx context.Context)) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + f(ctx) +} + +// PauseGCAndScheduler blocks the current goroutine and pause the GC safepoint and remove the scheduler by the config. +// This function will block until the context being canceled. +func PauseGCAndScheduler(ctx context.Context, cfg *PauseGcConfig) error { mgr, err := dialPD(ctx, &cfg.Config) if err != nil { return err } + + eg, ectx := errgroup.WithContext(ctx) + + eg.Go(func() error { return pauseGCKeeper(ectx, cfg, mgr) }) + eg.Go(func() error { return pauseSchedulerKeeper(ectx, mgr) }) + + return eg.Wait() +} + +func pauseGCKeeper(ctx context.Context, cfg *PauseGcConfig, ctl *pdutil.PdController) error { + // Note: should we remove the service safepoint as soon as this exits? sp := utils.BRServiceSafePoint{ ID: utils.MakeSafePointID(), TTL: int64(cfg.TTL.Seconds()), BackupTS: cfg.SafePoint, } if sp.BackupTS == 0 { - rts, err := mgr.GetMinResolvedTS(ctx) + rts, err := ctl.GetMinResolvedTS(ctx) if err != nil { return err } log.Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts)) sp.BackupTS = rts } - err = utils.StartServiceSafePointKeeper(ctx, mgr.GetPDClient(), sp) + err := utils.StartServiceSafePointKeeper(ctx, ctl.GetPDClient(), sp) + if err != nil { + return err + } + log.Info("GC is paused.", zap.String("ID", sp.ID), zap.Uint64("at", sp.BackupTS)) + // Note: in fact we can directly return here. + // But the name `keeper` implies once the function exits, + // the GC should be resume, so let's block here. + <-ctx.Done() + return nil +} + +func pauseSchedulerKeeper(ctx context.Context, ctl *pdutil.PdController) error { + undo, err := ctl.RemoveAllPDSchedulers(ctx) + if undo != nil { + defer cleanUpWith(func(ctx context.Context) { + if err := undo(ctx); err != nil { + log.Warn("failed to restore pd scheduler.", logutil.ShortError(err)) + } + }) + } if err != nil { return err } + log.Info("Schedulers are paused.") + // Wait until the context canceled. + // So we can properly do the clean up work. <-ctx.Done() return nil } From aee73ee6c90bd8cda539f962cbd804d6315004e1 Mon Sep 17 00:00:00 2001 From: hillium Date: Sat, 6 May 2023 17:37:46 +0800 Subject: [PATCH 4/6] rename the command to pause-gc-and-schedulers Signed-off-by: hillium --- br/cmd/br/operator.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/br/cmd/br/operator.go b/br/cmd/br/operator.go index d05c9cde9ca3f..57ed59b224d06 100644 --- a/br/cmd/br/operator.go +++ b/br/cmd/br/operator.go @@ -32,8 +32,8 @@ func newOpeartorCommand() *cobra.Command { func newPauseGcCommand() *cobra.Command { cmd := &cobra.Command{ - Use: "pause-gc", - Short: "pause gc to the ts until the program exits.", + Use: "pause-gc-and-schedulers", + Short: "pause gc and schedulers to the ts until the program exits.", Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, args []string) error { cfg := operator.PauseGcConfig{} From e484d5c186fff72aefe5928272c16fb770779016 Mon Sep 17 00:00:00 2001 From: hillium Date: Mon, 8 May 2023 14:17:18 +0800 Subject: [PATCH 5/6] make bazel_prepare Signed-off-by: hillium --- br/pkg/task/operator/BUILD.bazel | 1 - 1 file changed, 1 deletion(-) diff --git a/br/pkg/task/operator/BUILD.bazel b/br/pkg/task/operator/BUILD.bazel index a48a35c60b343..a291d68df5b12 100644 --- a/br/pkg/task/operator/BUILD.bazel +++ b/br/pkg/task/operator/BUILD.bazel @@ -13,7 +13,6 @@ go_library( "//br/pkg/pdutil", "//br/pkg/task", "//br/pkg/utils", - "//util", "@com_github_pingcap_log//:log", "@com_github_spf13_pflag//:pflag", "@org_golang_x_sync//errgroup", From fca816045afa3df1c98f06beada2b8f6ebc0cb56 Mon Sep 17 00:00:00 2001 From: hillium Date: Tue, 9 May 2023 11:21:19 +0800 Subject: [PATCH 6/6] address comments Signed-off-by: hillium --- br/pkg/task/operator/cmd.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/br/pkg/task/operator/cmd.go b/br/pkg/task/operator/cmd.go index e356a0ac8f98c..95c922b1c19cf 100644 --- a/br/pkg/task/operator/cmd.go +++ b/br/pkg/task/operator/cmd.go @@ -1,3 +1,5 @@ +// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0. + package operator import ( @@ -73,7 +75,7 @@ func pauseGCKeeper(ctx context.Context, cfg *PauseGcConfig, ctl *pdutil.PdContro if err != nil { return err } - log.Info("GC is paused.", zap.String("ID", sp.ID), zap.Uint64("at", sp.BackupTS)) + log.Info("GC is paused.", zap.Object("safepoint", sp)) // Note: in fact we can directly return here. // But the name `keeper` implies once the function exits, // the GC should be resume, so let's block here.