From 972421e438a7adf3bde85c68af16f134fb91b68b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?xinfan=2Ewu=28=E5=90=B4=E6=AD=86=E5=B8=86=29?= Date: Sun, 16 Jun 2024 17:20:26 +0800 Subject: [PATCH 1/8] refact:rename the variable name in TCP reconnect --- transport/client.go | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/transport/client.go b/transport/client.go index 4b17f94..49d0497 100644 --- a/transport/client.go +++ b/transport/client.go @@ -41,10 +41,10 @@ import ( ) const ( - reconnectInterval = 3e8 // 300ms - connectInterval = 5e8 // 500ms - connectTimeout = 3e9 - maxTimes = 10 + defaultReconnectInterval = 3e8 // 300ms + connectInterval = 5e8 // 500ms + connectTimeout = 3e9 + defaultMaxReconnectAttempts = 10 ) var ( @@ -423,13 +423,13 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) { // a for-loop connect to make sure the connection pool is valid func (c *client) reConnect() { var ( - num, max, times, interval int - maxDuration int64 + sessionNum, maxReconnectAttempts, reconnectAttempts, reconnectInterval int + maxReconnectInterval int64 ) - max = c.number - interval = c.reconnectInterval - if interval == 0 { - interval = reconnectInterval + maxReconnectAttempts = c.number + reconnectInterval = c.reconnectInterval + if reconnectInterval == 0 { + reconnectInterval = defaultReconnectInterval } for { if c.IsClosed() { @@ -437,19 +437,19 @@ func (c *client) reConnect() { break } - num = c.sessionNum() - if max <= num || max < times { + sessionNum = c.sessionNum() + if maxReconnectAttempts <= sessionNum || maxReconnectAttempts < reconnectAttempts { //Exit when the number of connection pools is sufficient or the reconnection times exceeds the connections numbers. break } c.connect() - times++ - if times > maxTimes { - maxDuration = int64(maxTimes) * int64(interval) + reconnectAttempts++ + if reconnectAttempts > defaultMaxReconnectAttempts { + maxReconnectInterval = int64(defaultMaxReconnectAttempts) * int64(reconnectInterval) } else { - maxDuration = int64(times) * int64(interval) + maxReconnectInterval = int64(reconnectAttempts) * int64(reconnectInterval) } - <-gxtime.After(time.Duration(maxDuration)) + <-gxtime.After(time.Duration(maxReconnectInterval)) } } From 38deaf2e59453e23f8ce21483989b9a9818325ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?xinfan=2Ewu=28=E5=90=B4=E6=AD=86=E5=B8=86=29?= Date: Sun, 16 Jun 2024 17:22:49 +0800 Subject: [PATCH 2/8] refact:rename the variable name in TCP reconnect --- transport/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport/client.go b/transport/client.go index 49d0497..454e749 100644 --- a/transport/client.go +++ b/transport/client.go @@ -439,7 +439,7 @@ func (c *client) reConnect() { sessionNum = c.sessionNum() if maxReconnectAttempts <= sessionNum || maxReconnectAttempts < reconnectAttempts { - //Exit when the number of connection pools is sufficient or the reconnection times exceeds the connections numbers. + //exit reconnect when the number of connection pools is sufficient or the current reconnection attempts exceeds the max reconnection attempts. break } c.connect() From 8529f81af5378675ea8067a2d6cc68cf39248093 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?xinfan=2Ewu=28=E5=90=B4=E6=AD=86=E5=B8=86=29?= Date: Tue, 18 Jun 2024 21:08:15 +0800 Subject: [PATCH 3/8] improvement:add new client attribute maxReconnectAttempts. Co-authored-by: AlexStocks --- transport/client.go | 19 ++++++++++++------- transport/options.go | 17 +++++++++++++---- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/transport/client.go b/transport/client.go index 454e749..31df0ba 100644 --- a/transport/client.go +++ b/transport/client.go @@ -44,7 +44,8 @@ const ( defaultReconnectInterval = 3e8 // 300ms connectInterval = 5e8 // 500ms connectTimeout = 3e9 - defaultMaxReconnectAttempts = 10 + defaultMaxReconnectAttempts = 100 + maxBackOffTimes = 10 ) var ( @@ -423,14 +424,18 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) { // a for-loop connect to make sure the connection pool is valid func (c *client) reConnect() { var ( - sessionNum, maxReconnectAttempts, reconnectAttempts, reconnectInterval int - maxReconnectInterval int64 + sessionNum, maxReconnectAttempts, reconnectAttempts, reconnectInterval, connectionPoolSize int + maxReconnectInterval int64 ) - maxReconnectAttempts = c.number reconnectInterval = c.reconnectInterval if reconnectInterval == 0 { reconnectInterval = defaultReconnectInterval } + maxReconnectAttempts = c.maxReconnectAttempts + if maxReconnectAttempts == 0 { + maxReconnectAttempts = defaultMaxReconnectAttempts + } + connectionPoolSize = c.number for { if c.IsClosed() { log.Warnf("client{peer:%s} goroutine exit now.", c.addr) @@ -438,14 +443,14 @@ func (c *client) reConnect() { } sessionNum = c.sessionNum() - if maxReconnectAttempts <= sessionNum || maxReconnectAttempts < reconnectAttempts { + if connectionPoolSize <= sessionNum || maxReconnectAttempts < reconnectAttempts { //exit reconnect when the number of connection pools is sufficient or the current reconnection attempts exceeds the max reconnection attempts. break } c.connect() reconnectAttempts++ - if reconnectAttempts > defaultMaxReconnectAttempts { - maxReconnectInterval = int64(defaultMaxReconnectAttempts) * int64(reconnectInterval) + if reconnectAttempts > maxBackOffTimes { + maxReconnectInterval = int64(maxBackOffTimes) * int64(reconnectInterval) } else { maxReconnectInterval = int64(reconnectAttempts) * int64(reconnectInterval) } diff --git a/transport/options.go b/transport/options.go index d376e0c..d8a30e4 100644 --- a/transport/options.go +++ b/transport/options.go @@ -100,10 +100,10 @@ func WithServerTlsConfigBuilder(tlsConfigBuilder TlsConfigBuilder) ServerOption type ClientOption func(*ClientOptions) type ClientOptions struct { - addr string - number int - reconnectInterval int // reConnect Interval - + addr string + number int + reconnectInterval int // reConnect Interval + maxReconnectAttempts int // reconnectAttempts // tls sslEnabled bool tlsConfigBuilder TlsConfigBuilder @@ -168,3 +168,12 @@ func WithClientTlsConfigBuilder(tlsConfigBuilder TlsConfigBuilder) ClientOption o.tlsConfigBuilder = tlsConfigBuilder } } + +// WithReconnectInterval @maxReconnectAttempts is max reconnect attempts. +func WithReconnectAttempts(maxReconnectAttempts int) ClientOption { + return func(o *ClientOptions) { + if 0 < maxReconnectAttempts { + o.maxReconnectAttempts = maxReconnectAttempts + } + } +} From 5a0606135a1aaeee9e8501a05736dc4a16fda5f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?xinfan=2Ewu=28=E5=90=B4=E6=AD=86=E5=B8=86=29?= Date: Tue, 18 Jun 2024 21:13:11 +0800 Subject: [PATCH 4/8] refact:update annotation --- transport/options.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/transport/options.go b/transport/options.go index d8a30e4..f42b693 100644 --- a/transport/options.go +++ b/transport/options.go @@ -103,7 +103,7 @@ type ClientOptions struct { addr string number int reconnectInterval int // reConnect Interval - maxReconnectAttempts int // reconnectAttempts + maxReconnectAttempts int // max reconnect attempts // tls sslEnabled bool tlsConfigBuilder TlsConfigBuilder @@ -169,7 +169,7 @@ func WithClientTlsConfigBuilder(tlsConfigBuilder TlsConfigBuilder) ClientOption } } -// WithReconnectInterval @maxReconnectAttempts is max reconnect attempts. +// WithReconnectAttempts @maxReconnectAttempts is max reconnect attempts. func WithReconnectAttempts(maxReconnectAttempts int) ClientOption { return func(o *ClientOptions) { if 0 < maxReconnectAttempts { From 5bd38697c4216a8a15641409106805ed22325fba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?xinfan=2Ewu=28=E5=90=B4=E6=AD=86=E5=B8=86=29?= Date: Wed, 26 Jun 2024 19:28:02 +0800 Subject: [PATCH 5/8] change the initialization method of some variables in the reConnect(), adjust the defaultMaxReconnectAttempts from 100 to 50, and add the client's specification of the maxReconnectAttempts in the unit test --- transport/client.go | 14 +++++++------- transport/client_test.go | 1 + 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/transport/client.go b/transport/client.go index 31df0ba..08d079c 100644 --- a/transport/client.go +++ b/transport/client.go @@ -44,7 +44,7 @@ const ( defaultReconnectInterval = 3e8 // 300ms connectInterval = 5e8 // 500ms connectTimeout = 3e9 - defaultMaxReconnectAttempts = 100 + defaultMaxReconnectAttempts = 50 maxBackOffTimes = 10 ) @@ -424,18 +424,18 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) { // a for-loop connect to make sure the connection pool is valid func (c *client) reConnect() { var ( - sessionNum, maxReconnectAttempts, reconnectAttempts, reconnectInterval, connectionPoolSize int - maxReconnectInterval int64 + sessionNum, reconnectAttempts int + maxReconnectInterval int64 ) - reconnectInterval = c.reconnectInterval + reconnectInterval := c.reconnectInterval if reconnectInterval == 0 { reconnectInterval = defaultReconnectInterval } - maxReconnectAttempts = c.maxReconnectAttempts + maxReconnectAttempts := c.maxReconnectAttempts if maxReconnectAttempts == 0 { maxReconnectAttempts = defaultMaxReconnectAttempts } - connectionPoolSize = c.number + connPoolSize := c.number for { if c.IsClosed() { log.Warnf("client{peer:%s} goroutine exit now.", c.addr) @@ -443,7 +443,7 @@ func (c *client) reConnect() { } sessionNum = c.sessionNum() - if connectionPoolSize <= sessionNum || maxReconnectAttempts < reconnectAttempts { + if connPoolSize <= sessionNum || maxReconnectAttempts < reconnectAttempts { //exit reconnect when the number of connection pools is sufficient or the current reconnection attempts exceeds the max reconnection attempts. break } diff --git a/transport/client_test.go b/transport/client_test.go index 9f5aa87..d9d0e8c 100644 --- a/transport/client_test.go +++ b/transport/client_test.go @@ -115,6 +115,7 @@ func TestTCPClient(t *testing.T) { WithServerAddress(addr.String()), WithReconnectInterval(5e8), WithConnectionNumber(1), + WithReconnectAttempts(10), ) assert.NotNil(t, clt) assert.True(t, clt.ID() > 0) From 3f884995bee826487557fa2b4e54e95cf32ef3d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?xinfan=2Ewu=28=E5=90=B4=E6=AD=86=E5=B8=86=29?= Date: Thu, 4 Jul 2024 17:13:59 +0800 Subject: [PATCH 6/8] improve:add error handling in SetReadDeadline and SetWriteDeadline and simplify the maxReconnectInterval calculation --- transport/client.go | 15 ++++++++------- transport/session.go | 8 ++++++-- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/transport/client.go b/transport/client.go index 08d079c..fc63a44 100644 --- a/transport/client.go +++ b/transport/client.go @@ -22,6 +22,7 @@ import ( "crypto/x509" "encoding/pem" "fmt" + "math" "net" "os" "strings" @@ -208,14 +209,18 @@ func (c *client) dialUDP() Session { } // check connection alive by write/read action - conn.SetWriteDeadline(time.Now().Add(1e9)) + if err := conn.SetWriteDeadline(time.Now().Add(1e9)); err != nil { + log.Warnf("failed to set write deadline: %+v", err) + } if length, err = conn.Write(connectPingPackage[:]); err != nil { conn.Close() log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, perrors.WithStack(err)) <-gxtime.After(connectInterval) continue } - conn.SetReadDeadline(time.Now().Add(1e9)) + if err := conn.SetReadDeadline(time.Now().Add(1e9)); err != nil { + log.Warnf("failed to set read deadline: %+v", err) + } length, err = conn.Read(buf) if netErr, ok := perrors.Cause(err).(net.Error); ok && netErr.Timeout() { err = nil @@ -449,11 +454,7 @@ func (c *client) reConnect() { } c.connect() reconnectAttempts++ - if reconnectAttempts > maxBackOffTimes { - maxReconnectInterval = int64(maxBackOffTimes) * int64(reconnectInterval) - } else { - maxReconnectInterval = int64(reconnectAttempts) * int64(reconnectInterval) - } + maxReconnectInterval = int64(math.Min(float64(reconnectAttempts), float64(maxBackOffTimes))) * int64(reconnectInterval) <-gxtime.After(time.Duration(maxReconnectInterval)) } } diff --git a/transport/session.go b/transport/session.go index b760b7c..deee1a0 100644 --- a/transport/session.go +++ b/transport/session.go @@ -860,8 +860,12 @@ func (s *session) stop() { // let read/Write timeout asap now := time.Now() if conn := s.Conn(); conn != nil { - conn.SetReadDeadline(now.Add(s.ReadTimeout())) - conn.SetWriteDeadline(now.Add(s.WriteTimeout())) + if err := conn.SetReadDeadline(now.Add(s.ReadTimeout())); err != nil { + log.Warnf("failed to set read deadline: %+v", err) + } + if err := conn.SetWriteDeadline(now.Add(s.WriteTimeout())); err != nil { + log.Warnf("failed to set write deadline: %+v", err) + } } close(s.done) clt, cltFound := s.GetAttribute(sessionClientKey).(*client) From 12033ace432c43dac597db75418489724add0896 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?xinfan=2Ewu=28=E5=90=B4=E6=AD=86=E5=B8=86=29?= Date: Thu, 4 Jul 2024 19:22:10 +0800 Subject: [PATCH 7/8] improve:add TcpMaxReconnectAttempts field in GettySessionParam which indicates the max attempts in TCP reconnect behavior --- examples/echo/tcp-echo/client/app/config.go | 33 +++++++++++---------- examples/echo/tcp-echo/client/app/main.go | 2 +- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/examples/echo/tcp-echo/client/app/config.go b/examples/echo/tcp-echo/client/app/config.go index c49d77f..0b2adc5 100644 --- a/examples/echo/tcp-echo/client/app/config.go +++ b/examples/echo/tcp-echo/client/app/config.go @@ -42,22 +42,23 @@ var conf *Config type ( GettySessionParam struct { - CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"` - TcpNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"` - TcpKeepAlive bool `default:"true" yaml:"tcp_keep_alive" json:"tcp_keep_alive,omitempty"` - KeepAlivePeriod string `default:"180s" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"` - keepAlivePeriod time.Duration - TcpRBufSize int `default:"262144" yaml:"tcp_r_buf_size" json:"tcp_r_buf_size,omitempty"` - TcpWBufSize int `default:"65536" yaml:"tcp_w_buf_size" json:"tcp_w_buf_size,omitempty"` - PkgWQSize int `default:"1024" yaml:"pkg_wq_size" json:"pkg_wq_size,omitempty"` - TcpReadTimeout string `default:"1s" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"` - tcpReadTimeout time.Duration - TcpWriteTimeout string `default:"5s" yaml:"tcp_write_timeout" json:"tcp_write_timeout,omitempty"` - tcpWriteTimeout time.Duration - WaitTimeout string `default:"7s" yaml:"wait_timeout" json:"wait_timeout,omitempty"` - waitTimeout time.Duration - MaxMsgLen int `default:"1024" yaml:"max_msg_len" json:"max_msg_len,omitempty"` - SessionName string `default:"echo-client" yaml:"session_name" json:"session_name,omitempty"` + CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"` + TcpNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"` + TcpKeepAlive bool `default:"true" yaml:"tcp_keep_alive" json:"tcp_keep_alive,omitempty"` + KeepAlivePeriod string `default:"180s" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"` + keepAlivePeriod time.Duration + TcpRBufSize int `default:"262144" yaml:"tcp_r_buf_size" json:"tcp_r_buf_size,omitempty"` + TcpWBufSize int `default:"65536" yaml:"tcp_w_buf_size" json:"tcp_w_buf_size,omitempty"` + PkgWQSize int `default:"1024" yaml:"pkg_wq_size" json:"pkg_wq_size,omitempty"` + TcpReadTimeout string `default:"1s" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"` + tcpReadTimeout time.Duration + TcpWriteTimeout string `default:"5s" yaml:"tcp_write_timeout" json:"tcp_write_timeout,omitempty"` + tcpWriteTimeout time.Duration + WaitTimeout string `default:"7s" yaml:"wait_timeout" json:"wait_timeout,omitempty"` + waitTimeout time.Duration + MaxMsgLen int `default:"1024" yaml:"max_msg_len" json:"max_msg_len,omitempty"` + SessionName string `default:"echo-client" yaml:"session_name" json:"session_name,omitempty"` + TcpMaxReconnectAttempts int `default:"10" yaml:"tcp_max_retry_attempts" json:"tcp_max_retry_attempts,omitempty"` } // Config holds supported types by the multiconfig package diff --git a/examples/echo/tcp-echo/client/app/main.go b/examples/echo/tcp-echo/client/app/main.go index 916a9d8..2df3b39 100644 --- a/examples/echo/tcp-echo/client/app/main.go +++ b/examples/echo/tcp-echo/client/app/main.go @@ -124,7 +124,7 @@ func newSession(session getty.Session) error { func initClient() { clientOpts := []getty.ClientOption{getty.WithServerAddress(gxnet.HostAddress(conf.ServerHost, conf.ServerPort))} clientOpts = append(clientOpts, getty.WithClientTaskPool(taskPool)) - + clientOpts = append(clientOpts, getty.WithReconnectAttempts(conf.GettySessionParam.TcpMaxReconnectAttempts)) if conf.ConnectionNum != 0 { clientOpts = append(clientOpts, getty.WithConnectionNumber(conf.ConnectionNum)) } From bace946df10452f6b0753d9ac5e4fdfc7e5006c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?xinfan=2Ewu=28=E5=90=B4=E6=AD=86=E5=B8=86=29?= Date: Thu, 4 Jul 2024 19:25:02 +0800 Subject: [PATCH 8/8] change TcpMaxReconnectAttempts json tag --- examples/echo/tcp-echo/client/app/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/echo/tcp-echo/client/app/config.go b/examples/echo/tcp-echo/client/app/config.go index 0b2adc5..259d359 100644 --- a/examples/echo/tcp-echo/client/app/config.go +++ b/examples/echo/tcp-echo/client/app/config.go @@ -58,7 +58,7 @@ type ( waitTimeout time.Duration MaxMsgLen int `default:"1024" yaml:"max_msg_len" json:"max_msg_len,omitempty"` SessionName string `default:"echo-client" yaml:"session_name" json:"session_name,omitempty"` - TcpMaxReconnectAttempts int `default:"10" yaml:"tcp_max_retry_attempts" json:"tcp_max_retry_attempts,omitempty"` + TcpMaxReconnectAttempts int `default:"10" yaml:"tcp_max_reconnect_attempts" json:"tcp_max_reconnect_attempts,omitempty"` } // Config holds supported types by the multiconfig package