Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: enhancement of the current block generation logic. #1186

Merged
merged 4 commits into from
Nov 21, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions consensus/beacon/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ func (beacon *Beacon) SetThreads(threads int) {
}
}

func (p *Beacon) DropOnNewBlock(*types.Header) bool {
return true
}

// IsTTDReached checks if the TotalTerminalDifficulty has been surpassed on the `parentHash` block.
// It depends on the parentHash already being stored in the database.
// If the parentHash is not stored in the database a UnknownAncestor error is returned.
Expand Down
5 changes: 5 additions & 0 deletions consensus/clique/clique.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,11 @@ func (c *Clique) APIs(chain consensus.ChainHeaderReader) []rpc.API {
}}
}

func (p *Clique) DropOnNewBlock(header *types.Header) bool {
// drop the block if it is not in turn.
return header.Difficulty.Cmp(diffNoTurn) == 0
}

// SealHash returns the hash of a block prior to it being sealed.
func SealHash(header *types.Header) (hash common.Hash) {
hasher := sha3.NewLegacyKeccak256()
Expand Down
6 changes: 6 additions & 0 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ type Engine interface {

// Close terminates any background threads maintained by the consensus engine.
Close() error

// DropOnNewBlock determine the action of mining when it is interrupted by new imported block.
// Return
// true: the mining result will be dropped
// false: the mining result will be kept and move on to the next mine step.
DropOnNewBlock(header *types.Header) bool
}

// PoW is a consensus engine based on proof-of-work.
Expand Down
4 changes: 4 additions & 0 deletions consensus/ethash/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,10 @@ var (
big32 = big.NewInt(32)
)

func (p *Ethash) DropOnNewBlock(*types.Header) bool {
return true
}

// AccumulateRewards credits the coinbase of the given block with the mining
// reward. The total reward consists of the static block reward and rewards for
// included uncles. The coinbase of each uncle block is also rewarded.
Expand Down
5 changes: 5 additions & 0 deletions consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,11 @@ func (p *Parlia) CalcDifficulty(chain consensus.ChainHeaderReader, time uint64,
return CalcDifficulty(snap, p.val)
}

func (p *Parlia) DropOnNewBlock(header *types.Header) bool {
// drop the block if it is not in turn.
return header.Difficulty.Cmp(diffNoTurn) == 0
}

// CalcDifficulty is the difficulty adjustment algorithm. It returns the difficulty
// that a new block should have based on the previous blocks in the chain and the
// current signer.
Expand Down
214 changes: 184 additions & 30 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package miner
import (
"errors"
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -67,6 +68,11 @@ const (
var (
writeBlockTimer = metrics.NewRegisteredTimer("worker/writeblock", nil)
finalizeBlockTimer = metrics.NewRegisteredTimer("worker/finalizeblock", nil)

errBlockInterruptedByNewHead = errors.New("new head arrived while building block")
errBlockInterruptedByRecommit = errors.New("recommit interrupt while building block")
errBlockInterruptedByTimeout = errors.New("timeout while building block")
errBlockInterruptedByOutOfGas = errors.New("out of gas while building block")
)

// environment is the worker's current environment and holds all
Expand Down Expand Up @@ -142,8 +148,11 @@ type task struct {
}

const (
commitInterruptNewHead int32 = 1
commitInterruptResubmit int32 = 2
commitInterruptNone int32 = iota
commitInterruptNewHead
commitInterruptResubmit
commitInterruptTimeout
commitInterruptOutOfGas
)

// newWorkReq represents a request for new sealing work submitting with relative interrupt notifier.
Expand Down Expand Up @@ -754,7 +763,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction, rece
}

func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce,
interruptCh chan int32, stopTimer *time.Timer) bool {
interruptCh chan int32, stopTimer *time.Timer) error {
gasLimit := env.header.GasLimit
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit)
Expand All @@ -766,7 +775,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
}

var coalescedLogs []*types.Log
// initilise bloom processors
// initialize bloom processors
processorCapacity := 100
if txs.CurrentSize() < processorCapacity {
processorCapacity = txs.CurrentSize()
Expand All @@ -781,6 +790,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
txCurr := &tx
w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.CopyDoPrefetch(), *w.chain.GetVMConfig(), stopPrefetchCh, txCurr)

