diff --git a/p2p/net/connmgr/connmgr.go b/p2p/net/connmgr/connmgr.go index fc406bf8b8..56ca27ec59 100644 --- a/p2p/net/connmgr/connmgr.go +++ b/p2p/net/connmgr/connmgr.go @@ -7,14 +7,21 @@ import ( "time" logging "github.com/ipfs/go-log" - ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr" + "github.com/libp2p/go-libp2p-interface-connmgr" inet "github.com/libp2p/go-libp2p-net" - peer "github.com/libp2p/go-libp2p-peer" + "github.com/libp2p/go-libp2p-peer" ma "github.com/multiformats/go-multiaddr" ) var log = logging.Logger("connmgr") +// BasicConnMgr is a ConnManager that trims connections whenever the count exceeds the +// high watermark. New connections are given a grace period before they're subject +// to trimming. Trims are automatically run on demand, only if the time from the +// previous trim is higher than 10 seconds. Furthermore, trims can be explicitly +// requested through the public interface of this struct (see TrimOpenConns). + +// See configuration parameters in NewConnManager. type BasicConnMgr struct { highWater int lowWater int @@ -31,6 +38,12 @@ type BasicConnMgr struct { var _ ifconnmgr.ConnManager = (*BasicConnMgr)(nil) +// NewConnManager creates a new BasicConnMgr with the provided params: +// * lo and hi are watermarks governing the number of connections that'll be maintained. +// When the peer count exceeds the 'high watermark', as many peers will be pruned (and +// their connections terminated) until 'low watermark' peers remain. +// * grace is the amount of time a newly opened connection is given before it becomes +// subject to pruning. func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr { return &BasicConnMgr{ highWater: hi, @@ -40,15 +53,20 @@ func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr { } } +// peerInfo stores metadata for a given peer. type peerInfo struct { - tags map[string]int - value int + tags map[string]int // value for each tag + value int // cached sum of all tag values - conns map[inet.Conn]time.Time + conns map[inet.Conn]time.Time // start time of each connection - firstSeen time.Time + firstSeen time.Time // timestamp when we began tracking this peer. } +// TrimOpenConns closes the connections of as many peers as needed to make the peer count +// equal the low watermark. Peers are sorted in ascending order based on their total value, +// pruning those peers with the lowest scores first, as long as they are not within their +// grace period. func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) { defer log.EventBegin(ctx, "connCleanup").Done() for _, c := range cm.getConnsToClose(ctx) { @@ -58,9 +76,12 @@ func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) { } } +// getConnsToClose runs the heuristics described in TrimOpenConns and returns the +// connections to close. func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []inet.Conn { cm.lk.Lock() defer cm.lk.Unlock() + if cm.lowWater == 0 || cm.highWater == 0 { // disabled return nil @@ -79,6 +100,7 @@ func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []inet.Conn { infos = append(infos, inf) } + // Sort peers according to their value. sort.Slice(infos, func(i, j int) bool { return infos[i].value < infos[j].value }) @@ -143,6 +165,7 @@ func (cm *BasicConnMgr) TagPeer(p peer.ID, tag string, val int) { return } + // Update the total value of the peer. pi.value += (val - pi.tags[tag]) pi.tags[tag] = val } @@ -157,18 +180,30 @@ func (cm *BasicConnMgr) UntagPeer(p peer.ID, tag string) { return } + // Update the total value of the peer. pi.value -= pi.tags[tag] delete(pi.tags, tag) } +// CMInfo holds the configuration for BasicConnMgr, as well as status data. type CMInfo struct { - LowWater int - HighWater int - LastTrim time.Time + // The low watermark, as described in NewConnManager. + LowWater int + + // The high watermark, as described in NewConnManager. + HighWater int + + // The timestamp when the last trim was triggered. + LastTrim time.Time + + // The configured grace period, as described in NewConnManager. GracePeriod time.Duration - ConnCount int + + // The current connection count. + ConnCount int } +// GetInfo returns the configuration and status data for this connection manager. func (cm *BasicConnMgr) GetInfo() CMInfo { cm.lk.Lock() defer cm.lk.Unlock() @@ -182,6 +217,9 @@ func (cm *BasicConnMgr) GetInfo() CMInfo { } } +// Notifee returns a sink through which Notifiers can inform the BasicConnMgr when +// events occur. Currently, the notifee only reacts upon connection events +// {Connected, Disconnected}. func (cm *BasicConnMgr) Notifee() inet.Notifiee { return (*cmNotifee)(cm) } @@ -192,6 +230,9 @@ func (nn *cmNotifee) cm() *BasicConnMgr { return (*BasicConnMgr)(nn) } +// Connected is called by notifiers to inform that a new connection has been established. +// The notifee updates the BasicConnMgr to start tracking the connection. If the new connection +// count exceeds the high watermark, a trim may be triggered. func (nn *cmNotifee) Connected(n inet.Network, c inet.Conn) { cm := nn.cm() @@ -224,6 +265,8 @@ func (nn *cmNotifee) Connected(n inet.Network, c inet.Conn) { } } +// Disconnected is called by notifiers to inform that an existing connection has been closed or terminated. +// The notifee updates the BasicConnMgr accordingly to stop tracking the connection, and performs housekeeping. func (nn *cmNotifee) Disconnected(n inet.Network, c inet.Conn) { cm := nn.cm() @@ -249,7 +292,14 @@ func (nn *cmNotifee) Disconnected(n inet.Network, c inet.Conn) { } } -func (nn *cmNotifee) Listen(n inet.Network, addr ma.Multiaddr) {} +// Listen is no-op in this implementation. +func (nn *cmNotifee) Listen(n inet.Network, addr ma.Multiaddr) {} + +// ListenClose is no-op in this implementation. func (nn *cmNotifee) ListenClose(n inet.Network, addr ma.Multiaddr) {} -func (nn *cmNotifee) OpenedStream(inet.Network, inet.Stream) {} -func (nn *cmNotifee) ClosedStream(inet.Network, inet.Stream) {} + +// OpenedStream is no-op in this implementation. +func (nn *cmNotifee) OpenedStream(inet.Network, inet.Stream) {} + +// ClosedStream is no-op in this implementation. +func (nn *cmNotifee) ClosedStream(inet.Network, inet.Stream) {}