From 5c88f9f964c7106090e74e35e089839289c3127b Mon Sep 17 00:00:00 2001 From: setunapo Date: Fri, 28 Oct 2022 12:01:18 +0800 Subject: [PATCH 1/4] worker: enhancement of the current block generation logic. Currently, validator only try once to get transactions from TxPool to produce the block. However, new transactions could arrive while the validator is committing transaction. Validator should be allowed to add these new arrived transactions as long as Header.Timestamp is not reached This commit will: ** commitTransactions return with error code ** drop current mining block on new block imported ** try fillTransactions several times for the best not use append mode to follow the GasPrice rule. ** check if there is enough time for another fillTransactions. --- consensus/beacon/consensus.go | 4 + consensus/clique/clique.go | 5 + consensus/consensus.go | 6 ++ consensus/ethash/consensus.go | 4 + consensus/parlia/parlia.go | 5 + miner/worker.go | 181 ++++++++++++++++++++++++++++------ 6 files changed, 175 insertions(+), 30 deletions(-) diff --git a/consensus/beacon/consensus.go b/consensus/beacon/consensus.go index 8282ed7cb4..4f4c272a0a 100644 --- a/consensus/beacon/consensus.go +++ b/consensus/beacon/consensus.go @@ -363,6 +363,10 @@ func (beacon *Beacon) SetThreads(threads int) { } } +func (p *Beacon) DropOnNewBlock(*types.Header) bool { + return true +} + // IsTTDReached checks if the TotalTerminalDifficulty has been surpassed on the `parentHash` block. // It depends on the parentHash already being stored in the database. // If the parentHash is not stored in the database a UnknownAncestor error is returned. diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index a258f1fe5f..11287e74ef 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -705,6 +705,11 @@ func (c *Clique) APIs(chain consensus.ChainHeaderReader) []rpc.API { }} } +func (p *Clique) DropOnNewBlock(header *types.Header) bool { + // drop the block if it is not in turn. + return header.Difficulty.Cmp(diffNoTurn) == 0 +} + // SealHash returns the hash of a block prior to it being sealed. func SealHash(header *types.Header) (hash common.Hash) { hasher := sha3.NewLegacyKeccak256() diff --git a/consensus/consensus.go b/consensus/consensus.go index 87632a9d0d..367a703678 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -130,6 +130,12 @@ type Engine interface { // Close terminates any background threads maintained by the consensus engine. Close() error + + // DropOnNewBlock determine the action of mining when it is interrupted by new imported block. + // Return + // true: the mining result will be dropped + // false: the mining result will be kept and move on to the next mine step. + DropOnNewBlock(header *types.Header) bool } // PoW is a consensus engine based on proof-of-work. diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index 12a69c127a..0bf77a8ae0 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -647,6 +647,10 @@ var ( big32 = big.NewInt(32) ) +func (p *Ethash) DropOnNewBlock(*types.Header) bool { + return true +} + // AccumulateRewards credits the coinbase of the given block with the mining // reward. The total reward consists of the static block reward and rewards for // included uncles. The coinbase of each uncle block is also rewarded. diff --git a/consensus/parlia/parlia.go b/consensus/parlia/parlia.go index 2e544803ef..c16b36ac12 100644 --- a/consensus/parlia/parlia.go +++ b/consensus/parlia/parlia.go @@ -976,6 +976,11 @@ func (p *Parlia) CalcDifficulty(chain consensus.ChainHeaderReader, time uint64, return CalcDifficulty(snap, p.val) } +func (p *Parlia) DropOnNewBlock(header *types.Header) bool { + // drop the block if it is not in turn. + return header.Difficulty.Cmp(diffNoTurn) == 0 +} + // CalcDifficulty is the difficulty adjustment algorithm. It returns the difficulty // that a new block should have based on the previous blocks in the chain and the // current signer. diff --git a/miner/worker.go b/miner/worker.go index 1d44ee0fa5..407dc7fcc1 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -19,6 +19,7 @@ package miner import ( "errors" "fmt" + "math/big" "sync" "sync/atomic" "time" @@ -67,6 +68,11 @@ const ( var ( writeBlockTimer = metrics.NewRegisteredTimer("worker/writeblock", nil) finalizeBlockTimer = metrics.NewRegisteredTimer("worker/finalizeblock", nil) + + errBlockInterruptedByNewHead = errors.New("new head arrived while building block") + errBlockInterruptedByRecommit = errors.New("recommit interrupt while building block") + errBlockInterruptedByTimeout = errors.New("timeout while building block") + errBlockInterruptedByOutOfGas = errors.New("out of gas while building block") ) // environment is the worker's current environment and holds all @@ -142,8 +148,11 @@ type task struct { } const ( - commitInterruptNewHead int32 = 1 - commitInterruptResubmit int32 = 2 + commitInterruptNone int32 = iota + commitInterruptNewHead + commitInterruptResubmit + commitInterruptTimeout + commitInterruptOutOfGas ) // newWorkReq represents a request for new sealing work submitting with relative interrupt notifier. @@ -754,7 +763,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction, rece } func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, - interruptCh chan int32, stopTimer *time.Timer) bool { + interruptCh chan int32, stopTimer *time.Timer) error { gasLimit := env.header.GasLimit if env.gasPool == nil { env.gasPool = new(core.GasPool).AddGas(gasLimit) @@ -766,7 +775,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP } var coalescedLogs []*types.Log - // initilise bloom processors + // initialize bloom processors processorCapacity := 100 if txs.CurrentSize() < processorCapacity { processorCapacity = txs.CurrentSize() @@ -781,6 +790,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP txCurr := &tx w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.CopyDoPrefetch(), *w.chain.GetVMConfig(), stopPrefetchCh, txCurr) + signal := commitInterruptNone LOOP: for { // In the following three cases, we will interrupt the execution of the transaction. @@ -791,18 +801,19 @@ LOOP: // For the third case, the semi-finished work will be submitted to the consensus engine. if interruptCh != nil { select { - case reason, ok := <-interruptCh: + case signal, ok := <-interruptCh: if !ok { // should never be here, since interruptCh should not be read before log.Warn("commit transactions stopped unknown") } - return reason == commitInterruptNewHead + return signalToErr(signal) default: } } // If we don't have enough gas for any further transactions then we're done if env.gasPool.Gas() < params.TxGas { log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) + signal = commitInterruptOutOfGas break } if stopTimer != nil { @@ -810,6 +821,7 @@ LOOP: case <-stopTimer.C: log.Info("Not enough time for further transactions", "txs", len(env.txs)) stopTimer.Reset(0) // re-active the timer, in case it will be used later. + signal = commitInterruptTimeout break LOOP default: } @@ -885,7 +897,7 @@ LOOP: } w.pendingLogsFeed.Send(cpy) } - return false + return signalToErr(signal) } // generateParams wraps various of settings for generating sealing task. @@ -988,7 +1000,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { // fillTransactions retrieves the pending transactions from the txpool and fills them // into the given sealing block. The transaction selection and ordering strategy can // be customized with the plugin in the future. -func (w *worker) fillTransactions(interruptCh chan int32, env *environment) { +func (w *worker) fillTransactions(interruptCh chan int32, env *environment, stopTimer *time.Timer) (err error) { // Split the pending transactions into locals and remotes // Fill the block with all available pending transactions. pending := w.eth.TxPool().Pending(false) @@ -1000,26 +1012,23 @@ func (w *worker) fillTransactions(interruptCh chan int32, env *environment) { } } - var stopTimer *time.Timer - delay := w.engine.Delay(w.chain, env.header, &w.config.DelayLeftOver) - if delay != nil { - stopTimer = time.NewTimer(*delay) - log.Debug("Time left for mining work", "delay", delay.String()) - defer stopTimer.Stop() - } - + err = nil if len(localTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) - if w.commitTransactions(env, txs, interruptCh, stopTimer) { + err = w.commitTransactions(env, txs, interruptCh, stopTimer) + if err == errBlockInterruptedByNewHead || err == errBlockInterruptedByOutOfGas || err == errBlockInterruptedByTimeout { return } } if len(remoteTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee) - if w.commitTransactions(env, txs, interruptCh, stopTimer) { + err = w.commitTransactions(env, txs, interruptCh, stopTimer) + if err == errBlockInterruptedByNewHead || err == errBlockInterruptedByOutOfGas || err == errBlockInterruptedByTimeout { return } } + + return } // generateWork generates a sealing block based on the given parameters. @@ -1030,7 +1039,7 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) { } defer work.discard() - w.fillTransactions(nil, work) + w.fillTransactions(nil, work, nil) block, _, err := w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts) return block, err } @@ -1049,24 +1058,117 @@ func (w *worker) commitWork(interruptCh chan int32, timestamp int64) { } coinbase = w.coinbase // Use the preset address as the fee recipient } - work, err := w.prepareWork(&generateParams{ - timestamp: uint64(timestamp), - coinbase: coinbase, - }) - if err != nil { - return - } - // Fill pending transactions from the txpool - w.fillTransactions(interruptCh, work) - w.commit(work, w.fullTaskHook, true, start) + stopTimer := time.NewTimer(0) + defer stopTimer.Stop() + <-stopTimer.C // discard the initial tick + + // validator can try several times to get the most profitable block, + // as long as the timestamp is not reached. + workList := make([]*environment, 0, 10) + var bestWork *environment + // workList clean up + defer func() { + for _, w := range workList { + // only keep the best work, discard others. + if w == bestWork { + continue + } + w.discard() + } + }() +LOOP: + for { + work, err := w.prepareWork(&generateParams{ + timestamp: uint64(timestamp), + coinbase: coinbase, + }) + if err != nil { + return + } + + workList = append(workList, work) + + delay := w.engine.Delay(w.chain, work.header, &w.config.DelayLeftOver) + if delay == nil { + log.Warn("commitWork delay is nil, something is wrong") + stopTimer = nil + } else if *delay <= 0 { + log.Debug("Not enough time for commitWork") + break + } else { + log.Debug("commitWork stopTimer", "block", work.header.Number, + "header time", time.Until(time.Unix(int64(work.header.Time), 0)), + "commit delay", *delay, "DelayLeftOver", w.config.DelayLeftOver) + stopTimer.Reset(*delay) + } + + // subscribe before fillTransactions + txsCh := make(chan core.NewTxsEvent, txChanSize) + sub := w.eth.TxPool().SubscribeNewTxsEvent(txsCh) + defer sub.Unsubscribe() + + // Fill pending transactions from the txpool + fillStart := time.Now() + err = w.fillTransactions(interruptCh, work, stopTimer) + fillDuration := time.Since(fillStart) + switch { + case errors.Is(err, errBlockInterruptedByNewHead): + // For Parlia, it will drop the work on receiving new block if it is not inturn. + if w.engine.DropOnNewBlock(work.header) { + log.Debug("drop the block, when new block is imported") + return + } + case errors.Is(err, errBlockInterruptedByTimeout): + // break the loop to get the best work + log.Debug("commitWork timeout") + break LOOP + case errors.Is(err, errBlockInterruptedByOutOfGas): + log.Debug("commitWork out of gas") + break LOOP + } + + if interruptCh == nil || stopTimer == nil { + // it is single commit work, no need to try several time. + log.Info("commitWork interruptCh or stopTimer is nil") + break + } + + select { + case <-txsCh: + delay := w.engine.Delay(w.chain, work.header, &w.config.DelayLeftOver) + log.Debug("commitWork txsCh arrived", "fillDuration", fillDuration.String(), "delay", delay.String()) + if fillDuration > *delay { + // there may not have enough time for another fillTransactions + break LOOP + } + case <-stopTimer.C: + log.Debug("commitWork stopTimer expired") + break LOOP + case <-interruptCh: + log.Debug("commitWork interruptCh closed, new block imported or resubmit triggered") + return + } + } + // get the most profitable work + bestWork = workList[0] + bestReward := new(big.Int) + for i, w := range workList { + balance := w.state.GetBalance(consensus.SystemAddress) + log.Debug("Get the most profitable work", "index", i, "balance", balance, "bestReward", bestReward) + if balance.Cmp(bestReward) > 0 { + bestWork = w + bestReward = balance + } + } + w.commit(bestWork, w.fullTaskHook, true, start) // Swap out the old work with the new one, terminating any leftover // prefetcher processes in the mean time and starting a new one. if w.current != nil { w.current.discard() } - w.current = work + w.current = bestWork } // commit runs any post-transaction state modifications, assembles the final block @@ -1167,3 +1269,22 @@ func (w *worker) postSideBlock(event core.ChainSideEvent) { case <-w.exitCh: } } + +// signalToErr converts the interruption signal to a concrete error type for return. +// The given signal must be a valid interruption signal. +func signalToErr(signal int32) error { + switch signal { + case commitInterruptNone: + return nil + case commitInterruptNewHead: + return errBlockInterruptedByNewHead + case commitInterruptResubmit: + return errBlockInterruptedByRecommit + case commitInterruptTimeout: + return errBlockInterruptedByTimeout + case commitInterruptOutOfGas: + return errBlockInterruptedByOutOfGas + default: + panic(fmt.Errorf("undefined signal %d", signal)) + } +} From dad8b3fd76492c256eec8bb20df122134a1d19c6 Mon Sep 17 00:00:00 2001 From: setunapo Date: Thu, 17 Nov 2022 15:38:50 +0800 Subject: [PATCH 2/4] worker: do not fillTransactions immediately on new transaction. It may not efficient if schedule fillTransactions when new transactions arrive. It could make the CPU keep running. To make is more efficient: 1.schedule fillTransactions when a certain amount of transaction are arrived. 2.or there is not much time left. --- miner/worker.go | 57 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 45 insertions(+), 12 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index 407dc7fcc1..696cbefdcf 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1063,6 +1063,10 @@ func (w *worker) commitWork(interruptCh chan int32, timestamp int64) { defer stopTimer.Stop() <-stopTimer.C // discard the initial tick + stopWaitTimer := time.NewTimer(0) + defer stopWaitTimer.Stop() + <-stopWaitTimer.C // discard the initial tick + // validator can try several times to get the most profitable block, // as long as the timestamp is not reached. workList := make([]*environment, 0, 10) @@ -1134,20 +1138,49 @@ LOOP: break } - select { - case <-txsCh: - delay := w.engine.Delay(w.chain, work.header, &w.config.DelayLeftOver) - log.Debug("commitWork txsCh arrived", "fillDuration", fillDuration.String(), "delay", delay.String()) - if fillDuration > *delay { - // there may not have enough time for another fillTransactions + newTxsNum := 0 + // stopTimer was the maximum delay for each fillTransactions + // but now it is used to wait until (head.Time - DelayLeftOver) is reached. + stopTimer.Reset(time.Until(time.Unix(int64(work.header.Time), 0)) - w.config.DelayLeftOver) + LOOP_WAIT: + for { + select { + case <-stopTimer.C: + log.Debug("commitWork stopTimer expired") break LOOP + case <-interruptCh: + log.Debug("commitWork interruptCh closed, new block imported or resubmit triggered") + return + case ev := <-txsCh: + delay := w.engine.Delay(w.chain, work.header, &w.config.DelayLeftOver) + log.Debug("commitWork txsCh arrived", "fillDuration", fillDuration.String(), + "delay", delay.String(), "work.tcount", work.tcount, + "newTxsNum", newTxsNum, "len(ev.Txs)", len(ev.Txs)) + if *delay < fillDuration { + // There may not have enough time for another fillTransactions. + break LOOP + } else if *delay < fillDuration*2 { + // We can schedule another fillTransactions, but the time is limited, + // probably it is the last chance, schedule it immediately. + break LOOP_WAIT + } else { + // There is still plenty of time left. + // We can wait a while to collect more transactions before + // schedule another fillTransaction to reduce CPU cost. + // There will be 2 cases to schedule another fillTransactions: + // 1.newTxsNum >= work.tcount + // 2.no much time left, have to schedule it immediately. + newTxsNum = newTxsNum + len(ev.Txs) + if newTxsNum >= work.tcount { + break LOOP_WAIT + } + stopWaitTimer.Reset(*delay - fillDuration*2) + } + case <-stopWaitTimer.C: + if newTxsNum > 0 { + break LOOP_WAIT + } } - case <-stopTimer.C: - log.Debug("commitWork stopTimer expired") - break LOOP - case <-interruptCh: - log.Debug("commitWork interruptCh closed, new block imported or resubmit triggered") - return } } // get the most profitable work From 24976d000ff2852142ec8cda003970e92bd0cf0d Mon Sep 17 00:00:00 2001 From: setunapo Date: Fri, 18 Nov 2022 22:55:17 +0800 Subject: [PATCH 3/4] worker: always drop on new block imported. When new block is imported, there is no need to commit the current work, even the new imported block is offturn and itself is inturn. That is because when offturn block is received, the inturn block is already later to broadcast block, deliver the later block will cause many reorg, which is not reasonable. And also make sure all useless work can be discarded, to avoid goroutine leak. --- consensus/beacon/consensus.go | 4 ---- consensus/clique/clique.go | 5 ----- consensus/consensus.go | 6 ------ consensus/ethash/consensus.go | 4 ---- consensus/parlia/parlia.go | 5 ----- miner/worker.go | 13 ++++--------- 6 files changed, 4 insertions(+), 33 deletions(-) diff --git a/consensus/beacon/consensus.go b/consensus/beacon/consensus.go index 4f4c272a0a..8282ed7cb4 100644 --- a/consensus/beacon/consensus.go +++ b/consensus/beacon/consensus.go @@ -363,10 +363,6 @@ func (beacon *Beacon) SetThreads(threads int) { } } -func (p *Beacon) DropOnNewBlock(*types.Header) bool { - return true -} - // IsTTDReached checks if the TotalTerminalDifficulty has been surpassed on the `parentHash` block. // It depends on the parentHash already being stored in the database. // If the parentHash is not stored in the database a UnknownAncestor error is returned. diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 11287e74ef..a258f1fe5f 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -705,11 +705,6 @@ func (c *Clique) APIs(chain consensus.ChainHeaderReader) []rpc.API { }} } -func (p *Clique) DropOnNewBlock(header *types.Header) bool { - // drop the block if it is not in turn. - return header.Difficulty.Cmp(diffNoTurn) == 0 -} - // SealHash returns the hash of a block prior to it being sealed. func SealHash(header *types.Header) (hash common.Hash) { hasher := sha3.NewLegacyKeccak256() diff --git a/consensus/consensus.go b/consensus/consensus.go index 367a703678..87632a9d0d 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -130,12 +130,6 @@ type Engine interface { // Close terminates any background threads maintained by the consensus engine. Close() error - - // DropOnNewBlock determine the action of mining when it is interrupted by new imported block. - // Return - // true: the mining result will be dropped - // false: the mining result will be kept and move on to the next mine step. - DropOnNewBlock(header *types.Header) bool } // PoW is a consensus engine based on proof-of-work. diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index 0bf77a8ae0..12a69c127a 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -647,10 +647,6 @@ var ( big32 = big.NewInt(32) ) -func (p *Ethash) DropOnNewBlock(*types.Header) bool { - return true -} - // AccumulateRewards credits the coinbase of the given block with the mining // reward. The total reward consists of the static block reward and rewards for // included uncles. The coinbase of each uncle block is also rewarded. diff --git a/consensus/parlia/parlia.go b/consensus/parlia/parlia.go index c16b36ac12..2e544803ef 100644 --- a/consensus/parlia/parlia.go +++ b/consensus/parlia/parlia.go @@ -976,11 +976,6 @@ func (p *Parlia) CalcDifficulty(chain consensus.ChainHeaderReader, time uint64, return CalcDifficulty(snap, p.val) } -func (p *Parlia) DropOnNewBlock(header *types.Header) bool { - // drop the block if it is not in turn. - return header.Difficulty.Cmp(diffNoTurn) == 0 -} - // CalcDifficulty is the difficulty adjustment algorithm. It returns the difficulty // that a new block should have based on the previous blocks in the chain and the // current signer. diff --git a/miner/worker.go b/miner/worker.go index 696cbefdcf..7c79b73270 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1118,17 +1118,12 @@ LOOP: fillDuration := time.Since(fillStart) switch { case errors.Is(err, errBlockInterruptedByNewHead): - // For Parlia, it will drop the work on receiving new block if it is not inturn. - if w.engine.DropOnNewBlock(work.header) { - log.Debug("drop the block, when new block is imported") - return - } + log.Debug("commitWork abort", "err", err) + return case errors.Is(err, errBlockInterruptedByTimeout): - // break the loop to get the best work - log.Debug("commitWork timeout") - break LOOP case errors.Is(err, errBlockInterruptedByOutOfGas): - log.Debug("commitWork out of gas") + // break the loop to get the best work + log.Debug("commitWork finish", "reason", err) break LOOP } From 1b2cde030d898be00a02ecf6d5facd7a84b7e322 Mon Sep 17 00:00:00 2001 From: setunapo Date: Mon, 21 Nov 2022 13:30:57 +0800 Subject: [PATCH 4/4] worker: return for resubmit signal. it is not necssary to add more transaction when resubmit is fired. the resubmit logic was for PoW and can be removed later. --- miner/worker.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index 7c79b73270..05a93c934e 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1016,16 +1016,19 @@ func (w *worker) fillTransactions(interruptCh chan int32, env *environment, stop if len(localTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) err = w.commitTransactions(env, txs, interruptCh, stopTimer) - if err == errBlockInterruptedByNewHead || err == errBlockInterruptedByOutOfGas || err == errBlockInterruptedByTimeout { + // we will abort here when: + // 1.new block was imported + // 2.out of Gas, no more transaction can be added. + // 3.the mining timer has expired, stop adding transactions. + // 4.interrupted resubmit timer, which is by default 10s. + // resubmit is for PoW only, can be deleted for PoS consensus later + if err != nil { return } } if len(remoteTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee) err = w.commitTransactions(env, txs, interruptCh, stopTimer) - if err == errBlockInterruptedByNewHead || err == errBlockInterruptedByOutOfGas || err == errBlockInterruptedByTimeout { - return - } } return @@ -1120,6 +1123,7 @@ LOOP: case errors.Is(err, errBlockInterruptedByNewHead): log.Debug("commitWork abort", "err", err) return + case errors.Is(err, errBlockInterruptedByRecommit): case errors.Is(err, errBlockInterruptedByTimeout): case errors.Is(err, errBlockInterruptedByOutOfGas): // break the loop to get the best work