Skip to content

Commit

Permalink
Merge pull request #21 from raulk/feat/docs
Browse files Browse the repository at this point in the history
Add docs to BasicConnMgr
  • Loading branch information
Stebalien committed Aug 28, 2018
2 parents 88f2b70 + 02481e7 commit c5de9b3
Showing 1 changed file with 63 additions and 13 deletions.
76 changes: 63 additions & 13 deletions p2p/net/connmgr/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,21 @@ import (
"time"

logging "github.com/ipfs/go-log"
ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
"github.com/libp2p/go-libp2p-interface-connmgr"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
)

var log = logging.Logger("connmgr")

// BasicConnMgr is a ConnManager that trims connections whenever the count exceeds the
// high watermark. New connections are given a grace period before they're subject
// to trimming. Trims are automatically run on demand, only if the time from the
// previous trim is higher than 10 seconds. Furthermore, trims can be explicitly
// requested through the public interface of this struct (see TrimOpenConns).

// See configuration parameters in NewConnManager.
type BasicConnMgr struct {
highWater int
lowWater int
Expand All @@ -31,6 +38,12 @@ type BasicConnMgr struct {

var _ ifconnmgr.ConnManager = (*BasicConnMgr)(nil)

// NewConnManager creates a new BasicConnMgr with the provided params:
// * lo and hi are watermarks governing the number of connections that'll be maintained.
// When the peer count exceeds the 'high watermark', as many peers will be pruned (and
// their connections terminated) until 'low watermark' peers remain.
// * grace is the amount of time a newly opened connection is given before it becomes
// subject to pruning.
func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr {
return &BasicConnMgr{
highWater: hi,
Expand All @@ -40,15 +53,20 @@ func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr {
}
}

// peerInfo stores metadata for a given peer.
type peerInfo struct {
tags map[string]int
value int
tags map[string]int // value for each tag
value int // cached sum of all tag values

conns map[inet.Conn]time.Time
conns map[inet.Conn]time.Time // start time of each connection

firstSeen time.Time
firstSeen time.Time // timestamp when we began tracking this peer.
}

// TrimOpenConns closes the connections of as many peers as needed to make the peer count
// equal the low watermark. Peers are sorted in ascending order based on their total value,
// pruning those peers with the lowest scores first, as long as they are not within their
// grace period.
func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) {
defer log.EventBegin(ctx, "connCleanup").Done()
for _, c := range cm.getConnsToClose(ctx) {
Expand All @@ -58,9 +76,12 @@ func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) {
}
}

// getConnsToClose runs the heuristics described in TrimOpenConns and returns the
// connections to close.
func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []inet.Conn {
cm.lk.Lock()
defer cm.lk.Unlock()

if cm.lowWater == 0 || cm.highWater == 0 {
// disabled
return nil
Expand All @@ -79,6 +100,7 @@ func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []inet.Conn {
infos = append(infos, inf)
}

// Sort peers according to their value.
sort.Slice(infos, func(i, j int) bool {
return infos[i].value < infos[j].value
})
Expand Down Expand Up @@ -143,6 +165,7 @@ func (cm *BasicConnMgr) TagPeer(p peer.ID, tag string, val int) {
return
}

// Update the total value of the peer.
pi.value += (val - pi.tags[tag])
pi.tags[tag] = val
}
Expand All @@ -157,18 +180,30 @@ func (cm *BasicConnMgr) UntagPeer(p peer.ID, tag string) {
return
}

// Update the total value of the peer.
pi.value -= pi.tags[tag]
delete(pi.tags, tag)
}

// CMInfo holds the configuration for BasicConnMgr, as well as status data.
type CMInfo struct {
LowWater int
HighWater int
LastTrim time.Time
// The low watermark, as described in NewConnManager.
LowWater int

// The high watermark, as described in NewConnManager.
HighWater int

// The timestamp when the last trim was triggered.
LastTrim time.Time

// The configured grace period, as described in NewConnManager.
GracePeriod time.Duration
ConnCount int

// The current connection count.
ConnCount int
}

// GetInfo returns the configuration and status data for this connection manager.
func (cm *BasicConnMgr) GetInfo() CMInfo {
cm.lk.Lock()
defer cm.lk.Unlock()
Expand All @@ -182,6 +217,9 @@ func (cm *BasicConnMgr) GetInfo() CMInfo {
}
}

// Notifee returns a sink through which Notifiers can inform the BasicConnMgr when
// events occur. Currently, the notifee only reacts upon connection events
// {Connected, Disconnected}.
func (cm *BasicConnMgr) Notifee() inet.Notifiee {
return (*cmNotifee)(cm)
}
Expand All @@ -192,6 +230,9 @@ func (nn *cmNotifee) cm() *BasicConnMgr {
return (*BasicConnMgr)(nn)
}

// Connected is called by notifiers to inform that a new connection has been established.
// The notifee updates the BasicConnMgr to start tracking the connection. If the new connection
// count exceeds the high watermark, a trim may be triggered.
func (nn *cmNotifee) Connected(n inet.Network, c inet.Conn) {
cm := nn.cm()

Expand Down Expand Up @@ -224,6 +265,8 @@ func (nn *cmNotifee) Connected(n inet.Network, c inet.Conn) {
}
}

// Disconnected is called by notifiers to inform that an existing connection has been closed or terminated.
// The notifee updates the BasicConnMgr accordingly to stop tracking the connection, and performs housekeeping.
func (nn *cmNotifee) Disconnected(n inet.Network, c inet.Conn) {
cm := nn.cm()

Expand All @@ -249,7 +292,14 @@ func (nn *cmNotifee) Disconnected(n inet.Network, c inet.Conn) {
}
}

func (nn *cmNotifee) Listen(n inet.Network, addr ma.Multiaddr) {}
// Listen is no-op in this implementation.
func (nn *cmNotifee) Listen(n inet.Network, addr ma.Multiaddr) {}

// ListenClose is no-op in this implementation.
func (nn *cmNotifee) ListenClose(n inet.Network, addr ma.Multiaddr) {}
func (nn *cmNotifee) OpenedStream(inet.Network, inet.Stream) {}
func (nn *cmNotifee) ClosedStream(inet.Network, inet.Stream) {}

// OpenedStream is no-op in this implementation.
func (nn *cmNotifee) OpenedStream(inet.Network, inet.Stream) {}

// ClosedStream is no-op in this implementation.
func (nn *cmNotifee) ClosedStream(inet.Network, inet.Stream) {}

0 comments on commit c5de9b3

Please sign in to comment.