From 2e5f2ac5c408d69cb90cd55261659f36b512284b Mon Sep 17 00:00:00 2001 From: zjubfd <296179868@qq.com> Date: Fri, 7 Jan 2022 12:32:39 +0800 Subject: [PATCH 1/4] prefetch state by apply transactions within one block --- core/blockchain.go | 23 +++++++++--- core/state_prefetcher.go | 75 ++++++++++++++++++++++++---------------- core/state_processor.go | 1 - 3 files changed, 64 insertions(+), 35 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 4a7cdf9a54..9be99e5528 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -90,6 +90,7 @@ const ( maxFutureBlocks = 256 maxTimeFutureBlocks = 30 maxBeyondBlocks = 2048 + prefetchTxNumber = 100 diffLayerFreezerRecheckInterval = 3 * time.Second diffLayerPruneRecheckInterval = 1 * time.Second // The interval to prune unverified diff layers @@ -233,10 +234,11 @@ type BlockChain struct { running int32 // 0 if chain is running, 1 when stopped procInterrupt int32 // interrupt signaler for block processing - engine consensus.Engine - validator Validator // Block and state validator interface - processor Processor // Block transaction processor interface - vmConfig vm.Config + engine consensus.Engine + prefetcher Prefetcher + validator Validator // Block and state validator interface + processor Processor // Block transaction processor interface + vmConfig vm.Config shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion. @@ -295,6 +297,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par diffNumToBlockHashes: make(map[uint64]map[common.Hash]struct{}), diffPeersToDiffHashes: make(map[string]map[common.Hash]struct{}), } + bc.prefetcher = NewStatePrefetcher(chainConfig, bc, engine) bc.validator = NewBlockValidator(chainConfig, bc, engine) bc.processor = NewStateProcessor(chainConfig, bc, engine) @@ -2051,13 +2054,21 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er // Enable prefetching to pull in trie node paths while processing transactions statedb.StartPrefetcher("chain") - + var followupInterrupt uint32 + // For diff sync, it may fallback to full sync, so we still do prefetch + if len(block.Transactions()) >= prefetchTxNumber { + throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps) + go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) { + bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt) + }(time.Now(), block, throwaway, &followupInterrupt) + } //Process block using the parent state as reference point substart := time.Now() statedb, receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig) activeState = statedb if err != nil { bc.reportBlock(block, receipts, err) + atomic.StoreUint32(&followupInterrupt, 1) return it.index, err } // Update the metrics touched during block processing @@ -2076,6 +2087,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { log.Error("validate state failed", "error", err) bc.reportBlock(block, receipts, err) + atomic.StoreUint32(&followupInterrupt, 1) return it.index, err } } @@ -2093,6 +2105,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er substart = time.Now() status, err := bc.writeBlockWithState(block, receipts, logs, statedb, false) if err != nil { + atomic.StoreUint32(&followupInterrupt, 1) return it.index, err } // Update the metrics touched during block commit diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index dacd8df404..2ff745276c 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -17,6 +17,7 @@ package core import ( + "runtime" "sync/atomic" "github.com/ethereum/go-ethereum/consensus" @@ -35,42 +36,58 @@ type statePrefetcher struct { engine consensus.Engine // Consensus engine used for block rewards } +// newStatePrefetcher initialises a new statePrefetcher. +func NewStatePrefetcher(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *statePrefetcher { + return &statePrefetcher{ + config: config, + bc: bc, + engine: engine, + } +} + // Prefetch processes the state changes according to the Ethereum rules by running // the transaction messages using the statedb, but any changes are discarded. The -// only goal is to pre-cache transaction signatures and state trie nodes. +// only goal is to pre-cache transaction signatures and snapshot clean state. func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *uint32) { var ( - header = block.Header() - gaspool = new(GasPool).AddGas(block.GasLimit()) - blockContext = NewEVMBlockContext(header, p.bc, nil) - evm = vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg) - signer = types.MakeSigner(p.config, header.Number) + header = block.Header() + signer = types.MakeSigner(p.config, header.Number) ) - // Iterate over and process the individual transactions - byzantium := p.config.IsByzantium(block.Number()) - for i, tx := range block.Transactions() { - // If block precaching was interrupted, abort - if interrupt != nil && atomic.LoadUint32(interrupt) == 1 { - return - } - // Convert the transaction into an executable message and pre-cache its sender - msg, err := tx.AsMessage(signer) - if err != nil { - return // Also invalid block, bail out - } - statedb.Prepare(tx.Hash(), block.Hash(), i) - if err := precacheTransaction(msg, p.config, gaspool, statedb, header, evm); err != nil { - return // Ugh, something went horribly wrong, bail out - } - // If we're pre-byzantium, pre-load trie nodes for the intermediate root - if !byzantium { - statedb.IntermediateRoot(true) + transactions := block.Transactions() + threads := runtime.NumCPU() + batch := len(transactions) / (threads + 1) + + // No need to execute the first batch, since the main processor will do it. + for i := 1; i <= threads; i++ { + start := i * batch + end := (i + 1) * batch + if i == threads { + end = len(transactions) } + go func(start, end int) { + newStatedb := statedb.Copy() + gaspool := new(GasPool).AddGas(block.GasLimit()) + blockContext := NewEVMBlockContext(header, p.bc, nil) + evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg) + // Iterate over and process the individual transactions + for i, tx := range transactions[start:end] { + // If block precaching was interrupted, abort + if interrupt != nil && atomic.LoadUint32(interrupt) == 1 { + return + } + // Convert the transaction into an executable message and pre-cache its sender + msg, err := tx.AsMessage(signer) + if err != nil { + return // Also invalid block, bail out + } + newStatedb.Prepare(tx.Hash(), block.Hash(), i) + if err := precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm); err != nil { + return // Ugh, something went horribly wrong, bail out + } + } + }(start, end) } - // If were post-byzantium, pre-load trie nodes for the final root hash - if byzantium { - statedb.IntermediateRoot(true) - } + } // precacheTransaction attempts to apply a transaction to the given state database diff --git a/core/state_processor.go b/core/state_processor.go index 6ca99f2eec..5652547db7 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -378,7 +378,6 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg gp = new(GasPool).AddGas(block.GasLimit()) ) signer := types.MakeSigner(p.bc.chainConfig, block.Number()) - statedb.TryPreload(block, signer) var receipts = make([]*types.Receipt, 0) // Mutate the block and state according to any hard-fork specs if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 { From 899d77e793d7c65c06c986aa66fbacdfa2f9583b Mon Sep 17 00:00:00 2001 From: zjubfd <296179868@qq.com> Date: Mon, 10 Jan 2022 14:19:33 +0800 Subject: [PATCH 2/4] resolve comments --- core/blockchain.go | 2 +- core/state_prefetcher.go | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 9be99e5528..e0e1ad68ad 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2057,7 +2057,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er var followupInterrupt uint32 // For diff sync, it may fallback to full sync, so we still do prefetch if len(block.Transactions()) >= prefetchTxNumber { - throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps) + throwaway := statedb.Copy() go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) { bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt) }(time.Now(), block, throwaway, &followupInterrupt) diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index 2ff745276c..a6b80d2449 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -56,7 +56,9 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c transactions := block.Transactions() threads := runtime.NumCPU() batch := len(transactions) / (threads + 1) - + if batch == 0 { + return + } // No need to execute the first batch, since the main processor will do it. for i := 1; i <= threads; i++ { start := i * batch From 9ea676304fd4fd3e5ab7972de8e700c1b7d7b841 Mon Sep 17 00:00:00 2001 From: zjubfd <296179868@qq.com> Date: Mon, 10 Jan 2022 17:42:18 +0800 Subject: [PATCH 3/4] stop prefetch once process is done --- core/blockchain.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index e0e1ad68ad..6aa5916218 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2065,10 +2065,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er //Process block using the parent state as reference point substart := time.Now() statedb, receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig) + atomic.StoreUint32(&followupInterrupt, 1) activeState = statedb if err != nil { bc.reportBlock(block, receipts, err) - atomic.StoreUint32(&followupInterrupt, 1) return it.index, err } // Update the metrics touched during block processing @@ -2087,7 +2087,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { log.Error("validate state failed", "error", err) bc.reportBlock(block, receipts, err) - atomic.StoreUint32(&followupInterrupt, 1) return it.index, err } } @@ -2105,7 +2104,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er substart = time.Now() status, err := bc.writeBlockWithState(block, receipts, logs, statedb, false) if err != nil { - atomic.StoreUint32(&followupInterrupt, 1) return it.index, err } // Update the metrics touched during block commit From 565e9bf79a17e04077603900639389a5cd8a41a2 Mon Sep 17 00:00:00 2001 From: zjubfd <296179868@qq.com> Date: Tue, 11 Jan 2022 11:01:22 +0800 Subject: [PATCH 4/4] update comments fix ut --- core/blockchain_diff_test.go | 2 +- core/state_prefetcher.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/blockchain_diff_test.go b/core/blockchain_diff_test.go index facd86b52b..451a966589 100644 --- a/core/blockchain_diff_test.go +++ b/core/blockchain_diff_test.go @@ -372,7 +372,7 @@ func TestFreezeDiffLayer(t *testing.T) { t.Errorf("size of diff queue is wrong, expected: %d, get: %d", blockNum-1, fullBackend.chain.diffQueue.Size()) } - time.Sleep(diffLayerFreezerRecheckInterval + 1*time.Second) + time.Sleep(diffLayerFreezerRecheckInterval + 2*time.Second) if fullBackend.chain.diffQueue.Size() != int(fullBackend.chain.triesInMemory) { t.Errorf("size of diff queue is wrong, expected: %d, get: %d", blockNum, fullBackend.chain.diffQueue.Size()) } diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index a6b80d2449..ec4e7bf972 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -36,7 +36,7 @@ type statePrefetcher struct { engine consensus.Engine // Consensus engine used for block rewards } -// newStatePrefetcher initialises a new statePrefetcher. +// NewStatePrefetcher initialises a new statePrefetcher. func NewStatePrefetcher(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *statePrefetcher { return &statePrefetcher{ config: config,