Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: blocksync.bpRequester should stop procedure if block was received #546

Merged
merged 5 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 69 additions & 48 deletions internal/blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ const (

var peerTimeout = 15 * time.Second // not const so we can override with tests

var (
errPeerNotResponded = errors.New("peer did not send us anything")
errUnableToFindPeer = errors.New("unable to find a peer, a requester is stopped")
)

/*
Peers self report their heights when we join the block pool.
Starting from our latest pool.height, we request blocks
Expand Down Expand Up @@ -547,9 +552,8 @@ func (peer *bpPeer) onTimeout() {
peer.pool.mtx.Lock()
defer peer.pool.mtx.Unlock()

err := errors.New("peer did not send us anything")
peer.pool.sendError(err, peer.id)
peer.logger.Error("SendTimeout", "reason", err, "timeout", peerTimeout)
peer.pool.sendError(errPeerNotResponded, peer.id)
peer.logger.Error("SendTimeout", "reason", errPeerNotResponded, "timeout", peerTimeout)
peer.didTimeout = true
}

Expand Down Expand Up @@ -591,19 +595,25 @@ func (bpr *bpRequester) OnStart(ctx context.Context) error {

func (*bpRequester) OnStop() {}

// Returns true if the peer matches and block doesn't already exist.
func (bpr *bpRequester) setBlock(block *types.Block, commit *types.Commit, peerID types.NodeID) bool {
func (bpr *bpRequester) updateBlock(block *types.Block, commit *types.Commit, peerID types.NodeID) bool {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
if bpr.block != nil || bpr.peerID != peerID {
bpr.mtx.Unlock()
return false
}
bpr.block = block
if commit != nil {
bpr.commit = commit
}
bpr.mtx.Unlock()
return true
}

// Returns true if the peer matches and block doesn't already exist.
func (bpr *bpRequester) setBlock(block *types.Block, commit *types.Commit, peerID types.NodeID) bool {
updated := bpr.updateBlock(block, commit, peerID)
if !updated {
return false
}
select {
case bpr.gotBlockCh <- struct{}{}:
default:
Expand Down Expand Up @@ -656,52 +666,63 @@ func (bpr *bpRequester) redo(peerID types.NodeID) {
// Responsible for making more requests as necessary
// Returns only when a block is found (e.g. AddBlock() is called)
func (bpr *bpRequester) requestRoutine(ctx context.Context) {
OUTER_LOOP:
for {
for bpr.isReqRoutineRunning() {
// Pick a peer to send request to.
var peer *bpPeer
PICK_PEER_LOOP:
for {
if !bpr.IsRunning() || !bpr.pool.IsRunning() {
return
}
if ctx.Err() != nil {
return
}

peer = bpr.pool.pickIncrAvailablePeer(bpr.height)
if peer == nil {
// This is preferable to using a timer because the request
// interval is so small. Larger request intervals may
// necessitate using a timer/ticker.
time.Sleep(requestInterval)
continue PICK_PEER_LOOP
}
break PICK_PEER_LOOP
peer, err := bpr.findPeer(ctx)
if err != nil {
return
}
bpr.mtx.Lock()
bpr.peerID = peer.id
bpr.mtx.Unlock()

bpr.updatePeerID(peer)
// Send request and wait.
bpr.pool.sendRequest(bpr.height, peer.id)
WAIT_LOOP:
for {
select {
case <-ctx.Done():
return
case peerID := <-bpr.redoCh:
if peerID == bpr.peerID {
bpr.reset()
continue OUTER_LOOP
} else {
continue WAIT_LOOP
}
case <-bpr.gotBlockCh:
// We got a block!
// Continue the for-loop and wait til Quit.
continue WAIT_LOOP
shouldStop := bpr.waitForResponse(ctx)
if shouldStop {
return
}
}
}

func (bpr *bpRequester) isReqRoutineRunning() bool {
return bpr.IsRunning() && bpr.pool.IsRunning()
}

func (bpr *bpRequester) updatePeerID(peer *bpPeer) {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
bpr.peerID = peer.id
}

func (bpr *bpRequester) findPeer(ctx context.Context) (*bpPeer, error) {
var peer *bpPeer
for bpr.isReqRoutineRunning() {
if ctx.Err() != nil {
return nil, ctx.Err()
}
peer = bpr.pool.pickIncrAvailablePeer(bpr.height)
if peer != nil {
return peer, nil
}
// This is preferable to using a timer because the request
// interval is so small. Larger request intervals may
// necessitate using a timer/ticker.
time.Sleep(requestInterval)
}
return nil, errUnableToFindPeer
}

func (bpr *bpRequester) waitForResponse(ctx context.Context) bool {
for {
select {
case <-ctx.Done():
return true
case peerID := <-bpr.redoCh:
if peerID == bpr.peerID {
bpr.reset()
return false
}
case <-bpr.gotBlockCh:
// We got a block!
return true
}
}
}
14 changes: 7 additions & 7 deletions internal/blocksync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package blocksync

import (
"context"
"fmt"
"os"
"testing"
"time"
Expand Down Expand Up @@ -354,13 +355,12 @@ func TestReactor_NoBlockResponse(t *testing.T) {
"expected node to be fully synced",
)

for _, tc := range testCases {
block := rts.reactors[rts.nodes[1]].store.LoadBlock(tc.height)
if tc.existent {
require.True(t, block != nil)
} else {
require.Nil(t, block)
}
reactor := rts.reactors[rts.nodes[1]]
for i, tc := range testCases {
t.Run(fmt.Sprintf("test-case #%d", i), func(t *testing.T) {
block := reactor.store.LoadBlock(tc.height)
require.Equal(t, tc.existent, block != nil)
})
}
}

Expand Down