Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

State processor interrupt #819

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1803,7 +1803,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)

// Process block using the parent state as reference point
substart := time.Now()
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig)
receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig, nil)
if err != nil {
bc.reportBlock(block, receipts, err)
atomic.StoreUint32(&followupInterrupt, 1)
Expand Down
34 changes: 30 additions & 4 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package core

import (
"context"
"errors"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -123,9 +124,11 @@ func testFork(t *testing.T, blockchain *BlockChain, i, n int, full bool, compara
if full {
cur := blockchain.CurrentBlock()
tdPre = blockchain.GetTd(cur.Hash(), cur.NumberU64())
if err := testBlockChainImport(blockChainB, blockchain); err != nil {

if err := testBlockChainImport(blockChainB, blockchain, nil); err != nil {
t.Fatalf("failed to import forked block chain: %v", err)
}

last := blockChainB[len(blockChainB)-1]
tdPost = blockchain.GetTd(last.Hash(), last.NumberU64())
} else {
Expand All @@ -143,7 +146,7 @@ func testFork(t *testing.T, blockchain *BlockChain, i, n int, full bool, compara

// testBlockChainImport tries to process a chain of blocks, writing them into
// the database if successful.
func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
func testBlockChainImport(chain types.Blocks, blockchain *BlockChain, ctx context.Context) error {
for _, block := range chain {
// Try and process the block
err := blockchain.engine.VerifyHeader(blockchain, block.Header(), true)
Expand All @@ -156,11 +159,14 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
}
return err
}

statedb, err := state.New(blockchain.GetBlockByHash(block.ParentHash()).Root(), blockchain.stateCache, nil)

if err != nil {
return err
}
receipts, _, usedGas, err := blockchain.processor.Process(block, statedb, vm.Config{})

receipts, _, usedGas, err := blockchain.processor.Process(block, statedb, vm.Config{}, ctx)
if err != nil {
blockchain.reportBlock(block, receipts, err)
return err
Expand All @@ -180,6 +186,26 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
return nil
}

func TestBlockChainImportInterrupt(t *testing.T) {
t.Parallel()

db, blockchain, err := newCanonical(ethash.NewFaker(), 10, true)
if err != nil {
t.Fatalf("failed to make new canonical chain: %v", err)
}

defer blockchain.Stop()

blockChainB := makeFakeNonEmptyBlockChain(blockchain.CurrentBlock(), 5, ethash.NewFaker(), db, forkSeed, 5)

ctx, cancel := context.WithCancel(context.Background())
cancel()

if err := testBlockChainImport(blockChainB, blockchain, ctx); err != context.Canceled {
t.Errorf("block chain import is not cancelled correctly, got %v, want %v", err, context.Canceled)
}
}

// testHeaderChainImport tries to process a chain of header, writing them into
// the database if successful.
func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error {
Expand Down Expand Up @@ -476,7 +502,7 @@ func testBrokenChain(t *testing.T, full bool) {
// Create a forked chain, and try to insert with a missing link
if full {
chain := makeBlockChain(blockchain.CurrentBlock(), 5, ethash.NewFaker(), db, forkSeed)[1:]
if err := testBlockChainImport(chain, blockchain); err == nil {
if err := testBlockChainImport(chain, blockchain, nil); err == nil {
t.Errorf("broken block chain not reported")
}
} else {
Expand Down
13 changes: 13 additions & 0 deletions core/chain_makers.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,19 @@ func makeBlockChain(parent *types.Block, n int, engine consensus.Engine, db ethd
return blocks
}

// makeBlockChain creates a deterministic chain of blocks rooted at parent with fake invalid transactions.
func makeFakeNonEmptyBlockChain(parent *types.Block, n int, engine consensus.Engine, db ethdb.Database, seed int, numTx int) []*types.Block {
blocks, _ := GenerateChain(params.TestChainConfig, parent, engine, db, n, func(i int, b *BlockGen) {
addr := common.Address{0: byte(seed), 19: byte(i)}
b.SetCoinbase(addr)
for j := 0; j < numTx; j++ {
b.txs = append(b.txs, types.NewTransaction(0, addr, big.NewInt(1000), params.TxGas, nil, nil))
}
})

return blocks
}

type fakeChainReader struct {
config *params.ChainConfig
stateSyncData []*types.StateSyncData
Expand Down
3 changes: 2 additions & 1 deletion core/parallel_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package core

import (
"context"
"fmt"
"math/big"
"time"
Expand Down Expand Up @@ -273,7 +274,7 @@ var parallelizabilityTimer = metrics.NewRegisteredTimer("block/parallelizability
// returns the amount of gas that was used in the process. If any of the
// transactions failed to execute due to insufficient gas it will return an error.
// nolint:gocognit
func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error) {
func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config, interruptCtx context.Context) (types.Receipts, []*types.Log, uint64, error) {
blockstm.SetProcs(cfg.ParallelSpeculativeProcesses)

var (
Expand Down
11 changes: 10 additions & 1 deletion core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package core

import (
"context"
"fmt"
"math/big"

Expand Down Expand Up @@ -56,7 +57,7 @@ func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consen
// Process returns the receipts and logs accumulated during the process and
// returns the amount of gas that was used in the process. If any of the
// transactions failed to execute due to insufficient gas it will return an error.
func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error) {
func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config, interruptCtx context.Context) (types.Receipts, []*types.Log, uint64, error) {
var (
receipts types.Receipts
usedGas = new(uint64)
Expand All @@ -74,6 +75,14 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg)
// Iterate over and process the individual transactions
for i, tx := range block.Transactions() {
if interruptCtx != nil {
select {
case <-interruptCtx.Done():
return nil, nil, 0, interruptCtx.Err()
default:
}
}

msg, err := tx.AsMessage(types.MakeSigner(p.config, header.Number), header.BaseFee)
if err != nil {
return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
Expand Down
4 changes: 3 additions & 1 deletion core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package core

import (
"context"

"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
Expand Down Expand Up @@ -47,5 +49,5 @@ type Processor interface {
// Process processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb and applying any rewards to both
// the processor (coinbase) and any included uncles.
Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error)
Process(block *types.Block, statedb *state.StateDB, cfg vm.Config, interruptCtx context.Context) (types.Receipts, []*types.Log, uint64, error)
}
22 changes: 12 additions & 10 deletions eth/state_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ import (
// base layer statedb can be passed then it's regarded as the statedb of the
// parent block.
// Parameters:
// - block: The block for which we want the state (== state at the stateRoot of the parent)
// - reexec: The maximum number of blocks to reprocess trying to obtain the desired state
// - base: If the caller is tracing multiple blocks, the caller can provide the parent state
// continuously from the callsite.
// - checklive: if true, then the live 'blockchain' state database is used. If the caller want to
// perform Commit or other 'save-to-disk' changes, this should be set to false to avoid
// storing trash persistently
// - preferDisk: this arg can be used by the caller to signal that even though the 'base' is provided,
// it would be preferrable to start from a fresh state, if we have it on disk.
// - block: The block for which we want the state (== state at the stateRoot of the parent)
// - reexec: The maximum number of blocks to reprocess trying to obtain the desired state
// - base: If the caller is tracing multiple blocks, the caller can provide the parent state
// continuously from the callsite.
// - checklive: if true, then the live 'blockchain' state database is used. If the caller want to
// perform Commit or other 'save-to-disk' changes, this should be set to false to avoid
// storing trash persistently
// - preferDisk: this arg can be used by the caller to signal that even though the 'base' is provided,
// it would be preferrable to start from a fresh state, if we have it on disk.
func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (statedb *state.StateDB, err error) {
var (
current *types.Block
Expand Down Expand Up @@ -131,7 +131,9 @@ func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state
if current = eth.blockchain.GetBlockByNumber(next); current == nil {
return nil, fmt.Errorf("block #%d not found", next)
}
_, _, _, err := eth.blockchain.Processor().Process(current, statedb, vm.Config{})

_, _, _, err := eth.blockchain.Processor().Process(current, statedb, vm.Config{}, nil)

if err != nil {
return nil, fmt.Errorf("processing block %d failed: %v", current.NumberU64(), err)
}
Expand Down