Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p): Seed connectivity tuning options: max-incoming-connection-time, incoming-connection-window #532

Merged
merged 16 commits into from
Dec 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,16 @@ type P2PConfig struct { //nolint: maligned
// attempts per IP address.
MaxIncomingConnectionAttempts uint `mapstructure:"max-incoming-connection-attempts"`

// MaxIncomingConnectionTime limits maximum duration after which incoming peer will be evicted.
// Defaults to 0 which disables this mechanism.
// Used on seed nodes to evict peers and make space for others.
MaxIncomingConnectionTime time.Duration `mapstructure:"max-incoming-connection-time"`

// IncomingConnectionWindow describes how often an IP address
// can attempt to create a new connection. Defaults to 10
// milliseconds, and cannot be less than 1 millisecond.
IncomingConnectionWindow time.Duration `mapstructure:"incoming-connection-window"`

// Comma separated list of peer IDs to keep private (will not be gossiped to
// other peers)
PrivatePeerIDs string `mapstructure:"private-peer-ids"`
Expand Down Expand Up @@ -703,6 +713,8 @@ func DefaultP2PConfig() *P2PConfig {
MaxConnections: 64,
MaxOutgoingConnections: 12,
MaxIncomingConnectionAttempts: 100,
MaxIncomingConnectionTime: 0,
IncomingConnectionWindow: 10 * time.Millisecond,
FlushThrottleTimeout: 100 * time.Millisecond,
// The MTU (Maximum Transmission Unit) for Ethernet is 1500 bytes.
// The IP header and the TCP header take up 20 bytes each at least (unless
Expand Down Expand Up @@ -736,6 +748,12 @@ func (cfg *P2PConfig) ValidateBasic() error {
if cfg.MaxOutgoingConnections > cfg.MaxConnections {
return errors.New("max-outgoing-connections cannot be larger than max-connections")
}
if cfg.MaxIncomingConnectionTime < 0 {
return errors.New("max-incoming-connection-time can't be negative")
}
if cfg.IncomingConnectionWindow < 1*time.Millisecond {
return errors.New("incoming-connection-window must be set to at least 1ms")
}
return nil
}

Expand Down
10 changes: 10 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,16 @@ max-outgoing-connections = {{ .P2P.MaxOutgoingConnections }}
# Rate limits the number of incoming connection attempts per IP address.
max-incoming-connection-attempts = {{ .P2P.MaxIncomingConnectionAttempts }}

# Limits maximum duration after which incoming peer will be evicted.
# Defaults to 0 which disables this mechanism.
# Used on seed nodes to evict peers and make space for others.
max-incoming-connection-time = "{{ .P2P.MaxIncomingConnectionTime }}"

# incoming-connection-window describes how often an IP address
# can attempt to create a new connection. Defaults to 10
# milliseconds, and cannot be less than 1 millisecond.
incoming-connection-window = "{{ .P2P.IncomingConnectionWindow }}"

# Comma separated list of peer IDs to keep private (will not be gossiped to other peers)
# Warning: IPs will be exposed at /net_info, for more information https://github.com/tendermint/tendermint/issues/3055
private-peer-ids = "{{ .P2P.PrivatePeerIDs }}"
Expand Down
28 changes: 28 additions & 0 deletions internal/libs/sync/waker.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package sync

import (
"sync"
"time"
)

// Waker is used to wake up a sleeper when some event occurs. It debounces
// multiple wakeup calls occurring between each sleep, and wakeups are
// non-blocking to avoid having to coordinate goroutines.
type Waker struct {
wakeCh chan struct{}
mtx sync.Mutex
timers []*time.Timer
}

// NewWaker creates a new Waker.
Expand All @@ -28,3 +35,24 @@ func (w *Waker) Wake() {
default:
}
}

// WakeAfter wakes up the sleeper after some delay.
func (w *Waker) WakeAfter(delay time.Duration) {
w.mtx.Lock()
defer w.mtx.Unlock()

w.timers = append(w.timers, time.AfterFunc(delay, w.Wake))
}

// Close closes the waker and cleans up its resources
func (w *Waker) Close() error {
w.mtx.Lock()
defer w.mtx.Unlock()

for _, timer := range w.timers {
if timer != nil {
timer.Stop()
}
}
return nil
}
149 changes: 124 additions & 25 deletions internal/p2p/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
"sort"
"time"

sync "github.com/sasha-s/go-deadlock"

"github.com/gogo/protobuf/proto"
"github.com/google/orderedcode"
"github.com/rs/zerolog"
sync "github.com/sasha-s/go-deadlock"
dbm "github.com/tendermint/tm-db"

tmsync "github.com/tendermint/tendermint/internal/libs/sync"
"github.com/tendermint/tendermint/libs/log"
p2pproto "github.com/tendermint/tendermint/proto/tendermint/p2p"
"github.com/tendermint/tendermint/types"
)
Expand All @@ -42,7 +43,8 @@ const (
type peerConnectionDirection int

const (
peerConnectionIncoming peerConnectionDirection = iota + 1
peerConnectionNone peerConnectionDirection = iota
peerConnectionIncoming
peerConnectionOutgoing
)

Expand Down Expand Up @@ -136,6 +138,11 @@ type PeerManagerOptions struct {
// the connection and evict a lower-scored peer.
MaxConnectedUpgrade uint16

// MaxIncomingConnectionTime limits maximum duration after which incoming peer will be evicted.
// Defaults to 0 which disables this mechanism.
// Used on seed nodes to evict peers and make space for others.
MaxIncomingConnectionTime time.Duration

// MinRetryTime is the minimum time to wait between retries. Retry times
// double for each retry, up to MaxRetryTime. 0 disables retries.
MinRetryTime time.Duration
Expand Down Expand Up @@ -298,6 +305,7 @@ type PeerManager struct {
rand *rand.Rand
dialWaker *tmsync.Waker // wakes up DialNext() on relevant peer changes
evictWaker *tmsync.Waker // wakes up EvictNext() on relevant peer changes
logger log.Logger

mtx sync.Mutex
store *peerStore
Expand Down Expand Up @@ -329,9 +337,10 @@ func NewPeerManager(selfID types.NodeID, peerDB dbm.DB, options PeerManagerOptio
peerManager := &PeerManager{
selfID: selfID,
options: options,
rand: rand.New(rand.NewSource(time.Now().UnixNano())), // nolint:gosec
rand: rand.New(rand.NewSource(time.Now().UnixNano())), //nolint:gosec
dialWaker: tmsync.NewWaker(),
evictWaker: tmsync.NewWaker(),
logger: log.NewNopLogger(),
metrics: NopMetrics(),

store: store,
Expand All @@ -357,6 +366,18 @@ func NewPeerManager(selfID types.NodeID, peerDB dbm.DB, options PeerManagerOptio
return peerManager, nil
}

// SetLogger sets a logger for the PeerManager
func (m *PeerManager) SetLogger(logger log.Logger) {
m.logger = logger
}

// Close closes peer manager and frees up all resources
func (m *PeerManager) Close() error {
m.evictWaker.Close()
m.dialWaker.Close()
return nil
}

// configurePeers configures peers in the peer store with ephemeral runtime
// configuration, e.g. PersistentPeers. It also removes ourself, if we're in the
// peer store. The caller must hold the mutex lock.
Expand Down Expand Up @@ -524,11 +545,17 @@ func (m *PeerManager) HasDialedMaxPeers() bool {
// becomes available. The caller must call Dialed() or DialFailed() for the
// returned peer.
func (m *PeerManager) DialNext(ctx context.Context) (NodeAddress, error) {
for {
for counter := uint32(0); ; counter++ {
if address := m.TryDialNext(); (address != NodeAddress{}) {
return address, nil
}

// If we have zero peers connected, we need to schedule a retry.
// This can happen, for example, when some retry delay is not fulfilled
if m.numDialingOrConnected() == 0 {
m.scheduleDial(ctx, m.retryDelay(counter+1, false))
}

select {
case <-m.dialWaker.Sleep():
continue
Expand All @@ -548,29 +575,36 @@ func (m *PeerManager) TryDialNext() NodeAddress {
// MaxConnectedUpgrade allows us to probe additional peers that have a
// higher score than any other peers, and if successful evict it.
if m.options.MaxConnected > 0 && len(m.connected)+len(m.dialing) >= int(m.options.MaxConnected)+int(m.options.MaxConnectedUpgrade) {
m.logger.Trace("max connected reached, skipping dial attempt")
return NodeAddress{}
}

cinfo := m.getConnectedInfo()
if m.options.MaxOutgoingConnections > 0 && cinfo.outgoing >= m.options.MaxOutgoingConnections {
m.logger.Trace("max outgoing connections reached, skipping dial attempt")
return NodeAddress{}
}

for _, peer := range m.store.Ranked() {
if m.dialing[peer.ID] || m.isConnected(peer.ID) {
m.logger.Trace("peer dialing or connected, skipping", "peer", peer)
continue
}

if !peer.LastDisconnected.IsZero() && time.Since(peer.LastDisconnected) < m.options.DisconnectCooldownPeriod {
m.logger.Trace("peer within disconnect cooldown period, skipping", "peer", peer, "cooldown_period", m.options.DisconnectCooldownPeriod)
continue
}

for _, addressInfo := range peer.AddressInfo {
if time.Since(addressInfo.LastDialFailure) < m.retryDelay(addressInfo.DialFailures, peer.Persistent) {
delay := m.retryDelay(addressInfo.DialFailures, peer.Persistent)
if time.Since(addressInfo.LastDialFailure) < delay {
m.logger.Trace("not dialing peer due to retry delay", "peer", peer, "delay", delay, "last_failure", addressInfo.LastDialFailure)
continue
}

if id, ok := m.store.Resolve(addressInfo.Address); ok && (m.isConnected(id) || m.dialing[id]) {
m.logger.Trace("peer address already dialing", "peer", peer, "address", addressInfo.Address.String())
continue
}

Expand All @@ -583,6 +617,12 @@ func (m *PeerManager) TryDialNext() NodeAddress {
// peer (since they're ordered by score via peerStore.Ranked).
if m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) {
upgradeFromPeer := m.findUpgradeCandidate(peer.ID, peer.Score())
m.logger.Trace("max connected reached, checking upgrade candidate",
"peer", peer,
"max_connected", m.options.MaxConnected,
"connected", len(m.connected),
"upgrade_candidate", upgradeFromPeer,
)
if upgradeFromPeer == "" {
return NodeAddress{}
}
Expand Down Expand Up @@ -626,27 +666,19 @@ func (m *PeerManager) DialFailed(ctx context.Context, address NodeAddress) error
return err
}

// We spawn a goroutine that notifies DialNext() again when the retry
// timeout has elapsed, so that we can consider dialing it again. We
// calculate the retry delay outside the goroutine, since it must hold
// the mutex lock.
if d := m.retryDelay(addressInfo.DialFailures, peer.Persistent); d != 0 && d != retryNever {
go func() {
// Use an explicit timer with deferred cleanup instead of
// time.After(), to avoid leaking goroutines on PeerManager.Close().
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-timer.C:
m.dialWaker.Wake()
case <-ctx.Done():
}
}()
delay := m.retryDelay(addressInfo.DialFailures, peer.Persistent)
m.scheduleDial(ctx, delay)

return nil
}

// scheduleDial will dial peers after some delay
func (m *PeerManager) scheduleDial(ctx context.Context, delay time.Duration) {
if delay > 0 && delay != retryNever {
m.dialWaker.WakeAfter(delay)
} else {
m.dialWaker.Wake()
}

return nil
}

// Dialed marks a peer as successfully dialed. Any further connections will be
Expand Down Expand Up @@ -791,6 +823,9 @@ func (m *PeerManager) Accepted(peerID types.NodeID, peerOpts ...func(*peerInfo))
if upgradeFromPeer != "" {
m.evict[upgradeFromPeer] = true
}

evictPeerAfterTimeout(m, peerID, peerConnectionIncoming, m.options.MaxIncomingConnectionTime)

m.evictWaker.Wake()
return nil
}
Expand Down Expand Up @@ -1052,7 +1087,7 @@ func (m *PeerManager) Advertise(peerID types.NodeID, limit uint16) []NodeAddress
// 10% of the time we'll randomly insert a "loosing"
// peer.

// nolint:gosec // G404: Use of weak random number generator
//nolint:gosec // G404: Use of weak random number generator
if numAddresses <= int(limit) || rand.Intn((meanAbsScore*2)+1) <= scores[peer.ID]+1 || rand.Intn((idx+1)*10) <= idx+1 {
addresses = append(addresses, addressInfo.Address)
addedLastIteration = true
Expand Down Expand Up @@ -1609,6 +1644,37 @@ func (p *peerInfo) Validate() error {
return nil
}

func (p *peerInfo) IsZero() bool {
return p == nil || len(p.ID) == 0
}

func (p *peerInfo) MarshalZerologObject(e *zerolog.Event) {
if p == nil {
return
}

e.Str("node_id", string(p.ID))
if len(p.ProTxHash) != 0 {
e.Str("protxhash", p.ProTxHash.ShortString())
}
e.Time("last_connected", p.LastConnected)
e.Time("last_disconnected", p.LastDisconnected)
if p.Persistent {
e.Bool("persistent", p.Persistent)
}
e.Int64("height", p.Height)
if p.FixedScore != 0 {
e.Int16("fixed_score", int16(p.FixedScore))
}
if p.MutableScore != 0 {
e.Int64("mutable_score", p.MutableScore)
}
if p.Inactive {
e.Bool("inactive", p.Inactive)
}
e.Int16("score", int16(p.Score()))
}

// peerAddressInfo contains information and statistics about a peer address.
type peerAddressInfo struct {
Address NodeAddress
Expand Down Expand Up @@ -1712,6 +1778,20 @@ func (m *PeerManager) UpdatePeerInfo(nodeID types.NodeID, modifier func(peerInfo
return m.store.Set(peer)
}

// getPeer() loads and returns peer from store, together with last connection direction, if any
func (m *PeerManager) getPeer(peerID types.NodeID) (peerInfo, peerConnectionDirection) {
m.mtx.Lock()
defer m.mtx.Unlock()

p, ok := m.store.Get(peerID)
if !ok {
return peerInfo{}, peerConnectionNone
}

connType := m.connected[peerID]
return p, connType
}

// IsDialingOrConnected returns true if dialing to a peer at the moment or already connected otherwise false
func (m *PeerManager) IsDialingOrConnected(nodeID types.NodeID) bool {
m.mtx.Lock()
Expand All @@ -1720,9 +1800,28 @@ func (m *PeerManager) IsDialingOrConnected(nodeID types.NodeID) bool {
return m.dialing[nodeID] || ok
}

func (m *PeerManager) numDialingOrConnected() int {
m.mtx.Lock()
defer m.mtx.Unlock()
return len(m.connected) + len(m.dialing)
}

// SetProTxHashToPeerInfo sets a proTxHash in peerInfo.proTxHash to keep this value in a store
func SetProTxHashToPeerInfo(proTxHash types.ProTxHash) func(info *peerInfo) {
return func(info *peerInfo) {
info.ProTxHash = proTxHash.Copy()
}
}

// evictPeerAfterTimeout evicts incoming peer for which the timeout expired.
func evictPeerAfterTimeout(m *PeerManager, peerID types.NodeID, direction peerConnectionDirection, timeout time.Duration) {
if timeout > 0 {
time.AfterFunc(timeout, func() {
olderThan := time.Now().Add(-timeout)
p, connType := m.getPeer(peerID)
if !p.IsZero() && connType == direction && !p.Persistent && p.LastConnected.Before(olderThan) {
m.EvictPeer(peerID)
}
})
}
}
Loading