Skip to content

Commit

Permalink
fix: block fetcher efficiency (ethereum#333)
Browse files Browse the repository at this point in the history
  • Loading branch information
j75689 committed Jul 29, 2021
1 parent 2ce00ad commit 7e8d9fb
Showing 1 changed file with 36 additions and 8 deletions.
44 changes: 36 additions & 8 deletions eth/fetcher/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ import (
)

const (
lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested
arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested
gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction
lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested
arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested
gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction
reQueueBlockTimeout = 500 * time.Millisecond // Time allowance before blocks are requeued for import

)

const (
Expand Down Expand Up @@ -167,6 +169,8 @@ type BlockFetcher struct {
done chan common.Hash
quit chan struct{}

requeue chan *blockOrHeaderInject

// Announce states
announces map[string]int // Per peer blockAnnounce counts to prevent memory exhaustion
announced map[common.Hash][]*blockAnnounce // Announced blocks, scheduled for fetching
Expand Down Expand Up @@ -207,6 +211,7 @@ func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetr
bodyFilter: make(chan chan *bodyFilterTask),
done: make(chan common.Hash),
quit: make(chan struct{}),
requeue: make(chan *blockOrHeaderInject),
announces: make(map[string]int),
announced: make(map[common.Hash][]*blockAnnounce),
fetching: make(map[common.Hash]*blockAnnounce),
Expand Down Expand Up @@ -371,9 +376,9 @@ func (f *BlockFetcher) loop() {
continue
}
if f.light {
f.importHeaders(op.origin, op.header)
f.importHeaders(op)
} else {
f.importBlocks(op.origin, op.block)
f.importBlocks(op)
}
}
// Wait for an outside event to occur
Expand Down Expand Up @@ -416,6 +421,21 @@ func (f *BlockFetcher) loop() {
f.rescheduleFetch(fetchTimer)
}

case op := <-f.requeue:
// Re-queue blocks that have not been written due to fork block competition
number := int64(0)
hash := ""
if op.header != nil {
number = op.header.Number.Int64()
hash = op.header.Hash().String()
} else if op.block != nil {
number = op.block.Number().Int64()
hash = op.block.Hash().String()
}

log.Info("Re-queue blocks", "number", number, "hash", hash)
f.enqueue(op.origin, op.header, op.block)

case op := <-f.inject:
// A direct block insertion was requested, try and fill any pending gaps
blockBroadcastInMeter.Mark(1)
Expand Down Expand Up @@ -751,7 +771,9 @@ func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.B
// importHeaders spawns a new goroutine to run a header insertion into the chain.
// If the header's number is at the same height as the current import phase, it
// updates the phase states accordingly.
func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
func (f *BlockFetcher) importHeaders(op *blockOrHeaderInject) {
peer := op.origin
header := op.header
hash := header.Hash()
log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash)

Expand All @@ -761,6 +783,8 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
parent := f.getHeader(header.ParentHash)
if parent == nil {
log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash)
time.Sleep(reQueueBlockTimeout)
f.requeue <- op
return
}
// Validate the header and if something went wrong, drop the peer
Expand All @@ -784,7 +808,9 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
// importBlocks spawns a new goroutine to run a block insertion into the chain. If the
// block's number is at the same height as the current import phase, it updates
// the phase states accordingly.
func (f *BlockFetcher) importBlocks(peer string, block *types.Block) {
func (f *BlockFetcher) importBlocks(op *blockOrHeaderInject) {
peer := op.origin
block := op.block
hash := block.Hash()

// Run the import on a new thread
Expand All @@ -796,6 +822,8 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block) {
parent := f.getBlock(block.ParentHash())
if parent == nil {
log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
time.Sleep(reQueueBlockTimeout)
f.requeue <- op
return
}
// Quickly validate the header and propagate the block if it passes
Expand Down

0 comments on commit 7e8d9fb

Please sign in to comment.