From 4ec78728a38b516b633e690b8c1f4c4eb220ca0d Mon Sep 17 00:00:00 2001 From: Adam Tucker Date: Wed, 6 Mar 2024 16:28:44 -0700 Subject: [PATCH 1/5] add all upstream p2p fixes --- blocksync/pool.go | 420 +++++++++++++++++++++++++++-------------- blocksync/pool_test.go | 20 +- blocksync/reactor.go | 9 +- 3 files changed, 299 insertions(+), 150 deletions(-) diff --git a/blocksync/pool.go b/blocksync/pool.go index 9a95fe5621..e4a4bad547 100644 --- a/blocksync/pool.go +++ b/blocksync/pool.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "math" + "sort" "sync/atomic" "time" @@ -29,24 +30,25 @@ eg, L = latency = 0.1s const ( requestIntervalMS = 2 - maxTotalRequesters = 600 - maxPendingRequests = maxTotalRequesters maxPendingRequestsPerPeer = 20 requestRetrySeconds = 30 + // peerConnWait is the time that must have elapsed since the pool routine + // was created before we start making requests. This is to give the peer + // routine time to connect to peers. + peerConnWait = 3 * time.Second + // Minimum recv rate to ensure we're receiving blocks from a peer fast // enough. If a peer is not sending us data at at least that rate, we // consider them to have timedout and we disconnect. // - // Assuming a DSL connection (not a good choice) 128 Kbps (upload) ~ 15 KB/s, - // sending data across atlantic ~ 7.5 KB/s. - minRecvRate = 7680 - - // Maximum difference between current and new block's height. - maxDiffBetweenCurrentAndReceivedBlockHeight = 100 + // Based on the experiments with [Osmosis](https://osmosis.zone/), the + // minimum rate could be as high as 500 KB/s. However, we're setting it to + // 128 KB/s for now to be conservative. + minRecvRate = 128 * 1024 // 128 KB/s ) -var peerTimeout = 15 * time.Second // not const so we can override with tests +var peerTimeout = 7 * time.Second // not const so we can override with tests /* Peers self report their heights when we join the block pool. @@ -62,7 +64,8 @@ var peerTimeout = 15 * time.Second // not const so we can override with tests // BlockPool keeps track of the block sync peers, block requests and block responses. type BlockPool struct { service.BaseService - startTime time.Time + startTime time.Time + startHeight int64 mtx cmtsync.Mutex // block requests @@ -70,7 +73,8 @@ type BlockPool struct { height int64 // the lowest key in requesters. // peers peers map[p2p.ID]*bpPeer - maxPeerHeight int64 // the biggest reported height + sortedPeers []*bpPeer // sorted by curRate, highest first + maxPeerHeight int64 // the biggest reported height // atomic numPending int32 // number of requests pending assignment or block response @@ -85,9 +89,10 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p bp := &BlockPool{ peers: make(map[p2p.ID]*bpPeer), - requesters: make(map[int64]*bpRequester), - height: start, - numPending: 0, + requesters: make(map[int64]*bpRequester), + height: start, + startHeight: start, + numPending: 0, requestsCh: requestsCh, errorsCh: errorsCh, @@ -108,24 +113,45 @@ func (pool *BlockPool) OnStart() error { func (pool *BlockPool) makeRequestersRoutine() { for { if !pool.IsRunning() { - break + return + } + + pool.mtx.Lock() + var ( + maxRequestersCreated = len(pool.requesters) >= len(pool.peers)*maxPendingRequestsPerPeer + + nextHeight = pool.height + int64(len(pool.requesters)) + maxPeerHeightReached = nextHeight > pool.maxPeerHeight + ) + pool.mtx.Unlock() + + // Check if we are within peerConnWait seconds of start time + // This gives us some time to connect to peers before starting a wave of requests + if time.Since(pool.startTime) < peerConnWait { + // Calculate the duration to sleep until peerConnWait seconds have passed since pool.startTime + sleepDuration := peerConnWait - time.Since(pool.startTime) + time.Sleep(sleepDuration) + } + + // Check if we are within 2 seconds of start time + // This gives us some time to connect to peers before starting a wave of requests + if time.Since(pool.startTime) < 3*time.Second { + // Calculate the duration to sleep until 2 seconds have passed since pool.startTime + sleepDuration := 3*time.Second - time.Since(pool.startTime) + time.Sleep(sleepDuration) } - _, numPending, lenRequesters := pool.GetStatus() switch { - case numPending >= maxPendingRequests: - // sleep for a bit. + case maxRequestersCreated: // If we have enough requesters, wait for them to finish. time.Sleep(requestIntervalMS * time.Millisecond) - // check for timed out peers pool.removeTimedoutPeers() - case lenRequesters >= maxTotalRequesters: - // sleep for a bit. + case maxPeerHeightReached: // If we're caught up, wait for a bit so reactor could finish or a higher height is reported. time.Sleep(requestIntervalMS * time.Millisecond) - // check for timed out peers - pool.removeTimedoutPeers() default: // request for more blocks. - pool.makeNextRequester() + pool.makeNextRequester(nextHeight) + // Sleep for a bit to make the requests more ordered. + time.Sleep(requestIntervalMS * time.Millisecond) } } } @@ -147,11 +173,13 @@ func (pool *BlockPool) removeTimedoutPeers() { "minRate", fmt.Sprintf("%d KB/s", minRecvRate/1024)) peer.didTimeout = true } + peer.curRate = curRate } if peer.didTimeout { pool.removePeer(peer.id) } } + pool.sortPeers() } // GetStatus returns pool's height, numPending requests and the number of @@ -203,44 +231,61 @@ func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) return } -// PopRequest pops the first block at pool.height. -// It must have been validated by 'second'.Commit from PeekTwoBlocks(). +// PopRequest removes the requester at pool.height and increments pool.height. func (pool *BlockPool) PopRequest() { pool.mtx.Lock() defer pool.mtx.Unlock() - if r := pool.requesters[pool.height]; r != nil { - /* The block can disappear at any time, due to removePeer(). - if r := pool.requesters[pool.height]; r == nil || r.block == nil { - PanicSanity("PopRequest() requires a valid block") - } - */ - if err := r.Stop(); err != nil { - pool.Logger.Error("Error stopping requester", "err", err) - } - delete(pool.requesters, pool.height) - pool.height++ - } else { + r := pool.requesters[pool.height] + if r == nil { panic(fmt.Sprintf("Expected requester to pop, got nothing at height %v", pool.height)) } + + if err := r.Stop(); err != nil { + pool.Logger.Error("Error stopping requester", "err", err) + } + delete(pool.requesters, pool.height) + pool.height++ + + // Notify the next minBlocksForSingleRequest requesters about new height, so + // they can potentially request a block from the second peer. + for i := int64(0); i < minBlocksForSingleRequest && i < int64(len(pool.requesters)); i++ { + pool.requesters[pool.height+i].newHeight(pool.height) + } } -// RedoRequest invalidates the block at pool.height, -// Remove the peer and redo request from others. +// RemovePeerAndRedoAllPeerRequests retries the request at the given height and +// all the requests made to the same peer. The peer is removed from the pool. // Returns the ID of the removed peer. -func (pool *BlockPool) RedoRequest(height int64) p2p.ID { +func (pool *BlockPool) RemovePeerAndRedoAllPeerRequests(height int64) p2p.ID { pool.mtx.Lock() defer pool.mtx.Unlock() request := pool.requesters[height] - peerID := request.getPeerID() - if peerID != p2p.ID("") { - // RemovePeer will redo all requesters associated with this peer. - pool.removePeer(peerID) - } + peerID := request.gotBlockFromPeerID() + // RemovePeer will redo all requesters associated with this peer. + pool.removePeer(peerID) return peerID } +// RedoRequestFrom retries the request at the given height. It does not remove the +// peer. +func (pool *BlockPool) RedoRequestFrom(height int64, peerID p2p.ID) { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + if requester, ok := pool.requesters[height]; ok { // If we requested this block + if requester.didRequestFrom(peerID) { // From this specific peer + requester.redo(peerID) + } + } +} + +// Deprecated: use RemovePeerAndRedoAllPeerRequests instead. +func (pool *BlockPool) RedoRequest(height int64) p2p.ID { + return pool.RemovePeerAndRedoAllPeerRequests(height) +} + // AddBlock validates that the block comes from the peer it was expected from and calls the requester to store it. // TODO: ensure that blocks come in order for each peer. func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int) { @@ -249,33 +294,27 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int requester := pool.requesters[block.Height] if requester == nil { - pool.Logger.Info( - "peer sent us a block we didn't expect", - "peer", - peerID, - "curHeight", - pool.height, - "blockHeight", - block.Height) - diff := pool.height - block.Height - if diff < 0 { - diff *= -1 - } - if diff > maxDiffBetweenCurrentAndReceivedBlockHeight { - pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID) + // Because we're issuing 2nd requests for closer blocks, it's possible to + // receive a block we've already processed from a second peer. Hence, we + // can't punish it. But if the peer sent us a block we clearly didn't + // request, we disconnect. + if block.Height > pool.height || block.Height < pool.startHeight { + err := fmt.Errorf("peer sent us block #%d we didn't expect (current height: %d, start height: %d)", + block.Height, pool.height, pool.startHeight) + pool.sendError(err, peerID) } return } - if requester.setBlock(block, peerID) { - atomic.AddInt32(&pool.numPending, -1) - peer := pool.peers[peerID] - if peer != nil { - peer.decrPending(blockSize) - } - } else { - pool.Logger.Info("invalid peer", "peer", peerID, "blockHeight", block.Height) - pool.sendError(errors.New("invalid peer"), peerID) + if !requester.setBlock(block, peerID) { + err := fmt.Errorf("requested block #%d from %v, not %s", block.Height, requester.requestedFrom(), peerID) + pool.sendError(err, peerID) + } + + atomic.AddInt32(&pool.numPending, -1) + peer := pool.peers[peerID] + if peer != nil { + peer.decrPending(blockSize) } } @@ -299,6 +338,9 @@ func (pool *BlockPool) SetPeerRange(peerID p2p.ID, base int64, height int64) { peer = newBPPeer(pool, peerID, base, height) peer.setLogger(pool.Logger.With("peer", peerID)) pool.peers[peerID] = peer + // no need to sort because curRate is 0 at start. + // just add to the beginning so it's picked first by pickIncrAvailablePeer. + pool.sortedPeers = append([]*bpPeer{peer}, pool.sortedPeers...) } if height > pool.maxPeerHeight { @@ -317,7 +359,7 @@ func (pool *BlockPool) RemovePeer(peerID p2p.ID) { func (pool *BlockPool) removePeer(peerID p2p.ID) { for _, requester := range pool.requesters { - if requester.getPeerID() == peerID { + if requester.didRequestFrom(peerID) { requester.redo(peerID) } } @@ -329,6 +371,12 @@ func (pool *BlockPool) removePeer(peerID p2p.ID) { } delete(pool.peers, peerID) + for i, p := range pool.sortedPeers { + if p.id == peerID { + pool.sortedPeers = append(pool.sortedPeers[:i], pool.sortedPeers[i+1:]...) + break + } + } // Find a new peer with the biggest height and update maxPeerHeight if the // peer's height was the biggest. @@ -351,11 +399,14 @@ func (pool *BlockPool) updateMaxPeerHeight() { // Pick an available peer with the given height available. // If no peers are available, returns nil. -func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { +func (pool *BlockPool) pickIncrAvailablePeer(height int64, excludePeerID p2p.ID) *bpPeer { pool.mtx.Lock() defer pool.mtx.Unlock() - for _, peer := range pool.peers { + for _, peer := range pool.sortedPeers { + if peer.id == excludePeerID { + continue + } if peer.didTimeout { pool.removePeer(peer.id) continue @@ -372,30 +423,29 @@ func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { return nil } -func (pool *BlockPool) makeNextRequester() { +// Sort peers by curRate, highest first. +// +// CONTRACT: pool.mtx must be locked. +func (pool *BlockPool) sortPeers() { + sort.Slice(pool.sortedPeers, func(i, j int) bool { + return pool.sortedPeers[i].curRate > pool.sortedPeers[j].curRate + }) +} + +func (pool *BlockPool) makeNextRequester(nextHeight int64) { pool.mtx.Lock() defer pool.mtx.Unlock() - nextHeight := pool.height + pool.requestersLen() - if nextHeight > pool.maxPeerHeight { - return - } - request := newBPRequester(pool, nextHeight) pool.requesters[nextHeight] = request atomic.AddInt32(&pool.numPending, 1) - err := request.Start() - if err != nil { + if err := request.Start(); err != nil { request.Logger.Error("Error starting request", "err", err) } } -func (pool *BlockPool) requestersLen() int64 { - return int64(len(pool.requesters)) -} - func (pool *BlockPool) sendRequest(height int64, peerID p2p.ID) { if !pool.IsRunning() { return @@ -418,7 +468,7 @@ func (pool *BlockPool) debug() string { defer pool.mtx.Unlock() str := "" - nextHeight := pool.height + pool.requestersLen() + nextHeight := pool.height + int64(len(pool.requesters)) for h := pool.height; h < nextHeight; h++ { if pool.requesters[h] == nil { str += fmt.Sprintf("H(%v):X ", h) @@ -434,6 +484,7 @@ func (pool *BlockPool) debug() string { type bpPeer struct { didTimeout bool + curRate int64 numPending int32 height int64 base int64 @@ -506,27 +557,34 @@ func (peer *bpPeer) onTimeout() { //------------------------------------- +const minBlocksForSingleRequest = 50 + type bpRequester struct { service.BaseService - pool *BlockPool - height int64 - gotBlockCh chan struct{} - redoCh chan p2p.ID // redo may send multitime, add peerId to identify repeat + pool *BlockPool + height int64 + gotBlockCh chan struct{} + redoCh chan p2p.ID // redo may got multiple messages, add peerId to identify + newHeightCh chan int64 - mtx cmtsync.Mutex - peerID p2p.ID - block *types.Block + mtx cmtsync.Mutex + peerID p2p.ID + secondPeerID p2p.ID // alternative peer to request from (if close to pool's height) + gotBlockFrom p2p.ID + block *types.Block } func newBPRequester(pool *BlockPool, height int64) *bpRequester { bpr := &bpRequester{ - pool: pool, - height: height, - gotBlockCh: make(chan struct{}, 1), - redoCh: make(chan p2p.ID, 1), - - peerID: "", - block: nil, + pool: pool, + height: height, + gotBlockCh: make(chan struct{}, 1), + redoCh: make(chan p2p.ID, 1), + newHeightCh: make(chan int64, 1), + + peerID: "", + secondPeerID: "", + block: nil, } bpr.BaseService = *service.NewBaseService(nil, "bpRequester", bpr) return bpr @@ -540,11 +598,17 @@ func (bpr *bpRequester) OnStart() error { // Returns true if the peer matches and block doesn't already exist. func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool { bpr.mtx.Lock() - if bpr.block != nil || bpr.peerID != peerID { + if bpr.peerID != peerID && bpr.secondPeerID != peerID { bpr.mtx.Unlock() return false } + if bpr.block != nil { + bpr.mtx.Unlock() + return true // getting a block from both peers is not an error + } + bpr.block = block + bpr.gotBlockFrom = peerID bpr.mtx.Unlock() select { @@ -560,23 +624,54 @@ func (bpr *bpRequester) getBlock() *types.Block { return bpr.block } -func (bpr *bpRequester) getPeerID() p2p.ID { +// Returns the IDs of peers we've requested a block from. +func (bpr *bpRequester) requestedFrom() []p2p.ID { bpr.mtx.Lock() defer bpr.mtx.Unlock() - return bpr.peerID + peerIDs := make([]p2p.ID, 0, 2) + if bpr.peerID != "" { + peerIDs = append(peerIDs, bpr.peerID) + } + if bpr.secondPeerID != "" { + peerIDs = append(peerIDs, bpr.secondPeerID) + } + return peerIDs } -// This is called from the requestRoutine, upon redo(). -func (bpr *bpRequester) reset() { +// Returns true if we've requested a block from the given peer. +func (bpr *bpRequester) didRequestFrom(peerID p2p.ID) bool { bpr.mtx.Lock() defer bpr.mtx.Unlock() + return bpr.peerID == peerID || bpr.secondPeerID == peerID +} - if bpr.block != nil { +// Returns the ID of the peer who sent us the block. +func (bpr *bpRequester) gotBlockFromPeerID() p2p.ID { + bpr.mtx.Lock() + defer bpr.mtx.Unlock() + return bpr.gotBlockFrom +} + +// Removes the block (IF we got it from the given peer) and resets the peer. +func (bpr *bpRequester) reset(peerID p2p.ID) (removedBlock bool) { + bpr.mtx.Lock() + defer bpr.mtx.Unlock() + + // Only remove the block if we got it from that peer. + if bpr.gotBlockFrom == peerID { atomic.AddInt32(&bpr.pool.numPending, 1) + bpr.block = nil + bpr.gotBlockFrom = "" + removedBlock = true } - bpr.peerID = "" - bpr.block = nil + if bpr.peerID == peerID { + bpr.peerID = "" + } else { + bpr.secondPeerID = "" + } + + return removedBlock } // Tells bpRequester to pick another peer and try again. @@ -589,34 +684,72 @@ func (bpr *bpRequester) redo(peerID p2p.ID) { } } +func (bpr *bpRequester) pickPeerAndSendRequest() { + bpr.mtx.Lock() + secondPeerID := bpr.secondPeerID + bpr.mtx.Unlock() + var peer *bpPeer +PICK_PEER_LOOP: + for { + if !bpr.IsRunning() || !bpr.pool.IsRunning() { + return + } + peer = bpr.pool.pickIncrAvailablePeer(bpr.height, secondPeerID) + if peer == nil { + bpr.Logger.Debug("No peers currently available; will retry shortly", "height", bpr.height) + time.Sleep(requestIntervalMS * time.Millisecond) + continue PICK_PEER_LOOP + } + break PICK_PEER_LOOP + } + bpr.mtx.Lock() + bpr.peerID = peer.id + bpr.mtx.Unlock() + bpr.pool.sendRequest(bpr.height, peer.id) +} + +// Picks a second peer and sends a request to it. If the second peer is already +// set, does nothing. +func (bpr *bpRequester) pickSecondPeerAndSendRequest() { + bpr.mtx.Lock() + if bpr.secondPeerID != "" { + bpr.mtx.Unlock() + return + } + peerID := bpr.peerID + bpr.mtx.Unlock() + + secondPeer := bpr.pool.pickIncrAvailablePeer(bpr.height, peerID) + if secondPeer != nil { + bpr.mtx.Lock() + bpr.secondPeerID = secondPeer.id + bpr.mtx.Unlock() + bpr.pool.sendRequest(bpr.height, secondPeer.id) + } +} + +// Informs the requester of a new pool's height. +func (bpr *bpRequester) newHeight(height int64) { + select { + case bpr.newHeightCh <- height: + default: + } +} + // Responsible for making more requests as necessary // Returns only when a block is found (e.g. AddBlock() is called) func (bpr *bpRequester) requestRoutine() { + gotBlock := false + OUTER_LOOP: for { - // Pick a peer to send request to. - var peer *bpPeer - PICK_PEER_LOOP: - for { - if !bpr.IsRunning() || !bpr.pool.IsRunning() { - return - } - peer = bpr.pool.pickIncrAvailablePeer(bpr.height) - if peer == nil { - bpr.Logger.Debug("No peers currently available; will retry shortly", "height", bpr.height) - time.Sleep(requestIntervalMS * time.Millisecond) - continue PICK_PEER_LOOP - } - break PICK_PEER_LOOP + bpr.pickPeerAndSendRequest() + + poolHeight, _, _ := bpr.pool.GetStatus() + if bpr.height-poolHeight < minBlocksForSingleRequest { + bpr.pickSecondPeerAndSendRequest() } - bpr.mtx.Lock() - bpr.peerID = peer.id - bpr.mtx.Unlock() - to := time.NewTimer(requestRetrySeconds * time.Second) - // Send request and wait. - bpr.pool.sendRequest(bpr.height, peer.id) - WAIT_LOOP: for { select { case <-bpr.pool.Quit(): @@ -626,22 +759,31 @@ OUTER_LOOP: return case <-bpr.Quit(): return - case <-to.C: - bpr.Logger.Debug("Retrying block request after timeout", "height", bpr.height, "peer", bpr.peerID) - // Simulate a redo - bpr.reset() - continue OUTER_LOOP - case peerID := <-bpr.redoCh: - if peerID == bpr.peerID { - bpr.reset() + case <-time.After(requestRetrySeconds * time.Second): + if !gotBlock { + bpr.Logger.Debug("Retrying block request(s) after timeout", "height", bpr.height, "peer", bpr.peerID, "secondPeerID", bpr.secondPeerID) + bpr.reset(bpr.peerID) + bpr.reset(bpr.secondPeerID) continue OUTER_LOOP - } else { - continue WAIT_LOOP + } + case peerID := <-bpr.redoCh: + if bpr.didRequestFrom(peerID) { + removedBlock := bpr.reset(peerID) + if removedBlock { + gotBlock = false + } + } + // If both peers returned NoBlockResponse or bad block, reschedule both + // requests. If not, wait for the other peer. + case newHeight := <-bpr.newHeightCh: + if !gotBlock && bpr.height-newHeight < minBlocksForSingleRequest { + // The operation is a noop if the second peer is already set. The cost is checking a mutex. + bpr.pickSecondPeerAndSendRequest() } case <-bpr.gotBlockCh: + gotBlock = true // We got a block! // Continue the for-loop and wait til Quit. - continue WAIT_LOOP } } } diff --git a/blocksync/pool_test.go b/blocksync/pool_test.go index dd7c02b9da..73ef7bf970 100644 --- a/blocksync/pool_test.go +++ b/blocksync/pool_test.go @@ -78,10 +78,12 @@ func makePeers(numPeers int, minHeight, maxHeight int64) testPeers { } func TestBlockPoolBasic(t *testing.T) { - start := int64(42) - peers := makePeers(10, start+1, 1000) - errorsCh := make(chan peerError, 1000) - requestsCh := make(chan BlockRequest, 1000) + var ( + start = int64(42) + peers = makePeers(10, start, 1000) + errorsCh = make(chan peerError) + requestsCh = make(chan BlockRequest) + ) pool := NewBlockPool(start, requestsCh, errorsCh) pool.SetLogger(log.TestingLogger()) @@ -138,10 +140,12 @@ func TestBlockPoolBasic(t *testing.T) { } func TestBlockPoolTimeout(t *testing.T) { - start := int64(42) - peers := makePeers(10, start+1, 1000) - errorsCh := make(chan peerError, 1000) - requestsCh := make(chan BlockRequest, 1000) + var ( + start = int64(42) + peers = makePeers(10, start, 1000) + errorsCh = make(chan peerError) + requestsCh = make(chan BlockRequest) + ) pool := NewBlockPool(start, requestsCh, errorsCh) pool.SetLogger(log.TestingLogger()) err := pool.Start() diff --git a/blocksync/reactor.go b/blocksync/reactor.go index d6c01c81e4..97cdc40bfa 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -79,7 +79,10 @@ func NewReactorWithOfflineStateSync(state sm.State, blockExec *sm.BlockExecutor, panic(fmt.Sprintf("state (%v) and store (%v) height mismatch, stores were left in an inconsistent state", state.LastBlockHeight, storeHeight)) } - requestsCh := make(chan BlockRequest, maxTotalRequesters) + + // It's okay to block since sendRequest is called from a separate goroutine + // (bpRequester#requestRoutine; 1 per each peer). + requestsCh := make(chan BlockRequest) const capacity = 1000 // must be bigger than peers count errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock @@ -397,14 +400,14 @@ FOR_LOOP: } bcR.Logger.Error("Error in validation", "err", err) - peerID := bcR.pool.RedoRequest(first.Height) + peerID := bcR.pool.RemovePeerAndRedoAllPeerRequests(first.Height) peer := bcR.Switch.Peers().Get(peerID) if peer != nil { // NOTE: we've already removed the peer's request, but we // still need to clean up the rest. bcR.Switch.StopPeerForError(peer, fmt.Errorf("Reactor validation error: %v", err)) } - peerID2 := bcR.pool.RedoRequest(second.Height) + peerID2 := bcR.pool.RemovePeerAndRedoAllPeerRequests(second.Height) peer2 := bcR.Switch.Peers().Get(peerID2) if peer2 != nil && peer2 != peer { // NOTE: we've already removed the peer's request, but we From 39024eb656487da8f40edca1a08cf3d354da9d94 Mon Sep 17 00:00:00 2001 From: Adam Tucker Date: Wed, 6 Mar 2024 16:45:45 -0700 Subject: [PATCH 2/5] fix --- blocksync/pool.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/blocksync/pool.go b/blocksync/pool.go index e4a4bad547..00e29c686f 100644 --- a/blocksync/pool.go +++ b/blocksync/pool.go @@ -775,6 +775,9 @@ OUTER_LOOP: } // If both peers returned NoBlockResponse or bad block, reschedule both // requests. If not, wait for the other peer. + if len(bpr.requestedFrom()) == 0 { + continue OUTER_LOOP + } case newHeight := <-bpr.newHeightCh: if !gotBlock && bpr.height-newHeight < minBlocksForSingleRequest { // The operation is a noop if the second peer is already set. The cost is checking a mutex. From 5981f9f6497ae9d6384f7217ad4d82f55b40b434 Mon Sep 17 00:00:00 2001 From: Adam Tucker Date: Mon, 11 Mar 2024 17:35:12 -0600 Subject: [PATCH 3/5] fix --- blocksync/pool.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/blocksync/pool.go b/blocksync/pool.go index 09470c12bb..b4584d0e6f 100644 --- a/blocksync/pool.go +++ b/blocksync/pool.go @@ -425,8 +425,6 @@ func (pool *BlockPool) sortPeers() { func (pool *BlockPool) makeNextRequester(nextHeight int64) { pool.mtx.Lock() - defer pool.mtx.Unlock() - request := newBPRequester(pool, nextHeight) pool.requesters[nextHeight] = request pool.mtx.Unlock() From 50417860b98aec879f02863b0393662e86adfaa1 Mon Sep 17 00:00:00 2001 From: Adam Tucker Date: Mon, 11 Mar 2024 17:51:29 -0600 Subject: [PATCH 4/5] add redo request --- blocksync/reactor.go | 1 + 1 file changed, 1 insertion(+) diff --git a/blocksync/reactor.go b/blocksync/reactor.go index 2fa0434c69..f7eb6efb95 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -252,6 +252,7 @@ func (bcR *Reactor) ReceiveEnvelope(e p2p.Envelope) { bcR.pool.SetPeerRange(e.Src.ID(), msg.Base, msg.Height) case *bcproto.NoBlockResponse: bcR.Logger.Debug("Peer does not have requested block", "peer", e.Src, "height", msg.Height) + bcR.pool.RedoRequestFrom(msg.Height, e.Src.ID()) default: bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) } From 1d0aefb45ea1808d7c5c318621079dbdf6724a32 Mon Sep 17 00:00:00 2001 From: Adam Tucker Date: Mon, 11 Mar 2024 17:54:48 -0600 Subject: [PATCH 5/5] data race --- blocksync/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/blocksync/pool.go b/blocksync/pool.go index b4584d0e6f..4f75be3337 100644 --- a/blocksync/pool.go +++ b/blocksync/pool.go @@ -99,8 +99,8 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p // OnStart implements service.Service by spawning requesters routine and recording // pool's start time. func (pool *BlockPool) OnStart() error { - go pool.makeRequestersRoutine() pool.startTime = time.Now() + go pool.makeRequestersRoutine() return nil }