diff --git a/config/config.go b/config/config.go index 538f191fb..dac0546b9 100644 --- a/config/config.go +++ b/config/config.go @@ -643,6 +643,9 @@ type P2PConfig struct { //nolint: maligned // Comma separated list of nodes to keep persistent connections to PersistentPeers string `mapstructure:"persistent-peers"` + // Comma separated list of nodes for block sync only + BlockSyncPeers string `mapstructure:"blocksync-peers"` + // UPNP port forwarding UPNP bool `mapstructure:"upnp"` @@ -712,7 +715,7 @@ func DefaultP2PConfig() *P2PConfig { RecvRate: 5120000, // 5 mB/s PexReactor: true, AllowDuplicateIP: false, - HandshakeTimeout: 20 * time.Second, + HandshakeTimeout: 5 * time.Second, DialTimeout: 3 * time.Second, TestDialFail: false, QueueType: "simple-priority", diff --git a/config/toml.go b/config/toml.go index 0dceb12e5..db2fc9c0d 100644 --- a/config/toml.go +++ b/config/toml.go @@ -306,6 +306,9 @@ bootstrap-peers = "{{ .P2P.BootstrapPeers }}" # Comma separated list of nodes to keep persistent connections to persistent-peers = "{{ .P2P.PersistentPeers }}" +# Comma separated list of nodes for block sync only +blocksync-peers = "{{ .P2P.BlockSyncPeers }}" + # UPNP port forwarding upnp = {{ .P2P.UPNP }} diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index 0bea807ca..939741ceb 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -31,9 +31,8 @@ eg, L = latency = 0.1s */ const ( - requestInterval = 10 * time.Millisecond - inactiveSleepInterval = 1 * time.Second - maxTotalRequesters = 600 + requestInterval = 100 * time.Millisecond + maxTotalRequesters = 50 maxPeerErrBuffer = 1000 maxPendingRequests = maxTotalRequesters maxPendingRequestsPerPeer = 20 @@ -54,7 +53,7 @@ const ( BadBlock RetryReason = "BadBlock" ) -var peerTimeout = 10 * time.Second // not const so we can override with tests +var peerTimeout = 2 * time.Second // not const so we can override with tests /* Peers self report their heights when we join the block pool. @@ -356,6 +355,12 @@ func (pool *BlockPool) SetPeerRange(peerID types.NodeID, base int64, height int6 pool.mtx.Lock() defer pool.mtx.Unlock() + blockSyncPeers := pool.peerManager.GetBlockSyncPeers() + if len(blockSyncPeers) > 0 && !blockSyncPeers[peerID] { + pool.logger.Info(fmt.Sprintf("Skip adding peer %s for blocksync, num of blocksync peers: %d, num of pool peers: %d", peerID, len(blockSyncPeers), len(pool.peers))) + return + } + peer := pool.peers[peerID] if peer != nil { peer.base = base @@ -370,7 +375,7 @@ func (pool *BlockPool) SetPeerRange(peerID types.NodeID, base int64, height int6 logger: pool.logger.With("peer", peerID), startAt: time.Now(), } - + pool.logger.Info(fmt.Sprintf("Adding peer %s to blocksync pool", peerID)) pool.peers[peerID] = peer } @@ -431,6 +436,7 @@ func (pool *BlockPool) getSortedPeers(peers map[types.NodeID]*bpPeer) []types.No for peer := range peers { sortedPeers = append(sortedPeers, peer) } + // Sort from high to low score sort.Slice(sortedPeers, func(i, j int) bool { return pool.peerManager.Score(sortedPeers[i]) > pool.peerManager.Score(sortedPeers[j]) diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 5504f075e..d2a5f1a73 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -560,10 +560,11 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh continue case r.pool.IsCaughtUp() && r.previousMaxPeerHeight <= r.pool.MaxPeerHeight(): - r.logger.Info("switching to consensus reactor", "height", height) + r.logger.Info("switching to consensus reactor after caught up", "height", height) case time.Since(lastAdvance) > syncTimeout: r.logger.Error("no progress since last advance", "last_advance", lastAdvance) + continue default: r.logger.Info( @@ -611,8 +612,7 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh // See https://github.com/tendermint/tendermint/pull/8433#discussion_r866790631 panic(fmt.Errorf("peeked first block without extended commit at height %d - possible node store corruption", first.Height)) } else if first == nil || second == nil { - // we need to have fetched two consecutive blocks in order to - // perform blocksync verification + // we need to have fetched two consecutive blocks in order to perform blocksync verification continue } diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index 44fd3dd03..f006ab150 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -25,7 +25,7 @@ import ( const ( // retryNever is returned by retryDelay() when retries are disabled. retryNever time.Duration = math.MaxInt64 - // DefaultScore is the default score for a peer during initialization + // DefaultMutableScore is the default score for a peer during initialization DefaultMutableScore int64 = 10 ) @@ -101,6 +101,9 @@ type PeerManagerOptions struct { // Peers to which a connection will be (re)established ignoring any existing limits UnconditionalPeers []types.NodeID + // Only include those peers for block sync + BlockSyncPeers []types.NodeID + // MaxPeers is the maximum number of peers to track information about, i.e. // store in the peer store. When exceeded, the lowest-scored unconnected peers // will be deleted. 0 means no limit. @@ -157,6 +160,9 @@ type PeerManagerOptions struct { // List of node IDs, to which a connection will be (re)established ignoring any existing limits unconditionalPeers map[types.NodeID]struct{} + + // blocksyncPeers provides fast blocksyncPeers lookups. + blocksyncPeers map[types.NodeID]bool } // Validate validates the options. @@ -217,6 +223,13 @@ func (o *PeerManagerOptions) isPersistent(id types.NodeID) bool { return o.persistentPeers[id] } +func (o *PeerManagerOptions) isBlockSync(id types.NodeID) bool { + if o.blocksyncPeers == nil { + panic("isBlockSync() called before optimize()") + } + return o.blocksyncPeers[id] +} + func (o *PeerManagerOptions) isUnconditional(id types.NodeID) bool { if o.unconditionalPeers == nil { panic("isUnconditional() called before optimize()") @@ -234,6 +247,11 @@ func (o *PeerManagerOptions) optimize() { o.persistentPeers[p] = true } + o.blocksyncPeers = make(map[types.NodeID]bool, len(o.BlockSyncPeers)) + for _, p := range o.BlockSyncPeers { + o.blocksyncPeers[p] = true + } + o.unconditionalPeers = make(map[types.NodeID]struct{}, len(o.UnconditionalPeers)) for _, p := range o.UnconditionalPeers { o.unconditionalPeers[p] = struct{}{} @@ -367,6 +385,9 @@ func (m *PeerManager) configurePeers() error { for _, id := range m.options.UnconditionalPeers { configure[id] = true } + for _, id := range m.options.BlockSyncPeers { + configure[id] = true + } for id := range m.options.PeerScores { configure[id] = true } @@ -384,6 +405,7 @@ func (m *PeerManager) configurePeers() error { func (m *PeerManager) configurePeer(peer peerInfo) peerInfo { peer.Persistent = m.options.isPersistent(peer.ID) peer.Unconditional = m.options.isUnconditional(peer.ID) + peer.BlockSync = m.options.isBlockSync(peer.ID) peer.FixedScore = m.options.PeerScores[peer.ID] return peer } @@ -464,6 +486,10 @@ func (m *PeerManager) Add(address NodeAddress) (bool, error) { return true, nil } +func (m *PeerManager) GetBlockSyncPeers() map[types.NodeID]bool { + return m.options.blocksyncPeers +} + // PeerRatio returns the ratio of peer addresses stored to the maximum size. func (m *PeerManager) PeerRatio() float64 { m.mtx.Lock() @@ -1318,6 +1344,7 @@ type peerInfo struct { // These fields are ephemeral, i.e. not persisted to the database. Persistent bool Unconditional bool + BlockSync bool Seed bool Height int64 FixedScore PeerScore // mainly for tests @@ -1388,7 +1415,7 @@ func (p *peerInfo) Score() PeerScore { } score := p.MutableScore - if p.Persistent { + if p.Persistent || p.BlockSync { score = int64(PeerScorePersistent) } diff --git a/node/setup.go b/node/setup.go index 921f37f21..526382440 100644 --- a/node/setup.go +++ b/node/setup.go @@ -255,6 +255,16 @@ func createPeerManager( peers = append(peers, address) } + for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.BlockSyncPeers, ",", " ") { + address, err := p2p.ParseNodeAddress(p) + if err != nil { + return nil, func() error { return nil }, fmt.Errorf("invalid peer address %q: %w", p, err) + } + + peers = append(peers, address) + options.BlockSyncPeers = append(options.BlockSyncPeers, address.NodeID) + } + for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.UnconditionalPeerIDs, ",", " ") { options.UnconditionalPeers = append(options.UnconditionalPeers, types.NodeID(p)) }