Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core, miner, internal/cli : added interruptCommitFlag and cache #832

Merged
merged 6 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 83 additions & 6 deletions core/vm/interpreter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,24 @@ import (
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"

lru "github.com/hashicorp/golang-lru"
)

var (
opcodeCommitInterruptCounter = metrics.NewRegisteredCounter("worker/opcodeCommitInterrupt", nil)
ErrInterrupt = errors.New("EVM execution interrupted")
ErrNoCache = errors.New("no tx cache found")
ErrNoCurrentTx = errors.New("no current tx found in interruptCtx")
)

const (
// These are keys for the interruptCtx
InterruptCtxDelayKey = "delay"
InterruptCtxOpcodeDelayKey = "opcodeDelay"

// InterruptedTxCacheSize is size of lru cache for interrupted txs
InterruptedTxCacheSize = 90000
)

// Config are the configuration options for the Interpreter
Expand Down Expand Up @@ -78,6 +86,54 @@ type EVMInterpreter struct {
returnData []byte // Last CALL's return data for subsequent reuse
}

// TxCacher is an wrapper of lru.cache for caching transactions that get interrupted
type TxCache struct {
Cache *lru.Cache
}

type txCacheKey struct{}
type InterruptedTxContext_currenttxKey struct{}

// SetCurrentTxOnContext sets the current tx on the context
func SetCurrentTxOnContext(ctx context.Context, txHash common.Hash) context.Context {
return context.WithValue(ctx, InterruptedTxContext_currenttxKey{}, txHash)
}

// GetCurrentTxFromContext gets the current tx from the context
func GetCurrentTxFromContext(ctx context.Context) (common.Hash, error) {
val := ctx.Value(InterruptedTxContext_currenttxKey{})
if val == nil {
return common.Hash{}, ErrNoCurrentTx
}

c, ok := val.(common.Hash)
if !ok {
return common.Hash{}, ErrNoCurrentTx
}

return c, nil
}

// GetCache returns the txCache from the context
func GetCache(ctx context.Context) (*TxCache, error) {
val := ctx.Value(txCacheKey{})
if val == nil {
return nil, ErrNoCache
}

c, ok := val.(*TxCache)
if !ok {
return nil, ErrNoCache
}

return c, nil
}

// PutCache puts the txCache into the context
func PutCache(ctx context.Context, cache *TxCache) context.Context {
return context.WithValue(ctx, txCacheKey{}, cache)
}

