Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: enhancement of the current block generation logic. #1186

Merged
merged 4 commits into from
Nov 21, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 185 additions & 32 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package miner
import (
"errors"
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -67,6 +68,11 @@ const (
var (
writeBlockTimer = metrics.NewRegisteredTimer("worker/writeblock", nil)
finalizeBlockTimer = metrics.NewRegisteredTimer("worker/finalizeblock", nil)

errBlockInterruptedByNewHead = errors.New("new head arrived while building block")
errBlockInterruptedByRecommit = errors.New("recommit interrupt while building block")
errBlockInterruptedByTimeout = errors.New("timeout while building block")
errBlockInterruptedByOutOfGas = errors.New("out of gas while building block")
)

// environment is the worker's current environment and holds all
Expand Down Expand Up @@ -142,8 +148,11 @@ type task struct {
}

const (
commitInterruptNewHead int32 = 1
commitInterruptResubmit int32 = 2
commitInterruptNone int32 = iota
commitInterruptNewHead
commitInterruptResubmit
commitInterruptTimeout
commitInterruptOutOfGas
)

// newWorkReq represents a request for new sealing work submitting with relative interrupt notifier.
Expand Down Expand Up @@ -754,7 +763,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction, rece
}

func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce,
interruptCh chan int32, stopTimer *time.Timer) bool {
interruptCh chan int32, stopTimer *time.Timer) error {
gasLimit := env.header.GasLimit
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit)
Expand All @@ -766,7 +775,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
}

var coalescedLogs []*types.Log
// initilise bloom processors
// initialize bloom processors
processorCapacity := 100
if txs.CurrentSize() < processorCapacity {
processorCapacity = txs.CurrentSize()
Expand All @@ -781,6 +790,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
txCurr := &tx
w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.CopyDoPrefetch(), *w.chain.GetVMConfig(), stopPrefetchCh, txCurr)

signal := commitInterruptNone
LOOP:
for {
// In the following three cases, we will interrupt the execution of the transaction.
Expand All @@ -791,25 +801,27 @@ LOOP:
// For the third case, the semi-finished work will be submitted to the consensus engine.
if interruptCh != nil {
select {
case reason, ok := <-interruptCh:
case signal, ok := <-interruptCh:
if !ok {
// should never be here, since interruptCh should not be read before
log.Warn("commit transactions stopped unknown")
}
return reason == commitInterruptNewHead
return signalToErr(signal)
default:
}
}
// If we don't have enough gas for any further transactions then we're done
if env.gasPool.Gas() < params.TxGas {
log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
signal = commitInterruptOutOfGas
break
}
if stopTimer != nil {
select {
case <-stopTimer.C:
log.Info("Not enough time for further transactions", "txs", len(env.txs))
stopTimer.Reset(0) // re-active the timer, in case it will be used later.
signal = commitInterruptTimeout
break LOOP
default:
}
Expand Down Expand Up @@ -885,7 +897,7 @@ LOOP:
}
w.pendingLogsFeed.Send(cpy)
}
return false
return signalToErr(signal)
}

// generateParams wraps various of settings for generating sealing task.
Expand Down Expand Up @@ -988,7 +1000,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) {
// fillTransactions retrieves the pending transactions from the txpool and fills them
// into the given sealing block. The transaction selection and ordering strategy can
// be customized with the plugin in the future.
func (w *worker) fillTransactions(interruptCh chan int32, env *environment) {
func (w *worker) fillTransactions(interruptCh chan int32, env *environment, stopTimer *time.Timer) (err error) {
// Split the pending transactions into locals and remotes
// Fill the block with all available pending transactions.
pending := w.eth.TxPool().Pending(false)
Expand All @@ -1000,26 +1012,26 @@ func (w *worker) fillTransactions(interruptCh chan int32, env *environment) {
}
}

var stopTimer *time.Timer
delay := w.engine.Delay(w.chain, env.header, &w.config.DelayLeftOver)
if delay != nil {
stopTimer = time.NewTimer(*delay)
log.Debug("Time left for mining work", "delay", delay.String())
defer stopTimer.Stop()
}

err = nil
if len(localTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
if w.commitTransactions(env, txs, interruptCh, stopTimer) {
err = w.commitTransactions(env, txs, interruptCh, stopTimer)
// we will abort here when:
// 1.new block was imported
// 2.out of Gas, no more transaction can be added.
// 3.the mining timer has expired, stop adding transactions.
// 4.interrupted resubmit timer, which is by default 10s.
// resubmit is for PoW only, can be deleted for PoS consensus later
if err != nil {
return
}
}
if len(remoteTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee)
if w.commitTransactions(env, txs, interruptCh, stopTimer) {
return
}
err = w.commitTransactions(env, txs, interruptCh, stopTimer)
}

return
}

// generateWork generates a sealing block based on the given parameters.
Expand All @@ -1030,7 +1042,7 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) {
}
defer work.discard()

w.fillTransactions(nil, work)
w.fillTransactions(nil, work, nil)
block, _, err := w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts)
return block, err
}
Expand All @@ -1049,24 +1061,146 @@ func (w *worker) commitWork(interruptCh chan int32, timestamp int64) {
}
coinbase = w.coinbase // Use the preset address as the fee recipient
}
work, err := w.prepareWork(&generateParams{
timestamp: uint64(timestamp),
coinbase: coinbase,
})
if err != nil {
return
}

