From 650e1413f3f61ce9378670fbe80b08bd57887925 Mon Sep 17 00:00:00 2001 From: Jerry Date: Tue, 11 Apr 2023 15:30:08 -0700 Subject: [PATCH] Allow process in state processor to be interrupted by caller --- core/blockchain.go | 2 +- core/blockchain_test.go | 34 ++++++++++++++++++++++++++++---- core/chain_makers.go | 13 ++++++++++++ core/parallel_state_processor.go | 3 ++- core/state_processor.go | 11 ++++++++++- core/types.go | 4 +++- eth/state_accessor.go | 22 +++++++++++---------- 7 files changed, 71 insertions(+), 18 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 37d2aff25b..92d74a8be7 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1803,7 +1803,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool) // Process block using the parent state as reference point substart := time.Now() - receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig) + receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig, nil) if err != nil { bc.reportBlock(block, receipts, err) atomic.StoreUint32(&followupInterrupt, 1) diff --git a/core/blockchain_test.go b/core/blockchain_test.go index fa6b61225e..3c80ef3718 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -17,6 +17,7 @@ package core import ( + "context" "errors" "fmt" "io/ioutil" @@ -123,9 +124,11 @@ func testFork(t *testing.T, blockchain *BlockChain, i, n int, full bool, compara if full { cur := blockchain.CurrentBlock() tdPre = blockchain.GetTd(cur.Hash(), cur.NumberU64()) - if err := testBlockChainImport(blockChainB, blockchain); err != nil { + + if err := testBlockChainImport(blockChainB, blockchain, nil); err != nil { t.Fatalf("failed to import forked block chain: %v", err) } + last := blockChainB[len(blockChainB)-1] tdPost = blockchain.GetTd(last.Hash(), last.NumberU64()) } else { @@ -143,7 +146,7 @@ func testFork(t *testing.T, blockchain *BlockChain, i, n int, full bool, compara // testBlockChainImport tries to process a chain of blocks, writing them into // the database if successful. -func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { +func testBlockChainImport(chain types.Blocks, blockchain *BlockChain, ctx context.Context) error { for _, block := range chain { // Try and process the block err := blockchain.engine.VerifyHeader(blockchain, block.Header(), true) @@ -156,11 +159,14 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { } return err } + statedb, err := state.New(blockchain.GetBlockByHash(block.ParentHash()).Root(), blockchain.stateCache, nil) + if err != nil { return err } - receipts, _, usedGas, err := blockchain.processor.Process(block, statedb, vm.Config{}) + + receipts, _, usedGas, err := blockchain.processor.Process(block, statedb, vm.Config{}, ctx) if err != nil { blockchain.reportBlock(block, receipts, err) return err @@ -180,6 +186,26 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { return nil } +func TestBlockChainImportInterrupt(t *testing.T) { + t.Parallel() + + db, blockchain, err := newCanonical(ethash.NewFaker(), 10, true) + if err != nil { + t.Fatalf("failed to make new canonical chain: %v", err) + } + + defer blockchain.Stop() + + blockChainB := makeFakeNonEmptyBlockChain(blockchain.CurrentBlock(), 5, ethash.NewFaker(), db, forkSeed, 5) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + if err := testBlockChainImport(blockChainB, blockchain, ctx); err != context.Canceled { + t.Errorf("block chain import is not cancelled correctly, got %v, want %v", err, context.Canceled) + } +} + // testHeaderChainImport tries to process a chain of header, writing them into // the database if successful. func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error { @@ -476,7 +502,7 @@ func testBrokenChain(t *testing.T, full bool) { // Create a forked chain, and try to insert with a missing link if full { chain := makeBlockChain(blockchain.CurrentBlock(), 5, ethash.NewFaker(), db, forkSeed)[1:] - if err := testBlockChainImport(chain, blockchain); err == nil { + if err := testBlockChainImport(chain, blockchain, nil); err == nil { t.Errorf("broken block chain not reported") } } else { diff --git a/core/chain_makers.go b/core/chain_makers.go index e9944e4744..0857128288 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -335,6 +335,19 @@ func makeBlockChain(parent *types.Block, n int, engine consensus.Engine, db ethd return blocks } +// makeBlockChain creates a deterministic chain of blocks rooted at parent with fake invalid transactions. +func makeFakeNonEmptyBlockChain(parent *types.Block, n int, engine consensus.Engine, db ethdb.Database, seed int, numTx int) []*types.Block { + blocks, _ := GenerateChain(params.TestChainConfig, parent, engine, db, n, func(i int, b *BlockGen) { + addr := common.Address{0: byte(seed), 19: byte(i)} + b.SetCoinbase(addr) + for j := 0; j < numTx; j++ { + b.txs = append(b.txs, types.NewTransaction(0, addr, big.NewInt(1000), params.TxGas, nil, nil)) + } + }) + + return blocks +} + type fakeChainReader struct { config *params.ChainConfig stateSyncData []*types.StateSyncData diff --git a/core/parallel_state_processor.go b/core/parallel_state_processor.go index c4f3530374..4d0de960fa 100644 --- a/core/parallel_state_processor.go +++ b/core/parallel_state_processor.go @@ -17,6 +17,7 @@ package core import ( + "context" "fmt" "math/big" "time" @@ -273,7 +274,7 @@ var parallelizabilityTimer = metrics.NewRegisteredTimer("block/parallelizability // returns the amount of gas that was used in the process. If any of the // transactions failed to execute due to insufficient gas it will return an error. // nolint:gocognit -func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error) { +func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config, interruptCtx context.Context) (types.Receipts, []*types.Log, uint64, error) { blockstm.SetProcs(cfg.ParallelSpeculativeProcesses) var ( diff --git a/core/state_processor.go b/core/state_processor.go index 7cfe613080..90dcfdccf0 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -17,6 +17,7 @@ package core import ( + "context" "fmt" "math/big" @@ -56,7 +57,7 @@ func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consen // Process returns the receipts and logs accumulated during the process and // returns the amount of gas that was used in the process. If any of the // transactions failed to execute due to insufficient gas it will return an error. -func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error) { +func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config, interruptCtx context.Context) (types.Receipts, []*types.Log, uint64, error) { var ( receipts types.Receipts usedGas = new(uint64) @@ -74,6 +75,14 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg) // Iterate over and process the individual transactions for i, tx := range block.Transactions() { + if interruptCtx != nil { + select { + case <-interruptCtx.Done(): + return nil, nil, 0, interruptCtx.Err() + default: + } + } + msg, err := tx.AsMessage(types.MakeSigner(p.config, header.Number), header.BaseFee) if err != nil { return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) diff --git a/core/types.go b/core/types.go index 4c5b74a498..9cdab38483 100644 --- a/core/types.go +++ b/core/types.go @@ -17,6 +17,8 @@ package core import ( + "context" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" @@ -47,5 +49,5 @@ type Processor interface { // Process processes the state changes according to the Ethereum rules by running // the transaction messages using the statedb and applying any rewards to both // the processor (coinbase) and any included uncles. - Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error) + Process(block *types.Block, statedb *state.StateDB, cfg vm.Config, interruptCtx context.Context) (types.Receipts, []*types.Log, uint64, error) } diff --git a/eth/state_accessor.go b/eth/state_accessor.go index f01db93a67..d45b62671b 100644 --- a/eth/state_accessor.go +++ b/eth/state_accessor.go @@ -36,15 +36,15 @@ import ( // base layer statedb can be passed then it's regarded as the statedb of the // parent block. // Parameters: -// - block: The block for which we want the state (== state at the stateRoot of the parent) -// - reexec: The maximum number of blocks to reprocess trying to obtain the desired state -// - base: If the caller is tracing multiple blocks, the caller can provide the parent state -// continuously from the callsite. -// - checklive: if true, then the live 'blockchain' state database is used. If the caller want to -// perform Commit or other 'save-to-disk' changes, this should be set to false to avoid -// storing trash persistently -// - preferDisk: this arg can be used by the caller to signal that even though the 'base' is provided, -// it would be preferrable to start from a fresh state, if we have it on disk. +// - block: The block for which we want the state (== state at the stateRoot of the parent) +// - reexec: The maximum number of blocks to reprocess trying to obtain the desired state +// - base: If the caller is tracing multiple blocks, the caller can provide the parent state +// continuously from the callsite. +// - checklive: if true, then the live 'blockchain' state database is used. If the caller want to +// perform Commit or other 'save-to-disk' changes, this should be set to false to avoid +// storing trash persistently +// - preferDisk: this arg can be used by the caller to signal that even though the 'base' is provided, +// it would be preferrable to start from a fresh state, if we have it on disk. func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (statedb *state.StateDB, err error) { var ( current *types.Block @@ -131,7 +131,9 @@ func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state if current = eth.blockchain.GetBlockByNumber(next); current == nil { return nil, fmt.Errorf("block #%d not found", next) } - _, _, _, err := eth.blockchain.Processor().Process(current, statedb, vm.Config{}) + + _, _, _, err := eth.blockchain.Processor().Process(current, statedb, vm.Config{}, nil) + if err != nil { return nil, fmt.Errorf("processing block %d failed: %v", current.NumberU64(), err) }