Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(network): refining the connection limit #849

Merged
merged 8 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ func DefaultConfigTestnet() *Config {
"/ip4/139.162.153.10/tcp/4002/p2p/12D3KooWNR79jqHVVNhNVrqnDbxbJJze4VjbEsBjZhz6mkvinHAN",
"/ip4/188.121.102.178/tcp/4002/p2p/12D3KooWCRHn8vjrKNBEQcut8uVCYX5q77RKidPaE6iMK31qEVHb",
}
conf.Network.MinConns = 16
conf.Network.MaxConns = 32
conf.Network.MaxConns = 64
conf.Network.EnableNATService = false
conf.Network.EnableUPnP = false
conf.Network.EnableRelay = true
Expand All @@ -133,7 +132,6 @@ func DefaultConfigLocalnet() *Config {
conf.Network.EnableNATService = false
conf.Network.EnableUPnP = false
conf.Network.BootstrapAddrStrings = []string{}
conf.Network.MinConns = 0
conf.Network.MaxConns = 0
conf.Network.NetworkName = "pactus-localnet"
conf.Network.DefaultPort = 21666
Expand Down
10 changes: 3 additions & 7 deletions config/example_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,9 @@
# These addresses are used by the Pactus node to discover and connect to other peers on the network.
## bootstrap_addrs = []

# `min_connections` is the minimum number of connections that the Pactus node should maintain.
# Default is `16`.
## min_connections = 16

# `max_connections` is the maximum number of connections that the Pactus node should maintain.
# Default is `32`.
## max_connections = 32
# `max_connections` is the maximum number of connections that the Pactus node maintains.
# Default is `64`.
## max_connections = 64

# `enable_nat_service` provides a service to other peer for determining their reachability status.
# Default is `false`.
Expand Down
16 changes: 13 additions & 3 deletions network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ type Config struct {
ListenAddrStrings []string `toml:"listen_addrs"`
RelayAddrStrings []string `toml:"relay_addrs"`
BootstrapAddrStrings []string `toml:"bootstrap_addrs"`
MinConns int `toml:"min_connections"`
MaxConns int `toml:"max_connections"`
EnableNATService bool `toml:"enable_nat_service"`
EnableUPnP bool `toml:"enable_upnp"`
Expand All @@ -36,8 +35,7 @@ func DefaultConfig() *Config {
ListenAddrStrings: []string{},
RelayAddrStrings: []string{},
BootstrapAddrStrings: []string{},
MinConns: 16,
MaxConns: 32,
MaxConns: 64,
EnableNATService: false,
EnableUPnP: false,
EnableRelay: false,
Expand Down Expand Up @@ -115,3 +113,15 @@ func (conf *Config) IsBootstrapper(pid lp2pcore.PeerID) bool {

return false
}

func (conf *Config) ScaledMaxConns() int {
return util.LogScale(conf.MaxConns)
}

func (conf *Config) ScaledMinConns() int {
return conf.ScaledMaxConns() / 4
}

func (conf *Config) ConnsThreshold() int {
return conf.ScaledMaxConns() / 8
}
32 changes: 32 additions & 0 deletions network/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,35 @@ func TestIsBootstrapper(t *testing.T) {
assert.True(t, conf.IsBootstrapper(pid2))
assert.True(t, conf.IsBootstrapper(pid3))
}

func TestScaledConns(t *testing.T) {
tests := []struct {
config Config
expectedMax int
expectedMin int
expectedThreshold int
}{
{Config{MaxConns: 1}, 1, 0, 0},
{Config{MaxConns: 8}, 8, 2, 1},
{Config{MaxConns: 30}, 32, 8, 4},
{Config{MaxConns: 1000}, 1024, 256, 128},
}

for _, test := range tests {
resultMax := test.config.ScaledMaxConns()
resultMin := test.config.ScaledMinConns()
resultThreshold := test.config.ConnsThreshold()
if resultMax != test.expectedMax ||
resultMin != test.expectedMin ||
resultThreshold != test.expectedThreshold {
t.Errorf("For MaxConns %d, "+
"NormedMaxConns() returned %d (expected %d), "+
"NormedMinConns() returned %d (expected %d), "+
"ConnsThreshold() returned %d (expected %d)",
test.config.MaxConns,
resultMax, test.expectedMax,
resultMin, test.expectedMin,
resultThreshold, test.expectedThreshold)
}
}
}
26 changes: 14 additions & 12 deletions network/gater.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ var _ lp2pconnmgr.ConnectionGater = &ConnectionGater{}
type ConnectionGater struct {
lk sync.RWMutex

filters *multiaddr.Filters
peerMgr *peerMgr
maxConn int
logger *logger.SubLogger
filters *multiaddr.Filters
peerMgr *peerMgr
connsLimit int
logger *logger.SubLogger
}

func NewConnectionGater(conf *Config, log *logger.SubLogger) (*ConnectionGater, error) {
Expand All @@ -29,10 +29,12 @@ func NewConnectionGater(conf *Config, log *logger.SubLogger) (*ConnectionGater,
filters = SubnetsToFilters(privateSubnets, multiaddr.ActionDeny)
}

connsLimit := conf.ScaledMaxConns() + conf.ConnsThreshold()
log.Info("connection gater created", "connsLimit", connsLimit)
return &ConnectionGater{
filters: filters,
maxConn: conf.MaxConns,
logger: log,
filters: filters,
connsLimit: connsLimit,
logger: log,
}, nil
}

Expand All @@ -43,19 +45,19 @@ func (g *ConnectionGater) SetPeerManager(peerMgr *peerMgr) {
g.peerMgr = peerMgr
}

func (g *ConnectionGater) hasMaxConnections() bool {
func (g *ConnectionGater) onConnectionLimit() bool {
if g.peerMgr == nil {
return false
}

return g.peerMgr.NumOfConnected() > g.maxConn
return g.peerMgr.NumOfConnected() > g.connsLimit
}

func (g *ConnectionGater) InterceptPeerDial(pid lp2ppeer.ID) bool {
g.lk.RLock()
defer g.lk.RUnlock()

if g.hasMaxConnections() {
if g.onConnectionLimit() {
g.logger.Debug("InterceptPeerDial rejected: many connections", "pid", pid)
return false
}
Expand All @@ -67,7 +69,7 @@ func (g *ConnectionGater) InterceptAddrDial(pid lp2ppeer.ID, ma multiaddr.Multia
g.lk.RLock()
defer g.lk.RUnlock()

if g.hasMaxConnections() {
if g.onConnectionLimit() {
g.logger.Debug("InterceptAddrDial rejected: many connections", "pid", pid, "ma", ma.String())
return false
}
Expand All @@ -85,7 +87,7 @@ func (g *ConnectionGater) InterceptAccept(cma lp2pnetwork.ConnMultiaddrs) bool {
g.lk.RLock()
defer g.lk.RUnlock()

if g.hasMaxConnections() {
if g.onConnectionLimit() {
g.logger.Debug("InterceptAccept rejected: many connections")
return false
}
Expand Down
20 changes: 16 additions & 4 deletions network/gater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ func TestDenyPrivate(t *testing.T) {
func TestMaxConnection(t *testing.T) {
ts := testsuite.NewTestSuite(t)
conf := testConfig()
conf.MaxConns = 1
conf.MaxConns = 8
assert.Equal(t, conf.ScaledMinConns(), 2)
assert.Equal(t, conf.ScaledMaxConns(), 8)
assert.Equal(t, conf.ConnsThreshold(), 1)
net := makeTestNetwork(t, conf, nil)

maPrivate := multiaddr.StringCast("/ip4/127.0.0.1/tcp/1234")
Expand All @@ -70,10 +73,19 @@ func TestMaxConnection(t *testing.T) {
cmaPublic := &mockConnMultiaddrs{remote: maPublic}
pid := ts.RandPeerID()

for i := 0; i < 9; i++ {
net.peerMgr.AddPeer(ts.RandPeerID(),
multiaddr.StringCast("/ip4/1.1.1.1/tcp/1234"), lp2pnetwork.DirInbound)
}

assert.True(t, net.connGater.InterceptPeerDial(pid))
assert.True(t, net.connGater.InterceptAddrDial(pid, maPrivate))
assert.True(t, net.connGater.InterceptAddrDial(pid, maPublic))
assert.True(t, net.connGater.InterceptAccept(cmaPrivate))
assert.True(t, net.connGater.InterceptAccept(cmaPublic))

net.peerMgr.AddPeer(ts.RandPeerID(),
multiaddr.StringCast("/ip4/2.2.2.2/tcp/1234"), lp2pnetwork.DirInbound)
net.peerMgr.AddPeer(ts.RandPeerID(),
multiaddr.StringCast("/ip4/3.3.3.3/tcp/1234"), lp2pnetwork.DirInbound)
multiaddr.StringCast("/ip4/1.1.1.1/tcp/1234"), lp2pnetwork.DirInbound)

assert.False(t, net.connGater.InterceptPeerDial(pid))
assert.False(t, net.connGater.InterceptAddrDial(pid, maPrivate))
Expand Down
16 changes: 11 additions & 5 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,23 +112,29 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo
opts = append(opts, lp2p.DisableMetrics())
}

limit := MakeScalingLimitConfig(conf.MinConns, conf.MaxConns)
limit := BuildConcreteLimitConfig(conf.MaxConns)
resMgr, err := lp2prcmgr.NewResourceManager(
lp2prcmgr.NewFixedLimiter(limit.AutoScale()),
lp2prcmgr.NewFixedLimiter(limit),
rcMgrOpt...,
)
if err != nil {
return nil, LibP2PError{Err: err}
}

// https://github.com/libp2p/go-libp2p/issues/2616
// The connection manager doesn't reject any connections.
// It just triggers a pruning run once the high watermark is reached (or surpassed).

lowWM := conf.ScaledMinConns() // Low Watermark
highWM := conf.ScaledMaxConns() - conf.ConnsThreshold() // High Watermark
connMgr, err := lp2pconnmgr.NewConnManager(
conf.MinConns, // Low Watermark
conf.MaxConns, // High Watermark
lowWM, highWM,
lp2pconnmgr.WithGracePeriod(time.Minute),
)
if err != nil {
return nil, LibP2PError{Err: err}
}
log.Info("connection manager created", "lowWM", lowWM, "highWM", highWM)

opts = append(opts,
lp2p.Identity(networkKey),
Expand All @@ -154,7 +160,7 @@ func newNetwork(conf *Config, log *logger.SubLogger, opts []lp2p.Option) (*netwo
}

if conf.EnableRelay {
log.Info("relay enabled", "relay addrs", conf.RelayAddrStrings)
log.Info("relay enabled", "addrInfos", conf.RelayAddrInfos())
opts = append(opts,
lp2p.EnableRelay(),
lp2p.EnableAutoRelayWithStaticRelays(conf.RelayAddrInfos()),
Expand Down
1 change: 0 additions & 1 deletion network/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func testConfig() *Config {
ListenAddrStrings: []string{},
NetworkKey: util.TempFilePath(),
BootstrapAddrStrings: []string{},
MinConns: 4,
MaxConns: 8,
EnableNATService: false,
EnableUPnP: false,
Expand Down
4 changes: 2 additions & 2 deletions network/peermgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func newPeerMgr(ctx context.Context, h lp2phost.Host,
b := &peerMgr{
ctx: ctx,
bootstrapAddrs: conf.BootstrapAddrInfos(),
minConns: conf.MinConns,
maxConns: conf.MaxConns,
minConns: conf.ScaledMinConns(),
maxConns: conf.ScaledMaxConns(),
peers: make(map[lp2ppeer.ID]*peerInfo),
host: h,
logger: log,
Expand Down
55 changes: 24 additions & 31 deletions network/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
lp2prcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/multiformats/go-multiaddr"
"github.com/pactus-project/pactus/crypto/hash"
"github.com/pactus-project/pactus/util"
"github.com/pactus-project/pactus/util/logger"
)

Expand Down Expand Up @@ -131,36 +130,30 @@ func SubnetsToFilters(subnets []*net.IPNet, action multiaddr.Action) *multiaddr.
return filters
}

func MakeScalingLimitConfig(minConns, maxConns int) lp2prcmgr.ScalingLimitConfig {
limit := lp2prcmgr.DefaultLimits

limit.SystemBaseLimit.ConnsOutbound = util.LogScale(maxConns / 2)
limit.SystemBaseLimit.ConnsInbound = util.LogScale(maxConns / 2)
limit.SystemBaseLimit.Conns = util.LogScale(maxConns)
limit.SystemBaseLimit.StreamsOutbound = util.LogScale(maxConns / 2)
limit.SystemBaseLimit.StreamsInbound = util.LogScale(maxConns / 2)
limit.SystemBaseLimit.Streams = util.LogScale(maxConns)

limit.ServiceLimitIncrease.ConnsOutbound = util.LogScale(minConns / 2)
limit.ServiceLimitIncrease.ConnsInbound = util.LogScale(minConns / 2)
limit.ServiceLimitIncrease.Conns = util.LogScale(minConns)
limit.ServiceLimitIncrease.StreamsOutbound = util.LogScale(minConns / 2)
limit.ServiceLimitIncrease.StreamsInbound = util.LogScale(minConns / 2)
limit.ServiceLimitIncrease.Streams = util.LogScale(minConns)

limit.TransientBaseLimit.ConnsOutbound = util.LogScale(maxConns / 2)
limit.TransientBaseLimit.ConnsInbound = util.LogScale(maxConns / 2)
limit.TransientBaseLimit.Conns = util.LogScale(maxConns)
limit.TransientBaseLimit.StreamsOutbound = util.LogScale(maxConns / 2)
limit.TransientBaseLimit.StreamsInbound = util.LogScale(maxConns / 2)
limit.TransientBaseLimit.Streams = util.LogScale(maxConns)

limit.TransientLimitIncrease.ConnsInbound = util.LogScale(minConns / 2)
limit.TransientLimitIncrease.Conns = util.LogScale(minConns)
limit.TransientLimitIncrease.StreamsInbound = util.LogScale(minConns / 2)
limit.TransientLimitIncrease.Streams = util.LogScale(minConns)

return limit
func BuildConcreteLimitConfig(maxConns int) lp2prcmgr.ConcreteLimitConfig {
changes := lp2prcmgr.PartialLimitConfig{}

updateResourceLimits := func(limit *lp2prcmgr.ResourceLimits, maxConns, coefficient int) {
maxConnVal := lp2prcmgr.LimitVal(maxConns * coefficient)

limit.ConnsOutbound = maxConnVal / 2
b00f marked this conversation as resolved.
Show resolved Hide resolved
limit.ConnsInbound = maxConnVal / 2
limit.Conns = maxConnVal
limit.StreamsOutbound = maxConnVal * 4
limit.StreamsInbound = maxConnVal * 4
limit.Streams = maxConnVal * 8
}

updateResourceLimits(&changes.System, maxConns, 1)
updateResourceLimits(&changes.ServiceDefault, maxConns, 1)
updateResourceLimits(&changes.ProtocolDefault, maxConns, 1)
updateResourceLimits(&changes.ProtocolPeerDefault, maxConns, 1)
updateResourceLimits(&changes.Transient, maxConns, 1)

defaultLimitConfig := lp2prcmgr.DefaultLimits.AutoScale()
changedLimitConfig := changes.Build(defaultLimitConfig)

return changedLimitConfig
}

func MessageIDFunc(m *lp2pspb.Message) string {
Expand Down
1 change: 0 additions & 1 deletion tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ func TestMain(m *testing.M) {
tConfigs[i].Network.NetworkName = "test"
tConfigs[i].Network.ListenAddrStrings = []string{"/ip4/127.0.0.1/tcp/0", "/ip4/127.0.0.1/udp/0/quic-v1"}
tConfigs[i].Network.BootstrapAddrStrings = []string{}
tConfigs[i].Network.MinConns = 4
tConfigs[i].Network.MaxConns = 8
tConfigs[i].HTTP.Enable = false
tConfigs[i].GRPC.Enable = false
Expand Down
Loading