// Fill pending transactions from the txpool
w.fillTransactions(interruptCh, work)
w.commit(work, w.fullTaskHook, true, start)
stopTimer := time.NewTimer(0)
defer stopTimer.Stop()
<-stopTimer.C // discard the initial tick

stopWaitTimer := time.NewTimer(0)
defer stopWaitTimer.Stop()
<-stopWaitTimer.C // discard the initial tick

// validator can try several times to get the most profitable block,
// as long as the timestamp is not reached.
workList := make([]*environment, 0, 10)
var bestWork *environment
// workList clean up
defer func() {
for _, w := range workList {
// only keep the best work, discard others.
if w == bestWork {
continue
}
w.discard()
}
}()
LOOP:
for {
work, err := w.prepareWork(&generateParams{
timestamp: uint64(timestamp),
coinbase: coinbase,
})
if err != nil {
return
}

workList = append(workList, work)

delay := w.engine.Delay(w.chain, work.header, &w.config.DelayLeftOver)
if delay == nil {
log.Warn("commitWork delay is nil, something is wrong")
stopTimer = nil
} else if *delay <= 0 {
log.Debug("Not enough time for commitWork")
break
} else {
log.Debug("commitWork stopTimer", "block", work.header.Number,
"header time", time.Until(time.Unix(int64(work.header.Time), 0)),
"commit delay", *delay, "DelayLeftOver", w.config.DelayLeftOver)
stopTimer.Reset(*delay)
}

// subscribe before fillTransactions
txsCh := make(chan core.NewTxsEvent, txChanSize)
sub := w.eth.TxPool().SubscribeNewTxsEvent(txsCh)
defer sub.Unsubscribe()

// Fill pending transactions from the txpool
fillStart := time.Now()
err = w.fillTransactions(interruptCh, work, stopTimer)
fillDuration := time.Since(fillStart)
switch {
case errors.Is(err, errBlockInterruptedByNewHead):
log.Debug("commitWork abort", "err", err)
return
case errors.Is(err, errBlockInterruptedByRecommit):
case errors.Is(err, errBlockInterruptedByTimeout):
case errors.Is(err, errBlockInterruptedByOutOfGas):
// break the loop to get the best work
log.Debug("commitWork finish", "reason", err)
break LOOP
}

if interruptCh == nil || stopTimer == nil {
// it is single commit work, no need to try several time.
log.Info("commitWork interruptCh or stopTimer is nil")
break
}

newTxsNum := 0
// stopTimer was the maximum delay for each fillTransactions
// but now it is used to wait until (head.Time - DelayLeftOver) is reached.
stopTimer.Reset(time.Until(time.Unix(int64(work.header.Time), 0)) - w.config.DelayLeftOver)
LOOP_WAIT:
for {
select {
case <-stopTimer.C:
log.Debug("commitWork stopTimer expired")
break LOOP
case <-interruptCh:
log.Debug("commitWork interruptCh closed, new block imported or resubmit triggered")
return
case ev := <-txsCh:
delay := w.engine.Delay(w.chain, work.header, &w.config.DelayLeftOver)
log.Debug("commitWork txsCh arrived", "fillDuration", fillDuration.String(),
"delay", delay.String(), "work.tcount", work.tcount,
"newTxsNum", newTxsNum, "len(ev.Txs)", len(ev.Txs))
if *delay < fillDuration {
// There may not have enough time for another fillTransactions.
break LOOP
} else if *delay < fillDuration*2 {
// We can schedule another fillTransactions, but the time is limited,
// probably it is the last chance, schedule it immediately.
break LOOP_WAIT
} else {
// There is still plenty of time left.
// We can wait a while to collect more transactions before
// schedule another fillTransaction to reduce CPU cost.
// There will be 2 cases to schedule another fillTransactions:
// 1.newTxsNum >= work.tcount
// 2.no much time left, have to schedule it immediately.
newTxsNum = newTxsNum + len(ev.Txs)
if newTxsNum >= work.tcount {
break LOOP_WAIT
}
stopWaitTimer.Reset(*delay - fillDuration*2)
}
case <-stopWaitTimer.C:
if newTxsNum > 0 {
break LOOP_WAIT
}
}
}
}
// get the most profitable work
bestWork = workList[0]
bestReward := new(big.Int)
for i, w := range workList {
balance := w.state.GetBalance(consensus.SystemAddress)
log.Debug("Get the most profitable work", "index", i, "balance", balance, "bestReward", bestReward)
if balance.Cmp(bestReward) > 0 {
bestWork = w
bestReward = balance
}
}
w.commit(bestWork, w.fullTaskHook, true, start)

// Swap out the old work with the new one, terminating any leftover
// prefetcher processes in the mean time and starting a new one.
if w.current != nil {
w.current.discard()
}
w.current = work
w.current = bestWork
}

// commit runs any post-transaction state modifications, assembles the final block
Expand Down Expand Up @@ -1167,3 +1301,22 @@ func (w *worker) postSideBlock(event core.ChainSideEvent) {
case <-w.exitCh:
}
}

// signalToErr converts the interruption signal to a concrete error type for return.
// The given signal must be a valid interruption signal.
func signalToErr(signal int32) error {
switch signal {
case commitInterruptNone:
return nil
case commitInterruptNewHead:
return errBlockInterruptedByNewHead
case commitInterruptResubmit:
return errBlockInterruptedByRecommit
case commitInterruptTimeout:
return errBlockInterruptedByTimeout
case commitInterruptOutOfGas:
return errBlockInterruptedByOutOfGas
default:
panic(fmt.Errorf("undefined signal %d", signal))
}
}