Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br/cmd: implement the operator pause-gc-and-schedulers command #43562

Merged
merged 7 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions br/cmd/br/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"cmd.go",
"debug.go",
"main.go",
"operator.go",
"restore.go",
"stream.go",
],
Expand All @@ -26,6 +27,7 @@ go_library(
"//br/pkg/streamhelper/config",
"//br/pkg/summary",
"//br/pkg/task",
"//br/pkg/task/operator",
"//br/pkg/trace",
"//br/pkg/utils",
"//br/pkg/version/build",
Expand Down
1 change: 1 addition & 0 deletions br/cmd/br/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func main() {
NewBackupCommand(),
NewRestoreCommand(),
NewStreamCommand(),
newOpeartorCommand(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
newOpeartorCommand(),
NewOpeartorCommand(),

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this shouldn't be public function. 🤔️

)
// Outputs cmd.Print to stdout.
rootCmd.SetOut(os.Stdout)
Expand Down
49 changes: 49 additions & 0 deletions br/cmd/br/operator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0.

package main

import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/task"
"github.com/pingcap/tidb/br/pkg/task/operator"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version/build"
"github.com/spf13/cobra"
)

func newOpeartorCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "operator <subcommand>",
Short: "utilities for operators like tidb-operator.",
PersistentPreRunE: func(c *cobra.Command, args []string) error {
if err := Init(c); err != nil {
return errors.Trace(err)
}
build.LogInfo(build.BR)
utils.LogEnvVariables()
task.LogArguments(c)
return nil
},
Hidden: true,
}
cmd.AddCommand(newPauseGcCommand())
return cmd
}

func newPauseGcCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "pause-gc-and-schedulers",
Short: "pause gc and schedulers to the ts until the program exits.",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
cfg := operator.PauseGcConfig{}
if err := cfg.ParseFromFlags(cmd.Flags()); err != nil {
return err
}
ctx := GetDefaultContext()
return operator.PauseGCAndScheduler(ctx, &cfg)
},
}
operator.DefineFlagsForPauseGcConfig(cmd.Flags())
return cmd
}
2 changes: 1 addition & 1 deletion br/pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func NewPdController(
}
if failure != nil {
return nil, errors.Annotatef(berrors.ErrPDUpdateFailed,
"pd address (%s) not available, please check network", pdAddrs)
"pd address (%s) not available, error is %s, please check network", pdAddrs, failure)
}

version := parseVersion(versionBytes)
Expand Down
9 changes: 9 additions & 0 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,15 @@ func (tls *TLSConfig) ToTLSConfig() (*tls.Config, error) {
return tlsConfig, nil
}

// Convert the TLS config to the PD security option.
func (tls *TLSConfig) ToPDSecurityOption() pd.SecurityOption {
securityOption := pd.SecurityOption{}
securityOption.CAPath = tls.CA
securityOption.CertPath = tls.Cert
securityOption.KeyPath = tls.Key
return securityOption
}

// ParseFromFlags parses the TLS config from the flag set.
func (tls *TLSConfig) ParseFromFlags(flags *pflag.FlagSet) (err error) {
tls.CA, tls.Cert, tls.Key, err = ParseTLSTripleFromFlags(flags)
Expand Down
21 changes: 21 additions & 0 deletions br/pkg/task/operator/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "operator",
srcs = [
"cmd.go",
"config.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/task/operator",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/logutil",
"//br/pkg/pdutil",
"//br/pkg/task",
"//br/pkg/utils",
"@com_github_pingcap_log//:log",
"@com_github_spf13_pflag//:pflag",
"@org_golang_x_sync//errgroup",
"@org_uber_go_zap//:zap",
],
)
103 changes: 103 additions & 0 deletions br/pkg/task/operator/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0.

package operator
3pointer marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"crypto/tls"
"strings"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/task"
"github.com/pingcap/tidb/br/pkg/utils"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error) {
pdAddrs := strings.Join(cfg.PD, ",")
var tc *tls.Config
if cfg.TLS.IsEnabled() {
var err error
tc, err = cfg.TLS.ToTLSConfig()
if err != nil {
return nil, err
}
}
mgr, err := pdutil.NewPdController(ctx, pdAddrs, tc, cfg.TLS.ToPDSecurityOption())
if err != nil {
return nil, err
}
return mgr, nil
}

func cleanUpWith(f func(ctx context.Context)) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
3pointer marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()
f(ctx)
}

// PauseGCAndScheduler blocks the current goroutine and pause the GC safepoint and remove the scheduler by the config.
// This function will block until the context being canceled.
func PauseGCAndScheduler(ctx context.Context, cfg *PauseGcConfig) error {
mgr, err := dialPD(ctx, &cfg.Config)
if err != nil {
return err
}

eg, ectx := errgroup.WithContext(ctx)

eg.Go(func() error { return pauseGCKeeper(ectx, cfg, mgr) })
eg.Go(func() error { return pauseSchedulerKeeper(ectx, mgr) })

return eg.Wait()
}

func pauseGCKeeper(ctx context.Context, cfg *PauseGcConfig, ctl *pdutil.PdController) error {
// Note: should we remove the service safepoint as soon as this exits?
sp := utils.BRServiceSafePoint{
ID: utils.MakeSafePointID(),
TTL: int64(cfg.TTL.Seconds()),
3pointer marked this conversation as resolved.
Show resolved Hide resolved
BackupTS: cfg.SafePoint,
}
if sp.BackupTS == 0 {
rts, err := ctl.GetMinResolvedTS(ctx)
if err != nil {
return err
}
log.Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts))
sp.BackupTS = rts
}
err := utils.StartServiceSafePointKeeper(ctx, ctl.GetPDClient(), sp)
if err != nil {
return err
}
log.Info("GC is paused.", zap.Object("safepoint", sp))
// Note: in fact we can directly return here.
// But the name `keeper` implies once the function exits,
// the GC should be resume, so let's block here.
<-ctx.Done()
return nil
}

func pauseSchedulerKeeper(ctx context.Context, ctl *pdutil.PdController) error {
undo, err := ctl.RemoveAllPDSchedulers(ctx)
if undo != nil {
defer cleanUpWith(func(ctx context.Context) {
if err := undo(ctx); err != nil {
log.Warn("failed to restore pd scheduler.", logutil.ShortError(err))
}
})
}
if err != nil {
return err
}
log.Info("Schedulers are paused.")
// Wait until the context canceled.
// So we can properly do the clean up work.
<-ctx.Done()
return nil
}
41 changes: 41 additions & 0 deletions br/pkg/task/operator/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0.

package operator

import (
"time"

"github.com/pingcap/tidb/br/pkg/task"
"github.com/spf13/pflag"
)

type PauseGcConfig struct {
task.Config

SafePoint uint64 `json:"safepoint" yaml:"safepoint"`
TTL time.Duration `json:"ttl" yaml:"ttl"`
}

func DefineFlagsForPauseGcConfig(f *pflag.FlagSet) {
_ = f.DurationP("ttl", "i", 5*time.Minute, "The time-to-live of the safepoint.")
_ = f.Uint64P("safepoint", "t", 0, "The GC safepoint to be kept.")
}

// ParseFromFlags fills the config via the flags.
func (cfg *PauseGcConfig) ParseFromFlags(flags *pflag.FlagSet) error {
if err := cfg.Config.ParseFromFlags(flags); err != nil {
return err
}

var err error
cfg.SafePoint, err = flags.GetUint64("safepoint")
if err != nil {
return err
}
cfg.TTL, err = flags.GetDuration("ttl")
if err != nil {
return err
}

return nil
}