From 72c06c761c164b6fac30110e5e98722f250b4f42 Mon Sep 17 00:00:00 2001 From: ironbeer <7997273+ironbeer@users.noreply.github.com> Date: Wed, 16 Oct 2024 18:30:03 +0900 Subject: [PATCH 1/3] Imported stop process of eth.handler from bsc@v1.4.6 --- eth/handler.go | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index 9917124ce..281a245d4 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -153,6 +153,7 @@ type handler struct { // channels for fetcher, syncer, txsyncLoop quitSync chan struct{} + stopCh chan struct{} chainSync *chainSyncer wg sync.WaitGroup @@ -181,6 +182,7 @@ func newHandler(config *handlerConfig) (*handler, error) { quitSync: make(chan struct{}), handlerDoneCh: make(chan struct{}), handlerStartCh: make(chan struct{}), + stopCh: make(chan struct{}), } if config.Sync == downloader.FullSync { // The database seems empty as the current block is the genesis. Yet the snap @@ -620,7 +622,7 @@ func (h *handler) Stop() { h.voteMonitorSub.Unsubscribe() } } - + close(h.stopCh) // Quit chainSync and txsync64. // After this is done, no new peers will be accepted. close(h.quitSync) @@ -779,10 +781,18 @@ func (h *handler) BroadcastVote(vote *types.VoteEnvelope) { func (h *handler) minedBroadcastLoop() { defer h.wg.Done() - for obj := range h.minedBlockSub.Chan() { - if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok { - h.BroadcastBlock(ev.Block, true) // First propagate block to peers - h.BroadcastBlock(ev.Block, false) // Only then announce to the rest + for { + select { + case obj := <-h.minedBlockSub.Chan(): + if obj == nil { + continue + } + if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok { + h.BroadcastBlock(ev.Block, true) // First propagate block to peers + h.BroadcastBlock(ev.Block, false) // Only then announce to the rest + } + case <-h.stopCh: + return } } } @@ -796,6 +806,8 @@ func (h *handler) txBroadcastLoop() { h.BroadcastTransactions(event.Txs) case <-h.txsSub.Err(): return + case <-h.stopCh: + return } } } @@ -811,6 +823,8 @@ func (h *handler) voteBroadcastLoop() { h.BroadcastVote(event.Vote) case <-h.votesSub.Err(): return + case <-h.stopCh: + return } } } @@ -826,6 +840,8 @@ func (h *handler) startMaliciousVoteMonitor() { h.maliciousVoteMonitor.ConflictDetect(event.Vote, pendingBlockNumber) case <-h.voteMonitorSub.Err(): return + case <-h.stopCh: + return } } } From 2ef28a82e53fb551816445226f7b0306f8d00a5e Mon Sep 17 00:00:00 2001 From: ironbeer <7997273+ironbeer@users.noreply.github.com> Date: Thu, 17 Oct 2024 11:40:49 +0900 Subject: [PATCH 2/3] Imported `chainFinalizedHeightFn` of fetcher.BlockFetcher from bsc@v1.4.6 --- eth/fetcher/block_fetcher.go | 90 +++++++++++++++++++------------ eth/fetcher/block_fetcher_test.go | 78 ++++++++++++++++++++++++++- eth/handler.go | 10 +++- 3 files changed, 141 insertions(+), 37 deletions(-) diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go index 126eaaea7..4c4ee0cba 100644 --- a/eth/fetcher/block_fetcher.go +++ b/eth/fetcher/block_fetcher.go @@ -89,6 +89,9 @@ type blockBroadcasterFn func(block *types.Block, propagate bool) // chainHeightFn is a callback type to retrieve the current chain height. type chainHeightFn func() uint64 +// chainFinalizedHeightFn is a callback type to retrieve the current chain finalized height. +type chainFinalizedHeightFn func() uint64 + // headersInsertFn is a callback type to insert a batch of headers into the local chain. type headersInsertFn func(headers []*types.Header) (int, error) @@ -180,14 +183,15 @@ type BlockFetcher struct { queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports) // Callbacks - getHeader HeaderRetrievalFn // Retrieves a header from the local chain - getBlock blockRetrievalFn // Retrieves a block from the local chain - verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work - broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers - chainHeight chainHeightFn // Retrieves the current chain's height - insertHeaders headersInsertFn // Injects a batch of headers into the chain - insertChain chainInsertFn // Injects a batch of blocks into the chain - dropPeer peerDropFn // Drops a peer for misbehaving + getHeader HeaderRetrievalFn // Retrieves a header from the local chain + getBlock blockRetrievalFn // Retrieves a block from the local chain + verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work + broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers + chainHeight chainHeightFn // Retrieves the current chain's height + chainFinalizedHeight chainFinalizedHeightFn // Retrieves the current chain's finalized height + insertHeaders headersInsertFn // Injects a batch of headers into the chain + insertChain chainInsertFn // Injects a batch of blocks into the chain + dropPeer peerDropFn // Drops a peer for misbehaving // Testing hooks announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the blockAnnounce list @@ -198,31 +202,34 @@ type BlockFetcher struct { } // NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements. -func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher { +func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, + broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, chainFinalizedHeight chainFinalizedHeightFn, + insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher { return &BlockFetcher{ - light: light, - notify: make(chan *blockAnnounce), - inject: make(chan *blockOrHeaderInject), - headerFilter: make(chan chan *headerFilterTask), - bodyFilter: make(chan chan *bodyFilterTask), - done: make(chan common.Hash), - quit: make(chan struct{}), - announces: make(map[string]int), - announced: make(map[common.Hash][]*blockAnnounce), - fetching: make(map[common.Hash]*blockAnnounce), - fetched: make(map[common.Hash][]*blockAnnounce), - completing: make(map[common.Hash]*blockAnnounce), - queue: prque.New[int64, *blockOrHeaderInject](nil), - queues: make(map[string]int), - queued: make(map[common.Hash]*blockOrHeaderInject), - getHeader: getHeader, - getBlock: getBlock, - verifyHeader: verifyHeader, - broadcastBlock: broadcastBlock, - chainHeight: chainHeight, - insertHeaders: insertHeaders, - insertChain: insertChain, - dropPeer: dropPeer, + light: light, + notify: make(chan *blockAnnounce), + inject: make(chan *blockOrHeaderInject), + headerFilter: make(chan chan *headerFilterTask), + bodyFilter: make(chan chan *bodyFilterTask), + done: make(chan common.Hash), + quit: make(chan struct{}), + announces: make(map[string]int), + announced: make(map[common.Hash][]*blockAnnounce), + fetching: make(map[common.Hash]*blockAnnounce), + fetched: make(map[common.Hash][]*blockAnnounce), + completing: make(map[common.Hash]*blockAnnounce), + queue: prque.New[int64, *blockOrHeaderInject](nil), + queues: make(map[string]int), + queued: make(map[common.Hash]*blockOrHeaderInject), + getHeader: getHeader, + getBlock: getBlock, + verifyHeader: verifyHeader, + broadcastBlock: broadcastBlock, + chainHeight: chainHeight, + chainFinalizedHeight: chainFinalizedHeight, + insertHeaders: insertHeaders, + insertChain: insertChain, + dropPeer: dropPeer, } } @@ -366,7 +373,8 @@ func (f *BlockFetcher) loop() { break } // Otherwise if fresh and still unknown, try and import - if (number+maxUncleDist < height) || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) { + finalizedHeight := f.chainFinalizedHeight() + if (number+maxUncleDist < height) || number <= finalizedHeight || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) { f.forgetBlock(hash) continue } @@ -397,7 +405,13 @@ func (f *BlockFetcher) loop() { } // If we have a valid block number, check that it's potentially useful if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { - log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist) + log.Debug("Peer discarded announcement by distance", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist) + blockAnnounceDropMeter.Mark(1) + break + } + finalized := f.chainFinalizedHeight() + if notification.number <= finalized { + log.Debug("Peer discarded announcement by finality", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "finalized", finalized) blockAnnounceDropMeter.Mark(1) break } @@ -782,6 +796,14 @@ func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.B f.forgetHash(hash) return } + // Discard any block that is below the current finalized height + finalizedHeight := f.chainFinalizedHeight() + if number <= finalizedHeight { + log.Debug("Discarded delivered header or block, below or equal to finalized", "peer", peer, "number", number, "hash", hash, "finalized", finalizedHeight) + blockBroadcastDropMeter.Mark(1) + f.forgetHash(hash) + return + } // Schedule the block for future importing if _, ok := f.queued[hash]; !ok { op := &blockOrHeaderInject{origin: peer} diff --git a/eth/fetcher/block_fetcher_test.go b/eth/fetcher/block_fetcher_test.go index 6927300b1..42eacc8e6 100644 --- a/eth/fetcher/block_fetcher_test.go +++ b/eth/fetcher/block_fetcher_test.go @@ -101,7 +101,9 @@ func newTester(light bool) *fetcherTester { blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis}, drops: make(map[string]bool), } - tester.fetcher = NewBlockFetcher(light, tester.getHeader, tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertHeaders, tester.insertChain, tester.dropPeer) + tester.fetcher = NewBlockFetcher(light, tester.getHeader, tester.getBlock, tester.verifyHeader, + tester.broadcastBlock, tester.chainHeight, tester.chainFinalizedHeight, tester.insertHeaders, + tester.insertChain, tester.dropPeer) tester.fetcher.Start() return tester @@ -143,6 +145,18 @@ func (f *fetcherTester) chainHeight() uint64 { return f.blocks[f.hashes[len(f.hashes)-1]].NumberU64() } +func (f *fetcherTester) chainFinalizedHeight() uint64 { + f.lock.RLock() + defer f.lock.RUnlock() + if len(f.hashes) < 3 { + return 0 + } + if f.fetcher.light { + return f.headers[f.hashes[len(f.hashes)-3]].Number.Uint64() + } + return f.blocks[f.hashes[len(f.hashes)-3]].NumberU64() +} + // insertChain injects a new headers into the simulated chain. func (f *fetcherTester) insertHeaders(headers []*types.Header) (int, error) { f.lock.Lock() @@ -735,6 +749,67 @@ func testDistantAnnouncementDiscarding(t *testing.T, light bool) { } } +// Tests that announcements with numbers much lower or equal to the current finalized block +// head get discarded to prevent wasting resources on useless blocks from faulty peers. +func TestFullFinalizedAnnouncementDiscarding(t *testing.T) { + testFinalizedAnnouncementDiscarding(t, false) +} +func TestLightFinalizedAnnouncementDiscarding(t *testing.T) { + testFinalizedAnnouncementDiscarding(t, true) +} + +func testFinalizedAnnouncementDiscarding(t *testing.T, light bool) { + // Create a long chain to import and define the discard boundaries + hashes, blocks := makeChain(3*maxQueueDist, 0, genesis) + + head := hashes[len(hashes)/2] + justified := hashes[len(hashes)/2+1] + finalized := hashes[len(hashes)/2+2] + beforeFinalized := hashes[len(hashes)/2+3] + + low, equal := len(hashes)/2+3, len(hashes)/2+2 + + // Create a tester and simulate a head block being the middle of the above chain + tester := newTester(light) + + tester.lock.Lock() + tester.hashes = []common.Hash{beforeFinalized, finalized, justified, head} + tester.headers = map[common.Hash]*types.Header{ + beforeFinalized: blocks[beforeFinalized].Header(), + finalized: blocks[finalized].Header(), + justified: blocks[justified].Header(), + head: blocks[head].Header(), + } + tester.blocks = map[common.Hash]*types.Block{ + beforeFinalized: blocks[beforeFinalized], + finalized: blocks[finalized], + justified: blocks[justified], + head: blocks[head], + } + tester.lock.Unlock() + + headerFetcher := tester.makeHeaderFetcher("lower", blocks, -gatherSlack) + bodyFetcher := tester.makeBodyFetcher("lower", blocks, 0) + + fetching := make(chan struct{}, 2) + tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- struct{}{} } + + // Ensure that a block with a lower number than the finalized height is discarded + tester.fetcher.Notify("lower", hashes[low], blocks[hashes[low]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher) + select { + case <-time.After(50 * time.Millisecond): + case <-fetching: + t.Fatalf("fetcher requested stale header") + } + // Ensure that a block with a same number of the finalized height is discarded + tester.fetcher.Notify("equal", hashes[equal], blocks[hashes[equal]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher) + select { + case <-time.After(50 * time.Millisecond): + case <-fetching: + t.Fatalf("fetcher requested future header") + } +} + // Tests that peers announcing blocks with invalid numbers (i.e. not matching // the headers provided afterwards) get dropped as malicious. func TestFullInvalidNumberAnnouncement(t *testing.T) { testInvalidNumberAnnouncement(t, false) } @@ -775,7 +850,6 @@ func testInvalidNumberAnnouncement(t *testing.T, light bool) { continue case <-time.After(1 * time.Second): t.Fatal("announce timeout") - return } } } diff --git a/eth/handler.go b/eth/handler.go index 281a245d4..a0d5e990f 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -254,6 +254,13 @@ func newHandler(config *handlerConfig) (*handler, error) { heighter := func() uint64 { return h.chain.CurrentBlock().Number.Uint64() } + finalizeHeighter := func() uint64 { + fblock := h.chain.CurrentFinalBlock() + if fblock == nil { + return 0 + } + return fblock.Number.Uint64() + } inserter := func(blocks types.Blocks) (int, error) { // All the block fetcher activities should be disabled // after the transition. Print the warning log. @@ -303,7 +310,8 @@ func newHandler(config *handlerConfig) (*handler, error) { } return h.chain.InsertChain(blocks) } - h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer) + h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, + heighter, finalizeHeighter, nil, inserter, h.removePeer) fetchTx := func(peer string, hashes []common.Hash) error { p := h.peers.peer(peer) From 10fc0752ff9f866f4a827c9a038e20986461ae98 Mon Sep 17 00:00:00 2001 From: tak Date: Fri, 18 Oct 2024 18:13:57 +0800 Subject: [PATCH 3/3] remove stop chan from eth/handler (#82) --- eth/handler.go | 25 ++++--------------------- 1 file changed, 4 insertions(+), 21 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index a0d5e990f..9704fc373 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -153,7 +153,6 @@ type handler struct { // channels for fetcher, syncer, txsyncLoop quitSync chan struct{} - stopCh chan struct{} chainSync *chainSyncer wg sync.WaitGroup @@ -182,7 +181,6 @@ func newHandler(config *handlerConfig) (*handler, error) { quitSync: make(chan struct{}), handlerDoneCh: make(chan struct{}), handlerStartCh: make(chan struct{}), - stopCh: make(chan struct{}), } if config.Sync == downloader.FullSync { // The database seems empty as the current block is the genesis. Yet the snap @@ -630,7 +628,6 @@ func (h *handler) Stop() { h.voteMonitorSub.Unsubscribe() } } - close(h.stopCh) // Quit chainSync and txsync64. // After this is done, no new peers will be accepted. close(h.quitSync) @@ -789,18 +786,10 @@ func (h *handler) BroadcastVote(vote *types.VoteEnvelope) { func (h *handler) minedBroadcastLoop() { defer h.wg.Done() - for { - select { - case obj := <-h.minedBlockSub.Chan(): - if obj == nil { - continue - } - if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok { - h.BroadcastBlock(ev.Block, true) // First propagate block to peers - h.BroadcastBlock(ev.Block, false) // Only then announce to the rest - } - case <-h.stopCh: - return + for obj := range h.minedBlockSub.Chan() { + if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok { + h.BroadcastBlock(ev.Block, true) // First propagate block to peers + h.BroadcastBlock(ev.Block, false) // Only then announce to the rest } } } @@ -814,8 +803,6 @@ func (h *handler) txBroadcastLoop() { h.BroadcastTransactions(event.Txs) case <-h.txsSub.Err(): return - case <-h.stopCh: - return } } } @@ -831,8 +818,6 @@ func (h *handler) voteBroadcastLoop() { h.BroadcastVote(event.Vote) case <-h.votesSub.Err(): return - case <-h.stopCh: - return } } } @@ -848,8 +833,6 @@ func (h *handler) startMaliciousVoteMonitor() { h.maliciousVoteMonitor.ConflictDetect(event.Vote, pendingBlockNumber) case <-h.voteMonitorSub.Err(): return - case <-h.stopCh: - return } } }