Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

netsync: add handleBlockMsgHeadersFirst #2292

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 200 additions & 63 deletions netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package netsync

import (
"container/list"
"fmt"
"math/rand"
"net"
"sync"
Expand Down Expand Up @@ -203,6 +204,7 @@ type SyncManager struct {
headerList *list.List
startHeader *list.Element
nextCheckpoint *chaincfg.Checkpoint
queuedBlocks map[chainhash.Hash]*blockMsg

// An optional fee estimator.
feeEstimator *mempool.FeeEstimator
Expand All @@ -211,9 +213,9 @@ type SyncManager struct {
// resetHeaderState sets the headers-first mode state to values appropriate for
// syncing from a new peer.
func (sm *SyncManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int32) {
sm.headersFirstMode = false
sm.headerList.Init()
sm.startHeader = nil
if sm.headerList.Len() != 0 {
return
}

// When there is a next checkpoint, add an entry for the latest known
// block into the header pool. This allows the next downloaded header
Expand Down Expand Up @@ -328,18 +330,55 @@ func (sm *SyncManager) startSync() {
// Clear the requestedBlocks if the sync peer changes, otherwise
// we may ignore blocks we need that the last sync peer failed
// to send.
sm.requestedBlocks = make(map[chainhash.Hash]struct{})

locator, err := sm.chain.LatestBlockLocator()
if err != nil {
log.Errorf("Failed to get block locator for the "+
"latest block: %v", err)
return
//
// We don't reset it during headersFirstMode since it's not used
// during headersFirstMode.
if !sm.headersFirstMode {
sm.requestedBlocks = make(map[chainhash.Hash]struct{})
}

log.Infof("Syncing to block height %d from peer %v",
bestPeer.LastBlock(), bestPeer.Addr())

sm.syncPeer = bestPeer

// Reset the last progress time now that we have a non-nil
// syncPeer to avoid instantly detecting it as stalled in the
// event the progress time hasn't been updated recently.
sm.lastProgressTime = time.Now()

// Check if we have some headers already downloaded.
var locator blockchain.BlockLocator
if sm.headerList.Len() > 0 && sm.nextCheckpoint != nil {
e := sm.headerList.Back()
node := e.Value.(*headerNode)

// If the final hash equals next checkpoint, that
// means we've verified the downloaded headers and
// can start fetching blocks.
if node.hash.IsEqual(sm.nextCheckpoint.Hash) {
sm.startHeader = sm.headerList.Front()
sm.fetchHeaderBlocks()
return
}

// If the last hash doesn't equal the checkpoint,
// make the locator as the last hash.
locator = blockchain.BlockLocator(
[]*chainhash.Hash{node.hash})
}

// If we don't already have headers downloaded we need to fetch
// the block locator from chain.
if len(locator) == 0 {
locator, err = sm.chain.LatestBlockLocator()
if err != nil {
log.Errorf("Failed to get block locator for the "+
"latest block: %v", err)
return
}
}

// When the current height is less than a known checkpoint we
// can use block headers to learn about which blocks comprise
// the chain up to the checkpoint and perform less validation
Expand Down Expand Up @@ -369,12 +408,6 @@ func (sm *SyncManager) startSync() {
} else {
bestPeer.PushGetBlocksMsg(locator, &zeroHash)
}
sm.syncPeer = bestPeer

// Reset the last progress time now that we have a non-nil
// syncPeer to avoid instantly detecting it as stalled in the
// event the progress time hasn't been updated recently.
sm.lastProgressTime = time.Now()
} else {
log.Warnf("No sync peer candidates available")
}
Expand Down Expand Up @@ -565,12 +598,15 @@ func (sm *SyncManager) clearRequestedState(state *peerSyncState) {
delete(sm.requestedTxns, txHash)
}

// Remove requested blocks from the global map so that they will be
// fetched from elsewhere next time we get an inv.
// TODO: we could possibly here check which peers have these blocks
// and request them now to speed things up a little.
for blockHash := range state.requestedBlocks {
delete(sm.requestedBlocks, blockHash)
// The global map of requestedBlocks is not used during headersFirstMode.
if !sm.headersFirstMode {
// Remove requested blocks from the global map so that they will
// be fetched from elsewhere next time we get an inv.
// TODO: we could possibly here check which peers have these
// blocks and request them now to speed things up a little.
for blockHash := range state.requestedBlocks {
delete(sm.requestedBlocks, blockHash)
}
}
}

Expand Down Expand Up @@ -710,30 +746,6 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
}
}

// When in headers-first mode, if the block matches the hash of the
// first header in the list of headers that are being fetched, it's
// eligible for less validation since the headers have already been
// verified to link together and are valid up to the next checkpoint.
// Also, remove the list entry for all blocks except the checkpoint
// since it is needed to verify the next round of headers links
// properly.
isCheckpointBlock := false
behaviorFlags := blockchain.BFNone
if sm.headersFirstMode {
firstNodeEl := sm.headerList.Front()
if firstNodeEl != nil {
firstNode := firstNodeEl.Value.(*headerNode)
if blockHash.IsEqual(firstNode.hash) {
behaviorFlags |= blockchain.BFFastAdd
if firstNode.hash.IsEqual(sm.nextCheckpoint.Hash) {
isCheckpointBlock = true
} else {
sm.headerList.Remove(firstNodeEl)
}
}
}
}

// Remove block from request maps. Either chain will know about it and
// so we shouldn't have any more instances of trying to fetch it, or we
// will fail the insert and thus we'll retry next time we get an inv.
Expand All @@ -742,7 +754,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {

// Process the block to include validation, best chain selection, orphan
// handling, etc.
_, isOrphan, err := sm.chain.ProcessBlock(bmsg.block, behaviorFlags)
_, isOrphan, err := sm.chain.ProcessBlock(bmsg.block, blockchain.BFNone)
if err != nil {
// When the error is a rule error, it means the block was simply
// rejected as opposed to something actually going wrong, so log
Expand Down Expand Up @@ -840,16 +852,131 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
}
}

// If we are not in headers first mode, it's a good time to periodically
// flush the blockchain cache because we don't expect new blocks immediately.
// After that, there is nothing more to do.
if !sm.headersFirstMode {
if err := sm.chain.FlushUtxoCache(blockchain.FlushPeriodic); err != nil {
log.Errorf("Error while flushing the blockchain cache: %v", err)
// It's a good time to periodically flush the blockchain cache because
// we don't expect new blocks immediately. After that, there is
// nothing more to do.
if err := sm.chain.FlushUtxoCache(blockchain.FlushPeriodic); err != nil {
log.Errorf("Error while flushing the blockchain cache: %v", err)
}
}

// handleBlockMsgInHeadersFirst handles block messages from all peers when the
// sync manager is in headers first mode. For blocks received out of order, it
// first keeps them in memory and sends them to be processed when the next block
// from the tip is available.
func (sm *SyncManager) handleBlockMsgHeadersFirst(bmsg *blockMsg) {
peer := bmsg.peer
state, exists := sm.peerStates[peer]
if !exists {
log.Warnf("Received block message from unknown peer %s", peer)
return
}

// If we didn't ask for this block then the peer is misbehaving.
blockHash := bmsg.block.Hash()
if _, exists = state.requestedBlocks[*blockHash]; !exists {
// The regression test intentionally sends some blocks twice
// to test duplicate block insertion fails. Don't disconnect
// the peer or ignore the block when we're in regression test
// mode in this case so the chain code is actually fed the
// duplicate blocks.
if sm.chainParams != &chaincfg.RegressionNetParams {
log.Warnf("Got unrequested block %v from %s -- "+
"disconnecting", blockHash, peer.Addr())
peer.Disconnect()
return
}
}

// Add the block to the queue.
sm.queuedBlocks[*blockHash] = bmsg

// Remove block from the request map as we've added it to queuedBlocks.
delete(state.requestedBlocks, *blockHash)

// Log the progress as we have received the block.
sm.lastProgressTime = time.Now()

firstNodeEl := sm.headerList.Front()
if firstNodeEl == nil {
log.Errorf("missing first node in the headerlist on block %v",
bmsg.block.Hash())
return
}

// Look for blocks that we can process next.
processBlocks := make([]*blockMsg, 0, len(sm.queuedBlocks)+1)
var next *list.Element
for e := firstNodeEl; e != nil; e = next {
next = e.Next()
node, ok := e.Value.(*headerNode)
if !ok {
log.Warn("Header list node type is not a headerNode")
continue
}

b, found := sm.queuedBlocks[*node.hash]
if !found {
// Break when we're missing the next block in
// sequence.
break
}

// We have a block we can process so remove from the queue and
// add it to the processBlocks slice.
processBlocks = append(processBlocks, b)
delete(sm.queuedBlocks, *node.hash)

// Remove the block from the headerList.
//
// NOTE We leave in the checkpointed hash so that we can connect
// headers on the next get headers request.
if !node.hash.IsEqual(sm.nextCheckpoint.Hash) {
sm.headerList.Remove(e)
}
}

// If we have any blocks to process, process them now.
isCheckpointBlock := false
for _, processBlock := range processBlocks {
// Process the block to include validation, best chain selection, orphan
// handling, etc.
_, _, err := sm.chain.ProcessBlock(processBlock.block, blockchain.BFFastAdd)
if err != nil {
// This is a checkpointed block and therefore should never fail
// validation. If it did, then the binary is likely corrupted.
if ruleErr, ok := err.(blockchain.RuleError); ok {
// If it's not a duplicate block error, we can panic.
if ruleErr.ErrorCode != blockchain.ErrDuplicateBlock {
panicErr := fmt.Errorf("Rejected checkpointed block %v from %s: %v"+
" -- The binary is likely corrupted and should "+
"not be trusted. Exiting.",
blockHash, peer, err)
panic(panicErr)
}
} else {
log.Errorf("Failed to process block %v: %v",
blockHash, err)
}
if dbErr, ok := err.(database.Error); ok && dbErr.ErrorCode ==
database.ErrCorruption {
panic(dbErr)
}
}

// Look for checkpointed blocks.
if processBlock.block.Hash().IsEqual(sm.nextCheckpoint.Hash) {
isCheckpointBlock = true
}

// If we have a lot of blocks we're processing, then the last
// progress time will be in the past and we may incorrectly
// punish a peer thus we need to update the lastProgressTime
// now.
sm.lastProgressTime = time.Now()
sm.progressLogger.LogBlockHeight(bmsg.block, sm.chain)
}

// This is headers-first mode, so if the block is not a checkpoint
// request more blocks using the header list when the request queue is
// getting short.
Expand Down Expand Up @@ -889,7 +1016,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
sm.headerList.Init()
log.Infof("Reached the final checkpoint -- switching to normal mode")
locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash})
err = peer.PushGetBlocksMsg(locator, &zeroHash)
err := peer.PushGetBlocksMsg(locator, &zeroHash)
if err != nil {
log.Warnf("Failed to send getblocks message to peer %s: %v",
peer.Addr(), err)
Expand Down Expand Up @@ -927,8 +1054,6 @@ func (sm *SyncManager) fetchHeaderBlocks() {
}
if !haveInv {
syncPeerState := sm.peerStates[sm.syncPeer]

sm.requestedBlocks[*node.hash] = struct{}{}
syncPeerState.requestedBlocks[*node.hash] = struct{}{}

// If we're fetching from a witness enabled peer
Expand Down Expand Up @@ -1076,7 +1201,11 @@ func (sm *SyncManager) handleNotFoundMsg(nfmsg *notFoundMsg) {
case wire.InvTypeBlock:
if _, exists := state.requestedBlocks[inv.Hash]; exists {
delete(state.requestedBlocks, inv.Hash)
delete(sm.requestedBlocks, inv.Hash)
// The global map of requestedBlocks is not used
// during headersFirstMode.
if !sm.headersFirstMode {
delete(sm.requestedBlocks, inv.Hash)
}
}

case wire.InvTypeWitnessTx:
Expand Down Expand Up @@ -1186,6 +1315,11 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
}
}

// Don't request on inventory messages when we're in headers-first mode.
if sm.headersFirstMode {
return
}

// Request the advertised inventory if we don't already have it. Also,
// request parent blocks of orphans if we receive one we already have.
// Finally, attempt to detect potential stalls due to long side chains
Expand All @@ -1205,11 +1339,6 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
// for the peer.
peer.AddKnownInventory(iv)

// Ignore inventory when we're in headers-first mode.
if sm.headersFirstMode {
continue
}

// Request the inventory if we don't already have it.
haveInv, err := sm.haveInventory(iv)
if err != nil {
Expand Down Expand Up @@ -1297,6 +1426,9 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
case wire.InvTypeBlock:
// Request the block if there is not already a pending
// request.
//
// No check for if we're in headers first since it's
// already done so earlier in the method.
if _, exists := sm.requestedBlocks[iv.Hash]; !exists {
limitAdd(sm.requestedBlocks, iv.Hash, maxRequestedBlocks)
limitAdd(state.requestedBlocks, iv.Hash, maxRequestedBlocks)
Expand Down Expand Up @@ -1362,7 +1494,11 @@ out:
msg.reply <- struct{}{}

case *blockMsg:
sm.handleBlockMsg(msg)
if sm.headersFirstMode {
sm.handleBlockMsgHeadersFirst(msg)
} else {
sm.handleBlockMsg(msg)
}
msg.reply <- struct{}{}

case *invMsg:
Expand Down Expand Up @@ -1681,6 +1817,7 @@ func New(config *Config) (*SyncManager, error) {
progressLogger: newBlockProgressLogger("Processed", log),
msgChan: make(chan interface{}, config.MaxPeers*3),
headerList: list.New(),
queuedBlocks: make(map[chainhash.Hash]*blockMsg),
quit: make(chan struct{}),
feeEstimator: config.FeeEstimator,
}
Expand Down
Loading