signal := commitInterruptNone
LOOP:
for {
// In the following three cases, we will interrupt the execution of the transaction.
Expand All @@ -791,25 +801,27 @@ LOOP:
// For the third case, the semi-finished work will be submitted to the consensus engine.
if interruptCh != nil {
select {
case reason, ok := <-interruptCh:
case signal, ok := <-interruptCh:
if !ok {
// should never be here, since interruptCh should not be read before
log.Warn("commit transactions stopped unknown")
}
return reason == commitInterruptNewHead
return signalToErr(signal)
default:
}
}
// If we don't have enough gas for any further transactions then we're done
if env.gasPool.Gas() < params.TxGas {
log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
signal = commitInterruptOutOfGas
break
}
if stopTimer != nil {
select {
case <-stopTimer.C:
log.Info("Not enough time for further transactions", "txs", len(env.txs))
stopTimer.Reset(0) // re-active the timer, in case it will be used later.
signal = commitInterruptTimeout
break LOOP
default:
}
Expand Down Expand Up @@ -885,7 +897,7 @@ LOOP:
}
w.pendingLogsFeed.Send(cpy)
}
return false
return signalToErr(signal)
}

// generateParams wraps various of settings for generating sealing task.
Expand Down Expand Up @@ -988,7 +1000,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) {
// fillTransactions retrieves the pending transactions from the txpool and fills them
// into the given sealing block. The transaction selection and ordering strategy can
// be customized with the plugin in the future.
func (w *worker) fillTransactions(interruptCh chan int32, env *environment) {
func (w *worker) fillTransactions(interruptCh chan int32, env *environment, stopTimer *time.Timer) (err error) {
// Split the pending transactions into locals and remotes
// Fill the block with all available pending transactions.
pending := w.eth.TxPool().Pending(false)
Expand All @@ -1000,26 +1012,23 @@ func (w *worker) fillTransactions(interruptCh chan int32, env *environment) {
}
}

var stopTimer *time.Timer
delay := w.engine.Delay(w.chain, env.header, &w.config.DelayLeftOver)
if delay != nil {
stopTimer = time.NewTimer(*delay)
log.Debug("Time left for mining work", "delay", delay.String())
defer stopTimer.Stop()
}

err = nil
if len(localTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
if w.commitTransactions(env, txs, interruptCh, stopTimer) {
err = w.commitTransactions(env, txs, interruptCh, stopTimer)
if err == errBlockInterruptedByNewHead || err == errBlockInterruptedByOutOfGas || err == errBlockInterruptedByTimeout {
return
}
}
if len(remoteTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee)
if w.commitTransactions(env, txs, interruptCh, stopTimer) {
err = w.commitTransactions(env, txs, interruptCh, stopTimer)
if err == errBlockInterruptedByNewHead || err == errBlockInterruptedByOutOfGas || err == errBlockInterruptedByTimeout {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this if seems useless here, would return anyway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accepted.

return
}
}

return
}

// generateWork generates a sealing block based on the given parameters.
Expand All @@ -1030,7 +1039,7 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) {
}
defer work.discard()

w.fillTransactions(nil, work)
w.fillTransactions(nil, work, nil)
block, _, err := w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts)
return block, err
}
Expand All @@ -1049,24 +1058,150 @@ func (w *worker) commitWork(interruptCh chan int32, timestamp int64) {
}
coinbase = w.coinbase // Use the preset address as the fee recipient
}
work, err := w.prepareWork(&generateParams{
timestamp: uint64(timestamp),
coinbase: coinbase,
})
if err != nil {
return
}

// Fill pending transactions from the txpool
w.fillTransactions(interruptCh, work)
w.commit(work, w.fullTaskHook, true, start)
stopTimer := time.NewTimer(0)
defer stopTimer.Stop()
<-stopTimer.C // discard the initial tick

stopWaitTimer := time.NewTimer(0)
defer stopWaitTimer.Stop()
<-stopWaitTimer.C // discard the initial tick

