Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
conn,task: add config about keepalive (#545)
Browse files Browse the repository at this point in the history
* conn,task: add config about keepalive

* *: fix CI

* task: hide grpc lifetime flags

* *: fix a typo

* task: add adjust to grpc keepalive variables
  • Loading branch information
YuJuncen authored Sep 30, 2020
1 parent a2a7d22 commit 5b7df4a
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 16 deletions.
2 changes: 1 addition & 1 deletion cmd/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func setPDConfigCommand() *cobra.Command {
return err
}

mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg), cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down
10 changes: 4 additions & 6 deletions pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Mgr struct {
mu sync.Mutex
clis map[uint64]*grpc.ClientConn
}
keepalive keepalive.ClientParameters
ownsStorage bool
}

Expand Down Expand Up @@ -109,6 +110,7 @@ func NewMgr(
storage tikv.Storage,
tlsConf *tls.Config,
securityOption pd.SecurityOption,
keepalive keepalive.ClientParameters,
storeBehavior StoreBehavior,
checkRequirements bool,
) (*Mgr, error) {
Expand Down Expand Up @@ -159,6 +161,7 @@ func NewMgr(
ownsStorage: g.OwnsStorage(),
}
mgr.grpcClis.clis = make(map[uint64]*grpc.ClientConn)
mgr.keepalive = keepalive
return mgr, nil
}

