diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index 2431b5644a..57cfd5a438 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -637,8 +637,8 @@ func (b *SimulatedBackend) callContract(ctx context.Context, call ethereum.CallM // about the transaction and calling mechanisms. vmEnv := vm.NewEVM(evmContext, txContext, stateDB, b.config, vm.Config{NoBaseFee: true}) gasPool := new(core.GasPool).AddGas(math.MaxUint64) - - return core.NewStateTransition(vmEnv, msg, gasPool).TransitionDb() + // nolint : contextcheck + return core.NewStateTransition(vmEnv, msg, gasPool).TransitionDb(context.Background()) } // SendTransaction updates the pending block to include the given transaction. diff --git a/cmd/evm/internal/t8ntool/execution.go b/cmd/evm/internal/t8ntool/execution.go index c848b953f8..ca0dd5b0d4 100644 --- a/cmd/evm/internal/t8ntool/execution.go +++ b/cmd/evm/internal/t8ntool/execution.go @@ -173,7 +173,7 @@ func (pre *Prestate) Apply(vmConfig vm.Config, chainConfig *params.ChainConfig, evm := vm.NewEVM(vmContext, txContext, statedb, chainConfig, vmConfig) // (ret []byte, usedGas uint64, failed bool, err error) - msgResult, err := core.ApplyMessage(evm, msg, gaspool) + msgResult, err := core.ApplyMessage(evm, msg, gaspool, nil) if err != nil { statedb.RevertToSnapshot(snapshot) log.Info("rejected tx", "index", i, "hash", tx.Hash(), "from", msg.From(), "error", err) diff --git a/consensus/bor/statefull/processor.go b/consensus/bor/statefull/processor.go index 0fe9baeeba..a78359a309 100644 --- a/consensus/bor/statefull/processor.go +++ b/consensus/bor/statefull/processor.go @@ -83,6 +83,7 @@ func ApplyMessage( msg.Data(), msg.Gas(), msg.Value(), + nil, ) // Update the state with pending changes if err != nil { @@ -104,6 +105,7 @@ func ApplyBorMessage(vmenv vm.EVM, msg Callmsg) (*core.ExecutionResult, error) { msg.Data(), msg.Gas(), msg.Value(), + nil, ) // Update the state with pending changes if err != nil { diff --git a/core/chain_makers.go b/core/chain_makers.go index e9944e4744..ee44cc4d75 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -105,7 +105,7 @@ func (b *BlockGen) AddTxWithChain(bc *BlockChain, tx *types.Transaction) { b.SetCoinbase(common.Address{}) } b.statedb.Prepare(tx.Hash(), len(b.txs)) - receipt, err := ApplyTransaction(b.config, bc, &b.header.Coinbase, b.gasPool, b.statedb, b.header, tx, &b.header.GasUsed, vm.Config{}) + receipt, err := ApplyTransaction(b.config, bc, &b.header.Coinbase, b.gasPool, b.statedb, b.header, tx, &b.header.GasUsed, vm.Config{}, nil) if err != nil { panic(err) } diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index 10a1722940..215734a590 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -17,6 +17,7 @@ package core import ( + "context" "sync/atomic" "github.com/ethereum/go-ethereum/consensus" @@ -89,6 +90,6 @@ func precacheTransaction(msg types.Message, config *params.ChainConfig, gaspool // Update the evm with the new transaction context. evm.Reset(NewEVMTxContext(msg), statedb) // Add addresses to access list if applicable - _, err := ApplyMessage(evm, msg, gaspool) + _, err := ApplyMessage(evm, msg, gaspool, context.Background()) return err } diff --git a/core/state_processor.go b/core/state_processor.go index d4c77ae410..28c4cbfddb 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -17,6 +17,7 @@ package core import ( + "context" "fmt" "math/big" @@ -79,7 +80,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) } statedb.Prepare(tx.Hash(), i) - receipt, err := applyTransaction(msg, p.config, p.bc, nil, gp, statedb, blockNumber, blockHash, tx, usedGas, vmenv) + receipt, err := applyTransaction(msg, p.config, p.bc, nil, gp, statedb, blockNumber, blockHash, tx, usedGas, vmenv, nil) if err != nil { return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err) } @@ -92,17 +93,22 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg return receipts, allLogs, *usedGas, nil } -func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, blockNumber *big.Int, blockHash common.Hash, tx *types.Transaction, usedGas *uint64, evm *vm.EVM) (*types.Receipt, error) { +// nolint : unparam +func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, blockNumber *big.Int, blockHash common.Hash, tx *types.Transaction, usedGas *uint64, evm *vm.EVM, interruptCtx context.Context) (*types.Receipt, error) { // Create a new context to be used in the EVM environment. txContext := NewEVMTxContext(msg) evm.Reset(txContext, statedb) // Apply the transaction to the current state (included in the env). - result, err := ApplyMessage(evm, msg, gp) + result, err := ApplyMessage(evm, msg, gp, interruptCtx) if err != nil { return nil, err } + if result.Err == vm.ErrInterrupt { + return nil, result.Err + } + // Update the state with pending changes. var root []byte if config.IsByzantium(blockNumber) { @@ -141,7 +147,7 @@ func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainCon // and uses the input parameters for its environment. It returns the receipt // for the transaction, gas used and an error if the transaction failed, // indicating the block was invalid. -func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, cfg vm.Config) (*types.Receipt, error) { +func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, cfg vm.Config, interruptCtx context.Context) (*types.Receipt, error) { msg, err := tx.AsMessage(types.MakeSigner(config, header.Number), header.BaseFee) if err != nil { return nil, err @@ -149,5 +155,6 @@ func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *commo // Create a new context to be used in the EVM environment blockContext := NewEVMBlockContext(header, bc, author) vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, config, cfg) - return applyTransaction(msg, config, bc, author, gp, statedb, header.Number, header.Hash(), tx, usedGas, vmenv) + + return applyTransaction(msg, config, bc, author, gp, statedb, header.Number, header.Hash(), tx, usedGas, vmenv, interruptCtx) } diff --git a/core/state_transition.go b/core/state_transition.go index 3fc5a635e9..3e6d384719 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -17,6 +17,7 @@ package core import ( + "context" "fmt" "math" "math/big" @@ -179,8 +180,8 @@ func NewStateTransition(evm *vm.EVM, msg Message, gp *GasPool) *StateTransition // the gas used (which includes gas refunds) and an error if it failed. An error always // indicates a core error meaning that the message would always fail for that particular // state and would never be accepted within a block. -func ApplyMessage(evm *vm.EVM, msg Message, gp *GasPool) (*ExecutionResult, error) { - return NewStateTransition(evm, msg, gp).TransitionDb() +func ApplyMessage(evm *vm.EVM, msg Message, gp *GasPool, interruptCtx context.Context) (*ExecutionResult, error) { + return NewStateTransition(evm, msg, gp).TransitionDb(interruptCtx) } // to returns the recipient of the message. @@ -274,7 +275,7 @@ func (st *StateTransition) preCheck() error { // // However if any consensus issue encountered, return the error directly with // nil evm execution result. -func (st *StateTransition) TransitionDb() (*ExecutionResult, error) { +func (st *StateTransition) TransitionDb(interruptCtx context.Context) (*ExecutionResult, error) { input1 := st.state.GetBalance(st.msg.From()) input2 := st.state.GetBalance(st.evm.Context.Coinbase) @@ -327,7 +328,7 @@ func (st *StateTransition) TransitionDb() (*ExecutionResult, error) { } else { // Increment the nonce for the next transaction st.state.SetNonce(msg.From(), st.state.GetNonce(sender.Address())+1) - ret, st.gas, vmerr = st.evm.Call(sender, st.to(), st.data, st.gas, st.value) + ret, st.gas, vmerr = st.evm.Call(sender, st.to(), st.data, st.gas, st.value, interruptCtx) } if !london { diff --git a/core/tests/blockchain_repair_test.go b/core/tests/blockchain_repair_test.go index 0d4a86b069..9cb6b8f899 100644 --- a/core/tests/blockchain_repair_test.go +++ b/core/tests/blockchain_repair_test.go @@ -1815,7 +1815,7 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) { chainConfig.LondonBlock = big.NewInt(0) - _, back, closeFn := miner.NewTestWorker(t, chainConfig, engine, db, 0, 0, 0) + _, back, closeFn := miner.NewTestWorker(t, chainConfig, engine, db, 0, 0, 0, 0) defer closeFn() genesis := back.BlockChain().Genesis() diff --git a/core/vm/evm.go b/core/vm/evm.go index dd55618bf8..31689fa506 100644 --- a/core/vm/evm.go +++ b/core/vm/evm.go @@ -17,6 +17,7 @@ package vm import ( + "context" "math/big" "sync/atomic" "time" @@ -165,7 +166,7 @@ func (evm *EVM) Interpreter() *EVMInterpreter { // parameters. It also handles any necessary value transfer required and takes // the necessary steps to create accounts and reverses the state in case of an // execution error or failed value transfer. -func (evm *EVM) Call(caller ContractRef, addr common.Address, input []byte, gas uint64, value *big.Int) (ret []byte, leftOverGas uint64, err error) { +func (evm *EVM) Call(caller ContractRef, addr common.Address, input []byte, gas uint64, value *big.Int, interruptCtx context.Context) (ret []byte, leftOverGas uint64, err error) { // Fail if we're trying to execute above the call depth limit if evm.depth > int(params.CallCreateDepth) { return nil, gas, ErrDepth @@ -225,7 +226,7 @@ func (evm *EVM) Call(caller ContractRef, addr common.Address, input []byte, gas // The depth-check is already done, and precompiles handled above contract := NewContract(caller, AccountRef(addrCopy), value, gas) contract.SetCallCode(&addrCopy, evm.StateDB.GetCodeHash(addrCopy), code) - ret, err = evm.interpreter.Run(contract, input, false) + ret, err = evm.interpreter.PreRun(contract, input, false, interruptCtx) gas = contract.Gas } } @@ -282,7 +283,7 @@ func (evm *EVM) CallCode(caller ContractRef, addr common.Address, input []byte, // The contract is a scoped environment for this execution context only. contract := NewContract(caller, AccountRef(caller.Address()), value, gas) contract.SetCallCode(&addrCopy, evm.StateDB.GetCodeHash(addrCopy), evm.StateDB.GetCode(addrCopy)) - ret, err = evm.interpreter.Run(contract, input, false) + ret, err = evm.interpreter.PreRun(contract, input, false, nil) gas = contract.Gas } if err != nil { @@ -322,7 +323,7 @@ func (evm *EVM) DelegateCall(caller ContractRef, addr common.Address, input []by // Initialise a new contract and make initialise the delegate values contract := NewContract(caller, AccountRef(caller.Address()), nil, gas).AsDelegate() contract.SetCallCode(&addrCopy, evm.StateDB.GetCodeHash(addrCopy), evm.StateDB.GetCode(addrCopy)) - ret, err = evm.interpreter.Run(contract, input, false) + ret, err = evm.interpreter.PreRun(contract, input, false, nil) gas = contract.Gas } if err != nil { @@ -378,7 +379,7 @@ func (evm *EVM) StaticCall(caller ContractRef, addr common.Address, input []byte // When an error was returned by the EVM or when setting the creation code // above we revert to the snapshot and consume any gas remaining. Additionally // when we're in Homestead this also counts for code storage gas errors. - ret, err = evm.interpreter.Run(contract, input, true) + ret, err = evm.interpreter.PreRun(contract, input, true, nil) gas = contract.Gas } if err != nil { @@ -450,7 +451,7 @@ func (evm *EVM) create(caller ContractRef, codeAndHash *codeAndHash, gas uint64, start := time.Now() - ret, err := evm.interpreter.Run(contract, nil, false) + ret, err := evm.interpreter.PreRun(contract, nil, false, nil) // Check whether the max code size has been exceeded, assign err if the case. if err == nil && evm.chainRules.IsEIP158 && len(ret) > params.MaxCodeSize { diff --git a/core/vm/gas_table_test.go b/core/vm/gas_table_test.go index 6cd126c9b4..f9aec3b40d 100644 --- a/core/vm/gas_table_test.go +++ b/core/vm/gas_table_test.go @@ -93,7 +93,7 @@ func TestEIP2200(t *testing.T) { } vmenv := NewEVM(vmctx, TxContext{}, statedb, params.AllEthashProtocolChanges, Config{ExtraEips: []int{2200}}) - _, gas, err := vmenv.Call(AccountRef(common.Address{}), address, nil, tt.gaspool, new(big.Int)) + _, gas, err := vmenv.Call(AccountRef(common.Address{}), address, nil, tt.gaspool, new(big.Int), nil) if err != tt.failure { t.Errorf("test %d: failure mismatch: have %v, want %v", i, err, tt.failure) } diff --git a/core/vm/instructions.go b/core/vm/instructions.go index db507c4811..2f3608f2b2 100644 --- a/core/vm/instructions.go +++ b/core/vm/instructions.go @@ -392,16 +392,21 @@ func opExtCodeCopy(pc *uint64, interpreter *EVMInterpreter, scope *ScopeContext) // opExtCodeHash returns the code hash of a specified account. // There are several cases when the function is called, while we can relay everything // to `state.GetCodeHash` function to ensure the correctness. -// (1) Caller tries to get the code hash of a normal contract account, state +// +// (1) Caller tries to get the code hash of a normal contract account, state +// // should return the relative code hash and set it as the result. // -// (2) Caller tries to get the code hash of a non-existent account, state should +// (2) Caller tries to get the code hash of a non-existent account, state should +// // return common.Hash{} and zero will be set as the result. // -// (3) Caller tries to get the code hash for an account without contract code, +// (3) Caller tries to get the code hash for an account without contract code, +// // state should return emptyCodeHash(0xc5d246...) as the result. // -// (4) Caller tries to get the code hash of a precompiled account, the result +// (4) Caller tries to get the code hash of a precompiled account, the result +// // should be zero or emptyCodeHash. // // It is worth noting that in order to avoid unnecessary create and clean, @@ -410,10 +415,12 @@ func opExtCodeCopy(pc *uint64, interpreter *EVMInterpreter, scope *ScopeContext) // If the precompile account is not transferred any amount on a private or // customized chain, the return value will be zero. // -// (5) Caller tries to get the code hash for an account which is marked as suicided +// (5) Caller tries to get the code hash for an account which is marked as suicided +// // in the current transaction, the code hash of this account should be returned. // -// (6) Caller tries to get the code hash for an account which is marked as deleted, +// (6) Caller tries to get the code hash for an account which is marked as deleted, +// // this account should be regarded as a non-existent account and zero should be returned. func opExtCodeHash(pc *uint64, interpreter *EVMInterpreter, scope *ScopeContext) ([]byte, error) { slot := scope.Stack.peek() @@ -688,7 +695,7 @@ func opCall(pc *uint64, interpreter *EVMInterpreter, scope *ScopeContext) ([]byt bigVal = value.ToBig() } - ret, returnGas, err := interpreter.evm.Call(scope.Contract, toAddr, args, gas, bigVal) + ret, returnGas, err := interpreter.evm.Call(scope.Contract, toAddr, args, gas, bigVal, nil) if err != nil { temp.Clear() diff --git a/core/vm/interpreter.go b/core/vm/interpreter.go index 21e3c914e1..91f6254039 100644 --- a/core/vm/interpreter.go +++ b/core/vm/interpreter.go @@ -17,11 +17,33 @@ package vm import ( + "context" + "errors" "hash" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + + lru "github.com/hashicorp/golang-lru" +) + +var ( + opcodeCommitInterruptCounter = metrics.NewRegisteredCounter("worker/opcodeCommitInterrupt", nil) + ErrInterrupt = errors.New("EVM execution interrupted") + ErrNoCache = errors.New("no tx cache found") + ErrNoCurrentTx = errors.New("no current tx found in interruptCtx") +) + +const ( + // These are keys for the interruptCtx + InterruptCtxDelayKey = "delay" + InterruptCtxOpcodeDelayKey = "opcodeDelay" + + // InterruptedTxCacheSize is size of lru cache for interrupted txs + InterruptedTxCacheSize = 90000 ) // Config are the configuration options for the Interpreter @@ -64,6 +86,54 @@ type EVMInterpreter struct { returnData []byte // Last CALL's return data for subsequent reuse } +// TxCacher is an wrapper of lru.cache for caching transactions that get interrupted +type TxCache struct { + Cache *lru.Cache +} + +type txCacheKey struct{} +type InterruptedTxContext_currenttxKey struct{} + +// SetCurrentTxOnContext sets the current tx on the context +func SetCurrentTxOnContext(ctx context.Context, txHash common.Hash) context.Context { + return context.WithValue(ctx, InterruptedTxContext_currenttxKey{}, txHash) +} + +// GetCurrentTxFromContext gets the current tx from the context +func GetCurrentTxFromContext(ctx context.Context) (common.Hash, error) { + val := ctx.Value(InterruptedTxContext_currenttxKey{}) + if val == nil { + return common.Hash{}, ErrNoCurrentTx + } + + c, ok := val.(common.Hash) + if !ok { + return common.Hash{}, ErrNoCurrentTx + } + + return c, nil +} + +// GetCache returns the txCache from the context +func GetCache(ctx context.Context) (*TxCache, error) { + val := ctx.Value(txCacheKey{}) + if val == nil { + return nil, ErrNoCache + } + + c, ok := val.(*TxCache) + if !ok { + return nil, ErrNoCache + } + + return c, nil +} + +// PutCache puts the txCache into the context +func PutCache(ctx context.Context, cache *TxCache) context.Context { + return context.WithValue(ctx, txCacheKey{}, cache) +} + // NewEVMInterpreter returns a new instance of the Interpreter. func NewEVMInterpreter(evm *EVM, cfg Config) *EVMInterpreter { // If jump table was not initialised we set the default one. @@ -107,14 +177,192 @@ func NewEVMInterpreter(evm *EVM, cfg Config) *EVMInterpreter { } } +// PreRun is a wrapper around Run that allows for a delay to be injected before each opcode when induced by tests else it calls the lagace Run() method +func (in *EVMInterpreter) PreRun(contract *Contract, input []byte, readOnly bool, interruptCtx context.Context) (ret []byte, err error) { + var opcodeDelay interface{} + + if interruptCtx != nil { + if interruptCtx.Value(InterruptCtxOpcodeDelayKey) != nil { + opcodeDelay = interruptCtx.Value(InterruptCtxOpcodeDelayKey) + } + } + + if opcodeDelay != nil { + return in.RunWithDelay(contract, input, readOnly, interruptCtx, opcodeDelay.(uint)) + } + + return in.Run(contract, input, readOnly, interruptCtx) +} + // Run loops and evaluates the contract's code with the given input data and returns // the return byte-slice and an error if one occurred. // // It's important to note that any errors returned by the interpreter should be // considered a revert-and-consume-all-gas operation except for // ErrExecutionReverted which means revert-and-keep-gas-left. -func (in *EVMInterpreter) Run(contract *Contract, input []byte, readOnly bool) (ret []byte, err error) { +// nolint: gocognit +func (in *EVMInterpreter) Run(contract *Contract, input []byte, readOnly bool, interruptCtx context.Context) (ret []byte, err error) { + // Increment the call depth which is restricted to 1024 + in.evm.depth++ + defer func() { in.evm.depth-- }() + + // Make sure the readOnly is only set if we aren't in readOnly yet. + // This also makes sure that the readOnly flag isn't removed for child calls. + if readOnly && !in.readOnly { + in.readOnly = true + defer func() { in.readOnly = false }() + } + + // Reset the previous call's return data. It's unimportant to preserve the old buffer + // as every returning call will return new data anyway. + in.returnData = nil + + // Don't bother with the execution if there's no code. + if len(contract.Code) == 0 { + return nil, nil + } + + var ( + op OpCode // current opcode + mem = NewMemory() // bound memory + stack = newstack() // local stack + callContext = &ScopeContext{ + Memory: mem, + Stack: stack, + Contract: contract, + } + // For optimisation reason we're using uint64 as the program counter. + // It's theoretically possible to go above 2^64. The YP defines the PC + // to be uint256. Practically much less so feasible. + pc = uint64(0) // program counter + cost uint64 + // copies used by tracer + pcCopy uint64 // needed for the deferred EVMLogger + gasCopy uint64 // for EVMLogger to log gas remaining before execution + logged bool // deferred EVMLogger should ignore already logged steps + res []byte // result of the opcode execution function + ) + // Don't move this deferrred function, it's placed before the capturestate-deferred method, + // so that it get's executed _after_: the capturestate needs the stacks before + // they are returned to the pools + defer func() { + returnStack(stack) + }() + + contract.Input = input + + if in.cfg.Debug { + defer func() { + if err != nil { + if !logged { + in.cfg.Tracer.CaptureState(pcCopy, op, gasCopy, cost, callContext, in.returnData, in.evm.depth, err) + } else { + in.cfg.Tracer.CaptureFault(pcCopy, op, gasCopy, cost, callContext, in.evm.depth, err) + } + } + }() + } + // The Interpreter main run loop (contextual). This loop runs until either an + // explicit STOP, RETURN or SELFDESTRUCT is executed, an error occurred during + // the execution of one of the operations or until the done flag is set by the + // parent context. + for { + if interruptCtx != nil { + // case of interrupting by timeout + select { + case <-interruptCtx.Done(): + txHash, _ := GetCurrentTxFromContext(interruptCtx) + interruptedTxCache, _ := GetCache(interruptCtx) + + // if the tx is already in the cache, it means that it has been interrupted before and we will not interrupt it again + found, _ := interruptedTxCache.Cache.ContainsOrAdd(txHash, true) + if found { + interruptedTxCache.Cache.Remove(txHash) + } else { + // if the tx is not in the cache, it means that it has not been interrupted before and we will interrupt it + opcodeCommitInterruptCounter.Inc(1) + log.Warn("OPCODE Level interrupt") + + return nil, ErrInterrupt + } + default: + } + } + + if in.cfg.Debug { + // Capture pre-execution values for tracing. + logged, pcCopy, gasCopy = false, pc, contract.Gas + } + // Get the operation from the jump table and validate the stack to ensure there are + // enough stack items available to perform the operation. + op = contract.GetOp(pc) + operation := in.cfg.JumpTable[op] + cost = operation.constantGas // For tracing + // Validate stack + if sLen := stack.len(); sLen < operation.minStack { + return nil, &ErrStackUnderflow{stackLen: sLen, required: operation.minStack} + } else if sLen > operation.maxStack { + return nil, &ErrStackOverflow{stackLen: sLen, limit: operation.maxStack} + } + if !contract.UseGas(cost) { + return nil, ErrOutOfGas + } + // nolint : nestif + if operation.dynamicGas != nil { + // All ops with a dynamic memory usage also has a dynamic gas cost. + var memorySize uint64 + // calculate the new memory size and expand the memory to fit + // the operation + // Memory check needs to be done prior to evaluating the dynamic gas portion, + // to detect calculation overflows + if operation.memorySize != nil { + memSize, overflow := operation.memorySize(stack) + if overflow { + return nil, ErrGasUintOverflow + } + // memory is expanded in words of 32 bytes. Gas + // is also calculated in words. + if memorySize, overflow = math.SafeMul(toWordSize(memSize), 32); overflow { + return nil, ErrGasUintOverflow + } + } + // Consume the gas and return an error if not enough gas is available. + // cost is explicitly set so that the capture state defer method can get the proper cost + var dynamicCost uint64 + dynamicCost, err = operation.dynamicGas(in.evm, contract, stack, mem, memorySize) + cost += dynamicCost // for tracing + if err != nil || !contract.UseGas(dynamicCost) { + return nil, ErrOutOfGas + } + if memorySize > 0 { + mem.Resize(memorySize) + } + } + + if in.cfg.Debug { + in.cfg.Tracer.CaptureState(pc, op, gasCopy, cost, callContext, in.returnData, in.evm.depth, err) + + logged = true + } + // execute the operation + res, err = operation.execute(&pc, in, callContext) + if err != nil { + break + } + pc++ + } + + if err == errStopToken { + err = nil // clear stop token error + } + + return res, err +} + +// nolint: gocognit +// RunWithDelay is Run() with a delay between each opcode. Only used by testcases. +func (in *EVMInterpreter) RunWithDelay(contract *Contract, input []byte, readOnly bool, interruptCtx context.Context, opcodeDelay uint) (ret []byte, err error) { // Increment the call depth which is restricted to 1024 in.evm.depth++ defer func() { in.evm.depth-- }() @@ -179,6 +427,31 @@ func (in *EVMInterpreter) Run(contract *Contract, input []byte, readOnly bool) ( // the execution of one of the operations or until the done flag is set by the // parent context. for { + if interruptCtx != nil { + // case of interrupting by timeout + select { + case <-interruptCtx.Done(): + txHash, _ := GetCurrentTxFromContext(interruptCtx) + interruptedTxCache, _ := GetCache(interruptCtx) + // if the tx is already in the cache, it means that it has been interrupted before and we will not interrupt it again + found, _ := interruptedTxCache.Cache.ContainsOrAdd(txHash, true) + log.Info("FOUND", "found", found, "txHash", txHash) + + if found { + interruptedTxCache.Cache.Remove(txHash) + } else { + // if the tx is not in the cache, it means that it has not been interrupted before and we will interrupt it + opcodeCommitInterruptCounter.Inc(1) + log.Warn("OPCODE Level interrupt") + + return nil, ErrInterrupt + } + default: + } + } + + time.Sleep(time.Duration(opcodeDelay) * time.Millisecond) + if in.cfg.Debug { // Capture pre-execution values for tracing. logged, pcCopy, gasCopy = false, pc, contract.Gas diff --git a/core/vm/interpreter_test.go b/core/vm/interpreter_test.go index dfae0f2e2a..9a7affb53d 100644 --- a/core/vm/interpreter_test.go +++ b/core/vm/interpreter_test.go @@ -53,7 +53,7 @@ func TestLoopInterrupt(t *testing.T) { timeout := make(chan bool) go func(evm *EVM) { - _, _, err := evm.Call(AccountRef(common.Address{}), address, nil, math.MaxUint64, new(big.Int)) + _, _, err := evm.Call(AccountRef(common.Address{}), address, nil, math.MaxUint64, new(big.Int), nil) errChannel <- err }(evm) diff --git a/core/vm/runtime/runtime.go b/core/vm/runtime/runtime.go index 7861fb92db..2c5505ec85 100644 --- a/core/vm/runtime/runtime.go +++ b/core/vm/runtime/runtime.go @@ -131,6 +131,7 @@ func Execute(code, input []byte, cfg *Config) ([]byte, *state.StateDB, error) { input, cfg.GasLimit, cfg.Value, + nil, ) return ret, cfg.State, err @@ -186,6 +187,7 @@ func Call(address common.Address, input []byte, cfg *Config) ([]byte, uint64, er input, cfg.GasLimit, cfg.Value, + nil, ) return ret, leftOverGas, err } diff --git a/core/vm/runtime/runtime_test.go b/core/vm/runtime/runtime_test.go index 97673b4906..cf744e3f29 100644 --- a/core/vm/runtime/runtime_test.go +++ b/core/vm/runtime/runtime_test.go @@ -386,12 +386,15 @@ func benchmarkNonModifyingCode(gas uint64, code []byte, name string, tracerCode //cfg.State.CreateAccount(cfg.Origin) // set the receiver's (the executing contract) code for execution. cfg.State.SetCode(destination, code) - vmenv.Call(sender, destination, nil, gas, cfg.Value) + + // nolint: errcheck + vmenv.Call(sender, destination, nil, gas, cfg.Value, nil) b.Run(name, func(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { - vmenv.Call(sender, destination, nil, gas, cfg.Value) + // nolint: errcheck + vmenv.Call(sender, destination, nil, gas, cfg.Value, nil) } }) } diff --git a/eth/state_accessor.go b/eth/state_accessor.go index f01db93a67..956ba5ee1e 100644 --- a/eth/state_accessor.go +++ b/eth/state_accessor.go @@ -36,15 +36,15 @@ import ( // base layer statedb can be passed then it's regarded as the statedb of the // parent block. // Parameters: -// - block: The block for which we want the state (== state at the stateRoot of the parent) -// - reexec: The maximum number of blocks to reprocess trying to obtain the desired state -// - base: If the caller is tracing multiple blocks, the caller can provide the parent state -// continuously from the callsite. -// - checklive: if true, then the live 'blockchain' state database is used. If the caller want to -// perform Commit or other 'save-to-disk' changes, this should be set to false to avoid -// storing trash persistently -// - preferDisk: this arg can be used by the caller to signal that even though the 'base' is provided, -// it would be preferrable to start from a fresh state, if we have it on disk. +// - block: The block for which we want the state (== state at the stateRoot of the parent) +// - reexec: The maximum number of blocks to reprocess trying to obtain the desired state +// - base: If the caller is tracing multiple blocks, the caller can provide the parent state +// continuously from the callsite. +// - checklive: if true, then the live 'blockchain' state database is used. If the caller want to +// perform Commit or other 'save-to-disk' changes, this should be set to false to avoid +// storing trash persistently +// - preferDisk: this arg can be used by the caller to signal that even though the 'base' is provided, +// it would be preferrable to start from a fresh state, if we have it on disk. func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (statedb *state.StateDB, err error) { var ( current *types.Block @@ -191,9 +191,11 @@ func (eth *Ethereum) stateAtTransaction(block *types.Block, txIndex int, reexec // Not yet the searched for transaction, execute on top of the current state vmenv := vm.NewEVM(context, txContext, statedb, eth.blockchain.Config(), vm.Config{}) statedb.Prepare(tx.Hash(), idx) - if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas())); err != nil { + + if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas()), nil); err != nil { return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err) } + // Ensure any modifications are committed to the state // Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect statedb.Finalise(vmenv.ChainConfig().IsEIP158(block.Number())) diff --git a/eth/tracers/api.go b/eth/tracers/api.go index 13f5c627cd..9fb349878c 100644 --- a/eth/tracers/api.go +++ b/eth/tracers/api.go @@ -643,7 +643,8 @@ func (api *API) IntermediateRoots(ctx context.Context, hash common.Hash, config break } } else { - if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(msg.Gas())); err != nil { + // nolint : contextcheck + if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(msg.Gas()), context.Background()); err != nil { log.Warn("Tracing intermediate roots did not complete", "txindex", i, "txhash", tx.Hash(), "err", err) // We intentionally don't return the error here: if we do, then the RPC server will not // return the roots. Most likely, the caller already knows that a certain transaction fails to @@ -779,7 +780,8 @@ func (api *API) traceBlock(ctx context.Context, block *types.Block, config *Trac break } } else { - if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(msg.Gas())); err != nil { + // nolint : contextcheck + if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(msg.Gas()), context.Background()); err != nil { failed = err break } @@ -926,7 +928,8 @@ func (api *API) standardTraceBlockToFile(ctx context.Context, block *types.Block } } } else { - _, err = core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(msg.Gas())) + // nolint : contextcheck + _, err = core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(msg.Gas()), context.Background()) if writer != nil { writer.Flush() } @@ -1138,7 +1141,8 @@ func (api *API) traceTx(ctx context.Context, message core.Message, txctx *Contex return nil, fmt.Errorf("tracing failed: %w", err) } } else { - result, err = core.ApplyMessage(vmenv, message, new(core.GasPool).AddGas(message.Gas())) + // nolint : contextcheck + result, err = core.ApplyMessage(vmenv, message, new(core.GasPool).AddGas(message.Gas()), context.Background()) if err != nil { return nil, fmt.Errorf("tracing failed: %w", err) } diff --git a/eth/tracers/api_bor.go b/eth/tracers/api_bor.go index b93baae432..2351aed2a9 100644 --- a/eth/tracers/api_bor.go +++ b/eth/tracers/api_bor.go @@ -95,7 +95,7 @@ func (api *API) traceBorBlock(ctx context.Context, block *types.Block, config *T callmsg := prepareCallMessage(message) execRes, err = statefull.ApplyBorMessage(*vmenv, callmsg) } else { - execRes, err = core.ApplyMessage(vmenv, message, new(core.GasPool).AddGas(message.Gas())) + execRes, err = core.ApplyMessage(vmenv, message, new(core.GasPool).AddGas(message.Gas()), nil) } if err != nil { diff --git a/eth/tracers/api_test.go b/eth/tracers/api_test.go index d394e4fbe3..48307337c7 100644 --- a/eth/tracers/api_test.go +++ b/eth/tracers/api_test.go @@ -167,14 +167,17 @@ func (b *testBackend) StateAtTransaction(ctx context.Context, block *types.Block for idx, tx := range block.Transactions() { msg, _ := tx.AsMessage(signer, block.BaseFee()) txContext := core.NewEVMTxContext(msg) - context := core.NewEVMBlockContext(block.Header(), b.chain, nil) + blockContext := core.NewEVMBlockContext(block.Header(), b.chain, nil) if idx == txIndex { - return msg, context, statedb, nil + return msg, blockContext, statedb, nil } - vmenv := vm.NewEVM(context, txContext, statedb, b.chainConfig, vm.Config{}) - if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas())); err != nil { + + vmenv := vm.NewEVM(blockContext, txContext, statedb, b.chainConfig, vm.Config{}) + // nolint : contextcheck + if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas()), context.Background()); err != nil { return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err) } + statedb.Finalise(vmenv.ChainConfig().IsEIP158(block.Number())) } return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash()) diff --git a/eth/tracers/internal/tracetest/calltrace_test.go b/eth/tracers/internal/tracetest/calltrace_test.go index cf7c1e6c0d..938edccbd8 100644 --- a/eth/tracers/internal/tracetest/calltrace_test.go +++ b/eth/tracers/internal/tracetest/calltrace_test.go @@ -17,6 +17,7 @@ package tracetest import ( + "context" "encoding/json" "io/ioutil" "math/big" @@ -168,7 +169,7 @@ func testCallTracer(tracerName string, dirPath string, t *testing.T) { Origin: origin, GasPrice: tx.GasPrice(), } - context = vm.BlockContext{ + blockContext = vm.BlockContext{ CanTransfer: core.CanTransfer, Transfer: core.Transfer, Coinbase: test.Context.Miner, @@ -183,13 +184,13 @@ func testCallTracer(tracerName string, dirPath string, t *testing.T) { if err != nil { t.Fatalf("failed to create call tracer: %v", err) } - evm := vm.NewEVM(context, txContext, statedb, test.Genesis.Config, vm.Config{Debug: true, Tracer: tracer}) + evm := vm.NewEVM(blockContext, txContext, statedb, test.Genesis.Config, vm.Config{Debug: true, Tracer: tracer}) msg, err := tx.AsMessage(signer, nil) if err != nil { t.Fatalf("failed to prepare transaction for tracing: %v", err) } st := core.NewStateTransition(evm, msg, new(core.GasPool).AddGas(tx.Gas())) - if _, err = st.TransitionDb(); err != nil { + if _, err = st.TransitionDb(context.Background()); err != nil { t.Fatalf("failed to execute transaction: %v", err) } // Retrieve the trace result and compare against the etalon @@ -279,7 +280,7 @@ func benchTracer(tracerName string, test *callTracerTest, b *testing.B) { Origin: origin, GasPrice: tx.GasPrice(), } - context := vm.BlockContext{ + blockContext := vm.BlockContext{ CanTransfer: core.CanTransfer, Transfer: core.Transfer, Coinbase: test.Context.Miner, @@ -297,15 +298,19 @@ func benchTracer(tracerName string, test *callTracerTest, b *testing.B) { if err != nil { b.Fatalf("failed to create call tracer: %v", err) } - evm := vm.NewEVM(context, txContext, statedb, test.Genesis.Config, vm.Config{Debug: true, Tracer: tracer}) + + evm := vm.NewEVM(blockContext, txContext, statedb, test.Genesis.Config, vm.Config{Debug: true, Tracer: tracer}) snap := statedb.Snapshot() st := core.NewStateTransition(evm, msg, new(core.GasPool).AddGas(tx.Gas())) - if _, err = st.TransitionDb(); err != nil { + + if _, err = st.TransitionDb(context.Background()); err != nil { b.Fatalf("failed to execute transaction: %v", err) } + if _, err = tracer.GetResult(); err != nil { b.Fatal(err) } + statedb.RevertToSnapshot(snap) } } @@ -333,7 +338,7 @@ func TestZeroValueToNotExitCall(t *testing.T) { Origin: origin, GasPrice: big.NewInt(1), } - context := vm.BlockContext{ + blockContext := vm.BlockContext{ CanTransfer: core.CanTransfer, Transfer: core.Transfer, Coinbase: common.Address{}, @@ -363,15 +368,18 @@ func TestZeroValueToNotExitCall(t *testing.T) { if err != nil { t.Fatalf("failed to create call tracer: %v", err) } - evm := vm.NewEVM(context, txContext, statedb, params.MainnetChainConfig, vm.Config{Debug: true, Tracer: tracer}) + + evm := vm.NewEVM(blockContext, txContext, statedb, params.MainnetChainConfig, vm.Config{Debug: true, Tracer: tracer}) msg, err := tx.AsMessage(signer, nil) if err != nil { t.Fatalf("failed to prepare transaction for tracing: %v", err) } st := core.NewStateTransition(evm, msg, new(core.GasPool).AddGas(tx.Gas())) - if _, err = st.TransitionDb(); err != nil { + + if _, err = st.TransitionDb(context.Background()); err != nil { t.Fatalf("failed to execute transaction: %v", err) } + // Retrieve the trace result and compare against the etalon res, err := tracer.GetResult() if err != nil { diff --git a/eth/tracers/js/tracer_test.go b/eth/tracers/js/tracer_test.go index cf0a4aa828..a17e5f3150 100644 --- a/eth/tracers/js/tracer_test.go +++ b/eth/tracers/js/tracer_test.go @@ -69,7 +69,7 @@ func runTrace(tracer tracers.Tracer, vmctx *vmContext, chaincfg *params.ChainCon contract.Code = []byte{byte(vm.PUSH1), 0x1, byte(vm.PUSH1), 0x1, 0x0} tracer.CaptureStart(env, contract.Caller(), contract.Address(), false, []byte{}, startGas, value) - ret, err := env.Interpreter().Run(contract, []byte{}, false) + ret, err := env.Interpreter().Run(contract, []byte{}, false, nil) tracer.CaptureEnd(ret, startGas-contract.Gas, 1, err) if err != nil { return nil, err diff --git a/eth/tracers/logger/logger_test.go b/eth/tracers/logger/logger_test.go index 205ee31120..546a3df87c 100644 --- a/eth/tracers/logger/logger_test.go +++ b/eth/tracers/logger/logger_test.go @@ -59,7 +59,7 @@ func TestStoreCapture(t *testing.T) { contract.Code = []byte{byte(vm.PUSH1), 0x1, byte(vm.PUSH1), 0x0, byte(vm.SSTORE)} var index common.Hash logger.CaptureStart(env, common.Address{}, contract.Address(), false, nil, 0, nil) - _, err := env.Interpreter().Run(contract, []byte{}, false) + _, err := env.Interpreter().PreRun(contract, []byte{}, false, nil) if err != nil { t.Fatal(err) } diff --git a/eth/tracers/tracers_test.go b/eth/tracers/tracers_test.go index ce9289dd75..85cb16a985 100644 --- a/eth/tracers/tracers_test.go +++ b/eth/tracers/tracers_test.go @@ -17,6 +17,7 @@ package tracers import ( + "context" "math/big" "testing" @@ -66,7 +67,7 @@ func BenchmarkTransactionTrace(b *testing.B) { Origin: from, GasPrice: tx.GasPrice(), } - context := vm.BlockContext{ + blockContext := vm.BlockContext{ CanTransfer: core.CanTransfer, Transfer: core.Transfer, Coinbase: common.Address{}, @@ -102,7 +103,7 @@ func BenchmarkTransactionTrace(b *testing.B) { //EnableMemory: false, //EnableReturnData: false, }) - evm := vm.NewEVM(context, txContext, statedb, params.AllEthashProtocolChanges, vm.Config{Debug: true, Tracer: tracer}) + evm := vm.NewEVM(blockContext, txContext, statedb, params.AllEthashProtocolChanges, vm.Config{Debug: true, Tracer: tracer}) msg, err := tx.AsMessage(signer, nil) if err != nil { b.Fatalf("failed to prepare transaction for tracing: %v", err) @@ -113,7 +114,7 @@ func BenchmarkTransactionTrace(b *testing.B) { for i := 0; i < b.N; i++ { snap := statedb.Snapshot() st := core.NewStateTransition(evm, msg, new(core.GasPool).AddGas(tx.Gas())) - _, err = st.TransitionDb() + _, err = st.TransitionDb(context.Background()) if err != nil { b.Fatal(err) } diff --git a/internal/cli/server/config.go b/internal/cli/server/config.go index 47cb9a7848..964770e503 100644 --- a/internal/cli/server/config.go +++ b/internal/cli/server/config.go @@ -304,8 +304,9 @@ type SealerConfig struct { GasPriceRaw string `hcl:"gasprice,optional" toml:"gasprice,optional"` // The time interval for miner to re-create mining work. - Recommit time.Duration `hcl:"-,optional" toml:"-"` - RecommitRaw string `hcl:"recommit,optional" toml:"recommit,optional"` + Recommit time.Duration `hcl:"-,optional" toml:"-"` + RecommitRaw string `hcl:"recommit,optional" toml:"recommit,optional"` + CommitInterruptFlag bool `hcl:"commitinterrupt,optional" toml:"commitinterrupt,optional"` } type JsonRPCConfig struct { @@ -622,12 +623,13 @@ func DefaultConfig() *Config { LifeTime: 3 * time.Hour, }, Sealer: &SealerConfig{ - Enabled: false, - Etherbase: "", - GasCeil: 30_000_000, // geth's default - GasPrice: big.NewInt(1 * params.GWei), // geth's default - ExtraData: "", - Recommit: 125 * time.Second, + Enabled: false, + Etherbase: "", + GasCeil: 30_000_000, // geth's default + GasPrice: big.NewInt(1 * params.GWei), // geth's default + ExtraData: "", + Recommit: 125 * time.Second, + CommitInterruptFlag: true, }, Gpo: &GpoConfig{ Blocks: 20, @@ -916,6 +918,7 @@ func (c *Config) buildEth(stack *node.Node, accountManager *accounts.Manager) (* n.Miner.GasPrice = c.Sealer.GasPrice n.Miner.GasCeil = c.Sealer.GasCeil n.Miner.ExtraData = []byte(c.Sealer.ExtraData) + n.Miner.CommitInterruptFlag = c.Sealer.CommitInterruptFlag if etherbase := c.Sealer.Etherbase; etherbase != "" { if !common.IsHexAddress(etherbase) { diff --git a/internal/cli/server/flags.go b/internal/cli/server/flags.go index 82b99090d4..7e68f249eb 100644 --- a/internal/cli/server/flags.go +++ b/internal/cli/server/flags.go @@ -302,6 +302,13 @@ func (c *Command) Flags() *flagset.Flagset { Default: c.cliConfig.Sealer.Recommit, Group: "Sealer", }) + f.BoolFlag(&flagset.BoolFlag{ + Name: "miner.interruptcommit", + Usage: "Interrupt block commit when block creation time is passed", + Value: &c.cliConfig.Sealer.CommitInterruptFlag, + Default: c.cliConfig.Sealer.CommitInterruptFlag, + Group: "Sealer", + }) // ethstats f.StringFlag(&flagset.StringFlag{ diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 3ce2c6552b..885a5ebf2b 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1027,7 +1027,8 @@ func DoCall(ctx context.Context, b Backend, args TransactionArgs, blockNrOrHash // Execute the message. gp := new(core.GasPool).AddGas(math.MaxUint64) - result, err := core.ApplyMessage(evm, msg, gp) + // nolint : contextcheck + result, err := core.ApplyMessage(evm, msg, gp, context.Background()) if err := vmError(); err != nil { return nil, err } @@ -1595,13 +1596,16 @@ func AccessList(ctx context.Context, b Backend, blockNrOrHash rpc.BlockNumberOrH if err != nil { return nil, 0, nil, err } - res, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(msg.Gas())) + // nolint : contextcheck + res, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(msg.Gas()), context.Background()) if err != nil { return nil, 0, nil, fmt.Errorf("failed to apply transaction: %v err: %v", args.toTransaction().Hash(), err) } + if tracer.Equal(prevTracer) { return accessList, res.UsedGas, res.Err, nil } + prevTracer = tracer } } diff --git a/les/odr_test.go b/les/odr_test.go index ad77abf5b9..291755cfd3 100644 --- a/les/odr_test.go +++ b/les/odr_test.go @@ -137,13 +137,14 @@ func odrContractCall(ctx context.Context, db ethdb.Database, config *params.Chai msg := callmsg{types.NewMessage(from.Address(), &testContractAddr, 0, new(big.Int), 100000, big.NewInt(params.InitialBaseFee), big.NewInt(params.InitialBaseFee), new(big.Int), data, nil, true)} - context := core.NewEVMBlockContext(header, bc, nil) + blockContext := core.NewEVMBlockContext(header, bc, nil) txContext := core.NewEVMTxContext(msg) - vmenv := vm.NewEVM(context, txContext, statedb, config, vm.Config{NoBaseFee: true}) + vmenv := vm.NewEVM(blockContext, txContext, statedb, config, vm.Config{NoBaseFee: true}) //vmenv := core.NewEnv(statedb, config, bc, msg, header, vm.Config{}) gp := new(core.GasPool).AddGas(math.MaxUint64) - result, _ := core.ApplyMessage(vmenv, msg, gp) + // nolint : contextcheck + result, _ := core.ApplyMessage(vmenv, msg, gp, context.Background()) res = append(res, result.Return()...) } } else { @@ -151,11 +152,12 @@ func odrContractCall(ctx context.Context, db ethdb.Database, config *params.Chai state := light.NewState(ctx, header, lc.Odr()) state.SetBalance(bankAddr, math.MaxBig256) msg := callmsg{types.NewMessage(bankAddr, &testContractAddr, 0, new(big.Int), 100000, big.NewInt(params.InitialBaseFee), big.NewInt(params.InitialBaseFee), new(big.Int), data, nil, true)} - context := core.NewEVMBlockContext(header, lc, nil) + blockContext := core.NewEVMBlockContext(header, lc, nil) txContext := core.NewEVMTxContext(msg) - vmenv := vm.NewEVM(context, txContext, state, config, vm.Config{NoBaseFee: true}) + vmenv := vm.NewEVM(blockContext, txContext, state, config, vm.Config{NoBaseFee: true}) gp := new(core.GasPool).AddGas(math.MaxUint64) - result, _ := core.ApplyMessage(vmenv, msg, gp) + // nolint : contextcheck + result, _ := core.ApplyMessage(vmenv, msg, gp, context.Background()) if state.Error() == nil { res = append(res, result.Return()...) } diff --git a/les/state_accessor.go b/les/state_accessor.go index 112e6fd44d..de881032f4 100644 --- a/les/state_accessor.go +++ b/les/state_accessor.go @@ -57,14 +57,15 @@ func (leth *LightEthereum) stateAtTransaction(ctx context.Context, block *types. // Assemble the transaction call message and return if the requested offset msg, _ := tx.AsMessage(signer, block.BaseFee()) txContext := core.NewEVMTxContext(msg) - context := core.NewEVMBlockContext(block.Header(), leth.blockchain, nil) + blockContext := core.NewEVMBlockContext(block.Header(), leth.blockchain, nil) statedb.Prepare(tx.Hash(), idx) if idx == txIndex { - return msg, context, statedb, nil + return msg, blockContext, statedb, nil } // Not yet the searched for transaction, execute on top of the current state - vmenv := vm.NewEVM(context, txContext, statedb, leth.blockchain.Config(), vm.Config{}) - if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas())); err != nil { + vmenv := vm.NewEVM(blockContext, txContext, statedb, leth.blockchain.Config(), vm.Config{}) + // nolint : contextcheck + if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(tx.Gas()), context.Background()); err != nil { return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction %#x failed: %v", tx.Hash(), err) } // Ensure any modifications are committed to the state diff --git a/light/odr_test.go b/light/odr_test.go index 9f4b42e675..c8e5c27ed9 100644 --- a/light/odr_test.go +++ b/light/odr_test.go @@ -196,10 +196,11 @@ func odrContractCall(ctx context.Context, db ethdb.Database, bc *core.BlockChain st.SetBalance(testBankAddress, math.MaxBig256) msg := callmsg{types.NewMessage(testBankAddress, &testContractAddr, 0, new(big.Int), 1000000, big.NewInt(params.InitialBaseFee), big.NewInt(params.InitialBaseFee), new(big.Int), data, nil, true)} txContext := core.NewEVMTxContext(msg) - context := core.NewEVMBlockContext(header, chain, nil) - vmenv := vm.NewEVM(context, txContext, st, config, vm.Config{NoBaseFee: true}) + blockContext := core.NewEVMBlockContext(header, chain, nil) + vmenv := vm.NewEVM(blockContext, txContext, st, config, vm.Config{NoBaseFee: true}) gp := new(core.GasPool).AddGas(math.MaxUint64) - result, _ := core.ApplyMessage(vmenv, msg, gp) + // nolint : contextcheck + result, _ := core.ApplyMessage(vmenv, msg, gp, context.Background()) res = append(res, result.Return()...) if st.Error() != nil { return res, st.Error() diff --git a/miner/fake_miner.go b/miner/fake_miner.go index 38fd2b82d7..b33ae1e05a 100644 --- a/miner/fake_miner.go +++ b/miner/fake_miner.go @@ -212,7 +212,7 @@ var ( // Test accounts testBankKey, _ = crypto.GenerateKey() TestBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey) - testBankFunds = big.NewInt(1000000000000000000) + testBankFunds = big.NewInt(8000000000000000000) testUserKey, _ = crypto.GenerateKey() testUserAddress = crypto.PubkeyToAddress(testUserKey.PublicKey) @@ -222,8 +222,9 @@ var ( newTxs []*types.Transaction testConfig = &Config{ - Recommit: time.Second, - GasCeil: params.GenesisGasLimit, + Recommit: time.Second, + GasCeil: params.GenesisGasLimit, + CommitInterruptFlag: true, } ) diff --git a/miner/miner.go b/miner/miner.go index 20e12c240e..14a6de7c14 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -45,15 +45,16 @@ type Backend interface { // Config is the configuration parameters of mining. type Config struct { - Etherbase common.Address `toml:",omitempty"` // Public address for block mining rewards (default = first account) - Notify []string `toml:",omitempty"` // HTTP URL list to be notified of new work packages (only useful in ethash). - NotifyFull bool `toml:",omitempty"` // Notify with pending block headers instead of work packages - ExtraData hexutil.Bytes `toml:",omitempty"` // Block extra data set by the miner - GasFloor uint64 // Target gas floor for mined blocks. - GasCeil uint64 // Target gas ceiling for mined blocks. - GasPrice *big.Int // Minimum gas price for mining a transaction - Recommit time.Duration // The time interval for miner to re-create mining work. - Noverify bool // Disable remote mining solution verification(only useful in ethash). + Etherbase common.Address `toml:",omitempty"` // Public address for block mining rewards (default = first account) + Notify []string `toml:",omitempty"` // HTTP URL list to be notified of new work packages (only useful in ethash). + NotifyFull bool `toml:",omitempty"` // Notify with pending block headers instead of work packages + ExtraData hexutil.Bytes `toml:",omitempty"` // Block extra data set by the miner + GasFloor uint64 // Target gas floor for mined blocks. + GasCeil uint64 // Target gas ceiling for mined blocks. + GasPrice *big.Int // Minimum gas price for mining a transaction + Recommit time.Duration // The time interval for miner to re-create mining work. + Noverify bool // Disable remote mining solution verification(only useful in ethash). + CommitInterruptFlag bool // Interrupt commit when time is up ( default = true) } // Miner creates blocks and searches for proof-of-work values. diff --git a/miner/test_backend.go b/miner/test_backend.go index 3d4934e052..8573607bf7 100644 --- a/miner/test_backend.go +++ b/miner/test_backend.go @@ -31,6 +31,8 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + + lru "github.com/hashicorp/golang-lru" ) const ( @@ -40,6 +42,10 @@ const ( // testGas is the gas required for contract deployment. testGas = 144109 + + storageContractByteCode = "608060405234801561001057600080fd5b50610150806100206000396000f3fe608060405234801561001057600080fd5b50600436106100365760003560e01c80632e64cec11461003b5780636057361d14610059575b600080fd5b610043610075565b60405161005091906100a1565b60405180910390f35b610073600480360381019061006e91906100ed565b61007e565b005b60008054905090565b8060008190555050565b6000819050919050565b61009b81610088565b82525050565b60006020820190506100b66000830184610092565b92915050565b600080fd5b6100ca81610088565b81146100d557600080fd5b50565b6000813590506100e7816100c1565b92915050565b600060208284031215610103576101026100bc565b5b6000610111848285016100d8565b9150509291505056fea2646970667358221220322c78243e61b783558509c9cc22cb8493dde6925aa5e89a08cdf6e22f279ef164736f6c63430008120033" + storageContractTxCallData = "0x6057361d0000000000000000000000000000000000000000000000000000000000000001" + storageCallTxGas = 100000 ) func init() { @@ -167,6 +173,7 @@ func (b *testWorkerBackend) newRandomUncle() (*types.Block, error) { return blocks[0], err } +// newRandomTx creates a new transaction. func (b *testWorkerBackend) newRandomTx(creation bool) *types.Transaction { var tx *types.Transaction @@ -181,15 +188,54 @@ func (b *testWorkerBackend) newRandomTx(creation bool) *types.Transaction { return tx } -func NewTestWorker(t TensingObject, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, blocks int, noempty uint32, delay uint) (*worker, *testWorkerBackend, func()) { +// newRandomTxWithNonce creates a new transaction with the given nonce. +func (b *testWorkerBackend) newRandomTxWithNonce(creation bool, nonce uint64) *types.Transaction { + var tx *types.Transaction + + gasPrice := big.NewInt(100 * params.InitialBaseFee) + + if creation { + tx, _ = types.SignTx(types.NewContractCreation(b.txPool.Nonce(TestBankAddress), big.NewInt(0), testGas, gasPrice, common.FromHex(testCode)), types.HomesteadSigner{}, testBankKey) + } else { + tx, _ = types.SignTx(types.NewTransaction(nonce, testUserAddress, big.NewInt(1000), params.TxGas, gasPrice, nil), types.HomesteadSigner{}, testBankKey) + } + + return tx +} + +// newRandomTxWithGas creates a new transactionto deploy a storage smart contract. +func (b *testWorkerBackend) newStorageCreateContractTx() (*types.Transaction, common.Address) { + var tx *types.Transaction + + gasPrice := big.NewInt(10 * params.InitialBaseFee) + + tx, _ = types.SignTx(types.NewContractCreation(b.txPool.Nonce(TestBankAddress), big.NewInt(0), testGas, gasPrice, common.FromHex(storageContractByteCode)), types.HomesteadSigner{}, testBankKey) + contractAddr := crypto.CreateAddress(TestBankAddress, b.txPool.Nonce(TestBankAddress)) + + return tx, contractAddr +} + +// newStorageContractCallTx creates a new transaction to call a storage smart contract. +func (b *testWorkerBackend) newStorageContractCallTx(to common.Address, nonce uint64) *types.Transaction { + var tx *types.Transaction + + gasPrice := big.NewInt(10 * params.InitialBaseFee) + + tx, _ = types.SignTx(types.NewTransaction(nonce, to, nil, storageCallTxGas, gasPrice, common.FromHex(storageContractTxCallData)), types.HomesteadSigner{}, testBankKey) + + return tx +} + +// NewTestWorker creates a new test worker with the given parameters. +func NewTestWorker(t TensingObject, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, blocks int, noempty uint32, delay uint, opcodeDelay uint) (*worker, *testWorkerBackend, func()) { backend := newTestWorkerBackend(t, chainConfig, engine, db, blocks) backend.txPool.AddLocals(pendingTxs) var w *worker - if delay != 0 { + if delay != 0 || opcodeDelay != 0 { //nolint:staticcheck - w = newWorkerWithDelay(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false, delay) + w = newWorkerWithDelay(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false, delay, opcodeDelay) } else { //nolint:staticcheck w = newWorker(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false) @@ -203,32 +249,34 @@ func NewTestWorker(t TensingObject, chainConfig *params.ChainConfig, engine cons return w, backend, w.close } -//nolint:staticcheck -func newWorkerWithDelay(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(header *types.Header) bool, init bool, delay uint) *worker { +// newWorkerWithDelay is newWorker() with extra params to induce artficial delays for tests such as commit-interrupt. +// nolint:staticcheck +func newWorkerWithDelay(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(header *types.Header) bool, init bool, delay uint, opcodeDelay uint) *worker { worker := &worker{ - config: config, - chainConfig: chainConfig, - engine: engine, - eth: eth, - mux: mux, - chain: eth.BlockChain(), - isLocalBlock: isLocalBlock, - localUncles: make(map[common.Hash]*types.Block), - remoteUncles: make(map[common.Hash]*types.Block), - unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), sealingLogAtDepth), - pendingTasks: make(map[common.Hash]*task), - txsCh: make(chan core.NewTxsEvent, txChanSize), - chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), - chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), - newWorkCh: make(chan *newWorkReq), - getWorkCh: make(chan *getWorkReq), - taskCh: make(chan *task), - resultCh: make(chan *types.Block, resultQueueSize), - exitCh: make(chan struct{}), - startCh: make(chan struct{}, 1), - resubmitIntervalCh: make(chan time.Duration), - resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), - noempty: 1, + config: config, + chainConfig: chainConfig, + engine: engine, + eth: eth, + mux: mux, + chain: eth.BlockChain(), + isLocalBlock: isLocalBlock, + localUncles: make(map[common.Hash]*types.Block), + remoteUncles: make(map[common.Hash]*types.Block), + unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), sealingLogAtDepth), + pendingTasks: make(map[common.Hash]*task), + txsCh: make(chan core.NewTxsEvent, txChanSize), + chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), + chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), + newWorkCh: make(chan *newWorkReq), + getWorkCh: make(chan *getWorkReq), + taskCh: make(chan *task), + resultCh: make(chan *types.Block, resultQueueSize), + exitCh: make(chan struct{}), + startCh: make(chan struct{}, 1), + resubmitIntervalCh: make(chan time.Duration), + resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), + noempty: 1, + interruptCommitFlag: config.CommitInterruptFlag, } worker.profileCount = new(int32) // Subscribe NewTxsEvent for tx pool @@ -237,6 +285,19 @@ func newWorkerWithDelay(config *Config, chainConfig *params.ChainConfig, engine worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh) + interruptedTxCache, err := lru.New(vm.InterruptedTxCacheSize) + if err != nil { + log.Warn("Failed to create interrupted tx cache", "err", err) + } + + worker.interruptedTxCache = &vm.TxCache{ + Cache: interruptedTxCache, + } + + if !worker.interruptCommitFlag { + worker.noempty = 0 + } + // Sanitize recommit interval if the user-specified one is too short. recommit := worker.config.Recommit if recommit < minRecommitInterval { @@ -248,7 +309,7 @@ func newWorkerWithDelay(config *Config, chainConfig *params.ChainConfig, engine worker.wg.Add(4) - go worker.mainLoopWithDelay(ctx, delay) + go worker.mainLoopWithDelay(ctx, delay, opcodeDelay) go worker.newWorkLoop(ctx, recommit) go worker.resultLoop() go worker.taskLoop() @@ -261,8 +322,9 @@ func newWorkerWithDelay(config *Config, chainConfig *params.ChainConfig, engine return worker } +// mainLoopWithDelay is mainLoop() with extra params to induce artficial delays for tests such as commit-interrupt. // nolint:gocognit -func (w *worker) mainLoopWithDelay(ctx context.Context, delay uint) { +func (w *worker) mainLoopWithDelay(ctx context.Context, delay uint, opcodeDelay uint) { defer w.wg.Done() defer w.txsSub.Unsubscribe() defer w.chainHeadSub.Unsubscribe() @@ -280,7 +342,7 @@ func (w *worker) mainLoopWithDelay(ctx context.Context, delay uint) { select { case req := <-w.newWorkCh: //nolint:contextcheck - w.commitWorkWithDelay(req.ctx, req.interrupt, req.noempty, req.timestamp, delay) + w.commitWorkWithDelay(req.ctx, req.interrupt, req.noempty, req.timestamp, delay, opcodeDelay) case req := <-w.getWorkCh: //nolint:contextcheck @@ -342,6 +404,7 @@ func (w *worker) mainLoopWithDelay(ctx context.Context, delay uint) { // Note all transactions received may not be continuous with transactions // already included in the current sealing block. These transactions will // be automatically eliminated. + // nolint : nestif if !w.isRunning() && w.current != nil { // If block is already full, abort if gp := w.current.gasPool; gp != nil && gp.Gas() < params.TxGas { @@ -358,8 +421,19 @@ func (w *worker) mainLoopWithDelay(ctx context.Context, delay uint) { txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, cmath.FromBig(w.current.header.BaseFee)) tcount := w.current.tcount - interruptCh, stopFn := getInterruptTimer(ctx, w.current, w.chain.CurrentBlock()) - w.commitTransactionsWithDelay(w.current, txset, nil, interruptCh, delay) + var interruptCtx = context.Background() + stopFn := func() {} + defer func() { + stopFn() + }() + + if w.interruptCommitFlag { + interruptCtx, stopFn = getInterruptTimer(ctx, w.current, w.chain.CurrentBlock()) + // nolint : staticcheck + interruptCtx = vm.PutCache(interruptCtx, w.interruptedTxCache) + } + + w.commitTransactionsWithDelay(w.current, txset, nil, interruptCtx) // Only update the snapshot if any new transactions were added // to the pending block @@ -392,166 +466,8 @@ func (w *worker) mainLoopWithDelay(ctx context.Context, delay uint) { } } -// nolint:gocognit -func (w *worker) commitTransactionsWithDelay(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32, interruptCh chan struct{}, delay uint) bool { - gasLimit := env.header.GasLimit - if env.gasPool == nil { - env.gasPool = new(core.GasPool).AddGas(gasLimit) - } - - var coalescedLogs []*types.Log - - initialGasLimit := env.gasPool.Gas() - initialTxs := txs.GetTxs() - - var breakCause string - - defer func() { - log.OnDebug(func(lg log.Logging) { - lg("commitTransactions-stats", - "initialTxsCount", initialTxs, - "initialGasLimit", initialGasLimit, - "resultTxsCount", txs.GetTxs(), - "resultGapPool", env.gasPool.Gas(), - "exitCause", breakCause) - }) - }() - -mainloop: - for { - // case of interrupting by timeout - select { - case <-interruptCh: - commitInterruptCounter.Inc(1) - break mainloop - default: - } - // In the following three cases, we will interrupt the execution of the transaction. - // (1) new head block event arrival, the interrupt signal is 1 - // (2) worker start or restart, the interrupt signal is 1 - // (3) worker recreate the sealing block with any newly arrived transactions, the interrupt signal is 2. - // For the first two cases, the semi-finished work will be discarded. - // For the third case, the semi-finished work will be submitted to the consensus engine. - if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { - // Notify resubmit loop to increase resubmitting interval due to too frequent commits. - if atomic.LoadInt32(interrupt) == commitInterruptResubmit { - ratio := float64(gasLimit-env.gasPool.Gas()) / float64(gasLimit) - if ratio < 0.1 { - // nolint:goconst - ratio = 0.1 - } - w.resubmitAdjustCh <- &intervalAdjust{ - ratio: ratio, - inc: true, - } - } - // nolint:goconst - breakCause = "interrupt" - return atomic.LoadInt32(interrupt) == commitInterruptNewHead - } - // If we don't have enough gas for any further transactions then we're done - if env.gasPool.Gas() < params.TxGas { - log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) - // nolint:goconst - breakCause = "Not enough gas for further transactions" - break - } - // Retrieve the next transaction and abort if all done - tx := txs.Peek() - if tx == nil { - // nolint:goconst - breakCause = "all transactions has been included" - break - } - // Error may be ignored here. The error has already been checked - // during transaction acceptance is the transaction pool. - // - // We use the eip155 signer regardless of the current hf. - from, _ := types.Sender(env.signer, tx) - // Check whether the tx is replay protected. If we're not in the EIP155 hf - // phase, start ignoring the sender until we do. - if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) { - log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block) - - txs.Pop() - continue - } - // Start executing the transaction - env.state.Prepare(tx.Hash(), env.tcount) - - var start time.Time - - log.OnDebug(func(log.Logging) { - start = time.Now() - }) - - logs, err := w.commitTransaction(env, tx) - time.Sleep(time.Duration(delay) * time.Millisecond) - - switch { - case errors.Is(err, core.ErrGasLimitReached): - // Pop the current out-of-gas transaction without shifting in the next from the account - log.Trace("Gas limit exceeded for current block", "sender", from) - txs.Pop() - - case errors.Is(err, core.ErrNonceTooLow): - // New head notification data race between the transaction pool and miner, shift - log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce()) - txs.Shift() - - case errors.Is(err, core.ErrNonceTooHigh): - // Reorg notification data race between the transaction pool and miner, skip account = - log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce()) - txs.Pop() - - case errors.Is(err, nil): - // Everything ok, collect the logs and shift in the next transaction from the same account - coalescedLogs = append(coalescedLogs, logs...) - env.tcount++ - txs.Shift() - - log.OnDebug(func(lg log.Logging) { - lg("Committed new tx", "tx hash", tx.Hash(), "from", from, "to", tx.To(), "nonce", tx.Nonce(), "gas", tx.Gas(), "gasPrice", tx.GasPrice(), "value", tx.Value(), "time spent", time.Since(start)) - }) - - case errors.Is(err, core.ErrTxTypeNotSupported): - // Pop the unsupported transaction without shifting in the next from the account - log.Trace("Skipping unsupported transaction type", "sender", from, "type", tx.Type()) - txs.Pop() - - default: - // Strange error, discard the transaction and get the next in line (note, the - // nonce-too-high clause will prevent us from executing in vain). - log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err) - txs.Shift() - } - } - - if !w.isRunning() && len(coalescedLogs) > 0 { - // We don't push the pendingLogsEvent while we are sealing. The reason is that - // when we are sealing, the worker will regenerate a sealing block every 3 seconds. - // In order to avoid pushing the repeated pendingLog, we disable the pending log pushing. - // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined - // logs by filling in the block hash when the block was mined by the local miner. This can - // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. - cpy := make([]*types.Log, len(coalescedLogs)) - for i, l := range coalescedLogs { - cpy[i] = new(types.Log) - *cpy[i] = *l - } - - w.pendingLogsFeed.Send(cpy) - } - // Notify resubmit loop to decrease resubmitting interval if current interval is larger - // than the user-specified one. - if interrupt != nil { - w.resubmitAdjustCh <- &intervalAdjust{inc: false} - } - - return false -} - -func (w *worker) commitWorkWithDelay(ctx context.Context, interrupt *int32, noempty bool, timestamp int64, delay uint) { +// commitWorkWithDelay is commitWork() with extra params to induce artficial delays for tests such as commit-interrupt. +func (w *worker) commitWorkWithDelay(ctx context.Context, interrupt *int32, noempty bool, timestamp int64, delay uint, opcodeDelay uint) { start := time.Now() var ( @@ -581,15 +497,22 @@ func (w *worker) commitWorkWithDelay(ctx context.Context, interrupt *int32, noem return } - var interruptCh chan struct{} + // nolint : contextcheck + var interruptCtx = context.Background() stopFn := func() {} defer func() { stopFn() }() - if !noempty { - interruptCh, stopFn = getInterruptTimer(ctx, work, w.chain.CurrentBlock()) + if !noempty && w.interruptCommitFlag { + interruptCtx, stopFn = getInterruptTimer(ctx, work, w.chain.CurrentBlock()) + // nolint : staticcheck + interruptCtx = vm.PutCache(interruptCtx, w.interruptedTxCache) + // nolint : staticcheck + interruptCtx = context.WithValue(interruptCtx, vm.InterruptCtxDelayKey, delay) + // nolint : staticcheck + interruptCtx = context.WithValue(interruptCtx, vm.InterruptCtxOpcodeDelayKey, opcodeDelay) } ctx, span := tracing.StartSpan(ctx, "commitWork") @@ -610,7 +533,7 @@ func (w *worker) commitWorkWithDelay(ctx context.Context, interrupt *int32, noem } // Fill pending transactions from the txpool - w.fillTransactionsWithDelay(ctx, interrupt, work, interruptCh, delay) + w.fillTransactionsWithDelay(ctx, interrupt, work, interruptCtx) err = w.commit(ctx, work.copy(), w.fullTaskHook, true, start) if err != nil { @@ -626,8 +549,9 @@ func (w *worker) commitWorkWithDelay(ctx context.Context, interrupt *int32, noem w.current = work } +// fillTransactionsWithDelay is fillTransactions() with extra params to induce artficial delays for tests such as commit-interrupt. // nolint:gocognit -func (w *worker) fillTransactionsWithDelay(ctx context.Context, interrupt *int32, env *environment, interruptCh chan struct{}, delay uint) { +func (w *worker) fillTransactionsWithDelay(ctx context.Context, interrupt *int32, env *environment, interruptCtx context.Context) { ctx, span := tracing.StartSpan(ctx, "fillTransactions") defer tracing.EndSpan(span) @@ -751,7 +675,7 @@ func (w *worker) fillTransactionsWithDelay(ctx context.Context, interrupt *int32 }) tracing.Exec(ctx, "", "worker.LocalCommitTransactions", func(ctx context.Context, span trace.Span) { - committed = w.commitTransactionsWithDelay(env, txs, interrupt, interruptCh, delay) + committed = w.commitTransactionsWithDelay(env, txs, interrupt, interruptCtx) }) if committed { @@ -774,7 +698,7 @@ func (w *worker) fillTransactionsWithDelay(ctx context.Context, interrupt *int32 }) tracing.Exec(ctx, "", "worker.RemoteCommitTransactions", func(ctx context.Context, span trace.Span) { - committed = w.commitTransactionsWithDelay(env, txs, interrupt, interruptCh, delay) + committed = w.commitTransactionsWithDelay(env, txs, interrupt, interruptCtx) }) if committed { @@ -790,3 +714,176 @@ func (w *worker) fillTransactionsWithDelay(ctx context.Context, interrupt *int32 attribute.Int("len of final remote txs", remoteEnvTCount), ) } + +// commitTransactionsWithDelay is commitTransactions() with extra params to induce artficial delays for tests such as commit-interrupt. +// nolint:gocognit, unparam +func (w *worker) commitTransactionsWithDelay(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32, interruptCtx context.Context) bool { + gasLimit := env.header.GasLimit + if env.gasPool == nil { + env.gasPool = new(core.GasPool).AddGas(gasLimit) + } + + var coalescedLogs []*types.Log + + initialGasLimit := env.gasPool.Gas() + initialTxs := txs.GetTxs() + + var breakCause string + + defer func() { + log.OnDebug(func(lg log.Logging) { + lg("commitTransactions-stats", + "initialTxsCount", initialTxs, + "initialGasLimit", initialGasLimit, + "resultTxsCount", txs.GetTxs(), + "resultGapPool", env.gasPool.Gas(), + "exitCause", breakCause) + }) + }() + +mainloop: + for { + if interruptCtx != nil { + // case of interrupting by timeout + select { + case <-interruptCtx.Done(): + log.Warn("Interrupt") + break mainloop + default: + } + } + + // In the following three cases, we will interrupt the execution of the transaction. + // (1) new head block event arrival, the interrupt signal is 1 + // (2) worker start or restart, the interrupt signal is 1 + // (3) worker recreate the sealing block with any newly arrived transactions, the interrupt signal is 2. + // For the first two cases, the semi-finished work will be discarded. + // For the third case, the semi-finished work will be submitted to the consensus engine. + if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { + // Notify resubmit loop to increase resubmitting interval due to too frequent commits. + if atomic.LoadInt32(interrupt) == commitInterruptResubmit { + ratio := float64(gasLimit-env.gasPool.Gas()) / float64(gasLimit) + if ratio < 0.1 { + // nolint:goconst + ratio = 0.1 + } + w.resubmitAdjustCh <- &intervalAdjust{ + ratio: ratio, + inc: true, + } + } + // nolint:goconst + breakCause = "interrupt" + + return atomic.LoadInt32(interrupt) == commitInterruptNewHead + } + // If we don't have enough gas for any further transactions then we're done + if env.gasPool.Gas() < params.TxGas { + log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) + // nolint:goconst + breakCause = "Not enough gas for further transactions" + + break + } + // Retrieve the next transaction and abort if all done + tx := txs.Peek() + if tx == nil { + // nolint:goconst + breakCause = "all transactions has been included" + break + } + // Error may be ignored here. The error has already been checked + // during transaction acceptance is the transaction pool. + // + // We use the eip155 signer regardless of the current hf. + from, _ := types.Sender(env.signer, tx) + // Check whether the tx is replay protected. If we're not in the EIP155 hf + // phase, start ignoring the sender until we do. + if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) { + log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block) + + txs.Pop() + + continue + } + // Start executing the transaction + env.state.Prepare(tx.Hash(), env.tcount) + + var start time.Time + + log.OnDebug(func(log.Logging) { + start = time.Now() + }) + + logs, err := w.commitTransaction(env, tx, interruptCtx) + + if interruptCtx != nil { + if delay := interruptCtx.Value(vm.InterruptCtxDelayKey); delay != nil { + // nolint : durationcheck + time.Sleep(time.Duration(delay.(uint)) * time.Millisecond) + } + } + + switch { + case errors.Is(err, core.ErrGasLimitReached): + // Pop the current out-of-gas transaction without shifting in the next from the account + log.Trace("Gas limit exceeded for current block", "sender", from) + txs.Pop() + + case errors.Is(err, core.ErrNonceTooLow): + // New head notification data race between the transaction pool and miner, shift + log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce()) + txs.Shift() + + case errors.Is(err, core.ErrNonceTooHigh): + // Reorg notification data race between the transaction pool and miner, skip account = + log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce()) + txs.Pop() + + case errors.Is(err, nil): + // Everything ok, collect the logs and shift in the next transaction from the same account + coalescedLogs = append(coalescedLogs, logs...) + env.tcount++ + + txs.Shift() + + log.OnDebug(func(lg log.Logging) { + lg("Committed new tx", "tx hash", tx.Hash(), "from", from, "to", tx.To(), "nonce", tx.Nonce(), "gas", tx.Gas(), "gasPrice", tx.GasPrice(), "value", tx.Value(), "time spent", time.Since(start)) + }) + + case errors.Is(err, core.ErrTxTypeNotSupported): + // Pop the unsupported transaction without shifting in the next from the account + log.Trace("Skipping unsupported transaction type", "sender", from, "type", tx.Type()) + txs.Pop() + + default: + // Strange error, discard the transaction and get the next in line (note, the + // nonce-too-high clause will prevent us from executing in vain). + log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err) + txs.Shift() + } + } + + if !w.isRunning() && len(coalescedLogs) > 0 { + // We don't push the pendingLogsEvent while we are sealing. The reason is that + // when we are sealing, the worker will regenerate a sealing block every 3 seconds. + // In order to avoid pushing the repeated pendingLog, we disable the pending log pushing. + // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined + // logs by filling in the block hash when the block was mined by the local miner. This can + // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. + cpy := make([]*types.Log, len(coalescedLogs)) + for i, l := range coalescedLogs { + cpy[i] = new(types.Log) + *cpy[i] = *l + } + + w.pendingLogsFeed.Send(cpy) + } + // Notify resubmit loop to decrease resubmitting interval if current interval is larger + // than the user-specified one. + if interrupt != nil { + w.resubmitAdjustCh <- &intervalAdjust{inc: false} + } + + return false +} diff --git a/miner/worker.go b/miner/worker.go index 7094245c7a..f79657c580 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -31,6 +31,7 @@ import ( "time" mapset "github.com/deckarep/golang-set" + lru "github.com/hashicorp/golang-lru" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -44,6 +45,7 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" @@ -95,7 +97,7 @@ const ( var ( sealedBlocksCounter = metrics.NewRegisteredCounter("worker/sealedBlocks", nil) sealedEmptyBlocksCounter = metrics.NewRegisteredCounter("worker/sealedEmptyBlocks", nil) - commitInterruptCounter = metrics.NewRegisteredCounter("worker/commitInterrupt", nil) + txCommitInterruptCounter = metrics.NewRegisteredCounter("worker/txCommitInterrupt", nil) ) // environment is the worker's current environment and holds all @@ -273,35 +275,38 @@ type worker struct { fullTaskHook func() // Method to call before pushing the full sealing task. resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval. - profileCount *int32 // Global count for profiling + profileCount *int32 // Global count for profiling + interruptCommitFlag bool // Interrupt commit ( Default true ) + interruptedTxCache *vm.TxCache } //nolint:staticcheck func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(header *types.Header) bool, init bool) *worker { worker := &worker{ - config: config, - chainConfig: chainConfig, - engine: engine, - eth: eth, - mux: mux, - chain: eth.BlockChain(), - isLocalBlock: isLocalBlock, - localUncles: make(map[common.Hash]*types.Block), - remoteUncles: make(map[common.Hash]*types.Block), - unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), sealingLogAtDepth), - pendingTasks: make(map[common.Hash]*task), - txsCh: make(chan core.NewTxsEvent, txChanSize), - chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), - chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), - newWorkCh: make(chan *newWorkReq), - getWorkCh: make(chan *getWorkReq), - taskCh: make(chan *task), - resultCh: make(chan *types.Block, resultQueueSize), - exitCh: make(chan struct{}), - startCh: make(chan struct{}, 1), - resubmitIntervalCh: make(chan time.Duration), - resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), - noempty: 1, + config: config, + chainConfig: chainConfig, + engine: engine, + eth: eth, + mux: mux, + chain: eth.BlockChain(), + isLocalBlock: isLocalBlock, + localUncles: make(map[common.Hash]*types.Block), + remoteUncles: make(map[common.Hash]*types.Block), + unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), sealingLogAtDepth), + pendingTasks: make(map[common.Hash]*task), + txsCh: make(chan core.NewTxsEvent, txChanSize), + chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), + chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), + newWorkCh: make(chan *newWorkReq), + getWorkCh: make(chan *getWorkReq), + taskCh: make(chan *task), + resultCh: make(chan *types.Block, resultQueueSize), + exitCh: make(chan struct{}), + startCh: make(chan struct{}, 1), + resubmitIntervalCh: make(chan time.Duration), + resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), + noempty: 1, + interruptCommitFlag: config.CommitInterruptFlag, } worker.profileCount = new(int32) // Subscribe NewTxsEvent for tx pool @@ -310,6 +315,19 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh) + interruptedTxCache, err := lru.New(vm.InterruptedTxCacheSize) + if err != nil { + log.Warn("Failed to create interrupted tx cache", "err", err) + } + + worker.interruptedTxCache = &vm.TxCache{ + Cache: interruptedTxCache, + } + + if !worker.interruptCommitFlag { + worker.noempty = 0 + } + // Sanitize recommit interval if the user-specified one is too short. recommit := worker.config.Recommit if recommit < minRecommitInterval { @@ -656,8 +674,22 @@ func (w *worker) mainLoop(ctx context.Context) { txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, cmath.FromBig(w.current.header.BaseFee)) tcount := w.current.tcount - interruptCh, stopFn := getInterruptTimer(ctx, w.current, w.chain.CurrentBlock()) - w.commitTransactions(w.current, txset, nil, interruptCh) + // nolint : contextcheck + var interruptCtx = context.Background() + + stopFn := func() {} + + defer func() { + stopFn() + }() + + if w.interruptCommitFlag { + interruptCtx, stopFn = getInterruptTimer(ctx, w.current, w.chain.CurrentBlock()) + // nolint : staticcheck + interruptCtx = vm.PutCache(interruptCtx, w.interruptedTxCache) + } + + w.commitTransactions(w.current, txset, nil, interruptCtx) // Only update the snapshot if any new transactions were added // to the pending block @@ -933,10 +965,12 @@ func (w *worker) updateSnapshot(env *environment) { w.snapshotState = env.state.Copy() } -func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]*types.Log, error) { +func (w *worker) commitTransaction(env *environment, tx *types.Transaction, interruptCtx context.Context) ([]*types.Log, error) { snap := env.state.Snapshot() - receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &env.coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, *w.chain.GetVMConfig()) + // nolint : staticcheck + interruptCtx = vm.SetCurrentTxOnContext(interruptCtx, tx.Hash()) + receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &env.coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, *w.chain.GetVMConfig(), interruptCtx) if err != nil { env.state.RevertToSnapshot(snap) return nil, err @@ -948,7 +982,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]* } //nolint:gocognit -func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32, interruptCh chan struct{}) bool { +func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32, interruptCtx context.Context) bool { gasLimit := env.header.GasLimit if env.gasPool == nil { env.gasPool = new(core.GasPool).AddGas(gasLimit) @@ -973,12 +1007,15 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP mainloop: for { - // case of interrupting by timeout - select { - case <-interruptCh: - commitInterruptCounter.Inc(1) - break mainloop - default: + if interruptCtx != nil { + // case of interrupting by timeout + select { + case <-interruptCtx.Done(): + txCommitInterruptCounter.Inc(1) + log.Warn("Tx Level Interrupt") + break mainloop + default: + } } // In the following three cases, we will interrupt the execution of the transaction. @@ -1038,7 +1075,7 @@ mainloop: start = time.Now() }) - logs, err := w.commitTransaction(env, tx) + logs, err := w.commitTransaction(env, tx, interruptCtx) switch { case errors.Is(err, core.ErrGasLimitReached): @@ -1269,7 +1306,7 @@ func startProfiler(profile string, filepath string, number uint64) (func() error // be customized with the plugin in the future. // //nolint:gocognit -func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *environment, interruptCh chan struct{}) { +func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *environment, interruptCtx context.Context) { ctx, span := tracing.StartSpan(ctx, "fillTransactions") defer tracing.EndSpan(span) @@ -1393,7 +1430,7 @@ func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *en }) tracing.Exec(ctx, "", "worker.LocalCommitTransactions", func(ctx context.Context, span trace.Span) { - committed = w.commitTransactions(env, txs, interrupt, interruptCh) + committed = w.commitTransactions(env, txs, interrupt, interruptCtx) }) if committed { @@ -1416,7 +1453,7 @@ func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *en }) tracing.Exec(ctx, "", "worker.RemoteCommitTransactions", func(ctx context.Context, span trace.Span) { - committed = w.commitTransactions(env, txs, interrupt, interruptCh) + committed = w.commitTransactions(env, txs, interrupt, interruptCtx) }) if committed { @@ -1441,10 +1478,22 @@ func (w *worker) generateWork(ctx context.Context, params *generateParams) (*typ } defer work.discard() - interruptCh, stopFn := getInterruptTimer(ctx, work, w.chain.CurrentBlock()) - defer stopFn() + // nolint : contextcheck + var interruptCtx = context.Background() + + stopFn := func() {} - w.fillTransactions(ctx, nil, work, interruptCh) + defer func() { + stopFn() + }() + + if w.interruptCommitFlag { + interruptCtx, stopFn = getInterruptTimer(ctx, work, w.chain.CurrentBlock()) + // nolint : staticcheck + interruptCtx = vm.PutCache(interruptCtx, w.interruptedTxCache) + } + + w.fillTransactions(ctx, nil, work, interruptCtx) return w.engine.FinalizeAndAssemble(ctx, w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts) } @@ -1482,15 +1531,18 @@ func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool, return } - var interruptCh chan struct{} + // nolint : contextcheck + var interruptCtx = context.Background() stopFn := func() {} defer func() { stopFn() }() - if !noempty { - interruptCh, stopFn = getInterruptTimer(ctx, work, w.chain.CurrentBlock()) + if !noempty && w.interruptCommitFlag { + interruptCtx, stopFn = getInterruptTimer(ctx, work, w.chain.CurrentBlock()) + // nolint : staticcheck + interruptCtx = vm.PutCache(interruptCtx, w.interruptedTxCache) } ctx, span := tracing.StartSpan(ctx, "commitWork") @@ -1511,7 +1563,7 @@ func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool, } // Fill pending transactions from the txpool - w.fillTransactions(ctx, interrupt, work, interruptCh) + w.fillTransactions(ctx, interrupt, work, interruptCtx) err = w.commit(ctx, work.copy(), w.fullTaskHook, true, start) if err != nil { @@ -1527,28 +1579,25 @@ func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool, w.current = work } -func getInterruptTimer(ctx context.Context, work *environment, current *types.Block) (chan struct{}, func()) { +func getInterruptTimer(ctx context.Context, work *environment, current *types.Block) (context.Context, func()) { delay := time.Until(time.Unix(int64(work.header.Time), 0)) - timeoutTimer := time.NewTimer(delay) - stopFn := func() { - timeoutTimer.Stop() - } + interruptCtx, cancel := context.WithTimeout(context.Background(), delay) blockNumber := current.NumberU64() + 1 - interruptCh := make(chan struct{}) go func() { select { - case <-timeoutTimer.C: - log.Info("Commit Interrupt. Pre-committing the current block", "block", blockNumber) - - close(interruptCh) + case <-interruptCtx.Done(): + if interruptCtx.Err() != context.Canceled { + log.Info("Commit Interrupt. Pre-committing the current block", "block", blockNumber) + cancel() + } case <-ctx.Done(): // nothing to do } }() - return interruptCh, stopFn + return interruptCtx, cancel } // commit runs any post-transaction state modifications, assembles the final block diff --git a/miner/worker_test.go b/miner/worker_test.go index 24577a4fb2..0b4e2757fd 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -19,7 +19,6 @@ package miner import ( "math/big" "os" - "sync" "sync/atomic" "testing" "time" @@ -43,21 +42,18 @@ import ( "github.com/ethereum/go-ethereum/tests/bor/mocks" ) +// nolint : paralleltest func TestGenerateBlockAndImportEthash(t *testing.T) { - t.Parallel() - testGenerateBlockAndImport(t, false, false) } +// nolint : paralleltest func TestGenerateBlockAndImportClique(t *testing.T) { - t.Parallel() - testGenerateBlockAndImport(t, true, false) } +// nolint : paralleltest func TestGenerateBlockAndImportBor(t *testing.T) { - t.Parallel() - testGenerateBlockAndImport(t, false, true) } @@ -90,7 +86,7 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool, isBor bool) { chainConfig.LondonBlock = big.NewInt(0) - w, b, _ := NewTestWorker(t, chainConfig, engine, db, 0, 0, 0) + w, b, _ := NewTestWorker(t, chainConfig, engine, db, 0, 0, 0, 0) defer w.close() // This test chain imports the mined blocks. @@ -196,7 +192,7 @@ func TestEmptyWorkClique(t *testing.T) { func testEmptyWork(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { defer engine.Close() - w, _, _ := NewTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0, 0, 0) + w, _, _ := NewTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0, 0, 0, 0) defer w.close() var ( @@ -250,7 +246,7 @@ func TestStreamUncleBlock(t *testing.T) { ethash := ethash.NewFaker() defer ethash.Close() - w, b, _ := NewTestWorker(t, ethashChainConfig, ethash, rawdb.NewMemoryDatabase(), 1, 0, 0) + w, b, _ := NewTestWorker(t, ethashChainConfig, ethash, rawdb.NewMemoryDatabase(), 1, 0, 0, 0) defer w.close() var taskCh = make(chan struct{}) @@ -312,7 +308,7 @@ func TestRegenerateMiningBlockClique(t *testing.T) { func testRegenerateMiningBlock(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { defer engine.Close() - w, b, _ := NewTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0, 0, 0) + w, b, _ := NewTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0, 0, 0, 0) defer w.close() var taskCh = make(chan struct{}, 3) @@ -383,7 +379,7 @@ func TestAdjustIntervalClique(t *testing.T) { func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { defer engine.Close() - w, _, _ := NewTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0, 0, 0) + w, _, _ := NewTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0, 0, 0, 0) defer w.close() w.skipSealHook = func(task *task) bool { @@ -491,7 +487,7 @@ func TestGetSealingWorkPostMerge(t *testing.T) { func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, postMerge bool) { defer engine.Close() - w, b, _ := NewTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0, 0, 0) + w, b, _ := NewTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0, 0, 0, 0) defer w.close() w.setExtra([]byte{0x01, 0x02}) @@ -627,23 +623,41 @@ func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine co } } +// nolint : paralleltest +// TestCommitInterruptExperimentBor tests the commit interrupt experiment for bor consensus by inducing an artificial delay at transaction level. func TestCommitInterruptExperimentBor(t *testing.T) { - t.Parallel() // with 1 sec block time and 200 millisec tx delay we should get 5 txs per block - testCommitInterruptExperimentBor(t, 200, 5) + testCommitInterruptExperimentBor(t, 200, 5, 0) + + time.Sleep(2 * time.Second) // with 1 sec block time and 100 millisec tx delay we should get 10 txs per block - testCommitInterruptExperimentBor(t, 100, 10) + testCommitInterruptExperimentBor(t, 100, 10, 0) } -func testCommitInterruptExperimentBor(t *testing.T, delay uint, txCount int) { - t.Helper() +// nolint : paralleltest +// TestCommitInterruptExperimentBorContract tests the commit interrupt experiment for bor consensus by inducing an artificial delay at OPCODE level. +func TestCommitInterruptExperimentBorContract(t *testing.T) { + // pre-calculated number of OPCODES = 123. 7*123=861 < 1000, 1 tx is possible but 2 tx per block will not be possible. + testCommitInterruptExperimentBorContract(t, 0, 1, 7) + time.Sleep(2 * time.Second) + // pre-calculated number of OPCODES = 123. 2*123=246 < 1000, 4 tx is possible but 5 tx per block will not be possible. But 3 happen due to other overheads. + testCommitInterruptExperimentBorContract(t, 0, 3, 2) + time.Sleep(2 * time.Second) + // pre-calculated number of OPCODES = 123. 3*123=369 < 1000, 2 tx is possible but 3 tx per block will not be possible. + testCommitInterruptExperimentBorContract(t, 0, 2, 3) +} +// nolint : thelper +// testCommitInterruptExperimentBorContract is a helper function for testing the commit interrupt experiment for bor consensus. +func testCommitInterruptExperimentBorContract(t *testing.T, delay uint, txCount int, opcodeDelay uint) { var ( engine consensus.Engine chainConfig *params.ChainConfig db = rawdb.NewMemoryDatabase() ctrl *gomock.Controller + txInTxpool = 100 + txs = make([]*types.Transaction, 0, txInTxpool) ) chainConfig = params.BorUnittestChainConfig @@ -651,38 +665,91 @@ func testCommitInterruptExperimentBor(t *testing.T, delay uint, txCount int) { log.Root().SetHandler(log.LvlFilterHandler(4, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) engine, ctrl = getFakeBorFromConfig(t, chainConfig) + + w, b, _ := NewTestWorker(t, chainConfig, engine, db, 0, 1, delay, opcodeDelay) defer func() { + w.close() engine.Close() + db.Close() ctrl.Finish() }() - w, b, _ := NewTestWorker(t, chainConfig, engine, db, 0, 1, delay) - defer w.close() + // nonce 0 tx + tx, addr := b.newStorageCreateContractTx() + if err := b.TxPool().AddRemote(tx); err != nil { + t.Fatal(err) + } - wg := new(sync.WaitGroup) - wg.Add(1) + time.Sleep(4 * time.Second) - go func() { - wg.Done() + // nonce starts from 1 because we already have one tx + initNonce := uint64(1) - for { - tx := b.newRandomTx(false) - if err := b.TxPool().AddRemote(tx); err != nil { - t.Log(err) - } + for i := 0; i < txInTxpool; i++ { + tx := b.newStorageContractCallTx(addr, initNonce+uint64(i)) + txs = append(txs, tx) + } - time.Sleep(20 * time.Millisecond) - } + if err := b.TxPool().AddRemotes(txs); err != nil { + t.Fatal(err) + } + + // Start mining! + w.start() + time.Sleep(5 * time.Second) + w.stop() + + currentBlockNumber := w.current.header.Number.Uint64() + assert.Check(t, txCount >= w.chain.GetBlockByNumber(currentBlockNumber-1).Transactions().Len()) + assert.Check(t, 0 < w.chain.GetBlockByNumber(currentBlockNumber-1).Transactions().Len()+1) +} + +// nolint : thelper +// testCommitInterruptExperimentBor is a helper function for testing the commit interrupt experiment for bor consensus. +func testCommitInterruptExperimentBor(t *testing.T, delay uint, txCount int, opcodeDelay uint) { + var ( + engine consensus.Engine + chainConfig *params.ChainConfig + db = rawdb.NewMemoryDatabase() + ctrl *gomock.Controller + txInTxpool = 100 + txs = make([]*types.Transaction, 0, txInTxpool) + ) + + chainConfig = params.BorUnittestChainConfig + + log.Root().SetHandler(log.LvlFilterHandler(4, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) + + engine, ctrl = getFakeBorFromConfig(t, chainConfig) + + w, b, _ := NewTestWorker(t, chainConfig, engine, db, 0, 1, delay, opcodeDelay) + defer func() { + w.close() + engine.Close() + db.Close() + ctrl.Finish() }() - wg.Wait() + // nonce starts from 0 because have no txs yet + initNonce := uint64(0) + + for i := 0; i < txInTxpool; i++ { + tx := b.newRandomTxWithNonce(false, initNonce+uint64(i)) + txs = append(txs, tx) + } + + if err := b.TxPool().AddRemotes(txs); err != nil { + t.Fatal(err) + } // Start mining! w.start() time.Sleep(5 * time.Second) w.stop() - assert.Equal(t, txCount, w.chain.CurrentBlock().Transactions().Len()) + currentBlockNumber := w.current.header.Number.Uint64() + assert.Check(t, txCount >= w.chain.GetBlockByNumber(currentBlockNumber-1).Transactions().Len()) + assert.Check(t, 0 < w.chain.GetBlockByNumber(currentBlockNumber-1).Transactions().Len()) } func BenchmarkBorMining(b *testing.B) { @@ -716,7 +783,7 @@ func BenchmarkBorMining(b *testing.B) { chainConfig.LondonBlock = big.NewInt(0) - w, back, _ := NewTestWorker(b, chainConfig, engine, db, 0, 0, 0) + w, back, _ := NewTestWorker(b, chainConfig, engine, db, 0, 0, 0, 0) defer w.close() // This test chain imports the mined blocks. diff --git a/tests/bor/helper.go b/tests/bor/helper.go index c4b45f970d..06d2c6a069 100644 --- a/tests/bor/helper.go +++ b/tests/bor/helper.go @@ -257,7 +257,7 @@ func (b *blockGen) addTxWithChain(bc *core.BlockChain, statedb *state.StateDB, t statedb.Prepare(tx.Hash(), len(b.txs)) - receipt, err := core.ApplyTransaction(bc.Config(), bc, &b.header.Coinbase, b.gasPool, statedb, b.header, tx, &b.header.GasUsed, vm.Config{}) + receipt, err := core.ApplyTransaction(bc.Config(), bc, &b.header.Coinbase, b.gasPool, statedb, b.header, tx, &b.header.GasUsed, vm.Config{}, nil) if err != nil { panic(err) } diff --git a/tests/state_test.go b/tests/state_test.go index f18b84d16e..3ef251de14 100644 --- a/tests/state_test.go +++ b/tests/state_test.go @@ -303,7 +303,7 @@ func runBenchmark(b *testing.B, t *StateTest) { for n := 0; n < b.N; n++ { // Execute the message. snapshot := statedb.Snapshot() - _, _, err = evm.Call(sender, *msg.To(), msg.Data(), msg.Gas(), msg.Value()) + _, _, err = evm.Call(sender, *msg.To(), msg.Data(), msg.Gas(), msg.Value(), nil) if err != nil { b.Error(err) return diff --git a/tests/state_test_util.go b/tests/state_test_util.go index 65f93bfbe3..ffee265e3f 100644 --- a/tests/state_test_util.go +++ b/tests/state_test_util.go @@ -232,7 +232,8 @@ func (t *StateTest) RunNoVerify(subtest StateSubtest, vmconfig vm.Config, snapsh snapshot := statedb.Snapshot() gaspool := new(core.GasPool) gaspool.AddGas(block.GasLimit()) - if _, err := core.ApplyMessage(evm, msg, gaspool); err != nil { + + if _, err := core.ApplyMessage(evm, msg, gaspool, nil); err != nil { statedb.RevertToSnapshot(snapshot) }