Skip to content

Commit

Permalink
miner, eth: implement recommit mechanism for payload building (#25836)
Browse files Browse the repository at this point in the history
* miner, eth: implement recommit for payload building

* miner: address comments from marius
  • Loading branch information
rjl493456442 authored Nov 2, 2022
1 parent 2415911 commit a2a144c
Show file tree
Hide file tree
Showing 9 changed files with 338 additions and 131 deletions.
21 changes: 10 additions & 11 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
)
Expand Down Expand Up @@ -279,23 +280,21 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update beacon.ForkchoiceStateV1, pa
}
// If payload generation was requested, create a new block to be potentially
// sealed by the beacon client. The payload will be requested later, and we
// might replace it arbitrarily many times in between.
// will replace it arbitrarily many times in between.
if payloadAttributes != nil {
// Create an empty block first which can be used as a fallback
empty, err := api.eth.Miner().GetSealingBlockSync(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, true)
if err != nil {
log.Error("Failed to create empty sealing payload", "err", err)
return valid(nil), beacon.InvalidPayloadAttributes.With(err)
args := &miner.BuildPayloadArgs{
Parent: update.HeadBlockHash,
Timestamp: payloadAttributes.Timestamp,
FeeRecipient: payloadAttributes.SuggestedFeeRecipient,
Random: payloadAttributes.Random,
}
// Send a request to generate a full block in the background.
// The result can be obtained via the returned channel.
resCh, err := api.eth.Miner().GetSealingBlockAsync(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, false)
payload, err := api.eth.Miner().BuildPayload(args)
if err != nil {
log.Error("Failed to create async sealing payload", "err", err)
log.Error("Failed to build payload", "err", err)
return valid(nil), beacon.InvalidPayloadAttributes.With(err)
}
id := computePayloadId(update.HeadBlockHash, payloadAttributes)
api.localBlocks.put(id, &payload{empty: empty, result: resCh})
api.localBlocks.put(id, payload)
return valid(&id), nil
}
return valid(nil), nil
Expand Down
30 changes: 20 additions & 10 deletions eth/catalyst/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/params"
Expand Down Expand Up @@ -181,6 +182,8 @@ func TestEth2PrepareAndGetPayload(t *testing.T) {
if err != nil {
t.Fatalf("error preparing payload, err=%v", err)
}
// give the payload some time to be built
time.Sleep(100 * time.Millisecond)
payloadID := computePayloadId(fcState.HeadBlockHash, &blockParams)
execData, err := api.GetPayloadV1(payloadID)
if err != nil {
Expand Down Expand Up @@ -586,12 +589,12 @@ func TestNewPayloadOnInvalidChain(t *testing.T) {
if resp.PayloadStatus.Status != beacon.VALID {
t.Fatalf("error preparing payload, invalid status: %v", resp.PayloadStatus.Status)
}
// give the payload some time to be built
time.Sleep(100 * time.Millisecond)
payload, err := api.GetPayloadV1(*resp.PayloadID)
if err != nil {
t.Fatalf("can't get payload: %v", err)
}
// TODO(493456442, marius) this test can be flaky since we rely on a 100ms
// allowance for block generation internally.
if len(payload.Transactions) == 0 {
t.Fatalf("payload should not be empty")
}
Expand All @@ -618,11 +621,17 @@ func TestNewPayloadOnInvalidChain(t *testing.T) {
}

func assembleBlock(api *ConsensusAPI, parentHash common.Hash, params *beacon.PayloadAttributesV1) (*beacon.ExecutableDataV1, error) {
block, err := api.eth.Miner().GetSealingBlockSync(parentHash, params.Timestamp, params.SuggestedFeeRecipient, params.Random, false)
args := &miner.BuildPayloadArgs{
Parent: parentHash,
Timestamp: params.Timestamp,
FeeRecipient: params.SuggestedFeeRecipient,
Random: params.Random,
}
payload, err := api.eth.Miner().BuildPayload(args)
if err != nil {
return nil, err
}
return beacon.BlockToExecutableData(block), nil
return payload.ResolveFull(), nil
}

func TestEmptyBlocks(t *testing.T) {
Expand Down Expand Up @@ -854,16 +863,17 @@ func TestNewPayloadOnInvalidTerminalBlock(t *testing.T) {
}

// Test parent already post TTD in NewPayload
params := beacon.PayloadAttributesV1{
Timestamp: parent.Time() + 1,
Random: crypto.Keccak256Hash([]byte{byte(1)}),
SuggestedFeeRecipient: parent.Coinbase(),
args := &miner.BuildPayloadArgs{
Parent: parent.Hash(),
Timestamp: parent.Time() + 1,
Random: crypto.Keccak256Hash([]byte{byte(1)}),
FeeRecipient: parent.Coinbase(),
}
empty, err := api.eth.Miner().GetSealingBlockSync(parent.Hash(), params.Timestamp, params.SuggestedFeeRecipient, params.Random, true)
payload, err := api.eth.Miner().BuildPayload(args)
if err != nil {
t.Fatalf("error preparing payload, err=%v", err)
}
data := *beacon.BlockToExecutableData(empty)
data := *payload.Resolve()
resp2, err := api.NewPayloadV1(data)
if err != nil {
t.Fatalf("error sending NewPayload, err=%v", err)
Expand Down
55 changes: 7 additions & 48 deletions eth/catalyst/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ package catalyst

import (
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/beacon"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/miner"
)

// maxTrackedPayloads is the maximum number of prepared payloads the execution
Expand All @@ -35,52 +35,11 @@ const maxTrackedPayloads = 10
// latest one; but have a slight wiggle room for non-ideal conditions.
const maxTrackedHeaders = 10

// payload wraps the miner's block production channel, allowing the mined block
// to be retrieved later upon the GetPayload engine API call.
type payload struct {
lock sync.Mutex
done bool
empty *types.Block
block *types.Block
result chan *types.Block
}

// resolve extracts the generated full block from the given channel if possible
// or fallback to empty block as an alternative.
func (req *payload) resolve() *beacon.ExecutableDataV1 {
// this function can be called concurrently, prevent any
// concurrency issue in the first place.
req.lock.Lock()
defer req.lock.Unlock()

// Try to resolve the full block first if it's not obtained
// yet. The returned block can be nil if the generation fails.

if !req.done {
timeout := time.NewTimer(500 * time.Millisecond)
defer timeout.Stop()

select {
case req.block = <-req.result:
req.done = true
case <-timeout.C:
// TODO(rjl49345642, Marius), should we keep this
// 100ms timeout allowance? Why not just use the
// default and then fallback to empty directly?
}
}

if req.block != nil {
return beacon.BlockToExecutableData(req.block)
}
return beacon.BlockToExecutableData(req.empty)
}

// payloadQueueItem represents an id->payload tuple to store until it's retrieved
// or evicted.
type payloadQueueItem struct {
id beacon.PayloadID
data *payload
id beacon.PayloadID
payload *miner.Payload
}

// payloadQueue tracks the latest handful of constructed payloads to be retrieved
Expand All @@ -99,14 +58,14 @@ func newPayloadQueue() *payloadQueue {
}

// put inserts a new payload into the queue at the given id.
func (q *payloadQueue) put(id beacon.PayloadID, data *payload) {
func (q *payloadQueue) put(id beacon.PayloadID, payload *miner.Payload) {
q.lock.Lock()
defer q.lock.Unlock()

copy(q.payloads[1:], q.payloads)
q.payloads[0] = &payloadQueueItem{
id: id,
data: data,
id: id,
payload: payload,
}
}

Expand All @@ -120,7 +79,7 @@ func (q *payloadQueue) get(id beacon.PayloadID) *beacon.ExecutableDataV1 {
return nil // no more items
}
if item.id == id {
return item.data.resolve()
return item.payload.Resolve()
}
}
return nil
Expand Down
25 changes: 3 additions & 22 deletions miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,26 +251,7 @@ func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscript
return miner.worker.pendingLogsFeed.Subscribe(ch)
}

// GetSealingBlockAsync requests to generate a sealing block according to the
// given parameters. Regardless of whether the generation is successful or not,
// there is always a result that will be returned through the result channel.
// The difference is that if the execution fails, the returned result is nil
// and the concrete error is dropped silently.
func (miner *Miner) GetSealingBlockAsync(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (chan *types.Block, error) {
resCh, _, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs)
if err != nil {
return nil, err
}
return resCh, nil
}

// GetSealingBlockSync creates a sealing block according to the given parameters.
// If the generation is failed or the underlying work is already closed, an error
// will be returned.
func (miner *Miner) GetSealingBlockSync(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (*types.Block, error) {
resCh, errCh, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs)
if err != nil {
return nil, err
}
return <-resCh, <-errCh
// BuildPayload builds the payload according to the provided parameters.
func (miner *Miner) BuildPayload(args *BuildPayloadArgs) (*Payload, error) {
return miner.worker.buildPayload(args)
}
Loading

0 comments on commit a2a144c

Please sign in to comment.