Skip to content

Commit

Permalink
eth/catalyst, miner: build the execution payload async (ethereum#24866)
Browse files Browse the repository at this point in the history
* eth/catalyst: build the execution payload async

* miner: added comment, added test case

* eth/catalyst: miner: move async block production to miner

* eth/catalyst, miner: support generate seal block async

* miner: rework GetSealingBlockAsync to use a passed channel

* miner: apply rjl's diff

* eth/catalyst: nitpicks

Co-authored-by: Gary Rong <garyrong0905@gmail.com>
  • Loading branch information
2 people authored and cp-wjhan committed Dec 1, 2022
1 parent c0d6a21 commit 3a559ec
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 53 deletions.
30 changes: 10 additions & 20 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,18 +204,19 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update beacon.ForkchoiceStateV1, pa
// sealed by the beacon client. The payload will be requested later, and we
// might replace it arbitrarily many times in between.
if payloadAttributes != nil {
log.Info("Creating new payload for sealing")
start := time.Now()

data, err := api.assembleBlock(update.HeadBlockHash, payloadAttributes)
// Create an empty block first which can be used as a fallback
empty, err := api.eth.Miner().GetSealingBlockSync(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, true)
if err != nil {
return valid(nil), err
}
// Send a request to generate a full block in the background.
// The result can be obtained via the returned channel.
resCh, err := api.eth.Miner().GetSealingBlockAsync(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, false)
if err != nil {
log.Error("Failed to create sealing payload", "err", err)
return valid(nil), err // valid setHead, invalid payload
return valid(nil), err
}
id := computePayloadId(update.HeadBlockHash, payloadAttributes)
api.localBlocks.put(id, data)

log.Info("Created payload for sealing", "id", id, "elapsed", time.Since(start))
api.localBlocks.put(id, &payload{empty: empty, result: resCh})
return valid(&id), nil
}
return valid(nil), nil
Expand Down Expand Up @@ -351,14 +352,3 @@ func (api *ConsensusAPI) invalid(err error) beacon.PayloadStatusV1 {
errorMsg := err.Error()
return beacon.PayloadStatusV1{Status: beacon.INVALID, LatestValidHash: &currentHash, ValidationError: &errorMsg}
}

// assembleBlock creates a new block and returns the "execution
// data" required for beacon clients to process the new block.
func (api *ConsensusAPI) assembleBlock(parentHash common.Hash, params *beacon.PayloadAttributesV1) (*beacon.ExecutableDataV1, error) {
log.Info("Producing block", "parentHash", parentHash)
block, err := api.eth.Miner().GetSealingBlock(parentHash, params.Timestamp, params.SuggestedFeeRecipient, params.Random)
if err != nil {
return nil, err
}
return beacon.BlockToExecutableData(block), nil
}
82 changes: 78 additions & 4 deletions eth/catalyst/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestEth2AssembleBlock(t *testing.T) {
blockParams := beacon.PayloadAttributesV1{
Timestamp: blocks[9].Time() + 5,
}
execData, err := api.assembleBlock(blocks[9].Hash(), &blockParams)
execData, err := assembleBlock(api, blocks[9].Hash(), &blockParams)
if err != nil {
t.Fatalf("error producing block, err=%v", err)
}
Expand All @@ -114,7 +114,7 @@ func TestEth2AssembleBlockWithAnotherBlocksTxs(t *testing.T) {
blockParams := beacon.PayloadAttributesV1{
Timestamp: blocks[8].Time() + 5,
}
execData, err := api.assembleBlock(blocks[8].Hash(), &blockParams)
execData, err := assembleBlock(api, blocks[8].Hash(), &blockParams)
if err != nil {
t.Fatalf("error producing block, err=%v", err)
}
Expand Down Expand Up @@ -273,7 +273,7 @@ func TestEth2NewBlock(t *testing.T) {
tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey)
ethservice.TxPool().AddLocal(tx)

execData, err := api.assembleBlock(parent.Hash(), &beacon.PayloadAttributesV1{
execData, err := assembleBlock(api, parent.Hash(), &beacon.PayloadAttributesV1{
Timestamp: parent.Time() + 5,
})
if err != nil {
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestEth2NewBlock(t *testing.T) {
)
parent = preMergeBlocks[len(preMergeBlocks)-1]
for i := 0; i < 10; i++ {
execData, err := api.assembleBlock(parent.Hash(), &beacon.PayloadAttributesV1{
execData, err := assembleBlock(api, parent.Hash(), &beacon.PayloadAttributesV1{
Timestamp: parent.Time() + 6,
})
if err != nil {
Expand Down Expand Up @@ -530,3 +530,77 @@ func TestExchangeTransitionConfig(t *testing.T) {
t.Fatalf("expected no error on valid config, got %v", err)
}
}

func TestEmptyBlocks(t *testing.T) {
genesis, preMergeBlocks := generatePreMergeChain(10)
n, ethservice := startEthService(t, genesis, preMergeBlocks)
ethservice.Merger().ReachTTD()
defer n.Close()
var (
api = NewConsensusAPI(ethservice)
parent = ethservice.BlockChain().CurrentBlock()
// This EVM code generates a log when the contract is created.
logCode = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00")
)
for i := 0; i < 10; i++ {
statedb, _ := ethservice.BlockChain().StateAt(parent.Root())
nonce := statedb.GetNonce(testAddr)
tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey)
ethservice.TxPool().AddLocal(tx)

params := beacon.PayloadAttributesV1{
Timestamp: parent.Time() + 1,
Random: crypto.Keccak256Hash([]byte{byte(i)}),
SuggestedFeeRecipient: parent.Coinbase(),
}

fcState := beacon.ForkchoiceStateV1{
HeadBlockHash: parent.Hash(),
SafeBlockHash: common.Hash{},
FinalizedBlockHash: common.Hash{},
}
resp, err := api.ForkchoiceUpdatedV1(fcState, &params)
if err != nil {
t.Fatalf("error preparing payload, err=%v", err)
}
if resp.PayloadStatus.Status != beacon.VALID {
t.Fatalf("error preparing payload, invalid status: %v", resp.PayloadStatus.Status)
}
payload, err := api.GetPayloadV1(*resp.PayloadID)
if err != nil {
t.Fatalf("can't get payload: %v", err)
}
// TODO(493456442, marius) this test can be flaky since we rely on a 100ms
// allowance for block generation internally.
if len(payload.Transactions) == 0 {
t.Fatalf("payload should not be empty")
}
execResp, err := api.NewPayloadV1(*payload)
if err != nil {
t.Fatalf("can't execute payload: %v", err)
}
if execResp.Status != beacon.VALID {
t.Fatalf("invalid status: %v", execResp.Status)
}
fcState = beacon.ForkchoiceStateV1{
HeadBlockHash: payload.BlockHash,
SafeBlockHash: payload.ParentHash,
FinalizedBlockHash: payload.ParentHash,
}
if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil {
t.Fatalf("Failed to insert block: %v", err)
}
if ethservice.BlockChain().CurrentBlock().NumberU64() != payload.Number {
t.Fatalf("Chain head should be updated")
}
parent = ethservice.BlockChain().CurrentBlock()
}
}

func assembleBlock(api *ConsensusAPI, parentHash common.Hash, params *beacon.PayloadAttributesV1) (*beacon.ExecutableDataV1, error) {
block, err := api.eth.Miner().GetSealingBlockSync(parentHash, params.Timestamp, params.SuggestedFeeRecipient, params.Random, false)
if err != nil {
return nil, err
}
return beacon.BlockToExecutableData(block), nil
}
54 changes: 48 additions & 6 deletions eth/catalyst/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package catalyst

import (
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/beacon"
Expand All @@ -34,11 +35,52 @@ const maxTrackedPayloads = 10
// latest one; but have a slight wiggle room for non-ideal conditions.
const maxTrackedHeaders = 10

// payload wraps the miner's block production channel, allowing the mined block
// to be retrieved later upon the GetPayload engine API call.
type payload struct {
lock sync.Mutex
done bool
empty *types.Block
block *types.Block
result chan *types.Block
}

// resolve extracts the generated full block from the given channel if possible
// or fallback to empty block as an alternative.
func (req *payload) resolve() *beacon.ExecutableDataV1 {
// this function can be called concurrently, prevent any
// concurrency issue in the first place.
req.lock.Lock()
defer req.lock.Unlock()

// Try to resolve the full block first if it's not obtained
// yet. The returned block can be nil if the generation fails.

if !req.done {
timeout := time.NewTimer(500 * time.Millisecond)
defer timeout.Stop()

select {
case req.block = <-req.result:
req.done = true
case <-timeout.C:
// TODO(rjl49345642, Marius), should we keep this
// 100ms timeout allowance? Why not just use the
// default and then fallback to empty directly?
}
}

if req.block != nil {
return beacon.BlockToExecutableData(req.block)
}
return beacon.BlockToExecutableData(req.empty)
}

// payloadQueueItem represents an id->payload tuple to store until it's retrieved
// or evicted.
type payloadQueueItem struct {
id beacon.PayloadID
payload *beacon.ExecutableDataV1
id beacon.PayloadID
data *payload
}

// payloadQueue tracks the latest handful of constructed payloads to be retrieved
Expand All @@ -57,14 +99,14 @@ func newPayloadQueue() *payloadQueue {
}

// put inserts a new payload into the queue at the given id.
func (q *payloadQueue) put(id beacon.PayloadID, data *beacon.ExecutableDataV1) {
func (q *payloadQueue) put(id beacon.PayloadID, data *payload) {
q.lock.Lock()
defer q.lock.Unlock()

copy(q.payloads[1:], q.payloads)
q.payloads[0] = &payloadQueueItem{
id: id,
payload: data,
id: id,
data: data,
}
}

Expand All @@ -78,7 +120,7 @@ func (q *payloadQueue) get(id beacon.PayloadID) *beacon.ExecutableDataV1 {
return nil // no more items
}
if item.id == id {
return item.payload
return item.data.resolve()
}
}
return nil
Expand Down
30 changes: 24 additions & 6 deletions miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,32 @@ func (miner *Miner) DisablePreseal() {
miner.worker.disablePreseal()
}

// GetSealingBlock retrieves a sealing block based on the given parameters.
// The returned block is not sealed but all other fields should be filled.
func (miner *Miner) GetSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*types.Block, error) {
return miner.worker.getSealingBlock(parent, timestamp, coinbase, random)
}

// SubscribePendingLogs starts delivering logs from pending transactions
// to the given channel.
func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription {
return miner.worker.pendingLogsFeed.Subscribe(ch)
}

// GetSealingBlockAsync requests to generate a sealing block according to the
// given parameters. Regardless of whether the generation is successful or not,
// there is always a result that will be returned through the result channel.
// The difference is that if the execution fails, the returned result is nil
// and the concrete error is dropped silently.
func (miner *Miner) GetSealingBlockAsync(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (chan *types.Block, error) {
resCh, _, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs)
if err != nil {
return nil, err
}
return resCh, nil
}

// GetSealingBlockSync creates a sealing block according to the given parameters.
// If the generation is failed or the underlying work is already closed, an error
// will be returned.
func (miner *Miner) GetSealingBlockSync(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (*types.Block, error) {
resCh, errCh, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs)
if err != nil {
return nil, err
}
return <-resCh, <-errCh
}
35 changes: 20 additions & 15 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ type newWorkReq struct {
// getWorkReq represents a request for getting a new sealing work with provided parameters.
type getWorkReq struct {
params *generateParams
err error
result chan *types.Block
result chan *types.Block // non-blocking channel
err chan error
}

// intervalAdjust represents a resubmitting interval adjustment.
Expand Down Expand Up @@ -605,12 +605,12 @@ func (w *worker) mainLoop() {
case req := <-w.getWorkCh:
block, err := w.generateWork(req.params)
if err != nil {
req.err = err
req.err <- err
req.result <- nil
} else {
req.err <- nil
req.result <- block
}

case ev := <-w.chainSideCh:
// Short circuit for duplicate side blocks
if _, exist := w.localUncles[ev.Block.Hash()]; exist {
Expand Down Expand Up @@ -1299,6 +1299,7 @@ type generateParams struct {
random common.Hash // The randomness generated by beacon chain, empty before the merge
noUncle bool // Flag whether the uncle block inclusion is allowed
noExtra bool // Flag whether the extra field assignment is allowed
noTxs bool // Flag whether an empty block without any transaction is expected
}

// prepareWork constructs the sealing task according to the given parameters,
Expand Down Expand Up @@ -1496,8 +1497,9 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) {
}
defer work.discard()

w.fillTransactions(nil, work)

if !params.noTxs {
w.fillTransactions(nil, work)
}
return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts)
}

Expand Down Expand Up @@ -1658,7 +1660,6 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) {
work.discard()
return
}

w.commit(work.copy(), w.fullTaskHook, true, start)

// Swap out the old work with the new one, terminating any leftover
Expand Down Expand Up @@ -1803,7 +1804,13 @@ func (w *worker) commitEx(env *environment, interval func(), update bool, start
}

// getSealingBlock generates the sealing block based on the given parameters.
func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*types.Block, error) {
// The generation result will be passed back via the given channel no matter
// the generation itself succeeds or not.
func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (chan *types.Block, chan error, error) {
var (
resCh = make(chan *types.Block, 1)
errCh = make(chan error, 1)
)
req := &getWorkReq{
params: &generateParams{
timestamp: timestamp,
Expand All @@ -1813,18 +1820,16 @@ func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase
random: random,
noUncle: true,
noExtra: true,
noTxs: noTxs,
},
result: make(chan *types.Block, 1),
result: resCh,
err: errCh,
}
select {
case w.getWorkCh <- req:
block := <-req.result
if block == nil {
return nil, req.err
}
return block, nil
return resCh, errCh, nil
case <-w.exitCh:
return nil, errors.New("miner closed")
return nil, nil, errors.New("miner closed")
}
}

Expand Down
Loading

0 comments on commit 3a559ec

Please sign in to comment.