diff --git a/blocksync/pool.go b/blocksync/pool.go index 7645fd586d..4f75be3337 100644 --- a/blocksync/pool.go +++ b/blocksync/pool.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "math" + "sort" "time" flow "github.com/cometbft/cometbft/libs/flowrate" @@ -27,22 +28,26 @@ eg, L = latency = 0.1s */ const ( + requestIntervalMS = 2 maxPendingRequestsPerPeer = 20 requestRetrySeconds = 30 + // peerConnWait is the time that must have elapsed since the pool routine + // was created before we start making requests. This is to give the peer + // routine time to connect to peers. + peerConnWait = 3 * time.Second + // Minimum recv rate to ensure we're receiving blocks from a peer fast // enough. If a peer is not sending us data at at least that rate, we // consider them to have timedout and we disconnect. // - // Assuming a DSL connection (not a good choice) 128 Kbps (upload) ~ 15 KB/s, - // sending data across atlantic ~ 7.5 KB/s. - minRecvRate = 7680 + // Based on the experiments with [Osmosis](https://osmosis.zone/), the + // minimum rate could be as high as 500 KB/s. However, we're setting it to + // 128 KB/s for now to be conservative. + minRecvRate = 128 * 1024 // 128 KB/s ) -var ( - requestInterval = 10 * time.Millisecond // timeout between requests - peerTimeout = 15 * time.Second // not const so we can override with tests -) +var peerTimeout = 7 * time.Second // not const so we can override with tests /* Peers self report their heights when we join the block pool. @@ -58,7 +63,8 @@ var ( // BlockPool keeps track of the block sync peers, block requests and block responses. type BlockPool struct { service.BaseService - startTime time.Time + startTime time.Time + startHeight int64 mtx cmtsync.Mutex // block requests @@ -66,7 +72,8 @@ type BlockPool struct { height int64 // the lowest key in requesters. // peers peers map[p2p.ID]*bpPeer - maxPeerHeight int64 // the biggest reported height + sortedPeers []*bpPeer // sorted by curRate, highest first + maxPeerHeight int64 // the biggest reported height requestsCh chan<- BlockRequest errorsCh chan<- peerError @@ -78,8 +85,9 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p bp := &BlockPool{ peers: make(map[p2p.ID]*bpPeer), - requesters: make(map[int64]*bpRequester), - height: start, + requesters: make(map[int64]*bpRequester), + height: start, + startHeight: start, requestsCh: requestsCh, errorsCh: errorsCh, @@ -91,8 +99,8 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p // OnStart implements service.Service by spawning requesters routine and recording // pool's start time. func (pool *BlockPool) OnStart() error { - go pool.makeRequestersRoutine() pool.startTime = time.Now() + go pool.makeRequestersRoutine() return nil } @@ -111,14 +119,33 @@ func (pool *BlockPool) makeRequestersRoutine() { ) pool.mtx.Unlock() + // Check if we are within peerConnWait seconds of start time + // This gives us some time to connect to peers before starting a wave of requests + if time.Since(pool.startTime) < peerConnWait { + // Calculate the duration to sleep until peerConnWait seconds have passed since pool.startTime + sleepDuration := peerConnWait - time.Since(pool.startTime) + time.Sleep(sleepDuration) + } + + // Check if we are within 2 seconds of start time + // This gives us some time to connect to peers before starting a wave of requests + if time.Since(pool.startTime) < 3*time.Second { + // Calculate the duration to sleep until 2 seconds have passed since pool.startTime + sleepDuration := 3*time.Second - time.Since(pool.startTime) + time.Sleep(sleepDuration) + } + switch { case maxRequestersCreated: // If we have enough requesters, wait for them to finish. - time.Sleep(requestInterval) + time.Sleep(requestIntervalMS * time.Millisecond) pool.removeTimedoutPeers() case maxPeerHeightReached: // If we're caught up, wait for a bit so reactor could finish or a higher height is reported. - time.Sleep(requestInterval) + time.Sleep(requestIntervalMS * time.Millisecond) default: + // request for more blocks. pool.makeNextRequester(nextHeight) + // Sleep for a bit to make the requests more ordered. + time.Sleep(requestIntervalMS * time.Millisecond) } } } @@ -140,11 +167,13 @@ func (pool *BlockPool) removeTimedoutPeers() { "minRate", fmt.Sprintf("%d KB/s", minRecvRate/1024)) peer.didTimeout = true } + peer.curRate = curRate } if peer.didTimeout { pool.removePeer(peer.id) } } + pool.sortPeers() } // IsCaughtUp returns true if this node is caught up, false - otherwise. @@ -187,44 +216,61 @@ func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) return } -// PopRequest pops the first block at pool.height. -// It must have been validated by 'second'.Commit from PeekTwoBlocks(). +// PopRequest removes the requester at pool.height and increments pool.height. func (pool *BlockPool) PopRequest() { pool.mtx.Lock() defer pool.mtx.Unlock() - if r := pool.requesters[pool.height]; r != nil { - /* The block can disappear at any time, due to removePeer(). - if r := pool.requesters[pool.height]; r == nil || r.block == nil { - PanicSanity("PopRequest() requires a valid block") - } - */ - if err := r.Stop(); err != nil { - pool.Logger.Error("Error stopping requester", "err", err) - } - delete(pool.requesters, pool.height) - pool.height++ - } else { + r := pool.requesters[pool.height] + if r == nil { panic(fmt.Sprintf("Expected requester to pop, got nothing at height %v", pool.height)) } + + if err := r.Stop(); err != nil { + pool.Logger.Error("Error stopping requester", "err", err) + } + delete(pool.requesters, pool.height) + pool.height++ + + // Notify the next minBlocksForSingleRequest requesters about new height, so + // they can potentially request a block from the second peer. + for i := int64(0); i < minBlocksForSingleRequest && i < int64(len(pool.requesters)); i++ { + pool.requesters[pool.height+i].newHeight(pool.height) + } } -// RedoRequest invalidates the block at pool.height, -// Remove the peer and redo request from others. +// RemovePeerAndRedoAllPeerRequests retries the request at the given height and +// all the requests made to the same peer. The peer is removed from the pool. // Returns the ID of the removed peer. -func (pool *BlockPool) RedoRequest(height int64) p2p.ID { +func (pool *BlockPool) RemovePeerAndRedoAllPeerRequests(height int64) p2p.ID { pool.mtx.Lock() defer pool.mtx.Unlock() request := pool.requesters[height] - peerID := request.getPeerID() - if peerID != p2p.ID("") { - // RemovePeer will redo all requesters associated with this peer. - pool.removePeer(peerID) - } + peerID := request.gotBlockFromPeerID() + // RemovePeer will redo all requesters associated with this peer. + pool.removePeer(peerID) return peerID } +// RedoRequestFrom retries the request at the given height. It does not remove the +// peer. +func (pool *BlockPool) RedoRequestFrom(height int64, peerID p2p.ID) { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + if requester, ok := pool.requesters[height]; ok { // If we requested this block + if requester.didRequestFrom(peerID) { // From this specific peer + requester.redo(peerID) + } + } +} + +// Deprecated: use RemovePeerAndRedoAllPeerRequests instead. +func (pool *BlockPool) RedoRequest(height int64) p2p.ID { + return pool.RemovePeerAndRedoAllPeerRequests(height) +} + // AddBlock validates that the block comes from the peer it was expected from and calls the requester to store it. // TODO: ensure that blocks come in order for each peer. func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int) { @@ -233,29 +279,21 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int requester := pool.requesters[block.Height] if requester == nil { - pool.Logger.Info( - "peer sent us a block we didn't expect", - "peer", - peerID, - "curHeight", - pool.height, - "blockHeight", - block.Height) - diff := pool.height - block.Height - if diff < 0 { - diff *= -1 - } - const maxDiff = 100 // maximum difference between current and received block height - if diff > maxDiff { - pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID) + // Because we're issuing 2nd requests for closer blocks, it's possible to + // receive a block we've already processed from a second peer. Hence, we + // can't punish it. But if the peer sent us a block we clearly didn't + // request, we disconnect. + if block.Height > pool.height || block.Height < pool.startHeight { + err := fmt.Errorf("peer sent us block #%d we didn't expect (current height: %d, start height: %d)", + block.Height, pool.height, pool.startHeight) + pool.sendError(err, peerID) } return } if !requester.setBlock(block, peerID) { - err := errors.New("requester is different or block already exists") + err := fmt.Errorf("requested block #%d from %v, not %s", block.Height, requester.requestedFrom(), peerID) pool.sendError(err, peerID) - return } peer := pool.peers[peerID] @@ -264,6 +302,13 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int } } +// Height returns the pool's height. +func (pool *BlockPool) Height() int64 { + pool.mtx.Lock() + defer pool.mtx.Unlock() + return pool.height +} + // MaxPeerHeight returns the highest reported height. func (pool *BlockPool) MaxPeerHeight() int64 { pool.mtx.Lock() @@ -284,6 +329,9 @@ func (pool *BlockPool) SetPeerRange(peerID p2p.ID, base int64, height int64) { peer = newBPPeer(pool, peerID, base, height) peer.setLogger(pool.Logger.With("peer", peerID)) pool.peers[peerID] = peer + // no need to sort because curRate is 0 at start. + // just add to the beginning so it's picked first by pickIncrAvailablePeer. + pool.sortedPeers = append([]*bpPeer{peer}, pool.sortedPeers...) } if height > pool.maxPeerHeight { @@ -302,7 +350,7 @@ func (pool *BlockPool) RemovePeer(peerID p2p.ID) { func (pool *BlockPool) removePeer(peerID p2p.ID) { for _, requester := range pool.requesters { - if requester.getPeerID() == peerID { + if requester.didRequestFrom(peerID) { requester.redo(peerID) } } @@ -314,6 +362,12 @@ func (pool *BlockPool) removePeer(peerID p2p.ID) { } delete(pool.peers, peerID) + for i, p := range pool.sortedPeers { + if p.id == peerID { + pool.sortedPeers = append(pool.sortedPeers[:i], pool.sortedPeers[i+1:]...) + break + } + } // Find a new peer with the biggest height and update maxPeerHeight if the // peer's height was the biggest. @@ -336,11 +390,14 @@ func (pool *BlockPool) updateMaxPeerHeight() { // Pick an available peer with the given height available. // If no peers are available, returns nil. -func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { +func (pool *BlockPool) pickIncrAvailablePeer(height int64, excludePeerID p2p.ID) *bpPeer { pool.mtx.Lock() defer pool.mtx.Unlock() - for _, peer := range pool.peers { + for _, peer := range pool.sortedPeers { + if peer.id == excludePeerID { + continue + } if peer.didTimeout { pool.removePeer(peer.id) continue @@ -357,6 +414,15 @@ func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { return nil } +// Sort peers by curRate, highest first. +// +// CONTRACT: pool.mtx must be locked. +func (pool *BlockPool) sortPeers() { + sort.Slice(pool.sortedPeers, func(i, j int) bool { + return pool.sortedPeers[i].curRate > pool.sortedPeers[j].curRate + }) +} + func (pool *BlockPool) makeNextRequester(nextHeight int64) { pool.mtx.Lock() request := newBPRequester(pool, nextHeight) @@ -406,6 +472,7 @@ func (pool *BlockPool) debug() string { type bpPeer struct { didTimeout bool + curRate int64 numPending int32 height int64 base int64 @@ -478,27 +545,34 @@ func (peer *bpPeer) onTimeout() { //------------------------------------- +const minBlocksForSingleRequest = 50 + type bpRequester struct { service.BaseService - pool *BlockPool - height int64 - gotBlockCh chan struct{} - redoCh chan p2p.ID // redo may send multitime, add peerId to identify repeat + pool *BlockPool + height int64 + gotBlockCh chan struct{} + redoCh chan p2p.ID // redo may got multiple messages, add peerId to identify + newHeightCh chan int64 - mtx cmtsync.Mutex - peerID p2p.ID - block *types.Block + mtx cmtsync.Mutex + peerID p2p.ID + secondPeerID p2p.ID // alternative peer to request from (if close to pool's height) + gotBlockFrom p2p.ID + block *types.Block } func newBPRequester(pool *BlockPool, height int64) *bpRequester { bpr := &bpRequester{ - pool: pool, - height: height, - gotBlockCh: make(chan struct{}, 1), - redoCh: make(chan p2p.ID, 1), - - peerID: "", - block: nil, + pool: pool, + height: height, + gotBlockCh: make(chan struct{}, 1), + redoCh: make(chan p2p.ID, 1), + newHeightCh: make(chan int64, 1), + + peerID: "", + secondPeerID: "", + block: nil, } bpr.BaseService = *service.NewBaseService(nil, "bpRequester", bpr) return bpr @@ -512,11 +586,17 @@ func (bpr *bpRequester) OnStart() error { // Returns true if the peer matches and block doesn't already exist. func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool { bpr.mtx.Lock() - if bpr.block != nil || bpr.peerID != peerID { + if bpr.peerID != peerID && bpr.secondPeerID != peerID { bpr.mtx.Unlock() return false } + if bpr.block != nil { + bpr.mtx.Unlock() + return true // getting a block from both peers is not an error + } + bpr.block = block + bpr.gotBlockFrom = peerID bpr.mtx.Unlock() select { @@ -532,19 +612,53 @@ func (bpr *bpRequester) getBlock() *types.Block { return bpr.block } -func (bpr *bpRequester) getPeerID() p2p.ID { +// Returns the IDs of peers we've requested a block from. +func (bpr *bpRequester) requestedFrom() []p2p.ID { bpr.mtx.Lock() defer bpr.mtx.Unlock() - return bpr.peerID + peerIDs := make([]p2p.ID, 0, 2) + if bpr.peerID != "" { + peerIDs = append(peerIDs, bpr.peerID) + } + if bpr.secondPeerID != "" { + peerIDs = append(peerIDs, bpr.secondPeerID) + } + return peerIDs +} + +// Returns true if we've requested a block from the given peer. +func (bpr *bpRequester) didRequestFrom(peerID p2p.ID) bool { + bpr.mtx.Lock() + defer bpr.mtx.Unlock() + return bpr.peerID == peerID || bpr.secondPeerID == peerID +} + +// Returns the ID of the peer who sent us the block. +func (bpr *bpRequester) gotBlockFromPeerID() p2p.ID { + bpr.mtx.Lock() + defer bpr.mtx.Unlock() + return bpr.gotBlockFrom } -// This is called from the requestRoutine, upon redo(). -func (bpr *bpRequester) reset() { +// Removes the block (IF we got it from the given peer) and resets the peer. +func (bpr *bpRequester) reset(peerID p2p.ID) (removedBlock bool) { bpr.mtx.Lock() defer bpr.mtx.Unlock() - bpr.peerID = "" - bpr.block = nil + // Only remove the block if we got it from that peer. + if bpr.gotBlockFrom == peerID { + bpr.block = nil + bpr.gotBlockFrom = "" + removedBlock = true + } + + if bpr.peerID == peerID { + bpr.peerID = "" + } else { + bpr.secondPeerID = "" + } + + return removedBlock } // Tells bpRequester to pick another peer and try again. @@ -557,34 +671,72 @@ func (bpr *bpRequester) redo(peerID p2p.ID) { } } +func (bpr *bpRequester) pickPeerAndSendRequest() { + bpr.mtx.Lock() + secondPeerID := bpr.secondPeerID + bpr.mtx.Unlock() + var peer *bpPeer +PICK_PEER_LOOP: + for { + if !bpr.IsRunning() || !bpr.pool.IsRunning() { + return + } + peer = bpr.pool.pickIncrAvailablePeer(bpr.height, secondPeerID) + if peer == nil { + bpr.Logger.Debug("No peers currently available; will retry shortly", "height", bpr.height) + time.Sleep(requestIntervalMS * time.Millisecond) + continue PICK_PEER_LOOP + } + break PICK_PEER_LOOP + } + bpr.mtx.Lock() + bpr.peerID = peer.id + bpr.mtx.Unlock() + bpr.pool.sendRequest(bpr.height, peer.id) +} + +// Picks a second peer and sends a request to it. If the second peer is already +// set, does nothing. +func (bpr *bpRequester) pickSecondPeerAndSendRequest() { + bpr.mtx.Lock() + if bpr.secondPeerID != "" { + bpr.mtx.Unlock() + return + } + peerID := bpr.peerID + bpr.mtx.Unlock() + + secondPeer := bpr.pool.pickIncrAvailablePeer(bpr.height, peerID) + if secondPeer != nil { + bpr.mtx.Lock() + bpr.secondPeerID = secondPeer.id + bpr.mtx.Unlock() + bpr.pool.sendRequest(bpr.height, secondPeer.id) + } +} + +// Informs the requester of a new pool's height. +func (bpr *bpRequester) newHeight(height int64) { + select { + case bpr.newHeightCh <- height: + default: + } +} + // Responsible for making more requests as necessary // Returns only when a block is found (e.g. AddBlock() is called) func (bpr *bpRequester) requestRoutine() { + gotBlock := false + OUTER_LOOP: for { - // Pick a peer to send request to. - var peer *bpPeer - PICK_PEER_LOOP: - for { - if !bpr.IsRunning() || !bpr.pool.IsRunning() { - return - } - peer = bpr.pool.pickIncrAvailablePeer(bpr.height) - if peer == nil { - bpr.Logger.Debug("No peers currently available; will retry shortly", "height", bpr.height) - time.Sleep(requestInterval) - continue PICK_PEER_LOOP - } - break PICK_PEER_LOOP + bpr.pickPeerAndSendRequest() + + poolHeight := bpr.pool.Height() + if bpr.height-poolHeight < minBlocksForSingleRequest { + bpr.pickSecondPeerAndSendRequest() } - bpr.mtx.Lock() - bpr.peerID = peer.id - bpr.mtx.Unlock() - to := time.NewTimer(requestRetrySeconds * time.Second) - // Send request and wait. - bpr.pool.sendRequest(bpr.height, peer.id) - WAIT_LOOP: for { select { case <-bpr.pool.Quit(): @@ -594,22 +746,34 @@ OUTER_LOOP: return case <-bpr.Quit(): return - case <-to.C: - bpr.Logger.Debug("Retrying block request after timeout", "height", bpr.height, "peer", bpr.peerID) - // Simulate a redo - bpr.reset() - continue OUTER_LOOP + case <-time.After(requestRetrySeconds * time.Second): + if !gotBlock { + bpr.Logger.Debug("Retrying block request(s) after timeout", "height", bpr.height, "peer", bpr.peerID, "secondPeerID", bpr.secondPeerID) + bpr.reset(bpr.peerID) + bpr.reset(bpr.secondPeerID) + continue OUTER_LOOP + } case peerID := <-bpr.redoCh: - if peerID == bpr.peerID { - bpr.reset() + if bpr.didRequestFrom(peerID) { + removedBlock := bpr.reset(peerID) + if removedBlock { + gotBlock = false + } + } + // If both peers returned NoBlockResponse or bad block, reschedule both + // requests. If not, wait for the other peer. + if len(bpr.requestedFrom()) == 0 { continue OUTER_LOOP - } else { - continue WAIT_LOOP + } + case newHeight := <-bpr.newHeightCh: + if !gotBlock && bpr.height-newHeight < minBlocksForSingleRequest { + // The operation is a noop if the second peer is already set. The cost is checking a mutex. + bpr.pickSecondPeerAndSendRequest() } case <-bpr.gotBlockCh: + gotBlock = true // We got a block! // Continue the for-loop and wait til Quit. - continue WAIT_LOOP } } } diff --git a/blocksync/pool_test.go b/blocksync/pool_test.go index 8309b19e42..acada65585 100644 --- a/blocksync/pool_test.go +++ b/blocksync/pool_test.go @@ -84,7 +84,6 @@ func TestBlockPoolBasic(t *testing.T) { errorsCh = make(chan peerError) requestsCh = make(chan BlockRequest) ) - pool := NewBlockPool(start, requestsCh, errorsCh) pool.SetLogger(log.TestingLogger()) @@ -146,7 +145,6 @@ func TestBlockPoolTimeout(t *testing.T) { errorsCh = make(chan peerError) requestsCh = make(chan BlockRequest) ) - pool := NewBlockPool(start, requestsCh, errorsCh) pool.SetLogger(log.TestingLogger()) diff --git a/blocksync/reactor.go b/blocksync/reactor.go index 9867ac1b7e..f7eb6efb95 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -252,6 +252,7 @@ func (bcR *Reactor) ReceiveEnvelope(e p2p.Envelope) { bcR.pool.SetPeerRange(e.Src.ID(), msg.Base, msg.Height) case *bcproto.NoBlockResponse: bcR.Logger.Debug("Peer does not have requested block", "peer", e.Src, "height", msg.Height) + bcR.pool.RedoRequestFrom(msg.Height, e.Src.ID()) default: bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) } @@ -399,14 +400,14 @@ FOR_LOOP: } bcR.Logger.Error("Error in validation", "err", err) - peerID := bcR.pool.RedoRequest(first.Height) + peerID := bcR.pool.RemovePeerAndRedoAllPeerRequests(first.Height) peer := bcR.Switch.Peers().Get(peerID) if peer != nil { // NOTE: we've already removed the peer's request, but we // still need to clean up the rest. bcR.Switch.StopPeerForError(peer, fmt.Errorf("Reactor validation error: %v", err)) } - peerID2 := bcR.pool.RedoRequest(second.Height) + peerID2 := bcR.pool.RemovePeerAndRedoAllPeerRequests(second.Height) peer2 := bcR.Switch.Peers().Get(peerID2) if peer2 != nil && peer2 != peer { // NOTE: we've already removed the peer's request, but we