Skip to content

Commit 624241e

Browse files
committed
feat: refactor miner/dBFT to properly support post-merge behaviour
1 parent 2e589ba commit 624241e

File tree

18 files changed

+252
-52
lines changed

18 files changed

+252
-52
lines changed

consensus/consensus.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,3 +118,11 @@ type Engine interface {
118118
// Close terminates any background threads maintained by the consensus engine.
119119
Close() error
120120
}
121+
122+
// PoS consensus engine.
123+
type PoS interface {
124+
Engine
125+
126+
// GetFinalizedHeader retrieves the finalized header for a given header.
127+
GetFinalizedHeader(chain ChainHeaderReader, header *types.Header) *types.Header
128+
}

consensus/dbft/dbft.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2948,3 +2948,15 @@ func unpackContractExecutionResult(res interface{}, result *core.ExecutionResult
29482948
}
29492949
return contractAbi.UnpackIntoInterface(&res, method, result.Return())
29502950
}
2951+
2952+
// GetFinalizedHeader returns highest finalized block header.
2953+
func (c *DBFT) GetFinalizedHeader(chain consensus.ChainHeaderReader, header *types.Header) *types.Header {
2954+
if chain == nil || header == nil {
2955+
return nil
2956+
}
2957+
if !chain.Config().IsNeoXDKG(header.Number) || header.Number.Uint64() < 1 {
2958+
return chain.GetHeaderByNumber(0)
2959+
}
2960+
2961+
return chain.GetHeaderByNumber(header.Number.Uint64() - 1)
2962+
}

