Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blockchain/v0: relax termination conditions and increase sync timeout #5741

Merged
merged 12 commits into from
Dec 8, 2020
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [mempool] \#5673 Cancel `CheckTx` requests if RPC client disconnects or times out (@melekes)
- [abci] \#5706 Added `AbciVersion` to `RequestInfo` allowing applications to check ABCI version when connecting to Tendermint. (@marbar3778)
- [blockchain/v1] \#5728 Remove in favor of v2 (@melekes)
- [blockchain/v0] \#5741 Relax termination conditions and increase sync timeout (@melekes)

### BUG FIXES

Expand Down
17 changes: 17 additions & 0 deletions blockchain/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
Package blockchain provides two implementations of the fast-sync protocol.

- v0 was the very first implementation. it's battle tested, but does not have a
lot of test coverage.
- v2 is the newest implementation, with a focus on testability and readability.

Check out ADR-40 for the formal model and requirements.

# Termination criteria

1. the maximum peer height is reached
2. termination timeout is triggered, which is set if the peer set is empty or
there are no pending requests.

*/
package blockchain
63 changes: 27 additions & 36 deletions blockchain/v0/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,17 @@ var peerTimeout = 15 * time.Second // not const so we can override with tests
are not at peer limits, we can probably switch to consensus reactor
*/

// BlockRequest stores a block request identified by the block Height and the
// PeerID responsible for delivering the block.
type BlockRequest struct {
Height int64
PeerID p2p.ID
}

// BlockPool keeps track of the fast sync peers, block requests and block responses.
type BlockPool struct {
service.BaseService
startTime time.Time
lastAdvance time.Time

mtx tmsync.Mutex
// block requests
Expand Down Expand Up @@ -98,8 +105,8 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p
// OnStart implements service.Service by spawning requesters routine and recording
// pool's start time.
func (pool *BlockPool) OnStart() error {
pool.lastAdvance = time.Now()
go pool.makeRequestersRoutine()
pool.startTime = time.Now()
return nil
}

Expand Down Expand Up @@ -134,6 +141,7 @@ func (pool *BlockPool) removeTimedoutPeers() {
defer pool.mtx.Unlock()

for _, peer := range pool.peers {
// check if peer timed out
if !peer.didTimeout && peer.numPending > 0 {
curRate := peer.recvMonitor.Status().CurRate
// curRate can be 0 on start
Expand All @@ -147,6 +155,7 @@ func (pool *BlockPool) removeTimedoutPeers() {
peer.didTimeout = true
}
}

if peer.didTimeout {
pool.removePeer(peer.id)
}
Expand All @@ -163,26 +172,17 @@ func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequester
}

// IsCaughtUp returns true if this node is caught up, false - otherwise.
// TODO: relax conditions, prevent abuse.
func (pool *BlockPool) IsCaughtUp() bool {
pool.mtx.Lock()
defer pool.mtx.Unlock()

// Need at least 1 peer to be considered caught up.
if len(pool.peers) == 0 {
pool.Logger.Debug("Blockpool has no peers")
return false
}

// Some conditions to determine if we're caught up.
// Ensures we've either received a block or waited some amount of time,
// and that we're synced to the highest known height.
// Note we use maxPeerHeight - 1 because to sync block H requires block H+1
// NOTE: we use maxPeerHeight - 1 because to sync block H requires block H+1
// to verify the LastCommit.
receivedBlockOrTimedOut := pool.height > 0 || time.Since(pool.startTime) > 5*time.Second
ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= (pool.maxPeerHeight-1)
isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers
return isCaughtUp
return pool.height >= (pool.maxPeerHeight - 1)
}

// PeekTwoBlocks returns blocks at pool.height and pool.height+1.
Expand All @@ -209,16 +209,12 @@ func (pool *BlockPool) PopRequest() {
defer pool.mtx.Unlock()

if r := pool.requesters[pool.height]; r != nil {
/* The block can disappear at any time, due to removePeer().
if r := pool.requesters[pool.height]; r == nil || r.block == nil {
PanicSanity("PopRequest() requires a valid block")
}
*/
if err := r.Stop(); err != nil {
pool.Logger.Error("Error stopping requester", "err", err)
}
delete(pool.requesters, pool.height)
pool.height++
pool.lastAdvance = time.Now()
} else {
panic(fmt.Sprintf("Expected requester to pop, got nothing at height %v", pool.height))
}
Expand Down Expand Up @@ -248,14 +244,8 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int

requester := pool.requesters[block.Height]
if requester == nil {
pool.Logger.Info(
"peer sent us a block we didn't expect",
"peer",
peerID,
"curHeight",
pool.height,
"blockHeight",
block.Height)
pool.Logger.Error("peer sent us a block we didn't expect",
"peer", peerID, "curHeight", pool.height, "blockHeight", block.Height)
diff := pool.height - block.Height
if diff < 0 {
diff *= -1
Expand All @@ -273,8 +263,9 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
peer.decrPending(blockSize)
}
} else {
pool.Logger.Info("invalid peer", "peer", peerID, "blockHeight", block.Height)
pool.sendError(errors.New("invalid peer"), peerID)
err := errors.New("requester is different or block already exists")
pool.Logger.Error(err.Error(), "peer", peerID, "requester", requester.getPeerID(), "blockHeight", block.Height)
pool.sendError(err, peerID)
}
}

Expand All @@ -285,6 +276,14 @@ func (pool *BlockPool) MaxPeerHeight() int64 {
return pool.maxPeerHeight
}

// LastAdvance returns the time when the last block was processed (or start
// time if no blocks were processed).
func (pool *BlockPool) LastAdvance() time.Time {
pool.mtx.Lock()
defer pool.mtx.Unlock()
melekes marked this conversation as resolved.
Show resolved Hide resolved
return pool.lastAdvance
}

// SetPeerRange sets the peer's alleged blockchain base and height.
func (pool *BlockPool) SetPeerRange(peerID p2p.ID, base int64, height int64) {
pool.mtx.Lock()
Expand Down Expand Up @@ -601,7 +600,6 @@ OUTER_LOOP:
}
peer = bpr.pool.pickIncrAvailablePeer(bpr.height)
if peer == nil {
// log.Info("No peers available", "height", height)
time.Sleep(requestIntervalMS * time.Millisecond)
continue PICK_PEER_LOOP
}
Expand Down Expand Up @@ -638,10 +636,3 @@ OUTER_LOOP:
}
}
}

// BlockRequest stores a block request identified by the block Height and the PeerID responsible for
// delivering the block
type BlockRequest struct {
Height int64
PeerID p2p.ID
}
Loading