diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index 1732b4751e..62879dac45 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -49,6 +49,11 @@ const ( var peerTimeout = 15 * time.Second // not const so we can override with tests +var ( + errPeerNotResponded = errors.New("peer did not send us anything") + errUnableToFindPeer = errors.New("unable to find a peer, a requester is stopped") +) + /* Peers self report their heights when we join the block pool. Starting from our latest pool.height, we request blocks @@ -547,9 +552,8 @@ func (peer *bpPeer) onTimeout() { peer.pool.mtx.Lock() defer peer.pool.mtx.Unlock() - err := errors.New("peer did not send us anything") - peer.pool.sendError(err, peer.id) - peer.logger.Error("SendTimeout", "reason", err, "timeout", peerTimeout) + peer.pool.sendError(errPeerNotResponded, peer.id) + peer.logger.Error("SendTimeout", "reason", errPeerNotResponded, "timeout", peerTimeout) peer.didTimeout = true } @@ -591,19 +595,25 @@ func (bpr *bpRequester) OnStart(ctx context.Context) error { func (*bpRequester) OnStop() {} -// Returns true if the peer matches and block doesn't already exist. -func (bpr *bpRequester) setBlock(block *types.Block, commit *types.Commit, peerID types.NodeID) bool { +func (bpr *bpRequester) updateBlock(block *types.Block, commit *types.Commit, peerID types.NodeID) bool { bpr.mtx.Lock() + defer bpr.mtx.Unlock() if bpr.block != nil || bpr.peerID != peerID { - bpr.mtx.Unlock() return false } bpr.block = block if commit != nil { bpr.commit = commit } - bpr.mtx.Unlock() + return true +} +// Returns true if the peer matches and block doesn't already exist. +func (bpr *bpRequester) setBlock(block *types.Block, commit *types.Commit, peerID types.NodeID) bool { + updated := bpr.updateBlock(block, commit, peerID) + if !updated { + return false + } select { case bpr.gotBlockCh <- struct{}{}: default: @@ -656,52 +666,63 @@ func (bpr *bpRequester) redo(peerID types.NodeID) { // Responsible for making more requests as necessary // Returns only when a block is found (e.g. AddBlock() is called) func (bpr *bpRequester) requestRoutine(ctx context.Context) { -OUTER_LOOP: - for { + for bpr.isReqRoutineRunning() { // Pick a peer to send request to. - var peer *bpPeer - PICK_PEER_LOOP: - for { - if !bpr.IsRunning() || !bpr.pool.IsRunning() { - return - } - if ctx.Err() != nil { - return - } - - peer = bpr.pool.pickIncrAvailablePeer(bpr.height) - if peer == nil { - // This is preferable to using a timer because the request - // interval is so small. Larger request intervals may - // necessitate using a timer/ticker. - time.Sleep(requestInterval) - continue PICK_PEER_LOOP - } - break PICK_PEER_LOOP + peer, err := bpr.findPeer(ctx) + if err != nil { + return } - bpr.mtx.Lock() - bpr.peerID = peer.id - bpr.mtx.Unlock() - + bpr.updatePeerID(peer) // Send request and wait. bpr.pool.sendRequest(bpr.height, peer.id) - WAIT_LOOP: - for { - select { - case <-ctx.Done(): - return - case peerID := <-bpr.redoCh: - if peerID == bpr.peerID { - bpr.reset() - continue OUTER_LOOP - } else { - continue WAIT_LOOP - } - case <-bpr.gotBlockCh: - // We got a block! - // Continue the for-loop and wait til Quit. - continue WAIT_LOOP + shouldStop := bpr.waitForResponse(ctx) + if shouldStop { + return + } + } +} + +func (bpr *bpRequester) isReqRoutineRunning() bool { + return bpr.IsRunning() && bpr.pool.IsRunning() +} + +func (bpr *bpRequester) updatePeerID(peer *bpPeer) { + bpr.mtx.Lock() + defer bpr.mtx.Unlock() + bpr.peerID = peer.id +} + +func (bpr *bpRequester) findPeer(ctx context.Context) (*bpPeer, error) { + var peer *bpPeer + for bpr.isReqRoutineRunning() { + if ctx.Err() != nil { + return nil, ctx.Err() + } + peer = bpr.pool.pickIncrAvailablePeer(bpr.height) + if peer != nil { + return peer, nil + } + // This is preferable to using a timer because the request + // interval is so small. Larger request intervals may + // necessitate using a timer/ticker. + time.Sleep(requestInterval) + } + return nil, errUnableToFindPeer +} + +func (bpr *bpRequester) waitForResponse(ctx context.Context) bool { + for { + select { + case <-ctx.Done(): + return true + case peerID := <-bpr.redoCh: + if peerID == bpr.peerID { + bpr.reset() + return false } + case <-bpr.gotBlockCh: + // We got a block! + return true } } } diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index 5bf696b686..a34e9a4b62 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -2,6 +2,7 @@ package blocksync import ( "context" + "fmt" "os" "testing" "time" @@ -354,13 +355,12 @@ func TestReactor_NoBlockResponse(t *testing.T) { "expected node to be fully synced", ) - for _, tc := range testCases { - block := rts.reactors[rts.nodes[1]].store.LoadBlock(tc.height) - if tc.existent { - require.True(t, block != nil) - } else { - require.Nil(t, block) - } + reactor := rts.reactors[rts.nodes[1]] + for i, tc := range testCases { + t.Run(fmt.Sprintf("test-case #%d", i), func(t *testing.T) { + block := reactor.store.LoadBlock(tc.height) + require.Equal(t, tc.existent, block != nil) + }) } }