Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

conn,task: add config about keepalive #545

Merged
merged 6 commits into from
Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func setPDConfigCommand() *cobra.Command {
return err
}

mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg), cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down
10 changes: 4 additions & 6 deletions pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Mgr struct {
mu sync.Mutex
clis map[uint64]*grpc.ClientConn
}
keepalive keepalive.ClientParameters
ownsStorage bool
}

Expand Down Expand Up @@ -109,6 +110,7 @@ func NewMgr(
storage tikv.Storage,
tlsConf *tls.Config,
securityOption pd.SecurityOption,
keepalive keepalive.ClientParameters,
storeBehavior StoreBehavior,
checkRequirements bool,
) (*Mgr, error) {
Expand Down Expand Up @@ -159,6 +161,7 @@ func NewMgr(
ownsStorage: g.OwnsStorage(),
}
mgr.grpcClis.clis = make(map[uint64]*grpc.ClientConn)
mgr.keepalive = keepalive
return mgr, nil
}

Expand All @@ -172,8 +175,6 @@ func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.Cl
opt = grpc.WithTransportCredentials(credentials.NewTLS(mgr.tlsConf))
}
ctx, cancel := context.WithTimeout(ctx, dialTimeout)
keepAlive := 10
keepAliveTimeout := 3
bfConf := backoff.DefaultConfig
bfConf.MaxDelay = time.Second * 3
addr := store.GetPeerAddress()
Expand All @@ -186,10 +187,7 @@ func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.Cl
opt,
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(keepAlive) * time.Second,
Timeout: time.Duration(keepAliveTimeout) * time.Second,
}),
grpc.WithKeepaliveParams(mgr.keepalive),
)
cancel()
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func parseCompressionFlags(flags *pflag.FlagSet) (*CompressionConfig, error) {
// we should set proper value in this function.
// so that both binary and TiDB will use same default value.
func (cfg *BackupConfig) adjustBackupConfig() {
cfg.adjust()
if cfg.Config.Concurrency == 0 {
cfg.Config.Concurrency = defaultBackupConcurrency
}
Expand Down Expand Up @@ -176,7 +177,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
if err != nil {
return err
}
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func (cfg *RawKvConfig) ParseBackupConfigFromFlags(flags *pflag.FlagSet) error {

// RunBackupRaw starts a backup task inside the current goroutine.
func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConfig) error {
cfg.adjust()

defer summary.Summary(cmdName)
ctx, cancel := context.WithCancel(c)
defer cancel()
Expand All @@ -126,7 +128,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
if err != nil {
return err
}
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down
52 changes: 49 additions & 3 deletions pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/pkg/transport"
"go.uber.org/zap"
"google.golang.org/grpc/keepalive"

"github.com/pingcap/br/pkg/conn"
berrors "github.com/pingcap/br/pkg/errors"
Expand Down Expand Up @@ -55,8 +56,14 @@ const (
flagRemoveTiFlash = "remove-tiflash"
flagCheckRequirement = "check-requirements"
flagSwitchModeInterval = "switch-mode-interval"

defaultSwitchInterval = 5 * time.Minute
// flagGrpcKeepaliveTime is the interval of pinging the server.
flagGrpcKeepaliveTime = "grpc-keepalive-time"
// flagGrpcKeepaliveTimeout is the max time a grpc conn can keep idel before killed.
kennytm marked this conversation as resolved.
Show resolved Hide resolved
flagGrpcKeepaliveTimeout = "grpc-keepalive-timeout"

defaultSwitchInterval = 5 * time.Minute
defaultGRPCKeepaliveTime = 10 * time.Second
defaultGRPCKeepaliveTimeout = 3 * time.Second
)

// TLSConfig is the common configuration for TLS connection.
Expand Down Expand Up @@ -113,6 +120,11 @@ type Config struct {
TableFilter filter.Filter `json:"-" toml:"-"`
CheckRequirements bool `json:"check-requirements" toml:"check-requirements"`
SwitchModeInterval time.Duration `json:"switch-mode-interval" toml:"switch-mode-interval"`

// GrpcKeepaliveTime is the interval of pinging the server.
GRPCKeepaliveTime time.Duration `json:"grpc-keepalive-time" toml:"grpc-keepalive-time"`
// GrpcKeepaliveTimeout is the max time a grpc conn can keep idel before killed.
GRPCKeepaliveTimeout time.Duration `json:"grpc-keepalive-timeout" toml:"grpc-keepalive-timeout"`
}

// DefineCommonFlags defines the flags common to all BRIE commands.
Expand Down Expand Up @@ -143,6 +155,12 @@ func DefineCommonFlags(flags *pflag.FlagSet) {
flags.Bool(flagCheckRequirement, true,
"Whether start version check before execute command")
flags.Duration(flagSwitchModeInterval, defaultSwitchInterval, "maintain import mode on TiKV during restore")
flags.Duration(flagGrpcKeepaliveTime, defaultGRPCKeepaliveTime,
"the interval of pinging gRPC peer, must keep the same value with TiKV and PD")
flags.Duration(flagGrpcKeepaliveTimeout, defaultGRPCKeepaliveTimeout,
"the max time a gRPC connection can keep idle before killed, must keep the same value with TiKV and PD")
_ = flags.MarkHidden(flagGrpcKeepaliveTime)
_ = flags.MarkHidden(flagGrpcKeepaliveTimeout)

storage.DefineFlags(flags)
}
Expand Down Expand Up @@ -267,6 +285,14 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
if err != nil {
return errors.Trace(err)
}
cfg.GRPCKeepaliveTime, err = flags.GetDuration(flagGrpcKeepaliveTime)
if err != nil {
return errors.Trace(err)
}
cfg.GRPCKeepaliveTimeout, err = flags.GetDuration(flagGrpcKeepaliveTimeout)
if err != nil {
return errors.Trace(err)
}

if cfg.SwitchModeInterval <= 0 {
return errors.Annotatef(berrors.ErrInvalidArgument, "--switch-mode-interval must be positive, %s is not allowed", cfg.SwitchModeInterval)
Expand All @@ -282,6 +308,7 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
func NewMgr(ctx context.Context,
g glue.Glue, pds []string,
tlsConfig TLSConfig,
keepalive keepalive.ClientParameters,
checkRequirements bool) (*conn.Mgr, error) {
var (
tlsConf *tls.Config
Expand Down Expand Up @@ -312,7 +339,7 @@ func NewMgr(ctx context.Context,
// Is it necessary to remove `StoreBehavior`?
return conn.NewMgr(ctx, g,
pdAddress, store.(tikv.Storage),
tlsConf, securityOption,
tlsConf, securityOption, keepalive,
conn.SkipTiFlash, checkRequirements)
}

Expand Down Expand Up @@ -378,3 +405,22 @@ func LogArguments(cmd *cobra.Command) {
})
log.Info("arguments", fields...)
}

// GetKeepalive get the keepalive info from the config.
func GetKeepalive(cfg *Config) keepalive.ClientParameters {
return keepalive.ClientParameters{
Time: cfg.GRPCKeepaliveTime,
Timeout: cfg.GRPCKeepaliveTimeout,
}
}

// adjust adjusts the abnormal config value in the current config.
// useful when not starting BR from CLI (e.g. from BRIE in SQL).
func (cfg *Config) adjust() {
if cfg.GRPCKeepaliveTime == 0 {
cfg.GRPCKeepaliveTime = defaultGRPCKeepaliveTime
}
if cfg.GRPCKeepaliveTimeout == 0 {
cfg.GRPCKeepaliveTimeout = defaultGRPCKeepaliveTimeout
}
}
6 changes: 4 additions & 2 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error {
// we should set proper value in this function.
// so that both binary and TiDB will use same default value.
func (cfg *RestoreConfig) adjustRestoreConfig() {
cfg.adjust()

if cfg.Config.Concurrency == 0 {
cfg.Config.Concurrency = defaultRestoreConcurrency
}
Expand All @@ -94,7 +96,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
ctx, cancel := context.WithCancel(c)
defer cancel()

mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down Expand Up @@ -370,7 +372,7 @@ func RunRestoreTiflashReplica(c context.Context, g glue.Glue, cmdName string, cf
ctx, cancel := context.WithCancel(c)
defer cancel()

mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/task/restore_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func (cfg *LogRestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error {
// we should set proper value in this function.
// so that both binary and TiDB will use same default value.
func (cfg *LogRestoreConfig) adjustRestoreConfig() {
cfg.adjust()

if cfg.Config.Concurrency == 0 {
cfg.Config.Concurrency = defaultRestoreConcurrency
}
Expand All @@ -97,7 +99,7 @@ func RunLogRestore(c context.Context, g glue.Glue, cfg *LogRestoreConfig) error
ctx, cancel := context.WithCancel(c)
defer cancel()

mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/task/restore_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func (cfg *RestoreRawConfig) ParseFromFlags(flags *pflag.FlagSet) error {
}

func (cfg *RestoreRawConfig) adjust() {
cfg.Config.adjust()

if cfg.Concurrency == 0 {
cfg.Concurrency = defaultRestoreConcurrency
}
Expand All @@ -60,7 +62,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR
ctx, cancel := context.WithCancel(c)
defer cancel()

mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down