// NewEVMInterpreter returns a new instance of the Interpreter.
func NewEVMInterpreter(evm *EVM, cfg Config) *EVMInterpreter {
// If jump table was not initialised we set the default one.
Expand Down Expand Up @@ -215,10 +271,20 @@ func (in *EVMInterpreter) Run(contract *Contract, input []byte, readOnly bool, i
// case of interrupting by timeout
select {
case <-interruptCtx.Done():
opcodeCommitInterruptCounter.Inc(1)
log.Warn("OPCODE Level interrupt")
txHash, _ := GetCurrentTxFromContext(interruptCtx)
interruptedTxCache, _ := GetCache(interruptCtx)

// if the tx is already in the cache, it means that it has been interrupted before and we will not interrupt it again
found, _ := interruptedTxCache.Cache.ContainsOrAdd(txHash, true)
if found {
interruptedTxCache.Cache.Remove(txHash)
} else {
// if the tx is not in the cache, it means that it has not been interrupted before and we will interrupt it
opcodeCommitInterruptCounter.Inc(1)
log.Warn("OPCODE Level interrupt")

return nil, ErrInterrupt
return nil, ErrInterrupt
}
default:
}
}
Expand Down Expand Up @@ -365,10 +431,21 @@ func (in *EVMInterpreter) RunWithDelay(contract *Contract, input []byte, readOnl
// case of interrupting by timeout
select {
case <-interruptCtx.Done():
opcodeCommitInterruptCounter.Inc(1)
log.Warn("OPCODE Level interrupt")
txHash, _ := GetCurrentTxFromContext(interruptCtx)
interruptedTxCache, _ := GetCache(interruptCtx)
// if the tx is already in the cache, it means that it has been interrupted before and we will not interrupt it again
found, _ := interruptedTxCache.Cache.ContainsOrAdd(txHash, true)
log.Info("FOUND", "found", found, "txHash", txHash)

if found {
interruptedTxCache.Cache.Remove(txHash)
} else {
// if the tx is not in the cache, it means that it has not been interrupted before and we will interrupt it
opcodeCommitInterruptCounter.Inc(1)
log.Warn("OPCODE Level interrupt")

return nil, ErrInterrupt
return nil, ErrInterrupt
}
default:
}
}
Expand Down
19 changes: 11 additions & 8 deletions internal/cli/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,9 @@ type SealerConfig struct {
GasPriceRaw string `hcl:"gasprice,optional" toml:"gasprice,optional"`

// The time interval for miner to re-create mining work.
Recommit time.Duration `hcl:"-,optional" toml:"-"`
RecommitRaw string `hcl:"recommit,optional" toml:"recommit,optional"`
Recommit time.Duration `hcl:"-,optional" toml:"-"`
RecommitRaw string `hcl:"recommit,optional" toml:"recommit,optional"`
CommitInterruptFlag bool `hcl:"commitinterrupt,optional" toml:"commitinterrupt,optional"`
}

type JsonRPCConfig struct {
Expand Down Expand Up @@ -622,12 +623,13 @@ func DefaultConfig() *Config {
LifeTime: 3 * time.Hour,
},
Sealer: &SealerConfig{
Enabled: false,
Etherbase: "",
GasCeil: 30_000_000, // geth's default
GasPrice: big.NewInt(1 * params.GWei), // geth's default
ExtraData: "",
Recommit: 125 * time.Second,
Enabled: false,
Etherbase: "",
GasCeil: 30_000_000, // geth's default
GasPrice: big.NewInt(1 * params.GWei), // geth's default
ExtraData: "",
Recommit: 125 * time.Second,
CommitInterruptFlag: true,
},
Gpo: &GpoConfig{
Blocks: 20,
Expand Down Expand Up @@ -916,6 +918,7 @@ func (c *Config) buildEth(stack *node.Node, accountManager *accounts.Manager) (*
n.Miner.GasPrice = c.Sealer.GasPrice
n.Miner.GasCeil = c.Sealer.GasCeil
n.Miner.ExtraData = []byte(c.Sealer.ExtraData)
n.Miner.CommitInterruptFlag = c.Sealer.CommitInterruptFlag

if etherbase := c.Sealer.Etherbase; etherbase != "" {
if !common.IsHexAddress(etherbase) {
Expand Down
7 changes: 7 additions & 0 deletions internal/cli/server/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,13 @@ func (c *Command) Flags() *flagset.Flagset {
Default: c.cliConfig.Sealer.Recommit,
Group: "Sealer",
})
f.BoolFlag(&flagset.BoolFlag{
Name: "miner.interruptcommit",
Usage: "Interrupt block commit when block creation time is passed",
Value: &c.cliConfig.Sealer.CommitInterruptFlag,
Default: c.cliConfig.Sealer.CommitInterruptFlag,
Group: "Sealer",
})

// ethstats
f.StringFlag(&flagset.StringFlag{
Expand Down
7 changes: 4 additions & 3 deletions miner/fake_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ var (
// Test accounts
testBankKey, _ = crypto.GenerateKey()
TestBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey)
testBankFunds = big.NewInt(1000000000000000000)
testBankFunds = big.NewInt(8000000000000000000)

testUserKey, _ = crypto.GenerateKey()
testUserAddress = crypto.PubkeyToAddress(testUserKey.PublicKey)
Expand All @@ -222,8 +222,9 @@ var (
newTxs []*types.Transaction

testConfig = &Config{
Recommit: time.Second,
GasCeil: params.GenesisGasLimit,
Recommit: time.Second,
GasCeil: params.GenesisGasLimit,
CommitInterruptFlag: true,
}
)

Expand Down
19 changes: 10 additions & 9 deletions miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,16 @@ type Backend interface {

// Config is the configuration parameters of mining.
type Config struct {
Etherbase common.Address `toml:",omitempty"` // Public address for block mining rewards (default = first account)
Notify []string `toml:",omitempty"` // HTTP URL list to be notified of new work packages (only useful in ethash).
NotifyFull bool `toml:",omitempty"` // Notify with pending block headers instead of work packages
ExtraData hexutil.Bytes `toml:",omitempty"` // Block extra data set by the miner
GasFloor uint64 // Target gas floor for mined blocks.
GasCeil uint64 // Target gas ceiling for mined blocks.
GasPrice *big.Int // Minimum gas price for mining a transaction
Recommit time.Duration // The time interval for miner to re-create mining work.
Noverify bool // Disable remote mining solution verification(only useful in ethash).
Etherbase common.Address `toml:",omitempty"` // Public address for block mining rewards (default = first account)
Notify []string `toml:",omitempty"` // HTTP URL list to be notified of new work packages (only useful in ethash).
NotifyFull bool `toml:",omitempty"` // Notify with pending block headers instead of work packages
ExtraData hexutil.Bytes `toml:",omitempty"` // Block extra data set by the miner
GasFloor uint64 // Target gas floor for mined blocks.
GasCeil uint64 // Target gas ceiling for mined blocks.
GasPrice *big.Int // Minimum gas price for mining a transaction
Recommit time.Duration // The time interval for miner to re-create mining work.
Noverify bool // Disable remote mining solution verification(only useful in ethash).
CommitInterruptFlag bool // Interrupt commit when time is up ( default = true)
}

// Miner creates blocks and searches for proof-of-work values.
Expand Down
80 changes: 55 additions & 25 deletions miner/test_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

lru "github.com/hashicorp/golang-lru"
)

const (
Expand Down Expand Up @@ -251,29 +253,30 @@ func NewTestWorker(t TensingObject, chainConfig *params.ChainConfig, engine cons
// nolint:staticcheck
func newWorkerWithDelay(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(header *types.Header) bool, init bool, delay uint, opcodeDelay uint) *worker {
worker := &worker{
config: config,
chainConfig: chainConfig,
engine: engine,
eth: eth,
mux: mux,
chain: eth.BlockChain(),
isLocalBlock: isLocalBlock,
localUncles: make(map[common.Hash]*types.Block),
remoteUncles: make(map[common.Hash]*types.Block),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), sealingLogAtDepth),
pendingTasks: make(map[common.Hash]*task),
txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
newWorkCh: make(chan *newWorkReq),
getWorkCh: make(chan *getWorkReq),
taskCh: make(chan *task),
resultCh: make(chan *types.Block, resultQueueSize),
exitCh: make(chan struct{}),
startCh: make(chan struct{}, 1),
resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
noempty: 1,
config: config,
chainConfig: chainConfig,
engine: engine,
eth: eth,
mux: mux,
chain: eth.BlockChain(),
isLocalBlock: isLocalBlock,
localUncles: make(map[common.Hash]*types.Block),
remoteUncles: make(map[common.Hash]*types.Block),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), sealingLogAtDepth),
pendingTasks: make(map[common.Hash]*task),
txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
newWorkCh: make(chan *newWorkReq),
getWorkCh: make(chan *getWorkReq),
taskCh: make(chan *task),
resultCh: make(chan *types.Block, resultQueueSize),
exitCh: make(chan struct{}),
startCh: make(chan struct{}, 1),
resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
noempty: 1,
interruptCommitFlag: config.CommitInterruptFlag,
}
worker.profileCount = new(int32)
// Subscribe NewTxsEvent for tx pool
Expand All @@ -282,6 +285,19 @@ func newWorkerWithDelay(config *Config, chainConfig *params.ChainConfig, engine
worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)

interruptedTxCache, err := lru.New(vm.InterruptedTxCacheSize)
if err != nil {
log.Warn("Failed to create interrupted tx cache", "err", err)
}

worker.interruptedTxCache = &vm.TxCache{
Cache: interruptedTxCache,
}

if !worker.interruptCommitFlag {
worker.noempty = 0
}

// Sanitize recommit interval if the user-specified one is too short.
recommit := worker.config.Recommit
if recommit < minRecommitInterval {
Expand Down Expand Up @@ -388,6 +404,7 @@ func (w *worker) mainLoopWithDelay(ctx context.Context, delay uint, opcodeDelay
// Note all transactions received may not be continuous with transactions
// already included in the current sealing block. These transactions will
// be automatically eliminated.
// nolint : nestif
if !w.isRunning() && w.current != nil {
// If block is already full, abort
if gp := w.current.gasPool; gp != nil && gp.Gas() < params.TxGas {
Expand All @@ -404,7 +421,18 @@ func (w *worker) mainLoopWithDelay(ctx context.Context, delay uint, opcodeDelay
txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, cmath.FromBig(w.current.header.BaseFee))
tcount := w.current.tcount

interruptCtx, stopFn := getInterruptTimer(ctx, w.current, w.chain.CurrentBlock())
var interruptCtx = context.Background()
stopFn := func() {}
defer func() {
stopFn()
}()

if w.interruptCommitFlag {
interruptCtx, stopFn = getInterruptTimer(ctx, w.current, w.chain.CurrentBlock())
// nolint : staticcheck
interruptCtx = vm.PutCache(interruptCtx, w.interruptedTxCache)
}

w.commitTransactionsWithDelay(w.current, txset, nil, interruptCtx)

// Only update the snapshot if any new transactions were added
Expand Down Expand Up @@ -477,9 +505,11 @@ func (w *worker) commitWorkWithDelay(ctx context.Context, interrupt *int32, noem
stopFn()
}()

if !noempty {
if !noempty && w.interruptCommitFlag {
interruptCtx, stopFn = getInterruptTimer(ctx, work, w.chain.CurrentBlock())
// nolint : staticcheck
interruptCtx = vm.PutCache(interruptCtx, w.interruptedTxCache)
// nolint : staticcheck
interruptCtx = context.WithValue(interruptCtx, vm.InterruptCtxDelayKey, delay)
// nolint : staticcheck
interruptCtx = context.WithValue(interruptCtx, vm.InterruptCtxOpcodeDelayKey, opcodeDelay)
Expand Down
Loading