From e7a37303abbd67aa1d109d72fd7041aa347a739b Mon Sep 17 00:00:00 2001 From: Mostafa Date: Mon, 11 Dec 2023 14:41:49 +0800 Subject: [PATCH 1/7] fix(network): refining the connection limit --- config/config.go | 4 +-- config/example_config.toml | 10 +++---- network/config.go | 12 ++++++--- network/config_test.go | 22 +++++++++++++++ network/gater.go | 24 ++++++++--------- network/gater_test.go | 10 +++++++ network/network.go | 17 +++++++++--- network/network_test.go | 1 - network/peermgr.go | 4 +-- network/utils.go | 55 +++++++++++++++++--------------------- tests/main_test.go | 1 - 11 files changed, 96 insertions(+), 64 deletions(-) diff --git a/config/config.go b/config/config.go index b1e7cad29..20a2a1548 100644 --- a/config/config.go +++ b/config/config.go @@ -107,8 +107,7 @@ func DefaultConfigTestnet() *Config { "/ip4/139.162.153.10/tcp/4002/p2p/12D3KooWNR79jqHVVNhNVrqnDbxbJJze4VjbEsBjZhz6mkvinHAN", "/ip4/188.121.102.178/tcp/4002/p2p/12D3KooWCRHn8vjrKNBEQcut8uVCYX5q77RKidPaE6iMK31qEVHb", } - conf.Network.MinConns = 16 - conf.Network.MaxConns = 32 + conf.Network.MaxConns = 64 conf.Network.EnableNATService = false conf.Network.EnableUPnP = false conf.Network.EnableRelay = true @@ -133,7 +132,6 @@ func DefaultConfigLocalnet() *Config { conf.Network.EnableNATService = false conf.Network.EnableUPnP = false conf.Network.BootstrapAddrStrings = []string{} - conf.Network.MinConns = 0 conf.Network.MaxConns = 0 conf.Network.NetworkName = "pactus-localnet" conf.Network.DefaultPort = 21666 diff --git a/config/example_config.toml b/config/example_config.toml index 5bf955b27..5d144df86 100644 --- a/config/example_config.toml +++ b/config/example_config.toml @@ -36,13 +36,9 @@ # These addresses are used by the Pactus node to discover and connect to other peers on the network. ## bootstrap_addrs = [] - # `min_connections` is the minimum number of connections that the Pactus node should maintain. - # Default is `16`. - ## min_connections = 16 - - # `max_connections` is the maximum number of connections that the Pactus node should maintain. - # Default is `32`. - ## max_connections = 32 + # `max_connections` is the maximum number of connections that the Pactus node maintains. + # Default is `64`. + ## max_connections = 64 # `enable_nat_service` provides a service to other peer for determining their reachability status. # Default is `false`. diff --git a/network/config.go b/network/config.go index 8ecea1c03..d88d31b79 100644 --- a/network/config.go +++ b/network/config.go @@ -13,7 +13,6 @@ type Config struct { ListenAddrStrings []string `toml:"listen_addrs"` RelayAddrStrings []string `toml:"relay_addrs"` BootstrapAddrStrings []string `toml:"bootstrap_addrs"` - MinConns int `toml:"min_connections"` MaxConns int `toml:"max_connections"` EnableNATService bool `toml:"enable_nat_service"` EnableUPnP bool `toml:"enable_upnp"` @@ -36,8 +35,7 @@ func DefaultConfig() *Config { ListenAddrStrings: []string{}, RelayAddrStrings: []string{}, BootstrapAddrStrings: []string{}, - MinConns: 16, - MaxConns: 32, + MaxConns: 64, EnableNATService: false, EnableUPnP: false, EnableRelay: false, @@ -115,3 +113,11 @@ func (conf *Config) IsBootstrapper(pid lp2pcore.PeerID) bool { return false } + +func (conf *Config) ScaledMaxConns() int { + return util.LogScale(conf.MaxConns) +} + +func (conf *Config) ScaledMinConns() int { + return conf.ScaledMaxConns() / 4 +} diff --git a/network/config_test.go b/network/config_test.go index f17be158b..6a1264ed2 100644 --- a/network/config_test.go +++ b/network/config_test.go @@ -125,3 +125,25 @@ func TestIsBootstrapper(t *testing.T) { assert.True(t, conf.IsBootstrapper(pid2)) assert.True(t, conf.IsBootstrapper(pid3)) } + +func TestScaledConns(t *testing.T) { + tests := []struct { + config Config + expectedMax int + expectedMin int + }{ + {Config{MaxConns: 1}, 1, 0}, + {Config{MaxConns: 8}, 8, 2}, + {Config{MaxConns: 30}, 32, 8}, + {Config{MaxConns: 1000}, 1024, 256}, + } + + for _, test := range tests { + resultMax := test.config.ScaledMaxConns() + resultMin := test.config.ScaledMinConns() + if resultMax != test.expectedMax || resultMin != test.expectedMin { + t.Errorf("For MaxConns %d, NormedMaxConns() returned %d (expected %d), NormedMinConns() returned %d (expected %d)", + test.config.MaxConns, resultMax, test.expectedMax, resultMin, test.expectedMin) + } + } +} diff --git a/network/gater.go b/network/gater.go index e05648118..df19d2f5e 100644 --- a/network/gater.go +++ b/network/gater.go @@ -16,10 +16,10 @@ var _ lp2pconnmgr.ConnectionGater = &ConnectionGater{} type ConnectionGater struct { lk sync.RWMutex - filters *multiaddr.Filters - peerMgr *peerMgr - maxConn int - logger *logger.SubLogger + filters *multiaddr.Filters + peerMgr *peerMgr + ConnsLimit int + logger *logger.SubLogger } func NewConnectionGater(conf *Config, log *logger.SubLogger) (*ConnectionGater, error) { @@ -30,9 +30,9 @@ func NewConnectionGater(conf *Config, log *logger.SubLogger) (*ConnectionGater, } return &ConnectionGater{ - filters: filters, - maxConn: conf.MaxConns, - logger: log, + filters: filters, + ConnsLimit: conf.ScaledMaxConns() * 2, + logger: log, }, nil } @@ -43,19 +43,19 @@ func (g *ConnectionGater) SetPeerManager(peerMgr *peerMgr) { g.peerMgr = peerMgr } -func (g *ConnectionGater) hasMaxConnections() bool { +func (g *ConnectionGater) onConnectionLimit() bool { if g.peerMgr == nil { return false } - return g.peerMgr.NumOfConnected() > g.maxConn + return g.peerMgr.NumOfConnected() > g.ConnsLimit } func (g *ConnectionGater) InterceptPeerDial(pid lp2ppeer.ID) bool { g.lk.RLock() defer g.lk.RUnlock() - if g.hasMaxConnections() { + if g.onConnectionLimit() { g.logger.Debug("InterceptPeerDial rejected: many connections", "pid", pid) return false } @@ -67,7 +67,7 @@ func (g *ConnectionGater) InterceptAddrDial(pid lp2ppeer.ID, ma multiaddr.Multia g.lk.RLock() defer g.lk.RUnlock() - if g.hasMaxConnections() { + if g.onConnectionLimit() { g.logger.Debug("InterceptAddrDial rejected: many connections", "pid", pid, "ma", ma.String()) return false } @@ -85,7 +85,7 @@ func (g *ConnectionGater) InterceptAccept(cma lp2pnetwork.ConnMultiaddrs) bool { g.lk.RLock() defer g.lk.RUnlock() - if g.hasMaxConnections() { + if g.onConnectionLimit() { g.logger.Debug("InterceptAccept rejected: many connections") return false } diff --git a/network/gater_test.go b/network/gater_test.go index 645af4b0f..ab55a0b64 100644 --- a/network/gater_test.go +++ b/network/gater_test.go @@ -62,6 +62,7 @@ func TestMaxConnection(t *testing.T) { ts := testsuite.NewTestSuite(t) conf := testConfig() conf.MaxConns = 1 + assert.Equal(t, conf.ScaledMaxConns(), 1) net := makeTestNetwork(t, conf, nil) maPrivate := multiaddr.StringCast("/ip4/127.0.0.1/tcp/1234") @@ -75,6 +76,15 @@ func TestMaxConnection(t *testing.T) { net.peerMgr.AddPeer(ts.RandPeerID(), multiaddr.StringCast("/ip4/3.3.3.3/tcp/1234"), lp2pnetwork.DirInbound) + assert.True(t, net.connGater.InterceptPeerDial(pid)) + assert.True(t, net.connGater.InterceptAddrDial(pid, maPrivate)) + assert.True(t, net.connGater.InterceptAddrDial(pid, maPublic)) + assert.True(t, net.connGater.InterceptAccept(cmaPrivate)) + assert.True(t, net.connGater.InterceptAccept(cmaPublic)) + + net.peerMgr.AddPeer(ts.RandPeerID(), + multiaddr.StringCast("/ip4/4.4.4.4/tcp/1234"), lp2pnetwork.DirInbound) + assert.False(t, net.connGater.InterceptPeerDial(pid)) assert.False(t, net.connGater.InterceptAddrDial(pid, maPrivate)) assert.False(t, net.connGater.InterceptAddrDial(pid, maPublic)) diff --git a/network/network.go b/network/network.go index d15d1b910..88e4d55b6 100644 --- a/network/network.go +++ b/network/network.go @@ -112,23 +112,32 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo opts = append(opts, lp2p.DisableMetrics()) } - limit := MakeScalingLimitConfig(conf.MinConns, conf.MaxConns) + defLimit := lp2prcmgr.DefaultLimits.AutoScale() + limit := BuildConcreteLimitConfig(conf.MaxConns) resMgr, err := lp2prcmgr.NewResourceManager( - lp2prcmgr.NewFixedLimiter(limit.AutoScale()), + lp2prcmgr.NewFixedLimiter(limit), rcMgrOpt..., ) if err != nil { return nil, LibP2PError{Err: err} } + log.Info("resource manager created", "limit", defLimit) + // https://github.com/libp2p/go-libp2p/issues/2616 + // The connection manager doesn't reject any connections. + // It just triggers a pruning run once the high watermark is reached (or surpassed). + + lowWM := conf.ScaledMinConns() // Low Watermark + highWM := conf.ScaledMaxConns() // High Watermark + highWM -= (highWM - lowWM) / 2 connMgr, err := lp2pconnmgr.NewConnManager( - conf.MinConns, // Low Watermark - conf.MaxConns, // High Watermark + lowWM, highWM, lp2pconnmgr.WithGracePeriod(time.Minute), ) if err != nil { return nil, LibP2PError{Err: err} } + log.Info("connection manager created", "lowWM", lowWM, "highWM", highWM) opts = append(opts, lp2p.Identity(networkKey), diff --git a/network/network_test.go b/network/network_test.go index fd6e8a285..61b0c550a 100644 --- a/network/network_test.go +++ b/network/network_test.go @@ -68,7 +68,6 @@ func testConfig() *Config { ListenAddrStrings: []string{}, NetworkKey: util.TempFilePath(), BootstrapAddrStrings: []string{}, - MinConns: 4, MaxConns: 8, EnableNATService: false, EnableUPnP: false, diff --git a/network/peermgr.go b/network/peermgr.go index 7b290d0fd..fb8b786ac 100644 --- a/network/peermgr.go +++ b/network/peermgr.go @@ -39,8 +39,8 @@ func newPeerMgr(ctx context.Context, h lp2phost.Host, b := &peerMgr{ ctx: ctx, bootstrapAddrs: conf.BootstrapAddrInfos(), - minConns: conf.MinConns, - maxConns: conf.MaxConns, + minConns: conf.ScaledMaxConns(), + maxConns: conf.ScaledMinConns(), peers: make(map[lp2ppeer.ID]*peerInfo), host: h, logger: log, diff --git a/network/utils.go b/network/utils.go index 184c200f0..6f87be3cb 100644 --- a/network/utils.go +++ b/network/utils.go @@ -13,7 +13,6 @@ import ( lp2prcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "github.com/multiformats/go-multiaddr" "github.com/pactus-project/pactus/crypto/hash" - "github.com/pactus-project/pactus/util" "github.com/pactus-project/pactus/util/logger" ) @@ -131,36 +130,30 @@ func SubnetsToFilters(subnets []*net.IPNet, action multiaddr.Action) *multiaddr. return filters } -func MakeScalingLimitConfig(minConns, maxConns int) lp2prcmgr.ScalingLimitConfig { - limit := lp2prcmgr.DefaultLimits - - limit.SystemBaseLimit.ConnsOutbound = util.LogScale(maxConns / 2) - limit.SystemBaseLimit.ConnsInbound = util.LogScale(maxConns / 2) - limit.SystemBaseLimit.Conns = util.LogScale(maxConns) - limit.SystemBaseLimit.StreamsOutbound = util.LogScale(maxConns / 2) - limit.SystemBaseLimit.StreamsInbound = util.LogScale(maxConns / 2) - limit.SystemBaseLimit.Streams = util.LogScale(maxConns) - - limit.ServiceLimitIncrease.ConnsOutbound = util.LogScale(minConns / 2) - limit.ServiceLimitIncrease.ConnsInbound = util.LogScale(minConns / 2) - limit.ServiceLimitIncrease.Conns = util.LogScale(minConns) - limit.ServiceLimitIncrease.StreamsOutbound = util.LogScale(minConns / 2) - limit.ServiceLimitIncrease.StreamsInbound = util.LogScale(minConns / 2) - limit.ServiceLimitIncrease.Streams = util.LogScale(minConns) - - limit.TransientBaseLimit.ConnsOutbound = util.LogScale(maxConns / 2) - limit.TransientBaseLimit.ConnsInbound = util.LogScale(maxConns / 2) - limit.TransientBaseLimit.Conns = util.LogScale(maxConns) - limit.TransientBaseLimit.StreamsOutbound = util.LogScale(maxConns / 2) - limit.TransientBaseLimit.StreamsInbound = util.LogScale(maxConns / 2) - limit.TransientBaseLimit.Streams = util.LogScale(maxConns) - - limit.TransientLimitIncrease.ConnsInbound = util.LogScale(minConns / 2) - limit.TransientLimitIncrease.Conns = util.LogScale(minConns) - limit.TransientLimitIncrease.StreamsInbound = util.LogScale(minConns / 2) - limit.TransientLimitIncrease.Streams = util.LogScale(minConns) - - return limit +func BuildConcreteLimitConfig(maxConns int) lp2prcmgr.ConcreteLimitConfig { + changes := lp2prcmgr.PartialLimitConfig{} + + updateResourceLimits := func(limit *lp2prcmgr.ResourceLimits, maxConns, coefficient int) { + maxConnVal := lp2prcmgr.LimitVal(maxConns * coefficient) + + limit.ConnsOutbound = maxConnVal + limit.ConnsInbound = maxConnVal + limit.Conns = maxConnVal * 2 + limit.StreamsOutbound = maxConnVal * 8 + limit.StreamsInbound = maxConnVal * 8 + limit.Streams = maxConnVal * 16 + } + + updateResourceLimits(&changes.System, maxConns, 1) + updateResourceLimits(&changes.ServiceDefault, maxConns, 1) + updateResourceLimits(&changes.ProtocolDefault, maxConns, 1) + updateResourceLimits(&changes.ProtocolPeerDefault, maxConns, 1) + updateResourceLimits(&changes.Transient, maxConns, 1) + + defaultLimitConfig := lp2prcmgr.DefaultLimits.AutoScale() + changedLimitConfig := changes.Build(defaultLimitConfig) + + return changedLimitConfig } func MessageIDFunc(m *lp2pspb.Message) string { diff --git a/tests/main_test.go b/tests/main_test.go index 6f48ba35d..f33ed244f 100644 --- a/tests/main_test.go +++ b/tests/main_test.go @@ -86,7 +86,6 @@ func TestMain(m *testing.M) { tConfigs[i].Network.NetworkName = "test" tConfigs[i].Network.ListenAddrStrings = []string{"/ip4/127.0.0.1/tcp/0", "/ip4/127.0.0.1/udp/0/quic-v1"} tConfigs[i].Network.BootstrapAddrStrings = []string{} - tConfigs[i].Network.MinConns = 4 tConfigs[i].Network.MaxConns = 8 tConfigs[i].HTTP.Enable = false tConfigs[i].GRPC.Enable = false From 5c9b137773c32cd10fde6b6896c5144f344b0404 Mon Sep 17 00:00:00 2001 From: Mostafa Date: Mon, 11 Dec 2023 21:35:29 +0800 Subject: [PATCH 2/7] fix: updating connsLimit for connection gater --- network/gater.go | 9 ++++++--- network/network.go | 2 -- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/network/gater.go b/network/gater.go index df19d2f5e..320d0f708 100644 --- a/network/gater.go +++ b/network/gater.go @@ -18,7 +18,7 @@ type ConnectionGater struct { filters *multiaddr.Filters peerMgr *peerMgr - ConnsLimit int + connsLimit int logger *logger.SubLogger } @@ -29,9 +29,12 @@ func NewConnectionGater(conf *Config, log *logger.SubLogger) (*ConnectionGater, filters = SubnetsToFilters(privateSubnets, multiaddr.ActionDeny) } + connsLimit := conf.ScaledMaxConns() + connsLimit += (conf.ScaledMaxConns() - conf.ScaledMinConns()) / 2 + log.Info("connection gater created", "connsLimit", connsLimit) return &ConnectionGater{ filters: filters, - ConnsLimit: conf.ScaledMaxConns() * 2, + connsLimit: connsLimit, logger: log, }, nil } @@ -48,7 +51,7 @@ func (g *ConnectionGater) onConnectionLimit() bool { return false } - return g.peerMgr.NumOfConnected() > g.ConnsLimit + return g.peerMgr.NumOfConnected() > g.connsLimit } func (g *ConnectionGater) InterceptPeerDial(pid lp2ppeer.ID) bool { diff --git a/network/network.go b/network/network.go index 88e4d55b6..12fcc4587 100644 --- a/network/network.go +++ b/network/network.go @@ -112,7 +112,6 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo opts = append(opts, lp2p.DisableMetrics()) } - defLimit := lp2prcmgr.DefaultLimits.AutoScale() limit := BuildConcreteLimitConfig(conf.MaxConns) resMgr, err := lp2prcmgr.NewResourceManager( lp2prcmgr.NewFixedLimiter(limit), @@ -121,7 +120,6 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo if err != nil { return nil, LibP2PError{Err: err} } - log.Info("resource manager created", "limit", defLimit) // https://github.com/libp2p/go-libp2p/issues/2616 // The connection manager doesn't reject any connections. From 27029588029e053f43957ba1e51c6d08ac1d65d1 Mon Sep 17 00:00:00 2001 From: Mostafa Date: Mon, 11 Dec 2023 21:38:38 +0800 Subject: [PATCH 3/7] chore: updating relay log --- network/network.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/network.go b/network/network.go index 12fcc4587..ec5a4aaff 100644 --- a/network/network.go +++ b/network/network.go @@ -161,7 +161,7 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo } if conf.EnableRelay { - log.Info("relay enabled", "relay addrs", conf.RelayAddrStrings) + log.Info("relay enabled", "addrInfos", conf.RelayAddrInfos()) opts = append(opts, lp2p.EnableRelay(), lp2p.EnableAutoRelayWithStaticRelays(conf.RelayAddrInfos()), From 24b93c308f4be60db06a6e21d7df91f874f3643c Mon Sep 17 00:00:00 2001 From: Mostafa Date: Mon, 11 Dec 2023 21:48:27 +0800 Subject: [PATCH 4/7] test: updating connection gater test --- network/gater_test.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/network/gater_test.go b/network/gater_test.go index ab55a0b64..981a866f1 100644 --- a/network/gater_test.go +++ b/network/gater_test.go @@ -61,8 +61,9 @@ func TestDenyPrivate(t *testing.T) { func TestMaxConnection(t *testing.T) { ts := testsuite.NewTestSuite(t) conf := testConfig() - conf.MaxConns = 1 - assert.Equal(t, conf.ScaledMaxConns(), 1) + conf.MaxConns = 4 + assert.Equal(t, conf.ScaledMinConns(), 1) + assert.Equal(t, conf.ScaledMaxConns(), 4) net := makeTestNetwork(t, conf, nil) maPrivate := multiaddr.StringCast("/ip4/127.0.0.1/tcp/1234") @@ -72,9 +73,15 @@ func TestMaxConnection(t *testing.T) { pid := ts.RandPeerID() net.peerMgr.AddPeer(ts.RandPeerID(), - multiaddr.StringCast("/ip4/2.2.2.2/tcp/1234"), lp2pnetwork.DirInbound) + multiaddr.StringCast("/ip4/1.1.1.1/tcp/1234"), lp2pnetwork.DirInbound) + net.peerMgr.AddPeer(ts.RandPeerID(), + multiaddr.StringCast("/ip4/2.2.2.2/tcp/1234"), lp2pnetwork.DirOutbound) net.peerMgr.AddPeer(ts.RandPeerID(), multiaddr.StringCast("/ip4/3.3.3.3/tcp/1234"), lp2pnetwork.DirInbound) + net.peerMgr.AddPeer(ts.RandPeerID(), + multiaddr.StringCast("/ip4/4.4.4.4/tcp/1234"), lp2pnetwork.DirInbound) + net.peerMgr.AddPeer(ts.RandPeerID(), + multiaddr.StringCast("/ip4/5.5.5.5/tcp/1234"), lp2pnetwork.DirInbound) assert.True(t, net.connGater.InterceptPeerDial(pid)) assert.True(t, net.connGater.InterceptAddrDial(pid, maPrivate)) @@ -83,7 +90,7 @@ func TestMaxConnection(t *testing.T) { assert.True(t, net.connGater.InterceptAccept(cmaPublic)) net.peerMgr.AddPeer(ts.RandPeerID(), - multiaddr.StringCast("/ip4/4.4.4.4/tcp/1234"), lp2pnetwork.DirInbound) + multiaddr.StringCast("/ip4/6.6.6.6/tcp/1234"), lp2pnetwork.DirInbound) assert.False(t, net.connGater.InterceptPeerDial(pid)) assert.False(t, net.connGater.InterceptAddrDial(pid, maPrivate)) From d970675ec3787ff6374ed0d5bac7fb5c1e3b5486 Mon Sep 17 00:00:00 2001 From: b00f Date: Mon, 11 Dec 2023 22:43:57 +0800 Subject: [PATCH 5/7] Update network/peermgr.go Co-authored-by: Kay --- network/peermgr.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/peermgr.go b/network/peermgr.go index fb8b786ac..a04c00a18 100644 --- a/network/peermgr.go +++ b/network/peermgr.go @@ -39,7 +39,7 @@ func newPeerMgr(ctx context.Context, h lp2phost.Host, b := &peerMgr{ ctx: ctx, bootstrapAddrs: conf.BootstrapAddrInfos(), - minConns: conf.ScaledMaxConns(), + minConns: conf.ScaledMinConns(), maxConns: conf.ScaledMinConns(), peers: make(map[lp2ppeer.ID]*peerInfo), host: h, From d32688d34b6cedfc336ee8bb958dd97fc668d212 Mon Sep 17 00:00:00 2001 From: b00f Date: Mon, 11 Dec 2023 22:44:03 +0800 Subject: [PATCH 6/7] Update network/peermgr.go Co-authored-by: Kay --- network/peermgr.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/peermgr.go b/network/peermgr.go index a04c00a18..68834678e 100644 --- a/network/peermgr.go +++ b/network/peermgr.go @@ -40,7 +40,7 @@ func newPeerMgr(ctx context.Context, h lp2phost.Host, ctx: ctx, bootstrapAddrs: conf.BootstrapAddrInfos(), minConns: conf.ScaledMinConns(), - maxConns: conf.ScaledMinConns(), + maxConns: conf.ScaledMaxConns(), peers: make(map[lp2ppeer.ID]*peerInfo), host: h, logger: log, From 7dcbcd38df803ead3117fc9b00834bd7d8e3dc7e Mon Sep 17 00:00:00 2001 From: Mostafa Date: Mon, 11 Dec 2023 23:35:00 +0800 Subject: [PATCH 7/7] fix: defining connection threshold --- network/config.go | 4 ++++ network/config_test.go | 30 ++++++++++++++++++++---------- network/gater.go | 3 +-- network/gater_test.go | 23 +++++++++-------------- network/network.go | 5 ++--- network/utils.go | 12 ++++++------ 6 files changed, 42 insertions(+), 35 deletions(-) diff --git a/network/config.go b/network/config.go index d88d31b79..20b624906 100644 --- a/network/config.go +++ b/network/config.go @@ -121,3 +121,7 @@ func (conf *Config) ScaledMaxConns() int { func (conf *Config) ScaledMinConns() int { return conf.ScaledMaxConns() / 4 } + +func (conf *Config) ConnsThreshold() int { + return conf.ScaledMaxConns() / 8 +} diff --git a/network/config_test.go b/network/config_test.go index 6a1264ed2..b53ac50e7 100644 --- a/network/config_test.go +++ b/network/config_test.go @@ -128,22 +128,32 @@ func TestIsBootstrapper(t *testing.T) { func TestScaledConns(t *testing.T) { tests := []struct { - config Config - expectedMax int - expectedMin int + config Config + expectedMax int + expectedMin int + expectedThreshold int }{ - {Config{MaxConns: 1}, 1, 0}, - {Config{MaxConns: 8}, 8, 2}, - {Config{MaxConns: 30}, 32, 8}, - {Config{MaxConns: 1000}, 1024, 256}, + {Config{MaxConns: 1}, 1, 0, 0}, + {Config{MaxConns: 8}, 8, 2, 1}, + {Config{MaxConns: 30}, 32, 8, 4}, + {Config{MaxConns: 1000}, 1024, 256, 128}, } for _, test := range tests { resultMax := test.config.ScaledMaxConns() resultMin := test.config.ScaledMinConns() - if resultMax != test.expectedMax || resultMin != test.expectedMin { - t.Errorf("For MaxConns %d, NormedMaxConns() returned %d (expected %d), NormedMinConns() returned %d (expected %d)", - test.config.MaxConns, resultMax, test.expectedMax, resultMin, test.expectedMin) + resultThreshold := test.config.ConnsThreshold() + if resultMax != test.expectedMax || + resultMin != test.expectedMin || + resultThreshold != test.expectedThreshold { + t.Errorf("For MaxConns %d, "+ + "NormedMaxConns() returned %d (expected %d), "+ + "NormedMinConns() returned %d (expected %d), "+ + "ConnsThreshold() returned %d (expected %d)", + test.config.MaxConns, + resultMax, test.expectedMax, + resultMin, test.expectedMin, + resultThreshold, test.expectedThreshold) } } } diff --git a/network/gater.go b/network/gater.go index 320d0f708..ae32b59dc 100644 --- a/network/gater.go +++ b/network/gater.go @@ -29,8 +29,7 @@ func NewConnectionGater(conf *Config, log *logger.SubLogger) (*ConnectionGater, filters = SubnetsToFilters(privateSubnets, multiaddr.ActionDeny) } - connsLimit := conf.ScaledMaxConns() - connsLimit += (conf.ScaledMaxConns() - conf.ScaledMinConns()) / 2 + connsLimit := conf.ScaledMaxConns() + conf.ConnsThreshold() log.Info("connection gater created", "connsLimit", connsLimit) return &ConnectionGater{ filters: filters, diff --git a/network/gater_test.go b/network/gater_test.go index 981a866f1..4fb25523f 100644 --- a/network/gater_test.go +++ b/network/gater_test.go @@ -61,9 +61,10 @@ func TestDenyPrivate(t *testing.T) { func TestMaxConnection(t *testing.T) { ts := testsuite.NewTestSuite(t) conf := testConfig() - conf.MaxConns = 4 - assert.Equal(t, conf.ScaledMinConns(), 1) - assert.Equal(t, conf.ScaledMaxConns(), 4) + conf.MaxConns = 8 + assert.Equal(t, conf.ScaledMinConns(), 2) + assert.Equal(t, conf.ScaledMaxConns(), 8) + assert.Equal(t, conf.ConnsThreshold(), 1) net := makeTestNetwork(t, conf, nil) maPrivate := multiaddr.StringCast("/ip4/127.0.0.1/tcp/1234") @@ -72,16 +73,10 @@ func TestMaxConnection(t *testing.T) { cmaPublic := &mockConnMultiaddrs{remote: maPublic} pid := ts.RandPeerID() - net.peerMgr.AddPeer(ts.RandPeerID(), - multiaddr.StringCast("/ip4/1.1.1.1/tcp/1234"), lp2pnetwork.DirInbound) - net.peerMgr.AddPeer(ts.RandPeerID(), - multiaddr.StringCast("/ip4/2.2.2.2/tcp/1234"), lp2pnetwork.DirOutbound) - net.peerMgr.AddPeer(ts.RandPeerID(), - multiaddr.StringCast("/ip4/3.3.3.3/tcp/1234"), lp2pnetwork.DirInbound) - net.peerMgr.AddPeer(ts.RandPeerID(), - multiaddr.StringCast("/ip4/4.4.4.4/tcp/1234"), lp2pnetwork.DirInbound) - net.peerMgr.AddPeer(ts.RandPeerID(), - multiaddr.StringCast("/ip4/5.5.5.5/tcp/1234"), lp2pnetwork.DirInbound) + for i := 0; i < 9; i++ { + net.peerMgr.AddPeer(ts.RandPeerID(), + multiaddr.StringCast("/ip4/1.1.1.1/tcp/1234"), lp2pnetwork.DirInbound) + } assert.True(t, net.connGater.InterceptPeerDial(pid)) assert.True(t, net.connGater.InterceptAddrDial(pid, maPrivate)) @@ -90,7 +85,7 @@ func TestMaxConnection(t *testing.T) { assert.True(t, net.connGater.InterceptAccept(cmaPublic)) net.peerMgr.AddPeer(ts.RandPeerID(), - multiaddr.StringCast("/ip4/6.6.6.6/tcp/1234"), lp2pnetwork.DirInbound) + multiaddr.StringCast("/ip4/1.1.1.1/tcp/1234"), lp2pnetwork.DirInbound) assert.False(t, net.connGater.InterceptPeerDial(pid)) assert.False(t, net.connGater.InterceptAddrDial(pid, maPrivate)) diff --git a/network/network.go b/network/network.go index ec5a4aaff..834aff6d3 100644 --- a/network/network.go +++ b/network/network.go @@ -125,9 +125,8 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo // The connection manager doesn't reject any connections. // It just triggers a pruning run once the high watermark is reached (or surpassed). - lowWM := conf.ScaledMinConns() // Low Watermark - highWM := conf.ScaledMaxConns() // High Watermark - highWM -= (highWM - lowWM) / 2 + lowWM := conf.ScaledMinConns() // Low Watermark + highWM := conf.ScaledMaxConns() - conf.ConnsThreshold() // High Watermark connMgr, err := lp2pconnmgr.NewConnManager( lowWM, highWM, lp2pconnmgr.WithGracePeriod(time.Minute), diff --git a/network/utils.go b/network/utils.go index 6f87be3cb..d9fff03ec 100644 --- a/network/utils.go +++ b/network/utils.go @@ -136,12 +136,12 @@ func BuildConcreteLimitConfig(maxConns int) lp2prcmgr.ConcreteLimitConfig { updateResourceLimits := func(limit *lp2prcmgr.ResourceLimits, maxConns, coefficient int) { maxConnVal := lp2prcmgr.LimitVal(maxConns * coefficient) - limit.ConnsOutbound = maxConnVal - limit.ConnsInbound = maxConnVal - limit.Conns = maxConnVal * 2 - limit.StreamsOutbound = maxConnVal * 8 - limit.StreamsInbound = maxConnVal * 8 - limit.Streams = maxConnVal * 16 + limit.ConnsOutbound = maxConnVal / 2 + limit.ConnsInbound = maxConnVal / 2 + limit.Conns = maxConnVal + limit.StreamsOutbound = maxConnVal * 4 + limit.StreamsInbound = maxConnVal * 4 + limit.Streams = maxConnVal * 8 } updateResourceLimits(&changes.System, maxConns, 1)