From c9ff09b5a5e07285b546cf83fd3d45a4930e9e82 Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 30 Sep 2020 10:32:38 +0800 Subject: [PATCH 1/5] conn,task: add config about keepalive --- pkg/conn/conn.go | 10 ++++------ pkg/task/backup.go | 2 +- pkg/task/backup_raw.go | 2 +- pkg/task/common.go | 39 ++++++++++++++++++++++++++++++++++++--- pkg/task/restore.go | 4 ++-- pkg/task/restore_log.go | 2 +- pkg/task/restore_raw.go | 2 +- 7 files changed, 46 insertions(+), 15 deletions(-) diff --git a/pkg/conn/conn.go b/pkg/conn/conn.go index 66c3673f3..02e3ac580 100644 --- a/pkg/conn/conn.go +++ b/pkg/conn/conn.go @@ -44,6 +44,7 @@ type Mgr struct { mu sync.Mutex clis map[uint64]*grpc.ClientConn } + keepalive keepalive.ClientParameters ownsStorage bool } @@ -109,6 +110,7 @@ func NewMgr( storage tikv.Storage, tlsConf *tls.Config, securityOption pd.SecurityOption, + keepalive keepalive.ClientParameters, storeBehavior StoreBehavior, checkRequirements bool, ) (*Mgr, error) { @@ -159,6 +161,7 @@ func NewMgr( ownsStorage: g.OwnsStorage(), } mgr.grpcClis.clis = make(map[uint64]*grpc.ClientConn) + mgr.keepalive = keepalive return mgr, nil } @@ -172,8 +175,6 @@ func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.Cl opt = grpc.WithTransportCredentials(credentials.NewTLS(mgr.tlsConf)) } ctx, cancel := context.WithTimeout(ctx, dialTimeout) - keepAlive := 10 - keepAliveTimeout := 3 bfConf := backoff.DefaultConfig bfConf.MaxDelay = time.Second * 3 addr := store.GetPeerAddress() @@ -186,10 +187,7 @@ func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.Cl opt, grpc.WithBlock(), grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: time.Duration(keepAlive) * time.Second, - Timeout: time.Duration(keepAliveTimeout) * time.Second, - }), + grpc.WithKeepaliveParams(mgr.keepalive), ) cancel() if err != nil { diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 30fd8659f..c9c9f2206 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -176,7 +176,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig if err != nil { return err } - mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements) + mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements) if err != nil { return err } diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index 4a643bb59..73c5e26d5 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -126,7 +126,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf if err != nil { return err } - mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements) + mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements) if err != nil { return err } diff --git a/pkg/task/common.go b/pkg/task/common.go index ab701676b..88655aba2 100644 --- a/pkg/task/common.go +++ b/pkg/task/common.go @@ -21,6 +21,7 @@ import ( pd "github.com/tikv/pd/client" "go.etcd.io/etcd/pkg/transport" "go.uber.org/zap" + "google.golang.org/grpc/keepalive" "github.com/pingcap/br/pkg/conn" berrors "github.com/pingcap/br/pkg/errors" @@ -55,8 +56,14 @@ const ( flagRemoveTiFlash = "remove-tiflash" flagCheckRequirement = "check-requirements" flagSwitchModeInterval = "switch-mode-interval" - - defaultSwitchInterval = 5 * time.Minute + // flagGrpcKeepaliveTime is the interval of pinging the server. + flagGrpcKeepaliveTime = "grpc-keepalive-time" + // flagGrpcKeepaliveTimeout is the max time a grpc conn can keep idel before killed. + flagGrpcKeepaliveTimeout = "grpc-keepalive-timeout" + + defaultSwitchInterval = 5 * time.Minute + defaultGRPCKeepaliveTime = 10 * time.Second + defaultGRPCKeepaliveTimeout = 3 * time.Second ) // TLSConfig is the common configuration for TLS connection. @@ -99,6 +106,11 @@ type Config struct { // LogProgress is true means the progress bar is printed to the log instead of stdout. LogProgress bool `json:"log-progress" toml:"log-progress"` + // GrpcKeepaliveTime is the interval of pinging the server. + GRPCKeepaliveTime time.Duration `json:"grpc-keepalive-time" toml:"grpc-keepalive-time"` + // GrpcKeepaliveTimeout is the max time a grpc conn can keep idel before killed. + GRPCKeepaliveTimeout time.Duration `json:"grpc-keepalive-timeout" toml:"grpc-keepalive-timeout"` + // CaseSensitive should not be used. // // Deprecated: This field is kept only to satisfy the cyclic dependency with TiDB. This field @@ -143,6 +155,10 @@ func DefineCommonFlags(flags *pflag.FlagSet) { flags.Bool(flagCheckRequirement, true, "Whether start version check before execute command") flags.Duration(flagSwitchModeInterval, defaultSwitchInterval, "maintain import mode on TiKV during restore") + flags.Duration(flagGrpcKeepaliveTime, defaultGRPCKeepaliveTime, + "the interval of pinging the server, must keep same value with TiKV and PD") + flags.Duration(flagGrpcKeepaliveTimeout, defaultGRPCKeepaliveTimeout, + "the max time a grpc conn can keep idel before killed, must keep same value with TiKV and PD") storage.DefineFlags(flags) } @@ -267,6 +283,14 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } + cfg.GRPCKeepaliveTime, err = flags.GetDuration(flagGrpcKeepaliveTime) + if err != nil { + return errors.Trace(err) + } + cfg.GRPCKeepaliveTimeout, err = flags.GetDuration(flagGrpcKeepaliveTimeout) + if err != nil { + return errors.Trace(err) + } if cfg.SwitchModeInterval <= 0 { return errors.Annotatef(berrors.ErrInvalidArgument, "--switch-mode-interval must be positive, %s is not allowed", cfg.SwitchModeInterval) @@ -282,6 +306,7 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error { func NewMgr(ctx context.Context, g glue.Glue, pds []string, tlsConfig TLSConfig, + keepalive keepalive.ClientParameters, checkRequirements bool) (*conn.Mgr, error) { var ( tlsConf *tls.Config @@ -312,7 +337,7 @@ func NewMgr(ctx context.Context, // Is it necessary to remove `StoreBehavior`? return conn.NewMgr(ctx, g, pdAddress, store.(tikv.Storage), - tlsConf, securityOption, + tlsConf, securityOption, keepalive, conn.SkipTiFlash, checkRequirements) } @@ -378,3 +403,11 @@ func LogArguments(cmd *cobra.Command) { }) log.Info("arguments", fields...) } + +// GetKeepalive get the keepalive info from the config +func GetKeepalive(cfg *Config) keepalive.ClientParameters { + return keepalive.ClientParameters{ + Time: cfg.GRPCKeepaliveTime, + Timeout: cfg.GRPCKeepaliveTimeout, + } +} diff --git a/pkg/task/restore.go b/pkg/task/restore.go index a20abe1ff..faa845281 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -94,7 +94,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf ctx, cancel := context.WithCancel(c) defer cancel() - mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements) + mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements) if err != nil { return err } @@ -370,7 +370,7 @@ func RunRestoreTiflashReplica(c context.Context, g glue.Glue, cmdName string, cf ctx, cancel := context.WithCancel(c) defer cancel() - mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements) + mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements) if err != nil { return err } diff --git a/pkg/task/restore_log.go b/pkg/task/restore_log.go index 986d9a48a..00acc4491 100644 --- a/pkg/task/restore_log.go +++ b/pkg/task/restore_log.go @@ -97,7 +97,7 @@ func RunLogRestore(c context.Context, g glue.Glue, cfg *LogRestoreConfig) error ctx, cancel := context.WithCancel(c) defer cancel() - mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements) + mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements) if err != nil { return err } diff --git a/pkg/task/restore_raw.go b/pkg/task/restore_raw.go index 31c88b66b..b7137d9f9 100644 --- a/pkg/task/restore_raw.go +++ b/pkg/task/restore_raw.go @@ -60,7 +60,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR ctx, cancel := context.WithCancel(c) defer cancel() - mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, cfg.CheckRequirements) + mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements) if err != nil { return err } From d8d0d4c3794c47b575897b35bbfeb0fde49aaa8e Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 30 Sep 2020 11:54:54 +0800 Subject: [PATCH 2/5] *: fix CI --- cmd/validate.go | 2 +- pkg/task/common.go | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cmd/validate.go b/cmd/validate.go index 29cefe26b..440841ca8 100644 --- a/cmd/validate.go +++ b/cmd/validate.go @@ -341,7 +341,7 @@ func setPDConfigCommand() *cobra.Command { return err } - mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, cfg.CheckRequirements) + mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg), cfg.CheckRequirements) if err != nil { return err } diff --git a/pkg/task/common.go b/pkg/task/common.go index 88655aba2..420bfb62f 100644 --- a/pkg/task/common.go +++ b/pkg/task/common.go @@ -106,11 +106,6 @@ type Config struct { // LogProgress is true means the progress bar is printed to the log instead of stdout. LogProgress bool `json:"log-progress" toml:"log-progress"` - // GrpcKeepaliveTime is the interval of pinging the server. - GRPCKeepaliveTime time.Duration `json:"grpc-keepalive-time" toml:"grpc-keepalive-time"` - // GrpcKeepaliveTimeout is the max time a grpc conn can keep idel before killed. - GRPCKeepaliveTimeout time.Duration `json:"grpc-keepalive-timeout" toml:"grpc-keepalive-timeout"` - // CaseSensitive should not be used. // // Deprecated: This field is kept only to satisfy the cyclic dependency with TiDB. This field @@ -125,6 +120,11 @@ type Config struct { TableFilter filter.Filter `json:"-" toml:"-"` CheckRequirements bool `json:"check-requirements" toml:"check-requirements"` SwitchModeInterval time.Duration `json:"switch-mode-interval" toml:"switch-mode-interval"` + + // GrpcKeepaliveTime is the interval of pinging the server. + GRPCKeepaliveTime time.Duration `json:"grpc-keepalive-time" toml:"grpc-keepalive-time"` + // GrpcKeepaliveTimeout is the max time a grpc conn can keep idel before killed. + GRPCKeepaliveTimeout time.Duration `json:"grpc-keepalive-timeout" toml:"grpc-keepalive-timeout"` } // DefineCommonFlags defines the flags common to all BRIE commands. @@ -404,7 +404,7 @@ func LogArguments(cmd *cobra.Command) { log.Info("arguments", fields...) } -// GetKeepalive get the keepalive info from the config +// GetKeepalive get the keepalive info from the config. func GetKeepalive(cfg *Config) keepalive.ClientParameters { return keepalive.ClientParameters{ Time: cfg.GRPCKeepaliveTime, From 7325d7f75bf214d8aaed4a59fc941912fbef426a Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 30 Sep 2020 13:23:41 +0800 Subject: [PATCH 3/5] task: hide grpc lifetime flags --- pkg/task/common.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/task/common.go b/pkg/task/common.go index 420bfb62f..d4cc01295 100644 --- a/pkg/task/common.go +++ b/pkg/task/common.go @@ -159,6 +159,8 @@ func DefineCommonFlags(flags *pflag.FlagSet) { "the interval of pinging the server, must keep same value with TiKV and PD") flags.Duration(flagGrpcKeepaliveTimeout, defaultGRPCKeepaliveTimeout, "the max time a grpc conn can keep idel before killed, must keep same value with TiKV and PD") + _ = flags.MarkHidden(flagGrpcKeepaliveTime) + _ = flags.MarkHidden(flagGrpcKeepaliveTimeout) storage.DefineFlags(flags) } From af787289d6571a7327251b5a7a31d2eb025f83ec Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 30 Sep 2020 13:30:20 +0800 Subject: [PATCH 4/5] *: fix a typo --- pkg/task/common.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/task/common.go b/pkg/task/common.go index d4cc01295..9ffd4db3a 100644 --- a/pkg/task/common.go +++ b/pkg/task/common.go @@ -156,9 +156,9 @@ func DefineCommonFlags(flags *pflag.FlagSet) { "Whether start version check before execute command") flags.Duration(flagSwitchModeInterval, defaultSwitchInterval, "maintain import mode on TiKV during restore") flags.Duration(flagGrpcKeepaliveTime, defaultGRPCKeepaliveTime, - "the interval of pinging the server, must keep same value with TiKV and PD") + "the interval of pinging gRPC peer, must keep the same value with TiKV and PD") flags.Duration(flagGrpcKeepaliveTimeout, defaultGRPCKeepaliveTimeout, - "the max time a grpc conn can keep idel before killed, must keep same value with TiKV and PD") + "the max time a gRPC connection can keep idle before killed, must keep the same value with TiKV and PD") _ = flags.MarkHidden(flagGrpcKeepaliveTime) _ = flags.MarkHidden(flagGrpcKeepaliveTimeout) From 6feee8172bb61d635e1a2737ae54e67d3d02e3b6 Mon Sep 17 00:00:00 2001 From: hillium Date: Wed, 30 Sep 2020 15:29:45 +0800 Subject: [PATCH 5/5] task: add adjust to grpc keepalive variables --- pkg/task/backup.go | 1 + pkg/task/backup_raw.go | 2 ++ pkg/task/common.go | 11 +++++++++++ pkg/task/restore.go | 2 ++ pkg/task/restore_log.go | 2 ++ pkg/task/restore_raw.go | 2 ++ 6 files changed, 20 insertions(+) diff --git a/pkg/task/backup.go b/pkg/task/backup.go index c9c9f2206..deb85f385 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -148,6 +148,7 @@ func parseCompressionFlags(flags *pflag.FlagSet) (*CompressionConfig, error) { // we should set proper value in this function. // so that both binary and TiDB will use same default value. func (cfg *BackupConfig) adjustBackupConfig() { + cfg.adjust() if cfg.Config.Concurrency == 0 { cfg.Config.Concurrency = defaultBackupConcurrency } diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index 73c5e26d5..0047fadca 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -118,6 +118,8 @@ func (cfg *RawKvConfig) ParseBackupConfigFromFlags(flags *pflag.FlagSet) error { // RunBackupRaw starts a backup task inside the current goroutine. func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConfig) error { + cfg.adjust() + defer summary.Summary(cmdName) ctx, cancel := context.WithCancel(c) defer cancel() diff --git a/pkg/task/common.go b/pkg/task/common.go index 9ffd4db3a..ff800e2ce 100644 --- a/pkg/task/common.go +++ b/pkg/task/common.go @@ -413,3 +413,14 @@ func GetKeepalive(cfg *Config) keepalive.ClientParameters { Timeout: cfg.GRPCKeepaliveTimeout, } } + +// adjust adjusts the abnormal config value in the current config. +// useful when not starting BR from CLI (e.g. from BRIE in SQL). +func (cfg *Config) adjust() { + if cfg.GRPCKeepaliveTime == 0 { + cfg.GRPCKeepaliveTime = defaultGRPCKeepaliveTime + } + if cfg.GRPCKeepaliveTimeout == 0 { + cfg.GRPCKeepaliveTimeout = defaultGRPCKeepaliveTimeout + } +} diff --git a/pkg/task/restore.go b/pkg/task/restore.go index faa845281..afbf521c7 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -78,6 +78,8 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error { // we should set proper value in this function. // so that both binary and TiDB will use same default value. func (cfg *RestoreConfig) adjustRestoreConfig() { + cfg.adjust() + if cfg.Config.Concurrency == 0 { cfg.Config.Concurrency = defaultRestoreConcurrency } diff --git a/pkg/task/restore_log.go b/pkg/task/restore_log.go index 00acc4491..839a60a7d 100644 --- a/pkg/task/restore_log.go +++ b/pkg/task/restore_log.go @@ -72,6 +72,8 @@ func (cfg *LogRestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error { // we should set proper value in this function. // so that both binary and TiDB will use same default value. func (cfg *LogRestoreConfig) adjustRestoreConfig() { + cfg.adjust() + if cfg.Config.Concurrency == 0 { cfg.Config.Concurrency = defaultRestoreConcurrency } diff --git a/pkg/task/restore_raw.go b/pkg/task/restore_raw.go index b7137d9f9..0aabb01a4 100644 --- a/pkg/task/restore_raw.go +++ b/pkg/task/restore_raw.go @@ -47,6 +47,8 @@ func (cfg *RestoreRawConfig) ParseFromFlags(flags *pflag.FlagSet) error { } func (cfg *RestoreRawConfig) adjust() { + cfg.Config.adjust() + if cfg.Concurrency == 0 { cfg.Concurrency = defaultRestoreConcurrency }