Expand All @@ -172,8 +175,6 @@ func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.Cl
opt = grpc.WithTransportCredentials(credentials.NewTLS(mgr.tlsConf))
}
ctx, cancel := context.WithTimeout(ctx, dialTimeout)
keepAlive := 10
keepAliveTimeout := 3
bfConf := backoff.DefaultConfig
bfConf.MaxDelay = time.Second * 3
addr := store.GetPeerAddress()
Expand All @@ -186,10 +187,7 @@ func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.Cl
opt,
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(keepAlive) * time.Second,
Timeout: time.Duration(keepAliveTimeout) * time.Second,
}),
grpc.WithKeepaliveParams(mgr.keepalive),
)
cancel()
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func parseCompressionFlags(flags *pflag.FlagSet) (*CompressionConfig, error) {
// we should set proper value in this function.
// so that both binary and TiDB will use same default value.
func (cfg *BackupConfig) adjustBackupConfig() {
cfg.adjust()
if cfg.Config.Concurrency == 0 {
cfg.Config.Concurrency = defaultBackupConcurrency
}
Expand Down Expand Up @@ -176,7 +177,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
if err != nil {
return err
}
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func (cfg *RawKvConfig) ParseBackupConfigFromFlags(flags *pflag.FlagSet) error {

// RunBackupRaw starts a backup task inside the current goroutine.
func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConfig) error {
cfg.adjust()

defer summary.Summary(cmdName)
ctx, cancel := context.WithCancel(c)
defer cancel()
Expand All @@ -126,7 +128,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
if err != nil {
return err
}
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down
52 changes: 49 additions & 3 deletions pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/pkg/transport"
"go.uber.org/zap"
"google.golang.org/grpc/keepalive"

"github.com/pingcap/br/pkg/conn"
berrors "github.com/pingcap/br/pkg/errors"
Expand Down Expand Up @@ -55,8 +56,14 @@ const (
flagRemoveTiFlash = "remove-tiflash"
flagCheckRequirement = "check-requirements"
flagSwitchModeInterval = "switch-mode-interval"

defaultSwitchInterval = 5 * time.Minute
// flagGrpcKeepaliveTime is the interval of pinging the server.
flagGrpcKeepaliveTime = "grpc-keepalive-time"
// flagGrpcKeepaliveTimeout is the max time a grpc conn can keep idel before killed.
flagGrpcKeepaliveTimeout = "grpc-keepalive-timeout"

defaultSwitchInterval = 5 * time.Minute
defaultGRPCKeepaliveTime = 10 * time.Second
defaultGRPCKeepaliveTimeout = 3 * time.Second
)

// TLSConfig is the common configuration for TLS connection.
Expand Down Expand Up @@ -113,6 +120,11 @@ type Config struct {
TableFilter filter.Filter `json:"-" toml:"-"`
CheckRequirements bool `json:"check-requirements" toml:"check-requirements"`
SwitchModeInterval time.Duration `json:"switch-mode-interval" toml:"switch-mode-interval"`

// GrpcKeepaliveTime is the interval of pinging the server.
GRPCKeepaliveTime time.Duration `json:"grpc-keepalive-time" toml:"grpc-keepalive-time"`
// GrpcKeepaliveTimeout is the max time a grpc conn can keep idel before killed.
GRPCKeepaliveTimeout time.Duration `json:"grpc-keepalive-timeout" toml:"grpc-keepalive-timeout"`
}

// DefineCommonFlags defines the flags common to all BRIE commands.
Expand Down Expand Up @@ -143,6 +155,12 @@ func DefineCommonFlags(flags *pflag.FlagSet) {
flags.Bool(flagCheckRequirement, true,
"Whether start version check before execute command")
flags.Duration(flagSwitchModeInterval, defaultSwitchInterval, "maintain import mode on TiKV during restore")
flags.Duration(flagGrpcKeepaliveTime, defaultGRPCKeepaliveTime,
"the interval of pinging gRPC peer, must keep the same value with TiKV and PD")
flags.Duration(flagGrpcKeepaliveTimeout, defaultGRPCKeepaliveTimeout,
"the max time a gRPC connection can keep idle before killed, must keep the same value with TiKV and PD")
_ = flags.MarkHidden(flagGrpcKeepaliveTime)
_ = flags.MarkHidden(flagGrpcKeepaliveTimeout)

storage.DefineFlags(flags)
}
Expand Down Expand Up @@ -267,6 +285,14 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
if err != nil {
return errors.Trace(err)
}
cfg.GRPCKeepaliveTime, err = flags.GetDuration(flagGrpcKeepaliveTime)
if err != nil {
return errors.Trace(err)
}
cfg.GRPCKeepaliveTimeout, err = flags.GetDuration(flagGrpcKeepaliveTimeout)
if err != nil {
return errors.Trace(err)
}

if cfg.SwitchModeInterval <= 0 {
return errors.Annotatef(berrors.ErrInvalidArgument, "--switch-mode-interval must be positive, %s is not allowed", cfg.SwitchModeInterval)
Expand All @@ -282,6 +308,7 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
func NewMgr(ctx context.Context,
g glue.Glue, pds []string,
tlsConfig TLSConfig,
keepalive keepalive.ClientParameters,
checkRequirements bool) (*conn.Mgr, error) {
var (
tlsConf *tls.Config
Expand Down Expand Up @@ -312,7 +339,7 @@ func NewMgr(ctx context.Context,
// Is it necessary to remove `StoreBehavior`?
return conn.NewMgr(ctx, g,
pdAddress, store.(tikv.Storage),
tlsConf, securityOption,
tlsConf, securityOption, keepalive,
conn.SkipTiFlash, checkRequirements)
}

Expand Down Expand Up @@ -378,3 +405,22 @@ func LogArguments(cmd *cobra.Command) {
})
log.Info("arguments", fields...)
}

// GetKeepalive get the keepalive info from the config.
func GetKeepalive(cfg *Config) keepalive.ClientParameters {
return keepalive.ClientParameters{
Time: cfg.GRPCKeepaliveTime,
Timeout: cfg.GRPCKeepaliveTimeout,
}
}

// adjust adjusts the abnormal config value in the current config.
// useful when not starting BR from CLI (e.g. from BRIE in SQL).
func (cfg *Config) adjust() {
if cfg.GRPCKeepaliveTime == 0 {
cfg.GRPCKeepaliveTime = defaultGRPCKeepaliveTime
}
if cfg.GRPCKeepaliveTimeout == 0 {
cfg.GRPCKeepaliveTimeout = defaultGRPCKeepaliveTimeout
}
}
6 changes: 4 additions & 2 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error {
// we should set proper value in this function.
// so that both binary and TiDB will use same default value.
func (cfg *RestoreConfig) adjustRestoreConfig() {
cfg.adjust()

if cfg.Config.Concurrency == 0 {
cfg.Config.Concurrency = defaultRestoreConcurrency
}
Expand All @@ -94,7 +96,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
ctx, cancel := context.WithCancel(c)
defer cancel()

mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down Expand Up @@ -370,7 +372,7 @@ func RunRestoreTiflashReplica(c context.Context, g glue.Glue, cmdName string, cf
ctx, cancel := context.WithCancel(c)
defer cancel()

mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/task/restore_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func (cfg *LogRestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error {
// we should set proper value in this function.
// so that both binary and TiDB will use same default value.
func (cfg *LogRestoreConfig) adjustRestoreConfig() {
cfg.adjust()

if cfg.Config.Concurrency == 0 {
cfg.Config.Concurrency = defaultRestoreConcurrency
}
Expand All @@ -97,7 +99,7 @@ func RunLogRestore(c context.Context, g glue.Glue, cfg *LogRestoreConfig) error
ctx, cancel := context.WithCancel(c)
defer cancel()

mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/task/restore_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func (cfg *RestoreRawConfig) ParseFromFlags(flags *pflag.FlagSet) error {
}

func (cfg *RestoreRawConfig) adjust() {
cfg.Config.adjust()

if cfg.Concurrency == 0 {
cfg.Concurrency = defaultRestoreConcurrency
}
Expand All @@ -60,7 +62,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR
ctx, cancel := context.WithCancel(c)
defer cancel()

mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down

0 comments on commit 5b7df4a

Please sign in to comment.