Skip to content

Commit

Permalink
fix: block fetcher efficiency
Browse files Browse the repository at this point in the history
  • Loading branch information
j75689 committed Jul 29, 2021
1 parent 6ce2cef commit 0a05176
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions eth/fetcher/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ type BlockFetcher struct {
done chan common.Hash
quit chan struct{}

requeue chan *blockOrHeaderInject

// Announce states
announces map[string]int // Per peer blockAnnounce counts to prevent memory exhaustion
announced map[common.Hash][]*blockAnnounce // Announced blocks, scheduled for fetching
Expand Down Expand Up @@ -370,9 +372,9 @@ func (f *BlockFetcher) loop() {
continue
}
if f.light {
f.importHeaders(op.origin, op.header)
f.importHeaders(op)
} else {
f.importBlocks(op.origin, op.block)
f.importBlocks(op)
}
}
// Wait for an outside event to occur
Expand Down Expand Up @@ -415,6 +417,10 @@ func (f *BlockFetcher) loop() {
f.rescheduleFetch(fetchTimer)
}

case op := <-f.requeue:
// Re-queue blocks that have not been written due to fork block competition
f.enqueue(op.origin, op.header, op.block)

case op := <-f.inject:
// A direct block insertion was requested, try and fill any pending gaps
blockBroadcastInMeter.Mark(1)
Expand Down Expand Up @@ -750,7 +756,9 @@ func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.B
// importHeaders spawns a new goroutine to run a header insertion into the chain.
// If the header's number is at the same height as the current import phase, it
// updates the phase states accordingly.
func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
func (f *BlockFetcher) importHeaders(op *blockOrHeaderInject) {
peer := op.origin
header := op.header
hash := header.Hash()
log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash)

Expand All @@ -760,6 +768,7 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
parent := f.getHeader(header.ParentHash)
if parent == nil {
log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash)
f.requeue <- op
return
}
// Validate the header and if something went wrong, drop the peer
Expand All @@ -783,7 +792,9 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
// importBlocks spawns a new goroutine to run a block insertion into the chain. If the
// block's number is at the same height as the current import phase, it updates
// the phase states accordingly.
func (f *BlockFetcher) importBlocks(peer string, block *types.Block) {
func (f *BlockFetcher) importBlocks(op *blockOrHeaderInject) {
peer := op.origin
block := op.block
hash := block.Hash()

// Run the import on a new thread
Expand All @@ -795,6 +806,7 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block) {
parent := f.getBlock(block.ParentHash())
if parent == nil {
log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
f.requeue <- op
return
}
// Quickly validate the header and propagate the block if it passes
Expand Down

0 comments on commit 0a05176

Please sign in to comment.