Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

conn,task: add config about keepalive #545

Merged
merged 6 commits into from
Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func setPDConfigCommand() *cobra.Command {
return err
}

mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg), cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down
10 changes: 4 additions & 6 deletions pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Mgr struct {
mu sync.Mutex
clis map[uint64]*grpc.ClientConn
}
keepalive keepalive.ClientParameters
ownsStorage bool
}

Expand Down Expand Up @@ -109,6 +110,7 @@ func NewMgr(
storage tikv.Storage,
tlsConf *tls.Config,
securityOption pd.SecurityOption,
keepalive keepalive.ClientParameters,
storeBehavior StoreBehavior,
checkRequirements bool,
) (*Mgr, error) {
Expand Down Expand Up @@ -159,6 +161,7 @@ func NewMgr(
ownsStorage: g.OwnsStorage(),
}
mgr.grpcClis.clis = make(map[uint64]*grpc.ClientConn)
mgr.keepalive = keepalive
return mgr, nil
}

Expand All @@ -172,8 +175,6 @@ func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.Cl
opt = grpc.WithTransportCredentials(credentials.NewTLS(mgr.tlsConf))
}
ctx, cancel := context.WithTimeout(ctx, dialTimeout)
keepAlive := 10
keepAliveTimeout := 3
bfConf := backoff.DefaultConfig
bfConf.MaxDelay = time.Second * 3
addr := store.GetPeerAddress()
Expand All @@ -186,10 +187,7 @@ func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.Cl
opt,
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(keepAlive) * time.Second,
Timeout: time.Duration(keepAliveTimeout) * time.Second,
}),
grpc.WithKeepaliveParams(mgr.keepalive),
)
cancel()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
if err != nil {
return err
}
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
if err != nil {
return err
}
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down
39 changes: 36 additions & 3 deletions pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/pkg/transport"
"go.uber.org/zap"
"google.golang.org/grpc/keepalive"

"github.com/pingcap/br/pkg/conn"
berrors "github.com/pingcap/br/pkg/errors"
Expand Down Expand Up @@ -55,8 +56,14 @@ const (
flagRemoveTiFlash = "remove-tiflash"
flagCheckRequirement = "check-requirements"
flagSwitchModeInterval = "switch-mode-interval"

defaultSwitchInterval = 5 * time.Minute
// flagGrpcKeepaliveTime is the interval of pinging the server.
flagGrpcKeepaliveTime = "grpc-keepalive-time"
// flagGrpcKeepaliveTimeout is the max time a grpc conn can keep idel before killed.
kennytm marked this conversation as resolved.
Show resolved Hide resolved
flagGrpcKeepaliveTimeout = "grpc-keepalive-timeout"

defaultSwitchInterval = 5 * time.Minute
defaultGRPCKeepaliveTime = 10 * time.Second
defaultGRPCKeepaliveTimeout = 3 * time.Second
)

// TLSConfig is the common configuration for TLS connection.
Expand Down Expand Up @@ -113,6 +120,11 @@ type Config struct {
TableFilter filter.Filter `json:"-" toml:"-"`
CheckRequirements bool `json:"check-requirements" toml:"check-requirements"`
SwitchModeInterval time.Duration `json:"switch-mode-interval" toml:"switch-mode-interval"`

// GrpcKeepaliveTime is the interval of pinging the server.
GRPCKeepaliveTime time.Duration `json:"grpc-keepalive-time" toml:"grpc-keepalive-time"`
// GrpcKeepaliveTimeout is the max time a grpc conn can keep idel before killed.
GRPCKeepaliveTimeout time.Duration `json:"grpc-keepalive-timeout" toml:"grpc-keepalive-timeout"`
}

// DefineCommonFlags defines the flags common to all BRIE commands.
Expand Down Expand Up @@ -143,6 +155,10 @@ func DefineCommonFlags(flags *pflag.FlagSet) {
flags.Bool(flagCheckRequirement, true,
"Whether start version check before execute command")
flags.Duration(flagSwitchModeInterval, defaultSwitchInterval, "maintain import mode on TiKV during restore")
flags.Duration(flagGrpcKeepaliveTime, defaultGRPCKeepaliveTime,
"the interval of pinging the server, must keep same value with TiKV and PD")
kennytm marked this conversation as resolved.
Show resolved Hide resolved
flags.Duration(flagGrpcKeepaliveTimeout, defaultGRPCKeepaliveTimeout,
"the max time a grpc conn can keep idel before killed, must keep same value with TiKV and PD")
kennytm marked this conversation as resolved.
Show resolved Hide resolved

storage.DefineFlags(flags)
}
Expand Down Expand Up @@ -267,6 +283,14 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
if err != nil {
return errors.Trace(err)
}
cfg.GRPCKeepaliveTime, err = flags.GetDuration(flagGrpcKeepaliveTime)
if err != nil {
return errors.Trace(err)
}
cfg.GRPCKeepaliveTimeout, err = flags.GetDuration(flagGrpcKeepaliveTimeout)
if err != nil {
return errors.Trace(err)
}

if cfg.SwitchModeInterval <= 0 {
return errors.Annotatef(berrors.ErrInvalidArgument, "--switch-mode-interval must be positive, %s is not allowed", cfg.SwitchModeInterval)
Expand All @@ -282,6 +306,7 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
func NewMgr(ctx context.Context,
g glue.Glue, pds []string,
tlsConfig TLSConfig,
keepalive keepalive.ClientParameters,
checkRequirements bool) (*conn.Mgr, error) {
var (
tlsConf *tls.Config
Expand Down Expand Up @@ -312,7 +337,7 @@ func NewMgr(ctx context.Context,
// Is it necessary to remove `StoreBehavior`?
return conn.NewMgr(ctx, g,
pdAddress, store.(tikv.Storage),
tlsConf, securityOption,
tlsConf, securityOption, keepalive,
conn.SkipTiFlash, checkRequirements)
}

Expand Down Expand Up @@ -378,3 +403,11 @@ func LogArguments(cmd *cobra.Command) {
})
log.Info("arguments", fields...)
}

// GetKeepalive get the keepalive info from the config.
func GetKeepalive(cfg *Config) keepalive.ClientParameters {
return keepalive.ClientParameters{
Time: cfg.GRPCKeepaliveTime,
Timeout: cfg.GRPCKeepaliveTimeout,
}
}
4 changes: 2 additions & 2 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
ctx, cancel := context.WithCancel(c)
defer cancel()

mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down Expand Up @@ -370,7 +370,7 @@ func RunRestoreTiflashReplica(c context.Context, g glue.Glue, cmdName string, cf
ctx, cancel := context.WithCancel(c)
defer cancel()

mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/task/restore_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func RunLogRestore(c context.Context, g glue.Glue, cfg *LogRestoreConfig) error
ctx, cancel := context.WithCancel(c)
defer cancel()

mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/task/restore_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR
ctx, cancel := context.WithCancel(c)
defer cancel()

mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements)
if err != nil {
return err
}
Expand Down