From b4241b225371624359fc28d52b1bdcbf7349856c Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Thu, 12 May 2022 16:48:28 +0200 Subject: [PATCH 1/7] eth/catalyst: build the execution payload async --- eth/catalyst/api.go | 33 +++++++++++++++++++++++++++------ miner/miner.go | 17 +++++++++++++++++ 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 1525766604d0..e9121c9ae1a2 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -188,18 +188,28 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update beacon.ForkchoiceStateV1, pa // sealed by the beacon client. The payload will be requested later, and we // might replace it arbitrarily many times in between. if payloadAttributes != nil { - log.Info("Creating new payload for sealing") - start := time.Now() - - data, err := api.assembleBlock(update.HeadBlockHash, payloadAttributes) + // Create an empty block + emptyBlock, err := api.createEmptyBlock(update.HeadBlockHash, payloadAttributes) if err != nil { log.Error("Failed to create sealing payload", "err", err) return valid(nil), err // valid setHead, invalid payload } id := computePayloadId(update.HeadBlockHash, payloadAttributes) - api.localBlocks.put(id, data) + api.localBlocks.put(id, emptyBlock) + // Now start the real block production async + go func() { + log.Info("Creating new payload for sealing") + start := time.Now() + data, err := api.assembleBlock(update.HeadBlockHash, payloadAttributes) + if err != nil { + log.Error("Failed to create sealing payload", "err", err) + } + + id := computePayloadId(update.HeadBlockHash, payloadAttributes) + api.localBlocks.put(id, data) + log.Info("Created payload for sealing", "id", id, "elapsed", time.Since(start)) + }() - log.Info("Created payload for sealing", "id", id, "elapsed", time.Since(start)) return valid(&id), nil } return valid(nil), nil @@ -346,3 +356,14 @@ func (api *ConsensusAPI) assembleBlock(parentHash common.Hash, params *beacon.Pa } return beacon.BlockToExecutableData(block), nil } + +// createEmptyBlock creates a new empty block and returns the "execution +// data" required for beacon clients to process the new block. +func (api *ConsensusAPI) createEmptyBlock(parentHash common.Hash, params *beacon.PayloadAttributesV1) (*beacon.ExecutableDataV1, error) { + log.Info("Producing empty block", "parentHash", parentHash) + block, err := api.eth.Miner().GetEmptyBlock(parentHash, params.Timestamp, params.SuggestedFeeRecipient, params.Random) + if err != nil { + return nil, err + } + return beacon.BlockToExecutableData(block), nil +} diff --git a/miner/miner.go b/miner/miner.go index 20e12c240e12..1bdbc90aece8 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -246,3 +246,20 @@ func (miner *Miner) GetSealingBlock(parent common.Hash, timestamp uint64, coinba func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription { return miner.worker.pendingLogsFeed.Subscribe(ch) } + +func (miner *Miner) GetEmptyBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*types.Block, error) { + params := &generateParams{ + timestamp: timestamp, + forceTime: true, + parentHash: parent, + coinbase: coinbase, + random: random, + noUncle: true, + noExtra: true, + } + work, err := miner.worker.prepareWork(params) + if err != nil { + return nil, err + } + return miner.worker.engine.FinalizeAndAssemble(miner.worker.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts) +} From 821d245d3bdd5a488220751c73e472baf93ce8d8 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Fri, 13 May 2022 13:22:21 +0200 Subject: [PATCH 2/7] miner: added comment, added test case --- eth/catalyst/api_test.go | 72 ++++++++++++++++++++++++++++++++++++++++ miner/miner.go | 14 ++++---- 2 files changed, 80 insertions(+), 6 deletions(-) diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go index 2ae6d2cd5643..5824a81b6ada 100644 --- a/eth/catalyst/api_test.go +++ b/eth/catalyst/api_test.go @@ -527,3 +527,75 @@ func TestExchangeTransitionConfig(t *testing.T) { t.Fatalf("expected no error on valid config, got %v", err) } } + +func TestEmptyBlocks(t *testing.T) { + genesis, preMergeBlocks := generatePreMergeChain(10) + n, ethservice := startEthService(t, genesis, preMergeBlocks) + ethservice.Merger().ReachTTD() + defer n.Close() + var ( + api = NewConsensusAPI(ethservice) + parent = ethservice.BlockChain().CurrentBlock() + // This EVM code generates a log when the contract is created. + logCode = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00") + ) + for i := 0; i < 10; i++ { + statedb, _ := ethservice.BlockChain().StateAt(parent.Root()) + nonce := statedb.GetNonce(testAddr) + tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey) + ethservice.TxPool().AddLocal(tx) + + params := beacon.PayloadAttributesV1{ + Timestamp: parent.Time() + 1, + Random: crypto.Keccak256Hash([]byte{byte(i)}), + SuggestedFeeRecipient: parent.Coinbase(), + } + + fcState := beacon.ForkchoiceStateV1{ + HeadBlockHash: parent.Hash(), + SafeBlockHash: common.Hash{}, + FinalizedBlockHash: common.Hash{}, + } + resp, err := api.ForkchoiceUpdatedV1(fcState, ¶ms) + if err != nil { + t.Fatalf("error preparing payload, err=%v", err) + } + if resp.PayloadStatus.Status != beacon.VALID { + t.Fatalf("error preparing payload, invalid status: %v", resp.PayloadStatus.Status) + } + payload, err := api.GetPayloadV1(*resp.PayloadID) + if err != nil { + t.Fatalf("can't get payload: %v", err) + } + if len(payload.Transactions) != 0 { + t.Fatalf("payload should be empty") + } + time.Sleep(10 * time.Millisecond) + payload, err = api.GetPayloadV1(*resp.PayloadID) + if err != nil { + t.Fatalf("can't get payload: %v", err) + } + if len(payload.Transactions) == 0 { + t.Fatalf("payload should not be empty") + } + execResp, err := api.NewPayloadV1(*payload) + if err != nil { + t.Fatalf("can't execute payload: %v", err) + } + if execResp.Status != beacon.VALID { + t.Fatalf("invalid status: %v", execResp.Status) + } + fcState = beacon.ForkchoiceStateV1{ + HeadBlockHash: payload.BlockHash, + SafeBlockHash: payload.ParentHash, + FinalizedBlockHash: payload.ParentHash, + } + if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil { + t.Fatalf("Failed to insert block: %v", err) + } + if ethservice.BlockChain().CurrentBlock().NumberU64() != payload.Number { + t.Fatalf("Chain head should be updated") + } + parent = ethservice.BlockChain().CurrentBlock() + } +} diff --git a/miner/miner.go b/miner/miner.go index 1bdbc90aece8..76384a9ff2d7 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -235,18 +235,20 @@ func (miner *Miner) DisablePreseal() { miner.worker.disablePreseal() } -// GetSealingBlock retrieves a sealing block based on the given parameters. -// The returned block is not sealed but all other fields should be filled. -func (miner *Miner) GetSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*types.Block, error) { - return miner.worker.getSealingBlock(parent, timestamp, coinbase, random) -} - // SubscribePendingLogs starts delivering logs from pending transactions // to the given channel. func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription { return miner.worker.pendingLogsFeed.Subscribe(ch) } +// GetSealingBlock retrieves a sealing block based on the given parameters. +// The returned block is not sealed but all other fields should be filled. +func (miner *Miner) GetSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*types.Block, error) { + return miner.worker.getSealingBlock(parent, timestamp, coinbase, random) +} + +// GetEmptyBlock creates an empty block based on the given parameters. +// The returned block is not sealed, but all other fields should be filled. func (miner *Miner) GetEmptyBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*types.Block, error) { params := &generateParams{ timestamp: timestamp, From 00aa1c4d48074cc12e1fbe9b79fe2ec725c90fb4 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Mon, 16 May 2022 13:42:53 +0200 Subject: [PATCH 3/7] eth/catalyst: miner: move async block production to miner --- eth/catalyst/api.go | 50 +++++----------------------- eth/catalyst/api_test.go | 29 +++++++++------- eth/catalyst/queue.go | 9 ++--- miner/miner.go | 21 +++--------- miner/worker.go | 71 +++++++++++++++++++++++++++++----------- miner/worker_test.go | 6 ++-- 6 files changed, 90 insertions(+), 96 deletions(-) diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index e9121c9ae1a2..d8ecb04be895 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -189,27 +189,13 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update beacon.ForkchoiceStateV1, pa // might replace it arbitrarily many times in between. if payloadAttributes != nil { // Create an empty block - emptyBlock, err := api.createEmptyBlock(update.HeadBlockHash, payloadAttributes) + req, err := api.eth.Miner().RequestSealingBlock(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random) if err != nil { log.Error("Failed to create sealing payload", "err", err) return valid(nil), err // valid setHead, invalid payload } id := computePayloadId(update.HeadBlockHash, payloadAttributes) - api.localBlocks.put(id, emptyBlock) - // Now start the real block production async - go func() { - log.Info("Creating new payload for sealing") - start := time.Now() - data, err := api.assembleBlock(update.HeadBlockHash, payloadAttributes) - if err != nil { - log.Error("Failed to create sealing payload", "err", err) - } - - id := computePayloadId(update.HeadBlockHash, payloadAttributes) - api.localBlocks.put(id, data) - log.Info("Created payload for sealing", "id", id, "elapsed", time.Since(start)) - }() - + api.localBlocks.put(id, req) return valid(&id), nil } return valid(nil), nil @@ -243,11 +229,15 @@ func (api *ConsensusAPI) ExchangeTransitionConfigurationV1(config beacon.Transit // GetPayloadV1 returns a cached payload by id. func (api *ConsensusAPI) GetPayloadV1(payloadID beacon.PayloadID) (*beacon.ExecutableDataV1, error) { log.Trace("Engine API request received", "method", "GetPayload", "id", payloadID) - data := api.localBlocks.get(payloadID) - if data == nil { + req := api.localBlocks.get(payloadID) + if req == nil { return nil, &beacon.UnknownPayload } - return data, nil + block, err := api.eth.Miner().GetSealingBlock(req) + if err != nil { + return nil, err + } + return beacon.BlockToExecutableData(block), nil } // NewPayloadV1 creates an Eth1 block, inserts it in the chain, and returns the status of the chain. @@ -345,25 +335,3 @@ func (api *ConsensusAPI) invalid(err error) beacon.PayloadStatusV1 { errorMsg := err.Error() return beacon.PayloadStatusV1{Status: beacon.INVALID, LatestValidHash: ¤tHash, ValidationError: &errorMsg} } - -// assembleBlock creates a new block and returns the "execution -// data" required for beacon clients to process the new block. -func (api *ConsensusAPI) assembleBlock(parentHash common.Hash, params *beacon.PayloadAttributesV1) (*beacon.ExecutableDataV1, error) { - log.Info("Producing block", "parentHash", parentHash) - block, err := api.eth.Miner().GetSealingBlock(parentHash, params.Timestamp, params.SuggestedFeeRecipient, params.Random) - if err != nil { - return nil, err - } - return beacon.BlockToExecutableData(block), nil -} - -// createEmptyBlock creates a new empty block and returns the "execution -// data" required for beacon clients to process the new block. -func (api *ConsensusAPI) createEmptyBlock(parentHash common.Hash, params *beacon.PayloadAttributesV1) (*beacon.ExecutableDataV1, error) { - log.Info("Producing empty block", "parentHash", parentHash) - block, err := api.eth.Miner().GetEmptyBlock(parentHash, params.Timestamp, params.SuggestedFeeRecipient, params.Random) - if err != nil { - return nil, err - } - return beacon.BlockToExecutableData(block), nil -} diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go index 5824a81b6ada..81c4025a4902 100644 --- a/eth/catalyst/api_test.go +++ b/eth/catalyst/api_test.go @@ -93,7 +93,7 @@ func TestEth2AssembleBlock(t *testing.T) { blockParams := beacon.PayloadAttributesV1{ Timestamp: blocks[9].Time() + 5, } - execData, err := api.assembleBlock(blocks[9].Hash(), &blockParams) + execData, err := assembleBlock(api, blocks[9].Hash(), &blockParams) if err != nil { t.Fatalf("error producing block, err=%v", err) } @@ -114,7 +114,7 @@ func TestEth2AssembleBlockWithAnotherBlocksTxs(t *testing.T) { blockParams := beacon.PayloadAttributesV1{ Timestamp: blocks[8].Time() + 5, } - execData, err := api.assembleBlock(blocks[8].Hash(), &blockParams) + execData, err := assembleBlock(api, blocks[8].Hash(), &blockParams) if err != nil { t.Fatalf("error producing block, err=%v", err) } @@ -273,7 +273,7 @@ func TestEth2NewBlock(t *testing.T) { tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey) ethservice.TxPool().AddLocal(tx) - execData, err := api.assembleBlock(parent.Hash(), &beacon.PayloadAttributesV1{ + execData, err := assembleBlock(api, parent.Hash(), &beacon.PayloadAttributesV1{ Timestamp: parent.Time() + 5, }) if err != nil { @@ -313,7 +313,7 @@ func TestEth2NewBlock(t *testing.T) { ) parent = preMergeBlocks[len(preMergeBlocks)-1] for i := 0; i < 10; i++ { - execData, err := api.assembleBlock(parent.Hash(), &beacon.PayloadAttributesV1{ + execData, err := assembleBlock(api, parent.Hash(), &beacon.PayloadAttributesV1{ Timestamp: parent.Time() + 6, }) if err != nil { @@ -567,14 +567,6 @@ func TestEmptyBlocks(t *testing.T) { if err != nil { t.Fatalf("can't get payload: %v", err) } - if len(payload.Transactions) != 0 { - t.Fatalf("payload should be empty") - } - time.Sleep(10 * time.Millisecond) - payload, err = api.GetPayloadV1(*resp.PayloadID) - if err != nil { - t.Fatalf("can't get payload: %v", err) - } if len(payload.Transactions) == 0 { t.Fatalf("payload should not be empty") } @@ -599,3 +591,16 @@ func TestEmptyBlocks(t *testing.T) { parent = ethservice.BlockChain().CurrentBlock() } } + +func assembleBlock(api *ConsensusAPI, parentHash common.Hash, params *beacon.PayloadAttributesV1) (*beacon.ExecutableDataV1, error) { + req, err := api.eth.Miner().RequestSealingBlock(parentHash, params.Timestamp, params.SuggestedFeeRecipient, params.Random) + if err != nil { + return nil, err + } + time.Sleep(10 * time.Millisecond) + block, err := api.eth.Miner().GetSealingBlock(req) + if err != nil { + return nil, err + } + return beacon.BlockToExecutableData(block), nil +} diff --git a/eth/catalyst/queue.go b/eth/catalyst/queue.go index ffb2f56bf430..82000b8a95b5 100644 --- a/eth/catalyst/queue.go +++ b/eth/catalyst/queue.go @@ -22,6 +22,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/beacon" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/miner" ) // maxTrackedPayloads is the maximum number of prepared payloads the execution @@ -38,7 +39,7 @@ const maxTrackedHeaders = 10 // or evicted. type payloadQueueItem struct { id beacon.PayloadID - payload *beacon.ExecutableDataV1 + payload *miner.GetWorkReq } // payloadQueue tracks the latest handful of constructed payloads to be retrieved @@ -57,19 +58,19 @@ func newPayloadQueue() *payloadQueue { } // put inserts a new payload into the queue at the given id. -func (q *payloadQueue) put(id beacon.PayloadID, data *beacon.ExecutableDataV1) { +func (q *payloadQueue) put(id beacon.PayloadID, req *miner.GetWorkReq) { q.lock.Lock() defer q.lock.Unlock() copy(q.payloads[1:], q.payloads) q.payloads[0] = &payloadQueueItem{ id: id, - payload: data, + payload: req, } } // get retrieves a previously stored payload item or nil if it does not exist. -func (q *payloadQueue) get(id beacon.PayloadID) *beacon.ExecutableDataV1 { +func (q *payloadQueue) get(id beacon.PayloadID) *miner.GetWorkReq { q.lock.RLock() defer q.lock.RUnlock() diff --git a/miner/miner.go b/miner/miner.go index 76384a9ff2d7..fe3ce9abf031 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -243,25 +243,12 @@ func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscript // GetSealingBlock retrieves a sealing block based on the given parameters. // The returned block is not sealed but all other fields should be filled. -func (miner *Miner) GetSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*types.Block, error) { - return miner.worker.getSealingBlock(parent, timestamp, coinbase, random) +func (miner *Miner) RequestSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*GetWorkReq, error) { + return miner.worker.requestSealingBlock(parent, timestamp, coinbase, random) } // GetEmptyBlock creates an empty block based on the given parameters. // The returned block is not sealed, but all other fields should be filled. -func (miner *Miner) GetEmptyBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*types.Block, error) { - params := &generateParams{ - timestamp: timestamp, - forceTime: true, - parentHash: parent, - coinbase: coinbase, - random: random, - noUncle: true, - noExtra: true, - } - work, err := miner.worker.prepareWork(params) - if err != nil { - return nil, err - } - return miner.worker.engine.FinalizeAndAssemble(miner.worker.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts) +func (miner *Miner) GetSealingBlock(req *GetWorkReq) (*types.Block, error) { + return miner.worker.getSealingBlock(req) } diff --git a/miner/worker.go b/miner/worker.go index 31022e7e10f3..6887dbfba90f 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -167,8 +167,8 @@ type newWorkReq struct { timestamp int64 } -// getWorkReq represents a request for getting a new sealing work with provided parameters. -type getWorkReq struct { +// GetWorkReq represents a request for getting a new sealing work with provided parameters. +type GetWorkReq struct { params *generateParams err error result chan *types.Block @@ -203,7 +203,7 @@ type worker struct { // Channels newWorkCh chan *newWorkReq - getWorkCh chan *getWorkReq + getWorkCh chan *GetWorkReq taskCh chan *task resultCh chan *types.Block startCh chan struct{} @@ -268,7 +268,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), newWorkCh: make(chan *newWorkReq), - getWorkCh: make(chan *getWorkReq), + getWorkCh: make(chan *GetWorkReq), taskCh: make(chan *task), resultCh: make(chan *types.Block, resultQueueSize), exitCh: make(chan struct{}), @@ -1176,29 +1176,60 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti return nil } -// getSealingBlock generates the sealing block based on the given parameters. -func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*types.Block, error) { - req := &getWorkReq{ - params: &generateParams{ - timestamp: timestamp, - forceTime: true, - parentHash: parent, - coinbase: coinbase, - random: random, - noUncle: true, - noExtra: true, - }, +// requestSealingBlock starts generating the sealing block based on the given parameters. +func (w *worker) requestSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*GetWorkReq, error) { + params := &generateParams{ + timestamp: timestamp, + forceTime: true, + parentHash: parent, + coinbase: coinbase, + random: random, + noUncle: true, + noExtra: true, + } + + // Do the pre-checks, discard the result + env, err := w.prepareWork(params) + if err != nil { + return nil, err + } + defer env.discard() + + req := &GetWorkReq{ + params: params, result: make(chan *types.Block, 1), } + select { case w.getWorkCh <- req: - block := <-req.result - if block == nil { - return nil, req.err - } + return req, nil + case <-w.exitCh: + return nil, errors.New("miner closed") + } +} + +func (w *worker) getSealingBlock(req *GetWorkReq) (*types.Block, error) { + if req == nil { + return nil, errors.New("invalid getWork request") + } + if req.err != nil { + return nil, req.err + } + timeout := time.NewTimer(100 * time.Millisecond) + select { + case block := <-req.result: return block, nil case <-w.exitCh: return nil, errors.New("miner closed") + case <-timeout.C: + // return an empty block if the block production times out + work, err := w.prepareWork(req.params) + if err != nil { + return nil, err + } + defer work.discard() + + return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts) } } diff --git a/miner/worker_test.go b/miner/worker_test.go index dd029433b8bf..af1fa0caded6 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -638,7 +638,8 @@ func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine co // This API should work even when the automatic sealing is not enabled for _, c := range cases { - block, err := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random) + req, _ := w.requestSealingBlock(c.parent, timestamp, c.coinbase, c.random) + block, err := w.getSealingBlock(req) if c.expectErr { if err == nil { t.Error("Expect error but get nil") @@ -654,7 +655,8 @@ func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine co // This API should work even when the automatic sealing is enabled w.start() for _, c := range cases { - block, err := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random) + req, _ := w.requestSealingBlock(c.parent, timestamp, c.coinbase, c.random) + block, err := w.getSealingBlock(req) if c.expectErr { if err == nil { t.Error("Expect error but get nil") From e644c230dc58ed9432721422d4071d32dcaa0e0d Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Tue, 17 May 2022 11:45:46 +0800 Subject: [PATCH 4/7] eth/catalyst, miner: support generate seal block async --- eth/catalyst/api.go | 22 ++++---- eth/catalyst/api_test.go | 9 ++-- eth/catalyst/queue.go | 52 ++++++++++++++++--- miner/miner.go | 42 +++++++++++++--- miner/worker.go | 105 +++++++++++++++------------------------ miner/worker_test.go | 12 +++-- 6 files changed, 141 insertions(+), 101 deletions(-) diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index d8ecb04be895..75653b1d476c 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -188,14 +188,20 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update beacon.ForkchoiceStateV1, pa // sealed by the beacon client. The payload will be requested later, and we // might replace it arbitrarily many times in between. if payloadAttributes != nil { - // Create an empty block - req, err := api.eth.Miner().RequestSealingBlock(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random) + // Create an empty block first which can be used as a fallback + empty, err := api.eth.Miner().GetSealingBlockSync(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, true) + if err != nil { + return valid(nil), err + } + // Send a request to generate a full block in the background. + // The result can be obtained via the returned channel. + res, err := api.eth.Miner().GetSealingBlockAsync(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, false) if err != nil { log.Error("Failed to create sealing payload", "err", err) return valid(nil), err // valid setHead, invalid payload } id := computePayloadId(update.HeadBlockHash, payloadAttributes) - api.localBlocks.put(id, req) + api.localBlocks.put(id, &payload{empty: empty, result: res}) return valid(&id), nil } return valid(nil), nil @@ -229,15 +235,11 @@ func (api *ConsensusAPI) ExchangeTransitionConfigurationV1(config beacon.Transit // GetPayloadV1 returns a cached payload by id. func (api *ConsensusAPI) GetPayloadV1(payloadID beacon.PayloadID) (*beacon.ExecutableDataV1, error) { log.Trace("Engine API request received", "method", "GetPayload", "id", payloadID) - req := api.localBlocks.get(payloadID) - if req == nil { + data := api.localBlocks.get(payloadID) + if data == nil { return nil, &beacon.UnknownPayload } - block, err := api.eth.Miner().GetSealingBlock(req) - if err != nil { - return nil, err - } - return beacon.BlockToExecutableData(block), nil + return data, nil } // NewPayloadV1 creates an Eth1 block, inserts it in the chain, and returns the status of the chain. diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go index 81c4025a4902..3596c594e988 100644 --- a/eth/catalyst/api_test.go +++ b/eth/catalyst/api_test.go @@ -567,6 +567,8 @@ func TestEmptyBlocks(t *testing.T) { if err != nil { t.Fatalf("can't get payload: %v", err) } + // TODO(493456442, marius) this test can be flaky since we rely on a 100ms + // allowance for block generation internally. if len(payload.Transactions) == 0 { t.Fatalf("payload should not be empty") } @@ -593,12 +595,7 @@ func TestEmptyBlocks(t *testing.T) { } func assembleBlock(api *ConsensusAPI, parentHash common.Hash, params *beacon.PayloadAttributesV1) (*beacon.ExecutableDataV1, error) { - req, err := api.eth.Miner().RequestSealingBlock(parentHash, params.Timestamp, params.SuggestedFeeRecipient, params.Random) - if err != nil { - return nil, err - } - time.Sleep(10 * time.Millisecond) - block, err := api.eth.Miner().GetSealingBlock(req) + block, err := api.eth.Miner().GetSealingBlockSync(parentHash, params.Timestamp, params.SuggestedFeeRecipient, params.Random, false) if err != nil { return nil, err } diff --git a/eth/catalyst/queue.go b/eth/catalyst/queue.go index 82000b8a95b5..276f0bac62ce 100644 --- a/eth/catalyst/queue.go +++ b/eth/catalyst/queue.go @@ -18,11 +18,11 @@ package catalyst import ( "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/beacon" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/miner" ) // maxTrackedPayloads is the maximum number of prepared payloads the execution @@ -35,11 +35,47 @@ const maxTrackedPayloads = 10 // latest one; but have a slight wiggle room for non-ideal conditions. const maxTrackedHeaders = 10 +// payload wraps the execution payload generated by miner +type payload struct { + lock sync.Mutex + done bool + empty *types.Block + block *types.Block + result chan *types.Block +} + +// resolve extracts the generated full block from the given channel if possible +// or fallback to empty block as an alternative. +func (req *payload) resolve() *beacon.ExecutableDataV1 { + // this function can be called concurrently, prevent any + // concurrency issue in the first place. + req.lock.Lock() + defer req.lock.Unlock() + + // Try to resolve the full block first if it's not obtained + // yet. The returned block can be nil if the generation fails. + if !req.done { + select { + case req.block = <-req.result: + req.done = true + case <-time.NewTimer(time.Millisecond * 100).C: + // TODO(rjl49345642, Marius), should we keep this + // 100ms timeout allowance? Why not just use the + // default and then fallback to empty directly? + } + } + block := req.empty + if req.block != nil { + block = req.block + } + return beacon.BlockToExecutableData(block) +} + // payloadQueueItem represents an id->payload tuple to store until it's retrieved // or evicted. type payloadQueueItem struct { - id beacon.PayloadID - payload *miner.GetWorkReq + id beacon.PayloadID + data *payload } // payloadQueue tracks the latest handful of constructed payloads to be retrieved @@ -58,19 +94,19 @@ func newPayloadQueue() *payloadQueue { } // put inserts a new payload into the queue at the given id. -func (q *payloadQueue) put(id beacon.PayloadID, req *miner.GetWorkReq) { +func (q *payloadQueue) put(id beacon.PayloadID, data *payload) { q.lock.Lock() defer q.lock.Unlock() copy(q.payloads[1:], q.payloads) q.payloads[0] = &payloadQueueItem{ - id: id, - payload: req, + id: id, + data: data, } } // get retrieves a previously stored payload item or nil if it does not exist. -func (q *payloadQueue) get(id beacon.PayloadID) *miner.GetWorkReq { +func (q *payloadQueue) get(id beacon.PayloadID) *beacon.ExecutableDataV1 { q.lock.RLock() defer q.lock.RUnlock() @@ -79,7 +115,7 @@ func (q *payloadQueue) get(id beacon.PayloadID) *miner.GetWorkReq { return nil // no more items } if item.id == id { - return item.payload + return item.data.resolve() } } return nil diff --git a/miner/miner.go b/miner/miner.go index fe3ce9abf031..fb4d3d6029e7 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -241,14 +241,40 @@ func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscript return miner.worker.pendingLogsFeed.Subscribe(ch) } -// GetSealingBlock retrieves a sealing block based on the given parameters. -// The returned block is not sealed but all other fields should be filled. -func (miner *Miner) RequestSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*GetWorkReq, error) { - return miner.worker.requestSealingBlock(parent, timestamp, coinbase, random) +// GetSealingBlockAsync requests to generate a sealing block according to the +// given parameters. Regardless of whether the generation is successful or not, +// there is always a result that will be returned through the result channel. +// The difference is that if the execution fails, the returned result is nil +// and the concrete error is dropped silently. +func (miner *Miner) GetSealingBlockAsync(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (chan *types.Block, error) { + ch := make(chan *blockOrError) + if err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs, ch); err != nil { + return nil, err // worker is closed + } + // Spin up a translation go-routine to feed back the result returned + // by the background generator. It's safe to do it since the result + // will always be returned in a reasonable time allowance if the request + // is accepted. + ret := make(chan *types.Block, 1) + go func() { + result := <-ch + if result.err != nil { + ret <- nil + } else { + ret <- result.block + } + }() + return ret, nil } -// GetEmptyBlock creates an empty block based on the given parameters. -// The returned block is not sealed, but all other fields should be filled. -func (miner *Miner) GetSealingBlock(req *GetWorkReq) (*types.Block, error) { - return miner.worker.getSealingBlock(req) +// GetSealingBlockSync creates a sealing block according to the given parameters. +// If the generation is failed or the underlying work is already closed, an error +// will be returned. +func (miner *Miner) GetSealingBlockSync(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (*types.Block, error) { + ch := make(chan *blockOrError, 1) + if err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs, ch); err != nil { + return nil, err + } + result := <-ch + return result.block, result.err } diff --git a/miner/worker.go b/miner/worker.go index 6887dbfba90f..fdd7c0dcf5b6 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -167,11 +167,17 @@ type newWorkReq struct { timestamp int64 } -// GetWorkReq represents a request for getting a new sealing work with provided parameters. -type GetWorkReq struct { +// blockOrError defines the result object returned by getWorkCh, error will be set +// in case block generation is failed. +type blockOrError struct { + err error + block *types.Block +} + +// getWorkReq represents a request for getting a new sealing work with provided parameters. +type getWorkReq struct { params *generateParams - err error - result chan *types.Block + result chan *blockOrError // non-blocking channel } // intervalAdjust represents a resubmitting interval adjustment. @@ -203,7 +209,7 @@ type worker struct { // Channels newWorkCh chan *newWorkReq - getWorkCh chan *GetWorkReq + getWorkCh chan *getWorkReq taskCh chan *task resultCh chan *types.Block startCh chan struct{} @@ -268,7 +274,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), newWorkCh: make(chan *newWorkReq), - getWorkCh: make(chan *GetWorkReq), + getWorkCh: make(chan *getWorkReq), taskCh: make(chan *task), resultCh: make(chan *types.Block, resultQueueSize), exitCh: make(chan struct{}), @@ -535,12 +541,11 @@ func (w *worker) mainLoop() { case req := <-w.getWorkCh: block, err := w.generateWork(req.params) - if err != nil { - req.err = err - req.result <- nil - } else { - req.result <- block - } + + req.result <- &blockOrError{ + err: err, + block: block, + } // this channel is expected to be non-blocking case ev := <-w.chainSideCh: // Short circuit for duplicate side blocks @@ -969,6 +974,7 @@ type generateParams struct { random common.Hash // The randomness generated by beacon chain, empty before the merge noUncle bool // Flag whether the uncle block inclusion is allowed noExtra bool // Flag whether the extra field assignment is allowed + noTxs bool // Flag whether an empty block without any transaction is expected } // prepareWork constructs the sealing task according to the given parameters, @@ -1090,8 +1096,9 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) { } defer work.discard() - w.fillTransactions(nil, work) - + if !params.noTxs { + w.fillTransactions(nil, work) + } return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts) } @@ -1128,7 +1135,6 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { work.discard() return } - w.commit(work.copy(), w.fullTaskHook, true, start) // Swap out the old work with the new one, terminating any leftover @@ -1176,60 +1182,29 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti return nil } -// requestSealingBlock starts generating the sealing block based on the given parameters. -func (w *worker) requestSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*GetWorkReq, error) { - params := &generateParams{ - timestamp: timestamp, - forceTime: true, - parentHash: parent, - coinbase: coinbase, - random: random, - noUncle: true, - noExtra: true, - } - - // Do the pre-checks, discard the result - env, err := w.prepareWork(params) - if err != nil { - return nil, err - } - defer env.discard() - - req := &GetWorkReq{ - params: params, - result: make(chan *types.Block, 1), +// getSealingBlock generates the sealing block based on the given parameters. +// The generation result will be passed back via the given channel no matter +// the generation itself succeeds or not. The assumption is always held the +// given channel has at-least 1-size buffer for storing result. +func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool, result chan *blockOrError) error { + req := &getWorkReq{ + params: &generateParams{ + timestamp: timestamp, + forceTime: true, + parentHash: parent, + coinbase: coinbase, + random: random, + noUncle: true, + noExtra: true, + noTxs: noTxs, + }, + result: result, } - select { case w.getWorkCh <- req: - return req, nil + return nil case <-w.exitCh: - return nil, errors.New("miner closed") - } -} - -func (w *worker) getSealingBlock(req *GetWorkReq) (*types.Block, error) { - if req == nil { - return nil, errors.New("invalid getWork request") - } - if req.err != nil { - return nil, req.err - } - timeout := time.NewTimer(100 * time.Millisecond) - select { - case block := <-req.result: - return block, nil - case <-w.exitCh: - return nil, errors.New("miner closed") - case <-timeout.C: - // return an empty block if the block production times out - work, err := w.prepareWork(req.params) - if err != nil { - return nil, err - } - defer work.discard() - - return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts) + return errors.New("miner closed") } } diff --git a/miner/worker_test.go b/miner/worker_test.go index af1fa0caded6..bec207dd3115 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -638,8 +638,10 @@ func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine co // This API should work even when the automatic sealing is not enabled for _, c := range cases { - req, _ := w.requestSealingBlock(c.parent, timestamp, c.coinbase, c.random) - block, err := w.getSealingBlock(req) + ch := make(chan *blockOrError, 1) + w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random, false, ch) + result := <-ch + block, err := result.block, result.err if c.expectErr { if err == nil { t.Error("Expect error but get nil") @@ -655,8 +657,10 @@ func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine co // This API should work even when the automatic sealing is enabled w.start() for _, c := range cases { - req, _ := w.requestSealingBlock(c.parent, timestamp, c.coinbase, c.random) - block, err := w.getSealingBlock(req) + ch := make(chan *blockOrError, 1) + w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random, false, ch) + result := <-ch + block, err := result.block, result.err if c.expectErr { if err == nil { t.Error("Expect error but get nil") From 6728d020e8c9ef9bf3e81daf92208638c476f3f5 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Tue, 17 May 2022 10:38:07 +0200 Subject: [PATCH 5/7] miner: rework GetSealingBlockAsync to use a passed channel --- eth/catalyst/api.go | 10 +++++----- miner/miner.go | 33 ++++++++++----------------------- miner/worker.go | 31 ++++++++++++++----------------- miner/worker_test.go | 16 ++++++++-------- 4 files changed, 37 insertions(+), 53 deletions(-) diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 75653b1d476c..e38cd2b75f17 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/beacon" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" @@ -195,13 +196,12 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update beacon.ForkchoiceStateV1, pa } // Send a request to generate a full block in the background. // The result can be obtained via the returned channel. - res, err := api.eth.Miner().GetSealingBlockAsync(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, false) - if err != nil { - log.Error("Failed to create sealing payload", "err", err) - return valid(nil), err // valid setHead, invalid payload + resultChan := make(chan *types.Block, 1) + if err := api.eth.Miner().GetSealingBlockAsync(resultChan, update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, false); err != nil { + return valid(nil), err } id := computePayloadId(update.HeadBlockHash, payloadAttributes) - api.localBlocks.put(id, &payload{empty: empty, result: res}) + api.localBlocks.put(id, &payload{empty: empty, result: resultChan}) return valid(&id), nil } return valid(nil), nil diff --git a/miner/miner.go b/miner/miner.go index fb4d3d6029e7..8c21dbb4eb9e 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -246,35 +246,22 @@ func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscript // there is always a result that will be returned through the result channel. // The difference is that if the execution fails, the returned result is nil // and the concrete error is dropped silently. -func (miner *Miner) GetSealingBlockAsync(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (chan *types.Block, error) { - ch := make(chan *blockOrError) - if err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs, ch); err != nil { - return nil, err // worker is closed - } - // Spin up a translation go-routine to feed back the result returned - // by the background generator. It's safe to do it since the result - // will always be returned in a reasonable time allowance if the request - // is accepted. - ret := make(chan *types.Block, 1) - go func() { - result := <-ch - if result.err != nil { - ret <- nil - } else { - ret <- result.block - } - }() - return ret, nil +// The caller of this method needs to set up a non-blocking result channel. +func (miner *Miner) GetSealingBlockAsync(resultChan chan *types.Block, parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) error { + _, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs, resultChan) + return err } // GetSealingBlockSync creates a sealing block according to the given parameters. // If the generation is failed or the underlying work is already closed, an error // will be returned. func (miner *Miner) GetSealingBlockSync(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (*types.Block, error) { - ch := make(chan *blockOrError, 1) - if err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs, ch); err != nil { + resultChan := make(chan *types.Block, 1) + errChan, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs, resultChan) + if err != nil { return nil, err } - result := <-ch - return result.block, result.err + result := <-resultChan + err = <-errChan + return result, err } diff --git a/miner/worker.go b/miner/worker.go index fdd7c0dcf5b6..9edcfec7e6d2 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -167,17 +167,11 @@ type newWorkReq struct { timestamp int64 } -// blockOrError defines the result object returned by getWorkCh, error will be set -// in case block generation is failed. -type blockOrError struct { - err error - block *types.Block -} - // getWorkReq represents a request for getting a new sealing work with provided parameters. type getWorkReq struct { params *generateParams - result chan *blockOrError // non-blocking channel + result chan *types.Block // non-blocking channel + err chan error } // intervalAdjust represents a resubmitting interval adjustment. @@ -541,12 +535,13 @@ func (w *worker) mainLoop() { case req := <-w.getWorkCh: block, err := w.generateWork(req.params) - - req.result <- &blockOrError{ - err: err, - block: block, - } // this channel is expected to be non-blocking - + if err != nil { + req.err <- err + req.result <- nil + } else { + req.err <- nil + req.result <- block + } case ev := <-w.chainSideCh: // Short circuit for duplicate side blocks if _, exist := w.localUncles[ev.Block.Hash()]; exist { @@ -1186,7 +1181,8 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti // The generation result will be passed back via the given channel no matter // the generation itself succeeds or not. The assumption is always held the // given channel has at-least 1-size buffer for storing result. -func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool, result chan *blockOrError) error { +func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool, result chan *types.Block) (chan error, error) { + errChan := make(chan error, 1) req := &getWorkReq{ params: &generateParams{ timestamp: timestamp, @@ -1199,12 +1195,13 @@ func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase noTxs: noTxs, }, result: result, + err: errChan, } select { case w.getWorkCh <- req: - return nil + return errChan, nil case <-w.exitCh: - return errors.New("miner closed") + return errChan, errors.New("miner closed") } } diff --git a/miner/worker_test.go b/miner/worker_test.go index bec207dd3115..39db195dd42b 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -638,10 +638,10 @@ func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine co // This API should work even when the automatic sealing is not enabled for _, c := range cases { - ch := make(chan *blockOrError, 1) - w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random, false, ch) - result := <-ch - block, err := result.block, result.err + ch := make(chan *types.Block, 1) + errChan, _ := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random, false, ch) + block := <-ch + err := <-errChan if c.expectErr { if err == nil { t.Error("Expect error but get nil") @@ -657,10 +657,10 @@ func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine co // This API should work even when the automatic sealing is enabled w.start() for _, c := range cases { - ch := make(chan *blockOrError, 1) - w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random, false, ch) - result := <-ch - block, err := result.block, result.err + ch := make(chan *types.Block, 1) + errChan, _ := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random, false, ch) + block := <-ch + err := <-errChan if c.expectErr { if err == nil { t.Error("Expect error but get nil") From 29d68c3718236f08c874ddfd499f44af1b7ed7f5 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Tue, 17 May 2022 18:14:19 +0200 Subject: [PATCH 6/7] miner: apply rjl's diff --- eth/catalyst/api.go | 7 +++---- miner/miner.go | 17 ++++++++--------- miner/worker.go | 16 +++++++++------- miner/worker_test.go | 10 ++++------ 4 files changed, 24 insertions(+), 26 deletions(-) diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index e38cd2b75f17..44b6abea2264 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -28,7 +28,6 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/beacon" "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" @@ -196,12 +195,12 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update beacon.ForkchoiceStateV1, pa } // Send a request to generate a full block in the background. // The result can be obtained via the returned channel. - resultChan := make(chan *types.Block, 1) - if err := api.eth.Miner().GetSealingBlockAsync(resultChan, update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, false); err != nil { + resChan, err := api.eth.Miner().GetSealingBlockAsync(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, false) + if err != nil { return valid(nil), err } id := computePayloadId(update.HeadBlockHash, payloadAttributes) - api.localBlocks.put(id, &payload{empty: empty, result: resultChan}) + api.localBlocks.put(id, &payload{empty: empty, result: resChan}) return valid(&id), nil } return valid(nil), nil diff --git a/miner/miner.go b/miner/miner.go index 8c21dbb4eb9e..4235b44cec50 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -246,22 +246,21 @@ func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscript // there is always a result that will be returned through the result channel. // The difference is that if the execution fails, the returned result is nil // and the concrete error is dropped silently. -// The caller of this method needs to set up a non-blocking result channel. -func (miner *Miner) GetSealingBlockAsync(resultChan chan *types.Block, parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) error { - _, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs, resultChan) - return err +func (miner *Miner) GetSealingBlockAsync(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (chan *types.Block, error) { + resChan, _, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs) + if err != nil { + return nil, err + } + return resChan, nil } // GetSealingBlockSync creates a sealing block according to the given parameters. // If the generation is failed or the underlying work is already closed, an error // will be returned. func (miner *Miner) GetSealingBlockSync(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (*types.Block, error) { - resultChan := make(chan *types.Block, 1) - errChan, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs, resultChan) + resChan, errChan, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs) if err != nil { return nil, err } - result := <-resultChan - err = <-errChan - return result, err + return <-resChan, <-errChan } diff --git a/miner/worker.go b/miner/worker.go index 9edcfec7e6d2..2a6415450e37 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1179,10 +1179,12 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti // getSealingBlock generates the sealing block based on the given parameters. // The generation result will be passed back via the given channel no matter -// the generation itself succeeds or not. The assumption is always held the -// given channel has at-least 1-size buffer for storing result. -func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool, result chan *types.Block) (chan error, error) { - errChan := make(chan error, 1) +// the generation itself succeeds or not. +func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (chan *types.Block, chan error, error) { + var ( + resChan = make(chan *types.Block, 1) + errChan = make(chan error, 1) + ) req := &getWorkReq{ params: &generateParams{ timestamp: timestamp, @@ -1194,14 +1196,14 @@ func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase noExtra: true, noTxs: noTxs, }, - result: result, + result: resChan, err: errChan, } select { case w.getWorkCh <- req: - return errChan, nil + return resChan, errChan, nil case <-w.exitCh: - return errChan, errors.New("miner closed") + return nil, nil, errors.New("miner closed") } } diff --git a/miner/worker_test.go b/miner/worker_test.go index 39db195dd42b..55361349bcca 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -638,9 +638,8 @@ func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine co // This API should work even when the automatic sealing is not enabled for _, c := range cases { - ch := make(chan *types.Block, 1) - errChan, _ := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random, false, ch) - block := <-ch + resChan, errChan, _ := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random, false) + block := <-resChan err := <-errChan if c.expectErr { if err == nil { @@ -657,9 +656,8 @@ func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine co // This API should work even when the automatic sealing is enabled w.start() for _, c := range cases { - ch := make(chan *types.Block, 1) - errChan, _ := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random, false, ch) - block := <-ch + resChan, errChan, _ := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random, false) + block := <-resChan err := <-errChan if c.expectErr { if err == nil { From 8349baf6e83548c4968bcdb4bd7c53aa398ed286 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Wed, 18 May 2022 16:01:06 +0200 Subject: [PATCH 7/7] eth/catalyst: nitpicks --- eth/catalyst/api.go | 4 ++-- eth/catalyst/queue.go | 15 ++++++++++----- miner/miner.go | 8 ++++---- miner/worker.go | 10 +++++----- tests/testdata | 2 +- 5 files changed, 22 insertions(+), 17 deletions(-) diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 44b6abea2264..2c793e7becd6 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -195,12 +195,12 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update beacon.ForkchoiceStateV1, pa } // Send a request to generate a full block in the background. // The result can be obtained via the returned channel. - resChan, err := api.eth.Miner().GetSealingBlockAsync(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, false) + resCh, err := api.eth.Miner().GetSealingBlockAsync(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, false) if err != nil { return valid(nil), err } id := computePayloadId(update.HeadBlockHash, payloadAttributes) - api.localBlocks.put(id, &payload{empty: empty, result: resChan}) + api.localBlocks.put(id, &payload{empty: empty, result: resCh}) return valid(&id), nil } return valid(nil), nil diff --git a/eth/catalyst/queue.go b/eth/catalyst/queue.go index 276f0bac62ce..ff8edc1201c4 100644 --- a/eth/catalyst/queue.go +++ b/eth/catalyst/queue.go @@ -35,7 +35,8 @@ const maxTrackedPayloads = 10 // latest one; but have a slight wiggle room for non-ideal conditions. const maxTrackedHeaders = 10 -// payload wraps the execution payload generated by miner +// payload wraps the miner's block production channel, allowing the mined block +// to be retrieved later upon the GetPayload engine API call. type payload struct { lock sync.Mutex done bool @@ -54,21 +55,25 @@ func (req *payload) resolve() *beacon.ExecutableDataV1 { // Try to resolve the full block first if it's not obtained // yet. The returned block can be nil if the generation fails. + if !req.done { + timeout := time.NewTimer(500 * time.Millisecond) + defer timeout.Stop() + select { case req.block = <-req.result: req.done = true - case <-time.NewTimer(time.Millisecond * 100).C: + case <-timeout.C: // TODO(rjl49345642, Marius), should we keep this // 100ms timeout allowance? Why not just use the // default and then fallback to empty directly? } } - block := req.empty + if req.block != nil { - block = req.block + return beacon.BlockToExecutableData(req.block) } - return beacon.BlockToExecutableData(block) + return beacon.BlockToExecutableData(req.empty) } // payloadQueueItem represents an id->payload tuple to store until it's retrieved diff --git a/miner/miner.go b/miner/miner.go index 4235b44cec50..16c3bf19d263 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -247,20 +247,20 @@ func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscript // The difference is that if the execution fails, the returned result is nil // and the concrete error is dropped silently. func (miner *Miner) GetSealingBlockAsync(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (chan *types.Block, error) { - resChan, _, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs) + resCh, _, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs) if err != nil { return nil, err } - return resChan, nil + return resCh, nil } // GetSealingBlockSync creates a sealing block according to the given parameters. // If the generation is failed or the underlying work is already closed, an error // will be returned. func (miner *Miner) GetSealingBlockSync(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (*types.Block, error) { - resChan, errChan, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs) + resCh, errCh, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs) if err != nil { return nil, err } - return <-resChan, <-errChan + return <-resCh, <-errCh } diff --git a/miner/worker.go b/miner/worker.go index 2a6415450e37..ae1b61d42411 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1182,8 +1182,8 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti // the generation itself succeeds or not. func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (chan *types.Block, chan error, error) { var ( - resChan = make(chan *types.Block, 1) - errChan = make(chan error, 1) + resCh = make(chan *types.Block, 1) + errCh = make(chan error, 1) ) req := &getWorkReq{ params: &generateParams{ @@ -1196,12 +1196,12 @@ func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase noExtra: true, noTxs: noTxs, }, - result: resChan, - err: errChan, + result: resCh, + err: errCh, } select { case w.getWorkCh <- req: - return resChan, errChan, nil + return resCh, errCh, nil case <-w.exitCh: return nil, nil, errors.New("miner closed") } diff --git a/tests/testdata b/tests/testdata index 092a8834dc44..a380655e5ffa 160000 --- a/tests/testdata +++ b/tests/testdata @@ -1 +1 @@ -Subproject commit 092a8834dc445e683103689d6f0e75a5d380a190 +Subproject commit a380655e5ffab1a5ea0f4d860224bdb19013f06a