core/blockchain.go

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -225,14 +225,15 @@ type BlockChain struct {
225225
statedb *state.CachingDB // State database to reuse between imports (contains state cache)
226226
txIndexer *txIndexer // Transaction indexer, might be nil if not enabled
227227

228-
hc *HeaderChain
229-
rmLogsFeed event.Feed
230-
chainFeed event.Feed
231-
chainHeadFeed event.Feed
232-
logsFeed event.Feed
233-
blockProcFeed event.Feed
234-
scope event.SubscriptionScope
235-
genesisBlock *types.Block
228+
hc *HeaderChain
229+
rmLogsFeed event.Feed
230+
chainFeed event.Feed
231+
chainHeadFeed event.Feed
232+
logsFeed event.Feed
233+
blockProcFeed event.Feed
234+
finalizedHeaderFeed event.Feed
235+
scope event.SubscriptionScope
236+
genesisBlock *types.Block
236237

237238
// This mutex synchronizes chain write operations.
238239
// Readers don't need to take it, they can just read the database.
@@ -497,6 +498,17 @@ func (bc *BlockChain) empty() bool {
497498
return true
498499
}
499500

501+
// GetFinalizedNumber returns the highest finalized number before the specific block.
502+
func (bc *BlockChain) GetFinalizedNumber(header *types.Header) uint64 {
503+
if p, ok := bc.engine.(consensus.PoS); ok {
504+
if finalizedHeader := p.GetFinalizedHeader(bc, header); finalizedHeader != nil {
505+
return finalizedHeader.Number.Uint64()
506+
}
507+
}
508+
509+
return 0
510+
}
511+
500512
// loadLastState loads the last known chain state from the database. This method
501513
// assumes that the chain manager mutex is held.
502514
func (bc *BlockChain) loadLastState() error {
@@ -551,8 +563,7 @@ func (bc *BlockChain) loadLastState() error {
551563
}
552564
// Issue a status log for the user
553565
var (
554-
currentSnapBlock = bc.CurrentSnapBlock()
555-
currentFinalBlock = bc.CurrentFinalBlock()
566+
currentSnapBlock = bc.CurrentSnapBlock()
556567

557568
headerTd = bc.GetTd(headHeader.Hash(), headHeader.Number.Uint64())
558569
blockTd = bc.GetTd(headBlock.Hash(), headBlock.NumberU64())
@@ -565,9 +576,14 @@ func (bc *BlockChain) loadLastState() error {
565576
snapTd := bc.GetTd(currentSnapBlock.Hash(), currentSnapBlock.Number.Uint64())
566577
log.Info("Loaded most recent local snap block", "number", currentSnapBlock.Number, "hash", currentSnapBlock.Hash(), "td", snapTd, "age", common.PrettyAge(time.Unix(int64(currentSnapBlock.Time), 0)))
567578
}
568-
if currentFinalBlock != nil {
569-
finalTd := bc.GetTd(currentFinalBlock.Hash(), currentFinalBlock.Number.Uint64())
570-
log.Info("Loaded most recent local finalized block", "number", currentFinalBlock.Number, "hash", currentFinalBlock.Hash(), "td", finalTd, "age", common.PrettyAge(time.Unix(int64(currentFinalBlock.Time), 0)))
579+
if p, ok := bc.engine.(consensus.PoS); ok {
580+
if currentFinalizedHeader := p.GetFinalizedHeader(bc, headHeader); currentFinalizedHeader != nil {
581+
bc.currentFinalBlock.Store(currentFinalizedHeader)
582+
if currentFinalizedBlock := bc.GetBlockByHash(currentFinalizedHeader.Hash()); currentFinalizedBlock != nil {
583+
finalTd := bc.GetTd(currentFinalizedBlock.Hash(), currentFinalizedBlock.NumberU64())
584+
log.Info("Loaded most recent local finalized block", "number", currentFinalizedBlock.Number(), "hash", currentFinalizedBlock.Hash(), "root", currentFinalizedBlock.Root(), "td", finalTd, "age", common.PrettyAge(time.Unix(int64(currentFinalizedBlock.Time()), 0)))
585+
}
586+
}
571587
}
572588
if pivot := rawdb.ReadLastPivotNumber(bc.db); pivot != nil {
573589
log.Info("Loaded last snap-sync pivot marker", "number", *pivot)
@@ -1615,13 +1631,24 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
16151631
if len(logs) > 0 {
16161632
bc.logsFeed.Send(logs)
16171633
}
1634+
1635+
var finalizedHeader *types.Header
1636+
if p, ok := bc.Engine().(consensus.PoS); ok {
1637+
if finalizedHeader = p.GetFinalizedHeader(bc, block.Header()); finalizedHeader != nil {
1638+
bc.SetFinalized(finalizedHeader)
1639+
}
1640+
}
1641+
16181642
// In theory, we should fire a ChainHeadEvent when we inject
16191643
// a canonical block, but sometimes we can insert a batch of
16201644
// canonical blocks. Avoid firing too many ChainHeadEvents,
16211645
// we will fire an accumulated ChainHeadEvent and disable fire
16221646
// event here.
16231647
if emitHeadEvent {
16241648
bc.chainHeadFeed.Send(ChainHeadEvent{Header: block.Header()})
1649+
if finalizedHeader != nil {
1650+
bc.finalizedHeaderFeed.Send(FinalizedHeaderEvent{finalizedHeader})
1651+
}
16251652
}
16261653
}
16271654
return status, nil
@@ -1708,6 +1735,11 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness
17081735
defer func() {
17091736
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
17101737
bc.chainHeadFeed.Send(ChainHeadEvent{Header: lastCanon.Header()})
1738+
if p, ok := bc.Engine().(consensus.PoS); ok {
1739+
if finalizedHeader := p.GetFinalizedHeader(bc, lastCanon.Header()); finalizedHeader != nil {
1740+
bc.finalizedHeaderFeed.Send(FinalizedHeaderEvent{finalizedHeader})
1741+
}
1742+
}
17111743
}
17121744
}()
17131745
// Start the parallel header verifier

core/blockchain_reader.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,29 @@ func (bc *BlockChain) CurrentSnapBlock() *types.Header {
5454
// CurrentFinalBlock retrieves the current finalized block of the canonical
5555
// chain. The block is retrieved from the blockchain's internal cache.
5656
func (bc *BlockChain) CurrentFinalBlock() *types.Header {
57-
return bc.currentFinalBlock.Load()
57+
if p, ok := bc.engine.(consensus.PoS); ok {
58+
currentHeader := bc.CurrentHeader()
59+
if currentHeader == nil {
60+
return nil
61+
}
62+
return p.GetFinalizedHeader(bc, currentHeader)
63+
}
64+
65+
return nil
5866
}
5967

6068
// CurrentSafeBlock retrieves the current safe block of the canonical
6169
// chain. The block is retrieved from the blockchain's internal cache.
6270
func (bc *BlockChain) CurrentSafeBlock() *types.Header {
63-
return bc.currentSafeBlock.Load()
71+
if p, ok := bc.engine.(consensus.PoS); ok {
72+
currentHeader := bc.CurrentHeader()
73+
if currentHeader == nil {
74+
return nil
75+
}
76+
return p.GetFinalizedHeader(bc, currentHeader)
77+
}
78+
79+
return nil
6480
}
6581

6682
// HasHeader checks if a block header is present in the database or not, caching
@@ -440,3 +456,8 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript
440456
func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription {
441457
return bc.scope.Track(bc.blockProcFeed.Subscribe(ch))
442458
}
459+
460+
// SubscribeFinalizedHeaderEvent registers a subscription of FinalizedHeaderEvent.
461+
func (bc *BlockChain) SubscribeFinalizedHeaderEvent(ch chan<- FinalizedHeaderEvent) event.Subscription {
462+
return bc.scope.Track(bc.finalizedHeaderFeed.Subscribe(ch))
463+
}

core/events.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ type NewMinedBlockEvent struct{ Block *types.Block }
3232
// RemovedLogsEvent is posted when a reorg happens
3333
type RemovedLogsEvent struct{ Logs []*types.Log }
3434

35+
// FinalizedHeaderEvent is posted when a finalized header is reached.
36+
type FinalizedHeaderEvent struct{ Header *types.Header }
37+
3538
type ChainEvent struct {
3639
Header *types.Header
3740
}

core/headerchain.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,17 @@ func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine c
9999
return hc, nil
100100
}
101101

102+
// GetFinalizedNumber returns the highest finalized number before the specific block.
103+
func (hc *HeaderChain) GetFinalizedNumber(header *types.Header) uint64 {
104+
if p, ok := hc.engine.(consensus.PoS); ok {
105+
if finalizedHeader := p.GetFinalizedHeader(hc, header); finalizedHeader != nil {
106+
return finalizedHeader.Number.Uint64()
107+
}
108+
}
109+
110+
return 0
111+
}
112+
102113
// GetBlockNumber retrieves the block number belonging to the given hash
103114
// from the cache or database
104115
func (hc *HeaderChain) GetBlockNumber(hash common.Hash) *uint64 {

eth/api_backend.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,10 @@ func (b *EthAPIBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) e
277277
return b.eth.BlockChain().SubscribeChainHeadEvent(ch)
278278
}
279279

280+
func (b *EthAPIBackend) SubscribeFinalizedHeaderEvent(ch chan<- core.FinalizedHeaderEvent) event.Subscription {
281+
return b.eth.BlockChain().SubscribeFinalizedHeaderEvent(ch)
282+
}
283+
280284
func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
281285
return b.eth.BlockChain().SubscribeLogsEvent(ch)
282286
}

eth/ethconfig/config.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -187,19 +187,19 @@ type Config struct {
187187
// Clique is allowed for now to live standalone, but ethash is forbidden and can
188188
// only exist on already merged networks.
189189
func CreateConsensusEngine(config *params.ChainConfig, db ethdb.Database, statisticsCfg dbft.StatisticsConfig) (consensus.Engine, error) {
190+
if config.DBFT != nil {
191+
bft, err := dbft.New(config, db, statisticsCfg)
192+
if err != nil {
193+
return nil, fmt.Errorf("failed to create dBFT engine: %w", err)
194+
}
195+
return bft, nil
196+
}
190197
if config.TerminalTotalDifficulty == nil {
191198
return nil, fmt.Errorf("only PoS networks are supported, please transition old ones with Geth v1.13.x")
192199
}
193200
// If proof-of-authority is requested, set it up
194201
if config.Clique != nil {
195202
return beacon.New(clique.New(config.Clique, db)), nil
196203
}
197-
if config.DBFT != nil {
198-
bft, err := dbft.New(config, db, statisticsCfg)
199-
if err != nil {
200-
return nil, fmt.Errorf("failed to create dBFT engine: %w", err)
201-
}
202-
return beacon.New(bft), nil
203-
}
204204
return beacon.New(ethash.NewFaker()), nil
205205
}

eth/filters/api.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,65 @@ func (api *FilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
269269
return rpcSub, nil
270270
}
271271

272+
// NewFinalizedHeaderFilter creates a filter that fetches finalized headers that are reached.
273+
func (api *FilterAPI) NewFinalizedHeaderFilter() rpc.ID {
274+
var (
275+
headers = make(chan *types.Header)
276+
headerSub = api.events.SubscribeNewFinalizedHeaders(headers)
277+
)
278+
279+
api.filtersMu.Lock()
280+
api.filters[headerSub.ID] = &filter{typ: FinalizedHeadersSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: headerSub}
281+
api.filtersMu.Unlock()
282+
283+
go func() {
284+
for {
285+
select {
286+
case h := <-headers:
287+
api.filtersMu.Lock()
288+
if f, found := api.filters[headerSub.ID]; found {
289+
f.hashes = append(f.hashes, h.Hash())
290+
}
291+
api.filtersMu.Unlock()
292+
case <-headerSub.Err():
293+
api.filtersMu.Lock()
294+
delete(api.filters, headerSub.ID)
295+
api.filtersMu.Unlock()
296+
return
297+
}
298+
}
299+
}()
300+
301+
return headerSub.ID
302+
}
303+
304+
// NewFinalizedHeaders send a notification each time a new finalized header is reached.
305+
func (api *FilterAPI) NewFinalizedHeaders(ctx context.Context) (*rpc.Subscription, error) {
306+
notifier, supported := rpc.NotifierFromContext(ctx)
307+
if !supported {
308+
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
309+
}
310+
311+
rpcSub := notifier.CreateSubscription()
312+
313+
go func() {
314+
headers := make(chan *types.Header)
315+
headersSub := api.events.SubscribeNewFinalizedHeaders(headers)
316+
defer headersSub.Unsubscribe()
317+
318+
for {
319+
select {
320+
case h := <-headers:
321+
notifier.Notify(rpcSub.ID, h)
322+
case <-rpcSub.Err():
323+
return
324+
}
325+
}
326+
}()
327+
328+
return rpcSub, nil
329+
}
330+
272331
// Logs creates a subscription that fires for all new log that match the given filter criteria.
273332
func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
274333
notifier, supported := rpc.NotifierFromContext(ctx)

0 commit comments

Comments
 (0)