diff --git a/config/config.go b/config/config.go index 3a48dedc64..702f3e54dd 100644 --- a/config/config.go +++ b/config/config.go @@ -668,6 +668,16 @@ type P2PConfig struct { //nolint: maligned // attempts per IP address. MaxIncomingConnectionAttempts uint `mapstructure:"max-incoming-connection-attempts"` + // MaxIncomingConnectionTime limits maximum duration after which incoming peer will be evicted. + // Defaults to 0 which disables this mechanism. + // Used on seed nodes to evict peers and make space for others. + MaxIncomingConnectionTime time.Duration `mapstructure:"max-incoming-connection-time"` + + // IncomingConnectionWindow describes how often an IP address + // can attempt to create a new connection. Defaults to 10 + // milliseconds, and cannot be less than 1 millisecond. + IncomingConnectionWindow time.Duration `mapstructure:"incoming-connection-window"` + // Comma separated list of peer IDs to keep private (will not be gossiped to // other peers) PrivatePeerIDs string `mapstructure:"private-peer-ids"` @@ -703,6 +713,8 @@ func DefaultP2PConfig() *P2PConfig { MaxConnections: 64, MaxOutgoingConnections: 12, MaxIncomingConnectionAttempts: 100, + MaxIncomingConnectionTime: 0, + IncomingConnectionWindow: 10 * time.Millisecond, FlushThrottleTimeout: 100 * time.Millisecond, // The MTU (Maximum Transmission Unit) for Ethernet is 1500 bytes. // The IP header and the TCP header take up 20 bytes each at least (unless @@ -736,6 +748,12 @@ func (cfg *P2PConfig) ValidateBasic() error { if cfg.MaxOutgoingConnections > cfg.MaxConnections { return errors.New("max-outgoing-connections cannot be larger than max-connections") } + if cfg.MaxIncomingConnectionTime < 0 { + return errors.New("max-incoming-connection-time can't be negative") + } + if cfg.IncomingConnectionWindow < 1*time.Millisecond { + return errors.New("incoming-connection-window must be set to at least 1ms") + } return nil } diff --git a/config/toml.go b/config/toml.go index bb7518c8a7..a7e318be2b 100644 --- a/config/toml.go +++ b/config/toml.go @@ -328,6 +328,16 @@ max-outgoing-connections = {{ .P2P.MaxOutgoingConnections }} # Rate limits the number of incoming connection attempts per IP address. max-incoming-connection-attempts = {{ .P2P.MaxIncomingConnectionAttempts }} +# Limits maximum duration after which incoming peer will be evicted. +# Defaults to 0 which disables this mechanism. +# Used on seed nodes to evict peers and make space for others. +max-incoming-connection-time = "{{ .P2P.MaxIncomingConnectionTime }}" + +# incoming-connection-window describes how often an IP address +# can attempt to create a new connection. Defaults to 10 +# milliseconds, and cannot be less than 1 millisecond. +incoming-connection-window = "{{ .P2P.IncomingConnectionWindow }}" + # Comma separated list of peer IDs to keep private (will not be gossiped to other peers) # Warning: IPs will be exposed at /net_info, for more information https://github.com/tendermint/tendermint/issues/3055 private-peer-ids = "{{ .P2P.PrivatePeerIDs }}" diff --git a/internal/libs/sync/waker.go b/internal/libs/sync/waker.go index 0aff3ddf83..4aa78dc64c 100644 --- a/internal/libs/sync/waker.go +++ b/internal/libs/sync/waker.go @@ -1,10 +1,17 @@ package sync +import ( + "sync" + "time" +) + // Waker is used to wake up a sleeper when some event occurs. It debounces // multiple wakeup calls occurring between each sleep, and wakeups are // non-blocking to avoid having to coordinate goroutines. type Waker struct { wakeCh chan struct{} + mtx sync.Mutex + timers []*time.Timer } // NewWaker creates a new Waker. @@ -28,3 +35,24 @@ func (w *Waker) Wake() { default: } } + +// WakeAfter wakes up the sleeper after some delay. +func (w *Waker) WakeAfter(delay time.Duration) { + w.mtx.Lock() + defer w.mtx.Unlock() + + w.timers = append(w.timers, time.AfterFunc(delay, w.Wake)) +} + +// Close closes the waker and cleans up its resources +func (w *Waker) Close() error { + w.mtx.Lock() + defer w.mtx.Unlock() + + for _, timer := range w.timers { + if timer != nil { + timer.Stop() + } + } + return nil +} diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index 4e4865a43c..91e5a3f7f6 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -9,13 +9,14 @@ import ( "sort" "time" - sync "github.com/sasha-s/go-deadlock" - "github.com/gogo/protobuf/proto" "github.com/google/orderedcode" + "github.com/rs/zerolog" + sync "github.com/sasha-s/go-deadlock" dbm "github.com/tendermint/tm-db" tmsync "github.com/tendermint/tendermint/internal/libs/sync" + "github.com/tendermint/tendermint/libs/log" p2pproto "github.com/tendermint/tendermint/proto/tendermint/p2p" "github.com/tendermint/tendermint/types" ) @@ -42,7 +43,8 @@ const ( type peerConnectionDirection int const ( - peerConnectionIncoming peerConnectionDirection = iota + 1 + peerConnectionNone peerConnectionDirection = iota + peerConnectionIncoming peerConnectionOutgoing ) @@ -136,6 +138,11 @@ type PeerManagerOptions struct { // the connection and evict a lower-scored peer. MaxConnectedUpgrade uint16 + // MaxIncomingConnectionTime limits maximum duration after which incoming peer will be evicted. + // Defaults to 0 which disables this mechanism. + // Used on seed nodes to evict peers and make space for others. + MaxIncomingConnectionTime time.Duration + // MinRetryTime is the minimum time to wait between retries. Retry times // double for each retry, up to MaxRetryTime. 0 disables retries. MinRetryTime time.Duration @@ -298,6 +305,7 @@ type PeerManager struct { rand *rand.Rand dialWaker *tmsync.Waker // wakes up DialNext() on relevant peer changes evictWaker *tmsync.Waker // wakes up EvictNext() on relevant peer changes + logger log.Logger mtx sync.Mutex store *peerStore @@ -329,9 +337,10 @@ func NewPeerManager(selfID types.NodeID, peerDB dbm.DB, options PeerManagerOptio peerManager := &PeerManager{ selfID: selfID, options: options, - rand: rand.New(rand.NewSource(time.Now().UnixNano())), // nolint:gosec + rand: rand.New(rand.NewSource(time.Now().UnixNano())), //nolint:gosec dialWaker: tmsync.NewWaker(), evictWaker: tmsync.NewWaker(), + logger: log.NewNopLogger(), metrics: NopMetrics(), store: store, @@ -357,6 +366,18 @@ func NewPeerManager(selfID types.NodeID, peerDB dbm.DB, options PeerManagerOptio return peerManager, nil } +// SetLogger sets a logger for the PeerManager +func (m *PeerManager) SetLogger(logger log.Logger) { + m.logger = logger +} + +// Close closes peer manager and frees up all resources +func (m *PeerManager) Close() error { + m.evictWaker.Close() + m.dialWaker.Close() + return nil +} + // configurePeers configures peers in the peer store with ephemeral runtime // configuration, e.g. PersistentPeers. It also removes ourself, if we're in the // peer store. The caller must hold the mutex lock. @@ -524,11 +545,17 @@ func (m *PeerManager) HasDialedMaxPeers() bool { // becomes available. The caller must call Dialed() or DialFailed() for the // returned peer. func (m *PeerManager) DialNext(ctx context.Context) (NodeAddress, error) { - for { + for counter := uint32(0); ; counter++ { if address := m.TryDialNext(); (address != NodeAddress{}) { return address, nil } + // If we have zero peers connected, we need to schedule a retry. + // This can happen, for example, when some retry delay is not fulfilled + if m.numDialingOrConnected() == 0 { + m.scheduleDial(ctx, m.retryDelay(counter+1, false)) + } + select { case <-m.dialWaker.Sleep(): continue @@ -548,29 +575,36 @@ func (m *PeerManager) TryDialNext() NodeAddress { // MaxConnectedUpgrade allows us to probe additional peers that have a // higher score than any other peers, and if successful evict it. if m.options.MaxConnected > 0 && len(m.connected)+len(m.dialing) >= int(m.options.MaxConnected)+int(m.options.MaxConnectedUpgrade) { + m.logger.Trace("max connected reached, skipping dial attempt") return NodeAddress{} } cinfo := m.getConnectedInfo() if m.options.MaxOutgoingConnections > 0 && cinfo.outgoing >= m.options.MaxOutgoingConnections { + m.logger.Trace("max outgoing connections reached, skipping dial attempt") return NodeAddress{} } for _, peer := range m.store.Ranked() { if m.dialing[peer.ID] || m.isConnected(peer.ID) { + m.logger.Trace("peer dialing or connected, skipping", "peer", peer) continue } if !peer.LastDisconnected.IsZero() && time.Since(peer.LastDisconnected) < m.options.DisconnectCooldownPeriod { + m.logger.Trace("peer within disconnect cooldown period, skipping", "peer", peer, "cooldown_period", m.options.DisconnectCooldownPeriod) continue } for _, addressInfo := range peer.AddressInfo { - if time.Since(addressInfo.LastDialFailure) < m.retryDelay(addressInfo.DialFailures, peer.Persistent) { + delay := m.retryDelay(addressInfo.DialFailures, peer.Persistent) + if time.Since(addressInfo.LastDialFailure) < delay { + m.logger.Trace("not dialing peer due to retry delay", "peer", peer, "delay", delay, "last_failure", addressInfo.LastDialFailure) continue } if id, ok := m.store.Resolve(addressInfo.Address); ok && (m.isConnected(id) || m.dialing[id]) { + m.logger.Trace("peer address already dialing", "peer", peer, "address", addressInfo.Address.String()) continue } @@ -583,6 +617,12 @@ func (m *PeerManager) TryDialNext() NodeAddress { // peer (since they're ordered by score via peerStore.Ranked). if m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) { upgradeFromPeer := m.findUpgradeCandidate(peer.ID, peer.Score()) + m.logger.Trace("max connected reached, checking upgrade candidate", + "peer", peer, + "max_connected", m.options.MaxConnected, + "connected", len(m.connected), + "upgrade_candidate", upgradeFromPeer, + ) if upgradeFromPeer == "" { return NodeAddress{} } @@ -626,27 +666,19 @@ func (m *PeerManager) DialFailed(ctx context.Context, address NodeAddress) error return err } - // We spawn a goroutine that notifies DialNext() again when the retry - // timeout has elapsed, so that we can consider dialing it again. We - // calculate the retry delay outside the goroutine, since it must hold - // the mutex lock. - if d := m.retryDelay(addressInfo.DialFailures, peer.Persistent); d != 0 && d != retryNever { - go func() { - // Use an explicit timer with deferred cleanup instead of - // time.After(), to avoid leaking goroutines on PeerManager.Close(). - timer := time.NewTimer(d) - defer timer.Stop() - select { - case <-timer.C: - m.dialWaker.Wake() - case <-ctx.Done(): - } - }() + delay := m.retryDelay(addressInfo.DialFailures, peer.Persistent) + m.scheduleDial(ctx, delay) + + return nil +} + +// scheduleDial will dial peers after some delay +func (m *PeerManager) scheduleDial(ctx context.Context, delay time.Duration) { + if delay > 0 && delay != retryNever { + m.dialWaker.WakeAfter(delay) } else { m.dialWaker.Wake() } - - return nil } // Dialed marks a peer as successfully dialed. Any further connections will be @@ -791,6 +823,9 @@ func (m *PeerManager) Accepted(peerID types.NodeID, peerOpts ...func(*peerInfo)) if upgradeFromPeer != "" { m.evict[upgradeFromPeer] = true } + + evictPeerAfterTimeout(m, peerID, peerConnectionIncoming, m.options.MaxIncomingConnectionTime) + m.evictWaker.Wake() return nil } @@ -1052,7 +1087,7 @@ func (m *PeerManager) Advertise(peerID types.NodeID, limit uint16) []NodeAddress // 10% of the time we'll randomly insert a "loosing" // peer. - // nolint:gosec // G404: Use of weak random number generator + //nolint:gosec // G404: Use of weak random number generator if numAddresses <= int(limit) || rand.Intn((meanAbsScore*2)+1) <= scores[peer.ID]+1 || rand.Intn((idx+1)*10) <= idx+1 { addresses = append(addresses, addressInfo.Address) addedLastIteration = true @@ -1609,6 +1644,37 @@ func (p *peerInfo) Validate() error { return nil } +func (p *peerInfo) IsZero() bool { + return p == nil || len(p.ID) == 0 +} + +func (p *peerInfo) MarshalZerologObject(e *zerolog.Event) { + if p == nil { + return + } + + e.Str("node_id", string(p.ID)) + if len(p.ProTxHash) != 0 { + e.Str("protxhash", p.ProTxHash.ShortString()) + } + e.Time("last_connected", p.LastConnected) + e.Time("last_disconnected", p.LastDisconnected) + if p.Persistent { + e.Bool("persistent", p.Persistent) + } + e.Int64("height", p.Height) + if p.FixedScore != 0 { + e.Int16("fixed_score", int16(p.FixedScore)) + } + if p.MutableScore != 0 { + e.Int64("mutable_score", p.MutableScore) + } + if p.Inactive { + e.Bool("inactive", p.Inactive) + } + e.Int16("score", int16(p.Score())) +} + // peerAddressInfo contains information and statistics about a peer address. type peerAddressInfo struct { Address NodeAddress @@ -1712,6 +1778,20 @@ func (m *PeerManager) UpdatePeerInfo(nodeID types.NodeID, modifier func(peerInfo return m.store.Set(peer) } +// getPeer() loads and returns peer from store, together with last connection direction, if any +func (m *PeerManager) getPeer(peerID types.NodeID) (peerInfo, peerConnectionDirection) { + m.mtx.Lock() + defer m.mtx.Unlock() + + p, ok := m.store.Get(peerID) + if !ok { + return peerInfo{}, peerConnectionNone + } + + connType := m.connected[peerID] + return p, connType +} + // IsDialingOrConnected returns true if dialing to a peer at the moment or already connected otherwise false func (m *PeerManager) IsDialingOrConnected(nodeID types.NodeID) bool { m.mtx.Lock() @@ -1720,9 +1800,28 @@ func (m *PeerManager) IsDialingOrConnected(nodeID types.NodeID) bool { return m.dialing[nodeID] || ok } +func (m *PeerManager) numDialingOrConnected() int { + m.mtx.Lock() + defer m.mtx.Unlock() + return len(m.connected) + len(m.dialing) +} + // SetProTxHashToPeerInfo sets a proTxHash in peerInfo.proTxHash to keep this value in a store func SetProTxHashToPeerInfo(proTxHash types.ProTxHash) func(info *peerInfo) { return func(info *peerInfo) { info.ProTxHash = proTxHash.Copy() } } + +// evictPeerAfterTimeout evicts incoming peer for which the timeout expired. +func evictPeerAfterTimeout(m *PeerManager, peerID types.NodeID, direction peerConnectionDirection, timeout time.Duration) { + if timeout > 0 { + time.AfterFunc(timeout, func() { + olderThan := time.Now().Add(-timeout) + p, connType := m.getPeer(peerID) + if !p.IsZero() && connType == direction && !p.Persistent && p.LastConnected.Before(olderThan) { + m.EvictPeer(peerID) + } + }) + } +} diff --git a/internal/p2p/peermanager_test.go b/internal/p2p/peermanager_test.go index 3e72c333b2..7f8291c194 100644 --- a/internal/p2p/peermanager_test.go +++ b/internal/p2p/peermanager_test.go @@ -1261,6 +1261,58 @@ func TestPeerManager_Accepted_UpgradeDialing(t *testing.T) { require.Error(t, peerManager.Dialed(b)) } +// TestPeerManager_Accepted_Timeout ensures that an incoming peer will be evicted after `MaxIncomingConnectionTime` +func TestPeerManager_Accepted_Timeout(t *testing.T) { + ctx := context.Background() + + // FIXME: maxIncomingTime might require tuning on low-resource runners (eg. github) + // Feel free to increase it a bit if it fails - it should not affect the test logic + const maxIncomingTime = 10 * time.Millisecond + const processingTime = maxIncomingTime / 10 + + address := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} + + peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{ + MaxIncomingConnectionTime: maxIncomingTime, + DisconnectCooldownPeriod: 1 * time.Nanosecond, + }) + + require.NoError(t, err) + + // Accepting a connection from a known peer should work. + added, err := peerManager.Add(address) + require.NoError(t, err) + require.True(t, added) + require.NoError(t, peerManager.Accepted(address.NodeID)) + + // Initially, no peers are marked for eviction + evict, err := peerManager.TryEvictNext() + assert.NoError(t, err) + assert.Zero(t, evict, "No peer should be evicted") + + // After 1/2 of time, we disconnect and reconnect + time.Sleep(maxIncomingTime / 2) + evict, err = peerManager.TryEvictNext() + assert.NoError(t, err) + assert.Zero(t, evict, "No peer should be evicted") + + peerManager.Disconnected(ctx, address.NodeID) + time.Sleep(processingTime) + require.NoError(t, peerManager.Accepted(address.NodeID)) + + // After another 1/2 of time, we still don't expect peer to be evicted + time.Sleep(maxIncomingTime / 2) + evict, err = peerManager.TryEvictNext() + assert.NoError(t, err) + assert.Zero(t, evict, "Second peer connection was evicted after timeout starting at first connection") + + // But additional 1/2 of time, plus some processing time, should evict the peer + time.Sleep(maxIncomingTime/2 + processingTime) + evict, err = peerManager.TryEvictNext() + assert.NoError(t, err) + assert.Equal(t, address.NodeID, evict, "No peer should be evicted") +} + func TestPeerManager_Ready(t *testing.T) { a := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("a", 40))} b := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("b", 40))} diff --git a/internal/p2p/pex/reactor_test.go b/internal/p2p/pex/reactor_test.go index 9257a317b4..f92e1344bc 100644 --- a/internal/p2p/pex/reactor_test.go +++ b/internal/p2p/pex/reactor_test.go @@ -301,6 +301,7 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor { peerUpdates := p2p.NewPeerUpdates(peerCh, chBuf) peerManager, err := p2p.NewPeerManager(nodeID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) require.NoError(t, err) + defer peerManager.Close() chCreator := func(context.Context, *p2p.ChannelDescriptor) (p2p.Channel, error) { return pexCh, nil diff --git a/node/node.go b/node/node.go index d8583f292b..9d0ad06704 100644 --- a/node/node.go +++ b/node/node.go @@ -225,7 +225,13 @@ func makeNode( weAreOnlyValidator := onlyValidatorIsUs(state, proTxHash) - peerManager, peerCloser, err := createPeerManager(cfg, dbProvider, nodeKey.ID, nodeMetrics.p2p) + peerManager, peerCloser, err := createPeerManager( + cfg, + dbProvider, + nodeKey.ID, + nodeMetrics.p2p, + logger.With("module", "peermanager"), + ) closers = append(closers, peerCloser) if err != nil { return nil, combineCloseError( @@ -390,7 +396,12 @@ func makeNode( nodeMetrics.consensus.BlockSyncing.Set(1) } - node.services = append(node.services, pex.NewReactor(logger, peerManager, node.router.OpenChannel, peerManager.Subscribe)) + node.services = append(node.services, pex.NewReactor( + logger.With("module", "pex"), + peerManager, + node.router.OpenChannel, + peerManager.Subscribe), + ) // Set up state sync reactor, and schedule a sync if requested. // FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy, @@ -775,9 +786,10 @@ func loadStateFromDBOrGenesisDocProvider(stateStore sm.Store, genDoc *types.Gene func getRouterConfig(conf *config.Config, appClient abciclient.Client) p2p.RouterOptions { opts := p2p.RouterOptions{ - QueueType: conf.P2P.QueueType, - HandshakeTimeout: conf.P2P.HandshakeTimeout, - DialTimeout: conf.P2P.DialTimeout, + QueueType: conf.P2P.QueueType, + HandshakeTimeout: conf.P2P.HandshakeTimeout, + DialTimeout: conf.P2P.DialTimeout, + IncomingConnectionWindow: conf.P2P.IncomingConnectionWindow, } if conf.FilterPeers && appClient != nil { diff --git a/node/seed.go b/node/seed.go index 194bf4dbcb..6446ceb760 100644 --- a/node/seed.go +++ b/node/seed.go @@ -62,7 +62,7 @@ func makeSeedNode( // Setup Transport and Switch. p2pMetrics := p2p.PrometheusMetrics(cfg.Instrumentation.Namespace, "chain_id", genDoc.ChainID) - peerManager, closer, err := createPeerManager(cfg, dbProvider, nodeKey.ID, p2pMetrics) + peerManager, closer, err := createPeerManager(cfg, dbProvider, nodeKey.ID, p2pMetrics, logger) if err != nil { return nil, combineCloseError( fmt.Errorf("failed to create peer manager: %w", err), diff --git a/node/setup.go b/node/setup.go index 8bf57ed0f8..4f0019471d 100644 --- a/node/setup.go +++ b/node/setup.go @@ -197,6 +197,7 @@ func createPeerManager( dbProvider config.DBProvider, nodeID types.NodeID, metrics *p2p.Metrics, + logger log.Logger, ) (*p2p.PeerManager, closer, error) { selfAddr, err := p2p.ParseNodeAddress(nodeID.AddressString(cfg.P2P.ExternalAddress)) @@ -229,18 +230,19 @@ func createPeerManager( maxUpgradeConns := uint16(4) options := p2p.PeerManagerOptions{ - SelfAddress: selfAddr, - MaxConnected: maxConns, - MaxOutgoingConnections: maxOutgoingConns, - MaxConnectedUpgrade: maxUpgradeConns, - DisconnectCooldownPeriod: 2 * time.Second, - MaxPeers: maxUpgradeConns + 4*maxConns, - MinRetryTime: 250 * time.Millisecond, - MaxRetryTime: 30 * time.Minute, - MaxRetryTimePersistent: 5 * time.Minute, - RetryTimeJitter: 5 * time.Second, - PrivatePeers: privatePeerIDs, - Metrics: metrics, + SelfAddress: selfAddr, + MaxConnected: maxConns, + MaxOutgoingConnections: maxOutgoingConns, + MaxIncomingConnectionTime: cfg.P2P.MaxIncomingConnectionTime, + MaxConnectedUpgrade: maxUpgradeConns, + DisconnectCooldownPeriod: 2 * time.Second, + MaxPeers: maxUpgradeConns + 4*maxConns, + MinRetryTime: 250 * time.Millisecond, + MaxRetryTime: 30 * time.Minute, + MaxRetryTimePersistent: 5 * time.Minute, + RetryTimeJitter: 5 * time.Second, + PrivatePeers: privatePeerIDs, + Metrics: metrics, } peers := []p2p.NodeAddress{} @@ -271,14 +273,18 @@ func createPeerManager( if err != nil { return nil, peerDB.Close, fmt.Errorf("failed to create peer manager: %w", err) } - + peerManager.SetLogger(logger) + closer := func() error { + peerManager.Close() + return peerDB.Close() + } for _, peer := range peers { if _, err := peerManager.Add(peer); err != nil { - return nil, peerDB.Close, fmt.Errorf("failed to add peer %q: %w", peer, err) + return nil, closer, fmt.Errorf("failed to add peer %q: %w", peer, err) } } - return peerManager, peerDB.Close, nil + return peerManager, closer, nil } func createRouter( diff --git a/test/e2e/networks/dashcore.toml b/test/e2e/networks/dashcore.toml index dec82fab55..9a9cad0552 100644 --- a/test/e2e/networks/dashcore.toml +++ b/test/e2e/networks/dashcore.toml @@ -37,6 +37,12 @@ validator05 = 100 [node.seed01] mode = "seed" perturb = ["restart"] +persistent_peers = ["validator01"] + +p2p_max_connections = 4 +p2p_max_outgoing_connections = 2 +p2p_max_incoming_connection_time = "5s" +p2p_incoming_connection_window = "10s" [node.validator01] seeds = ["seed01"] diff --git a/test/e2e/networks/rotate.toml b/test/e2e/networks/rotate.toml index b8901c37e7..9c2cab4798 100644 --- a/test/e2e/networks/rotate.toml +++ b/test/e2e/networks/rotate.toml @@ -49,6 +49,12 @@ validator09 = 100 [node.seed01] mode = "seed" perturb = ["restart"] +persistent_peers = ["validator01"] + +p2p_max_connections = 4 +p2p_max_outgoing_connections = 2 +p2p_max_incoming_connection_time = "5s" +p2p_incoming_connection_window = "10s" [node.validator01] seeds = ["seed01"] diff --git a/test/e2e/pkg/exec/exec.go b/test/e2e/pkg/exec/exec.go index 9dcd793844..a8e03dce55 100644 --- a/test/e2e/pkg/exec/exec.go +++ b/test/e2e/pkg/exec/exec.go @@ -1,15 +1,18 @@ package exec import ( + "bytes" "context" "fmt" + "io" "os" osexec "os/exec" + "time" ) // Command executes a shell command. func Command(ctx context.Context, args ...string) error { - // nolint: gosec + //nolint: gosec // G204: Subprocess launched with a potential tainted input or cmd arguments cmd := osexec.CommandContext(ctx, args[0], args[1:]...) out, err := cmd.CombinedOutput() @@ -25,10 +28,50 @@ func Command(ctx context.Context, args ...string) error { // CommandVerbose executes a shell command while displaying its output. func CommandVerbose(ctx context.Context, args ...string) error { - // nolint: gosec + //nolint: gosec // G204: Subprocess launched with a potential tainted input or cmd arguments cmd := osexec.CommandContext(ctx, args[0], args[1:]...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr + now := time.Now() + cmd.Stdout = &tsWriter{out: os.Stdout, start: now} + cmd.Stderr = &tsWriter{out: os.Stderr, start: now} return cmd.Run() } + +// tsWriter prepends each item in written data with current timestamp. +// It is used mainly to add info about execution time to output of `e2e runner test` +type tsWriter struct { + out io.Writer + start time.Time + tsAdded bool // tsAdded is true if timestamp was already added to current line +} + +// Write implements io.Writer +func (w *tsWriter) Write(p []byte) (n int, err error) { + for n = 0; n < len(p); { + if !w.tsAdded { + took := time.Since(w.start) + + ts := fmt.Sprintf("%3.5fs ", took.Seconds()) + if _, err := w.out.Write([]byte(ts)); err != nil { + return n, err + } + w.tsAdded = true + } + + index := bytes.IndexByte(p[n:], '\n') + if index < 0 { + // not found + index = len(p) - 1 + } else { + // we have \n, let's add timestamp in next loop + w.tsAdded = false + } + w, err := w.out.Write(p[n : n+index+1]) + n += w + if err != nil { + return n, err + } + } + + return n, nil +} diff --git a/test/e2e/pkg/manifest.go b/test/e2e/pkg/manifest.go index 888dd592b0..cc768f8784 100644 --- a/test/e2e/pkg/manifest.go +++ b/test/e2e/pkg/manifest.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "sort" + "time" "github.com/BurntSushi/toml" @@ -165,6 +166,11 @@ type ManifestNode struct { // SnapshotInterval and EvidenceAgeHeight. RetainBlocks uint64 `toml:"retain_blocks"` + P2PMaxConnections uint16 `toml:"p2p_max_connections"` + P2PMaxOutgoingConnections uint16 `toml:"p2p_max_outgoing_connections"` + P2PMaxIncomingConnectionTime time.Duration `toml:"p2p_max_incoming_connection_time"` + P2PIncomingConnectionWindow time.Duration `toml:"p2p_incoming_connection_window"` + // Perturb lists perturbations to apply to the node after it has been // started and synced with the network: // diff --git a/test/e2e/pkg/testnet.go b/test/e2e/pkg/testnet.go index 5c3895245a..e6c1f20686 100644 --- a/test/e2e/pkg/testnet.go +++ b/test/e2e/pkg/testnet.go @@ -105,29 +105,33 @@ type Testnet struct { // Node represents a Tenderdash node in a testnet. type Node struct { - Name string - Testnet *Testnet - Mode Mode - PrivvalKeys map[string]crypto.QuorumKeys - PrivvalUpdateHeights map[string]crypto.QuorumHash - NodeKey crypto.PrivKey - ProTxHash crypto.ProTxHash - IP net.IP - ProxyPort uint32 - StartAt int64 - Mempool string - StateSync string - Database string - PrivvalProtocol Protocol - PersistInterval uint64 - SnapshotInterval uint64 - RetainBlocks uint64 - Seeds []*Node - PersistentPeers []*Node - Perturbations []Perturbation - LogLevel string - QueueType string - HasStarted bool + Name string + Testnet *Testnet + Mode Mode + PrivvalKeys map[string]crypto.QuorumKeys + PrivvalUpdateHeights map[string]crypto.QuorumHash + NodeKey crypto.PrivKey + ProTxHash crypto.ProTxHash + IP net.IP + ProxyPort uint32 + StartAt int64 + Mempool string + StateSync string + Database string + PrivvalProtocol Protocol + PersistInterval uint64 + SnapshotInterval uint64 + RetainBlocks uint64 + P2PMaxConnections uint16 + P2PMaxOutgoingConnections uint16 + P2PMaxIncomingConnectionTime time.Duration + P2PIncomingConnectionWindow time.Duration + Seeds []*Node + PersistentPeers []*Node + Perturbations []Perturbation + LogLevel string + QueueType string + HasStarted bool } // LoadTestnet loads a testnet from a manifest file, using the filename to @@ -274,6 +278,19 @@ func LoadTestnet(file string) (*Testnet, error) { if nodeManifest.PersistInterval != nil { node.PersistInterval = *nodeManifest.PersistInterval } + if nodeManifest.P2PMaxConnections > 0 { + node.P2PMaxConnections = nodeManifest.P2PMaxConnections + } + if nodeManifest.P2PMaxOutgoingConnections > 0 { + node.P2PMaxOutgoingConnections = nodeManifest.P2PMaxOutgoingConnections + } + if nodeManifest.P2PMaxIncomingConnectionTime > 0 { + node.P2PMaxIncomingConnectionTime = nodeManifest.P2PMaxIncomingConnectionTime + } + if nodeManifest.P2PIncomingConnectionWindow > 0 { + node.P2PIncomingConnectionWindow = nodeManifest.P2PIncomingConnectionWindow + } + for _, p := range nodeManifest.Perturb { node.Perturbations = append(node.Perturbations, Perturbation(p)) } @@ -504,7 +521,7 @@ func (n Node) Validate(testnet Testnet) error { return fmt.Errorf("unsupported p2p queue type: %s", n.QueueType) } switch n.Database { - case "goleveldb", "cleveldb", "boltdb", "rocksdb", "badgerdb": + case "goleveldb", "cleveldb", "boltdb", "rocksdb", "badgerdb", "memdb": default: return fmt.Errorf("invalid database setting %q", n.Database) } diff --git a/test/e2e/runner/setup.go b/test/e2e/runner/setup.go index ce497c0057..4ba9cfb250 100644 --- a/test/e2e/runner/setup.go +++ b/test/e2e/runner/setup.go @@ -296,12 +296,20 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) { } } - cfg.P2P.PersistentPeers = "" - for _, peer := range node.PersistentPeers { - if len(cfg.P2P.PersistentPeers) > 0 { - cfg.P2P.PersistentPeers += "," - } - cfg.P2P.PersistentPeers += peer.AddressP2P(true) + cfg.P2P.PersistentPeers = joinNodeP2PAddresses(node.PersistentPeers, true, ",") + cfg.P2P.BootstrapPeers = joinNodeP2PAddresses(node.Seeds, true, ",") + + if node.P2PMaxConnections > 0 { + cfg.P2P.MaxConnections = node.P2PMaxConnections + } + if node.P2PMaxOutgoingConnections > 0 { + cfg.P2P.MaxOutgoingConnections = node.P2PMaxOutgoingConnections + } + if node.P2PMaxIncomingConnectionTime > 0 { + cfg.P2P.MaxIncomingConnectionTime = node.P2PMaxIncomingConnectionTime + } + if node.P2PIncomingConnectionWindow > 0 { + cfg.P2P.IncomingConnectionWindow = node.P2PIncomingConnectionWindow } cfg.Instrumentation.Prometheus = true @@ -309,6 +317,14 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) { return cfg, nil } +func joinNodeP2PAddresses(nodes []*e2e.Node, withID bool, sep string) string { + addresses := []string{} + for _, node := range nodes { + addresses = append(addresses, node.AddressP2P(withID)) + } + return strings.Join(addresses, sep) +} + // MakeAppConfig generates an ABCI application config for a node. func MakeAppConfig(node *e2e.Node) ([]byte, error) { cfg := map[string]interface{}{