// validator can try several times to get the most profitable block,
// as long as the timestamp is not reached.
workList := make([]*environment, 0, 10)
var bestWork *environment
// workList clean up
defer func() {
for _, w := range workList {
// only keep the best work, discard others.
if w == bestWork {
continue
}
w.discard()
}
}()
LOOP:
for {
work, err := w.prepareWork(&generateParams{
timestamp: uint64(timestamp),
coinbase: coinbase,
})
if err != nil {
return
}

workList = append(workList, work)

delay := w.engine.Delay(w.chain, work.header, &w.config.DelayLeftOver)
if delay == nil {
log.Warn("commitWork delay is nil, something is wrong")
stopTimer = nil
} else if *delay <= 0 {
log.Debug("Not enough time for commitWork")
break
} else {
log.Debug("commitWork stopTimer", "block", work.header.Number,
"header time", time.Until(time.Unix(int64(work.header.Time), 0)),
"commit delay", *delay, "DelayLeftOver", w.config.DelayLeftOver)
stopTimer.Reset(*delay)
}

// subscribe before fillTransactions
txsCh := make(chan core.NewTxsEvent, txChanSize)
sub := w.eth.TxPool().SubscribeNewTxsEvent(txsCh)
defer sub.Unsubscribe()

// Fill pending transactions from the txpool
fillStart := time.Now()
err = w.fillTransactions(interruptCh, work, stopTimer)
fillDuration := time.Since(fillStart)
switch {
case errors.Is(err, errBlockInterruptedByNewHead):
// For Parlia, it will drop the work on receiving new block if it is not inturn.
if w.engine.DropOnNewBlock(work.header) {
log.Debug("drop the block, when new block is imported")
return
}
case errors.Is(err, errBlockInterruptedByTimeout):
// break the loop to get the best work
log.Debug("commitWork timeout")
break LOOP
case errors.Is(err, errBlockInterruptedByOutOfGas):
log.Debug("commitWork out of gas")
break LOOP
}

if interruptCh == nil || stopTimer == nil {
// it is single commit work, no need to try several time.
log.Info("commitWork interruptCh or stopTimer is nil")
break
}

newTxsNum := 0
// stopTimer was the maximum delay for each fillTransactions
// but now it is used to wait until (head.Time - DelayLeftOver) is reached.
stopTimer.Reset(time.Until(time.Unix(int64(work.header.Time), 0)) - w.config.DelayLeftOver)
LOOP_WAIT:
for {
select {
case <-stopTimer.C:
log.Debug("commitWork stopTimer expired")
break LOOP
case <-interruptCh:
log.Debug("commitWork interruptCh closed, new block imported or resubmit triggered")
return
case ev := <-txsCh:
delay := w.engine.Delay(w.chain, work.header, &w.config.DelayLeftOver)
log.Debug("commitWork txsCh arrived", "fillDuration", fillDuration.String(),
"delay", delay.String(), "work.tcount", work.tcount,
"newTxsNum", newTxsNum, "len(ev.Txs)", len(ev.Txs))
if *delay < fillDuration {
// There may not have enough time for another fillTransactions.
break LOOP
} else if *delay < fillDuration*2 {
// We can schedule another fillTransactions, but the time is limited,
// probably it is the last chance, schedule it immediately.
break LOOP_WAIT
} else {
// There is still plenty of time left.
// We can wait a while to collect more transactions before
// schedule another fillTransaction to reduce CPU cost.
// There will be 2 cases to schedule another fillTransactions:
// 1.newTxsNum >= work.tcount
// 2.no much time left, have to schedule it immediately.
newTxsNum = newTxsNum + len(ev.Txs)
if newTxsNum >= work.tcount {
break LOOP_WAIT
}
stopWaitTimer.Reset(*delay - fillDuration*2)
}
case <-stopWaitTimer.C:
if newTxsNum > 0 {
break LOOP_WAIT
}
}
}
}
// get the most profitable work
bestWork = workList[0]
bestReward := new(big.Int)
for i, w := range workList {
balance := w.state.GetBalance(consensus.SystemAddress)
log.Debug("Get the most profitable work", "index", i, "balance", balance, "bestReward", bestReward)
if balance.Cmp(bestReward) > 0 {
bestWork = w
bestReward = balance
}
}
w.commit(bestWork, w.fullTaskHook, true, start)

// Swap out the old work with the new one, terminating any leftover
// prefetcher processes in the mean time and starting a new one.
if w.current != nil {
w.current.discard()
}
w.current = work
w.current = bestWork
}

// commit runs any post-transaction state modifications, assembles the final block
Expand Down Expand Up @@ -1167,3 +1302,22 @@ func (w *worker) postSideBlock(event core.ChainSideEvent) {
case <-w.exitCh:
}
}

// signalToErr converts the interruption signal to a concrete error type for return.
// The given signal must be a valid interruption signal.
func signalToErr(signal int32) error {
switch signal {
case commitInterruptNone:
return nil
case commitInterruptNewHead:
return errBlockInterruptedByNewHead
case commitInterruptResubmit:
return errBlockInterruptedByRecommit
case commitInterruptTimeout:
return errBlockInterruptedByTimeout
case commitInterruptOutOfGas:
return errBlockInterruptedByOutOfGas
default:
panic(fmt.Errorf("undefined signal %d", signal))
}
}