Skip to content

Commit

Permalink
feat(p2p): Seed connectivity tuning options (max-incoming-connection-…
Browse files Browse the repository at this point in the history
…time,incoming-connection-window) (#532)

* feat(p2p): max-incoming-connection-time

* test(e2e): Support p2p max conn in e2e tests

* test(e2e): configure seeds to drop incoming conns after 5s

* test(p2p): improve incoming peer con time test

* feat(p2p): expose disconnect-cooldown-period as config option

* chore(p2p): code style improvements

* fix(p2p): apply disconnect cooldown to incoming connections

* test(e2e): add timestamp to e2e test output

* feat(config): add incoming-connection-window, remove disconnect-cooldown-period

* chore(p2p): add logger to peermanager

* refactor(p2p): add delay to waker

* fix(p2p): peer dial hangs when cannot connect to seed

* fix(p2p): don't use DisconnectCooldownPeriodfor incoming conns

* chore(p2p): Add some logs to p2p TryDialNext()

* fix(p2p): race condition in peermanager

* chore(p2p): apply peer review feedback
  • Loading branch information
lklimek authored Dec 20, 2022
1 parent 00572f8 commit 4c7237a
Show file tree
Hide file tree
Showing 15 changed files with 400 additions and 80 deletions.
18 changes: 18 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,16 @@ type P2PConfig struct { //nolint: maligned
// attempts per IP address.
MaxIncomingConnectionAttempts uint `mapstructure:"max-incoming-connection-attempts"`

// MaxIncomingConnectionTime limits maximum duration after which incoming peer will be evicted.
// Defaults to 0 which disables this mechanism.
// Used on seed nodes to evict peers and make space for others.
MaxIncomingConnectionTime time.Duration `mapstructure:"max-incoming-connection-time"`

// IncomingConnectionWindow describes how often an IP address
// can attempt to create a new connection. Defaults to 10
// milliseconds, and cannot be less than 1 millisecond.
IncomingConnectionWindow time.Duration `mapstructure:"incoming-connection-window"`

// Comma separated list of peer IDs to keep private (will not be gossiped to
// other peers)
PrivatePeerIDs string `mapstructure:"private-peer-ids"`
Expand Down Expand Up @@ -703,6 +713,8 @@ func DefaultP2PConfig() *P2PConfig {
MaxConnections: 64,
MaxOutgoingConnections: 12,
MaxIncomingConnectionAttempts: 100,
MaxIncomingConnectionTime: 0,
IncomingConnectionWindow: 10 * time.Millisecond,
FlushThrottleTimeout: 100 * time.Millisecond,
// The MTU (Maximum Transmission Unit) for Ethernet is 1500 bytes.
// The IP header and the TCP header take up 20 bytes each at least (unless
Expand Down Expand Up @@ -736,6 +748,12 @@ func (cfg *P2PConfig) ValidateBasic() error {
if cfg.MaxOutgoingConnections > cfg.MaxConnections {
return errors.New("max-outgoing-connections cannot be larger than max-connections")
}
if cfg.MaxIncomingConnectionTime < 0 {
return errors.New("max-incoming-connection-time can't be negative")
}
if cfg.IncomingConnectionWindow < 1*time.Millisecond {
return errors.New("incoming-connection-window must be set to at least 1ms")
}
return nil
}

Expand Down
10 changes: 10 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,16 @@ max-outgoing-connections = {{ .P2P.MaxOutgoingConnections }}
# Rate limits the number of incoming connection attempts per IP address.
max-incoming-connection-attempts = {{ .P2P.MaxIncomingConnectionAttempts }}
# Limits maximum duration after which incoming peer will be evicted.
# Defaults to 0 which disables this mechanism.
# Used on seed nodes to evict peers and make space for others.
max-incoming-connection-time = "{{ .P2P.MaxIncomingConnectionTime }}"
# incoming-connection-window describes how often an IP address
# can attempt to create a new connection. Defaults to 10
# milliseconds, and cannot be less than 1 millisecond.
incoming-connection-window = "{{ .P2P.IncomingConnectionWindow }}"
# Comma separated list of peer IDs to keep private (will not be gossiped to other peers)
# Warning: IPs will be exposed at /net_info, for more information https://github.com/tendermint/tendermint/issues/3055
private-peer-ids = "{{ .P2P.PrivatePeerIDs }}"
Expand Down
28 changes: 28 additions & 0 deletions internal/libs/sync/waker.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package sync

import (
"sync"
"time"
)

// Waker is used to wake up a sleeper when some event occurs. It debounces
// multiple wakeup calls occurring between each sleep, and wakeups are
// non-blocking to avoid having to coordinate goroutines.
type Waker struct {
wakeCh chan struct{}
mtx sync.Mutex
timers []*time.Timer
}

// NewWaker creates a new Waker.
Expand All @@ -28,3 +35,24 @@ func (w *Waker) Wake() {
default:
}
}

// WakeAfter wakes up the sleeper after some delay.
func (w *Waker) WakeAfter(delay time.Duration) {
w.mtx.Lock()
defer w.mtx.Unlock()

w.timers = append(w.timers, time.AfterFunc(delay, w.Wake))
}

// Close closes the waker and cleans up its resources
func (w *Waker) Close() error {
w.mtx.Lock()
defer w.mtx.Unlock()

for _, timer := range w.timers {
if timer != nil {
timer.Stop()
}
}
return nil
}
149 changes: 124 additions & 25 deletions internal/p2p/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
"sort"
"time"

sync "github.com/sasha-s/go-deadlock"

"github.com/gogo/protobuf/proto"
"github.com/google/orderedcode"
"github.com/rs/zerolog"
sync "github.com/sasha-s/go-deadlock"
dbm "github.com/tendermint/tm-db"

tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/libs/log"
p2pproto "github.com/tendermint/tendermint/proto/tendermint/p2p"
"github.com/tendermint/tendermint/types"
)
Expand All @@ -42,7 +43,8 @@ const (
type peerConnectionDirection int

const (
peerConnectionIncoming peerConnectionDirection = iota + 1
peerConnectionNone peerConnectionDirection = iota
peerConnectionIncoming
peerConnectionOutgoing
)

Expand Down Expand Up @@ -136,6 +138,11 @@ type PeerManagerOptions struct {
// the connection and evict a lower-scored peer.
MaxConnectedUpgrade uint16

// MaxIncomingConnectionTime limits maximum duration after which incoming peer will be evicted.
// Defaults to 0 which disables this mechanism.
// Used on seed nodes to evict peers and make space for others.
MaxIncomingConnectionTime time.Duration

// MinRetryTime is the minimum time to wait between retries. Retry times
// double for each retry, up to MaxRetryTime. 0 disables retries.
MinRetryTime time.Duration
Expand Down Expand Up @@ -298,6 +305,7 @@ type PeerManager struct {
rand *rand.Rand
dialWaker *tmsync.Waker // wakes up DialNext() on relevant peer changes
evictWaker *tmsync.Waker // wakes up EvictNext() on relevant peer changes
logger log.Logger

mtx sync.Mutex
store *peerStore
Expand Down Expand Up @@ -329,9 +337,10 @@ func NewPeerManager(selfID types.NodeID, peerDB dbm.DB, options PeerManagerOptio
peerManager := &PeerManager{
selfID: selfID,
options: options,
rand: rand.New(rand.NewSource(time.Now().UnixNano())), // nolint:gosec
rand: rand.New(rand.NewSource(time.Now().UnixNano())), //nolint:gosec
dialWaker: tmsync.NewWaker(),
evictWaker: tmsync.NewWaker(),
logger: log.NewNopLogger(),
metrics: NopMetrics(),

store: store,
Expand All @@ -357,6 +366,18 @@ func NewPeerManager(selfID types.NodeID, peerDB dbm.DB, options PeerManagerOptio
return peerManager, nil
}

// SetLogger sets a logger for the PeerManager
func (m *PeerManager) SetLogger(logger log.Logger) {
m.logger = logger
}

// Close closes peer manager and frees up all resources
func (m *PeerManager) Close() error {
m.evictWaker.Close()
m.dialWaker.Close()
return nil
}

// configurePeers configures peers in the peer store with ephemeral runtime
// configuration, e.g. PersistentPeers. It also removes ourself, if we're in the
// peer store. The caller must hold the mutex lock.
Expand Down Expand Up @@ -524,11 +545,17 @@ func (m *PeerManager) HasDialedMaxPeers() bool {
// becomes available. The caller must call Dialed() or DialFailed() for the
// returned peer.
func (m *PeerManager) DialNext(ctx context.Context) (NodeAddress, error) {
for {
for counter := uint32(0); ; counter++ {
if address := m.TryDialNext(); (address != NodeAddress{}) {
return address, nil
}

// If we have zero peers connected, we need to schedule a retry.
// This can happen, for example, when some retry delay is not fulfilled
if m.numDialingOrConnected() == 0 {
m.scheduleDial(ctx, m.retryDelay(counter+1, false))
}

select {
case <-m.dialWaker.Sleep():
continue
Expand All @@ -548,29 +575,36 @@ func (m *PeerManager) TryDialNext() NodeAddress {
// MaxConnectedUpgrade allows us to probe additional peers that have a
// higher score than any other peers, and if successful evict it.
if m.options.MaxConnected > 0 && len(m.connected)+len(m.dialing) >= int(m.options.MaxConnected)+int(m.options.MaxConnectedUpgrade) {
m.logger.Trace("max connected reached, skipping dial attempt")
return NodeAddress{}
}

cinfo := m.getConnectedInfo()
if m.options.MaxOutgoingConnections > 0 && cinfo.outgoing >= m.options.MaxOutgoingConnections {
m.logger.Trace("max outgoing connections reached, skipping dial attempt")
return NodeAddress{}
}

for _, peer := range m.store.Ranked() {
if m.dialing[peer.ID] || m.isConnected(peer.ID) {
m.logger.Trace("peer dialing or connected, skipping", "peer", peer)
continue
}

if !peer.LastDisconnected.IsZero() && time.Since(peer.LastDisconnected) < m.options.DisconnectCooldownPeriod {
m.logger.Trace("peer within disconnect cooldown period, skipping", "peer", peer, "cooldown_period", m.options.DisconnectCooldownPeriod)
continue
}

for _, addressInfo := range peer.AddressInfo {
if time.Since(addressInfo.LastDialFailure) < m.retryDelay(addressInfo.DialFailures, peer.Persistent) {
delay := m.retryDelay(addressInfo.DialFailures, peer.Persistent)
if time.Since(addressInfo.LastDialFailure) < delay {
m.logger.Trace("not dialing peer due to retry delay", "peer", peer, "delay", delay, "last_failure", addressInfo.LastDialFailure)
continue
}

if id, ok := m.store.Resolve(addressInfo.Address); ok && (m.isConnected(id) || m.dialing[id]) {
m.logger.Trace("peer address already dialing", "peer", peer, "address", addressInfo.Address.String())
continue
}

Expand All @@ -583,6 +617,12 @@ func (m *PeerManager) TryDialNext() NodeAddress {
// peer (since they're ordered by score via peerStore.Ranked).
if m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) {
upgradeFromPeer := m.findUpgradeCandidate(peer.ID, peer.Score())
m.logger.Trace("max connected reached, checking upgrade candidate",
"peer", peer,
"max_connected", m.options.MaxConnected,
"connected", len(m.connected),
"upgrade_candidate", upgradeFromPeer,
)
if upgradeFromPeer == "" {
return NodeAddress{}
}
Expand Down Expand Up @@ -626,27 +666,19 @@ func (m *PeerManager) DialFailed(ctx context.Context, address NodeAddress) error
return err
}

// We spawn a goroutine that notifies DialNext() again when the retry
// timeout has elapsed, so that we can consider dialing it again. We
// calculate the retry delay outside the goroutine, since it must hold
// the mutex lock.
if d := m.retryDelay(addressInfo.DialFailures, peer.Persistent); d != 0 && d != retryNever {
go func() {
// Use an explicit timer with deferred cleanup instead of
// time.After(), to avoid leaking goroutines on PeerManager.Close().
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-timer.C:
m.dialWaker.Wake()
case <-ctx.Done():
}
}()
delay := m.retryDelay(addressInfo.DialFailures, peer.Persistent)
m.scheduleDial(ctx, delay)

return nil
}

// scheduleDial will dial peers after some delay
func (m *PeerManager) scheduleDial(ctx context.Context, delay time.Duration) {
if delay > 0 && delay != retryNever {
m.dialWaker.WakeAfter(delay)
} else {
m.dialWaker.Wake()
}

return nil
}

// Dialed marks a peer as successfully dialed. Any further connections will be
Expand Down Expand Up @@ -791,6 +823,9 @@ func (m *PeerManager) Accepted(peerID types.NodeID, peerOpts ...func(*peerInfo))
if upgradeFromPeer != "" {
m.evict[upgradeFromPeer] = true
}

evictPeerAfterTimeout(m, peerID, peerConnectionIncoming, m.options.MaxIncomingConnectionTime)

m.evictWaker.Wake()
return nil
}
Expand Down Expand Up @@ -1052,7 +1087,7 @@ func (m *PeerManager) Advertise(peerID types.NodeID, limit uint16) []NodeAddress
// 10% of the time we'll randomly insert a "loosing"
// peer.

// nolint:gosec // G404: Use of weak random number generator
//nolint:gosec // G404: Use of weak random number generator
if numAddresses <= int(limit) || rand.Intn((meanAbsScore*2)+1) <= scores[peer.ID]+1 || rand.Intn((idx+1)*10) <= idx+1 {
addresses = append(addresses, addressInfo.Address)
addedLastIteration = true
Expand Down Expand Up @@ -1609,6 +1644,37 @@ func (p *peerInfo) Validate() error {
return nil
}

func (p *peerInfo) IsZero() bool {
return p == nil || len(p.ID) == 0
}

func (p *peerInfo) MarshalZerologObject(e *zerolog.Event) {
if p == nil {
return
}

e.Str("node_id", string(p.ID))
if len(p.ProTxHash) != 0 {
e.Str("protxhash", p.ProTxHash.ShortString())
}
e.Time("last_connected", p.LastConnected)
e.Time("last_disconnected", p.LastDisconnected)
if p.Persistent {
e.Bool("persistent", p.Persistent)
}
e.Int64("height", p.Height)
if p.FixedScore != 0 {
e.Int16("fixed_score", int16(p.FixedScore))
}
if p.MutableScore != 0 {
e.Int64("mutable_score", p.MutableScore)
}
if p.Inactive {
e.Bool("inactive", p.Inactive)
}
e.Int16("score", int16(p.Score()))
}

// peerAddressInfo contains information and statistics about a peer address.
type peerAddressInfo struct {
Address NodeAddress
Expand Down Expand Up @@ -1712,6 +1778,20 @@ func (m *PeerManager) UpdatePeerInfo(nodeID types.NodeID, modifier func(peerInfo
return m.store.Set(peer)
}

// getPeer() loads and returns peer from store, together with last connection direction, if any
func (m *PeerManager) getPeer(peerID types.NodeID) (peerInfo, peerConnectionDirection) {
m.mtx.Lock()
defer m.mtx.Unlock()

p, ok := m.store.Get(peerID)
if !ok {
return peerInfo{}, peerConnectionNone
}

connType := m.connected[peerID]
return p, connType
}

// IsDialingOrConnected returns true if dialing to a peer at the moment or already connected otherwise false
func (m *PeerManager) IsDialingOrConnected(nodeID types.NodeID) bool {
m.mtx.Lock()
Expand All @@ -1720,9 +1800,28 @@ func (m *PeerManager) IsDialingOrConnected(nodeID types.NodeID) bool {
return m.dialing[nodeID] || ok
}

func (m *PeerManager) numDialingOrConnected() int {
m.mtx.Lock()
defer m.mtx.Unlock()
return len(m.connected) + len(m.dialing)
}

// SetProTxHashToPeerInfo sets a proTxHash in peerInfo.proTxHash to keep this value in a store
func SetProTxHashToPeerInfo(proTxHash types.ProTxHash) func(info *peerInfo) {
return func(info *peerInfo) {
info.ProTxHash = proTxHash.Copy()
}
}

// evictPeerAfterTimeout evicts incoming peer for which the timeout expired.
func evictPeerAfterTimeout(m *PeerManager, peerID types.NodeID, direction peerConnectionDirection, timeout time.Duration) {
if timeout > 0 {
time.AfterFunc(timeout, func() {
olderThan := time.Now().Add(-timeout)
p, connType := m.getPeer(peerID)
if !p.IsZero() && connType == direction && !p.Persistent && p.LastConnected.Before(olderThan) {
m.EvictPeer(peerID)
}
})
}
}
Loading

0 comments on commit 4c7237a

Please sign in to comment.