From c7990a3eb12884072f2dbd19907900391d383981 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96mer=20Faruk=20Irmak?= Date: Tue, 21 May 2024 16:11:06 +0300 Subject: [PATCH] feat(worker): pipeline block building (#735) --- core/blockchain.go | 5 + eth/api.go | 5 - internal/web3ext/web3ext.go | 5 - miner/miner.go | 5 - miner/scroll_worker.go | 939 ++++++++++++++++++ .../{worker_test.go => scroll_worker_test.go} | 319 +----- miner/worker.go | 2 + params/version.go | 2 +- rollup/pipeline/pipeline.go | 410 ++++++++ 9 files changed, 1366 insertions(+), 326 deletions(-) create mode 100644 miner/scroll_worker.go rename miner/{worker_test.go => scroll_worker_test.go} (80%) create mode 100644 rollup/pipeline/pipeline.go diff --git a/core/blockchain.go b/core/blockchain.go index 61809163afce..e523463bbd35 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2286,3 +2286,8 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i _, err := bc.hc.InsertHeaderChain(chain, start) return 0, err } + +// Database gives access to the underlying database for convenience +func (bc *BlockChain) Database() ethdb.Database { + return bc.db +} diff --git a/eth/api.go b/eth/api.go index da85be18481e..8c2681238da2 100644 --- a/eth/api.go +++ b/eth/api.go @@ -143,11 +143,6 @@ func (api *PrivateMinerAPI) SetEtherbase(etherbase common.Address) bool { return true } -// SetRecommitInterval updates the interval for miner sealing work recommitting. -func (api *PrivateMinerAPI) SetRecommitInterval(interval int) { - api.e.Miner().SetRecommitInterval(time.Duration(interval) * time.Millisecond) -} - // PrivateAdminAPI is the collection of Ethereum full node-related APIs // exposed over the private admin endpoint. type PrivateAdminAPI struct { diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index 04fc46cb421d..65778007a28b 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -637,11 +637,6 @@ web3._extend({ params: 1, inputFormatter: [web3._extend.utils.fromDecimal] }), - new web3._extend.Method({ - name: 'setRecommitInterval', - call: 'miner_setRecommitInterval', - params: 1, - }), new web3._extend.Method({ name: 'getHashrate', call: 'miner_getHashrate' diff --git a/miner/miner.go b/miner/miner.go index 95992b90000f..4c79ef4d182e 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -188,11 +188,6 @@ func (miner *Miner) SetExtra(extra []byte) error { return nil } -// SetRecommitInterval sets the interval for sealing work resubmitting. -func (miner *Miner) SetRecommitInterval(interval time.Duration) { - miner.worker.setRecommitInterval(interval) -} - // Pending returns the currently pending block and associated state. func (miner *Miner) Pending() (*types.Block, *state.StateDB) { return miner.worker.pending() diff --git a/miner/scroll_worker.go b/miner/scroll_worker.go new file mode 100644 index 000000000000..dedc46096709 --- /dev/null +++ b/miner/scroll_worker.go @@ -0,0 +1,939 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package miner + +import ( + "bytes" + "errors" + "math" + "math/big" + "sync" + "sync/atomic" + "time" + + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/consensus" + "github.com/scroll-tech/go-ethereum/consensus/misc" + "github.com/scroll-tech/go-ethereum/core" + "github.com/scroll-tech/go-ethereum/core/rawdb" + "github.com/scroll-tech/go-ethereum/core/state" + "github.com/scroll-tech/go-ethereum/core/types" + "github.com/scroll-tech/go-ethereum/event" + "github.com/scroll-tech/go-ethereum/log" + "github.com/scroll-tech/go-ethereum/metrics" + "github.com/scroll-tech/go-ethereum/params" + "github.com/scroll-tech/go-ethereum/rollup/circuitcapacitychecker" + "github.com/scroll-tech/go-ethereum/rollup/fees" + "github.com/scroll-tech/go-ethereum/rollup/pipeline" + "github.com/scroll-tech/go-ethereum/trie" +) + +const ( + // resultQueueSize is the size of channel listening to sealing result. + resultQueueSize = 10 + + // txChanSize is the size of channel listening to NewTxsEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 + + // chainHeadChanSize is the size of channel listening to ChainHeadEvent. + chainHeadChanSize = 10 + + // chainSideChanSize is the size of channel listening to ChainSideEvent. + chainSideChanSize = 10 + + // miningLogAtDepth is the number of confirmations before logging successful mining. + miningLogAtDepth = 7 + + // minRecommitInterval is the minimal time interval to recreate the mining block with + // any newly arrived transactions. + minRecommitInterval = 1 * time.Second + + // staleThreshold is the maximum depth of the acceptable stale block. + staleThreshold = 7 +) + +var ( + // Metrics for the skipped txs + l1TxGasLimitExceededCounter = metrics.NewRegisteredCounter("miner/skipped_txs/l1/gas_limit_exceeded", nil) + l1TxRowConsumptionOverflowCounter = metrics.NewRegisteredCounter("miner/skipped_txs/l1/row_consumption_overflow", nil) + l2TxRowConsumptionOverflowCounter = metrics.NewRegisteredCounter("miner/skipped_txs/l2/row_consumption_overflow", nil) + l1TxCccUnknownErrCounter = metrics.NewRegisteredCounter("miner/skipped_txs/l1/ccc_unknown_err", nil) + l2TxCccUnknownErrCounter = metrics.NewRegisteredCounter("miner/skipped_txs/l2/ccc_unknown_err", nil) + l1TxStrangeErrCounter = metrics.NewRegisteredCounter("miner/skipped_txs/l1/strange_err", nil) + + collectL1MsgsTimer = metrics.NewRegisteredTimer("miner/collect_l1_msgs", nil) + prepareTimer = metrics.NewRegisteredTimer("miner/prepare", nil) + collectL2Timer = metrics.NewRegisteredTimer("miner/collect_l2_txns", nil) + l2CommitTimer = metrics.NewRegisteredTimer("miner/commit", nil) + resultTimer = metrics.NewRegisteredTimer("miner/result", nil) + + commitReasonCCCCounter = metrics.NewRegisteredCounter("miner/commit_reason_ccc", nil) + commitReasonDeadlineCounter = metrics.NewRegisteredCounter("miner/commit_reason_deadline", nil) + commitGasCounter = metrics.NewRegisteredCounter("miner/commit_gas", nil) +) + +// task contains all information for consensus engine sealing and result submitting. +type task struct { + receipts []*types.Receipt + state *state.StateDB + block *types.Block + createdAt time.Time + accRows *types.RowConsumption // accumulated row consumption in the circuit side + nextL1MsgIndex uint64 // next L1 queue index to be processed +} + +// newWorkReq represents a request for new sealing work submitting with relative interrupt notifier. +type newWorkReq struct { + noempty bool + timestamp int64 +} + +// prioritizedTransaction represents a single transaction that +// should be processed as the first transaction in the next block. +type prioritizedTransaction struct { + blockNumber uint64 + tx *types.Transaction +} + +// worker is the main object which takes care of submitting new work to consensus engine +// and gathering the sealing result. +type worker struct { + config *Config + chainConfig *params.ChainConfig + engine consensus.Engine + eth Backend + chain *core.BlockChain + + // Feeds + pendingLogsFeed event.Feed + + // Subscriptions + mux *event.TypeMux + txsCh chan core.NewTxsEvent + txsSub event.Subscription + chainHeadCh chan core.ChainHeadEvent + chainHeadSub event.Subscription + chainSideCh chan core.ChainSideEvent + chainSideSub event.Subscription + + // Channels + newWorkCh chan *newWorkReq + taskCh chan *task + resultCh chan *types.Block + startCh chan struct{} + exitCh chan struct{} + + wg sync.WaitGroup + + currentPipelineStart time.Time + currentPipeline *pipeline.Pipeline + + localUncles map[common.Hash]*types.Block // A set of side blocks generated locally as the possible uncle blocks. + remoteUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks. + unconfirmed *unconfirmedBlocks // A set of locally mined blocks pending canonicalness confirmations. + + mu sync.RWMutex // The lock used to protect the coinbase and extra fields + coinbase common.Address + extra []byte + + pendingMu sync.RWMutex + pendingTasks map[common.Hash]*task + + snapshotMu sync.RWMutex // The lock used to protect the snapshots below + snapshotBlock *types.Block + snapshotReceipts types.Receipts + snapshotState *state.StateDB + + // atomic status counters + running int32 // The indicator whether the consensus engine is running or not. + newTxs int32 // New arrival transaction count since last sealing work submitting. + newL1Msgs int32 // New arrival L1 message count since last sealing work submitting. + + // noempty is the flag used to control whether the feature of pre-seal empty + // block is enabled. The default value is false(pre-seal is enabled by default). + // But in some special scenario the consensus engine will seal blocks instantaneously, + // in this case this feature will add all empty blocks into canonical chain + // non-stop and no real transaction will be included. + noempty uint32 + + // External functions + isLocalBlock func(block *types.Block) bool // Function used to determine whether the specified block is mined by local miner. + + circuitCapacityChecker *circuitcapacitychecker.CircuitCapacityChecker + prioritizedTx *prioritizedTransaction + + // Test hooks + newTaskHook func(*task) // Method to call upon receiving a new sealing task. + skipSealHook func(*task) bool // Method to decide whether skipping the sealing. + beforeTxHook func() // Method to call before processing a transaction. +} + +func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker { + worker := &worker{ + config: config, + chainConfig: chainConfig, + engine: engine, + eth: eth, + mux: mux, + chain: eth.BlockChain(), + isLocalBlock: isLocalBlock, + localUncles: make(map[common.Hash]*types.Block), + remoteUncles: make(map[common.Hash]*types.Block), + unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), + pendingTasks: make(map[common.Hash]*task), + txsCh: make(chan core.NewTxsEvent, txChanSize), + chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), + chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), + newWorkCh: make(chan *newWorkReq), + taskCh: make(chan *task), + resultCh: make(chan *types.Block, resultQueueSize), + exitCh: make(chan struct{}), + startCh: make(chan struct{}, 1), + circuitCapacityChecker: circuitcapacitychecker.NewCircuitCapacityChecker(true), + } + log.Info("created new worker", "CircuitCapacityChecker ID", worker.circuitCapacityChecker.ID) + + // Subscribe NewTxsEvent for tx pool + worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) + + // Subscribe events for blockchain + worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) + worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh) + + // Sanitize recommit interval if the user-specified one is too short. + recommit := worker.config.Recommit + if recommit < minRecommitInterval { + log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval) + recommit = minRecommitInterval + } + + // Sanitize account fetch limit. + if worker.config.MaxAccountsNum == 0 { + log.Warn("Sanitizing miner account fetch limit", "provided", worker.config.MaxAccountsNum, "updated", math.MaxInt) + worker.config.MaxAccountsNum = math.MaxInt + } + + worker.wg.Add(4) + go worker.mainLoop() + go worker.newWorkLoop(recommit) + go worker.resultLoop() + go worker.taskLoop() + + // Submit first work to initialize pending state. + if init { + worker.startCh <- struct{}{} + } + return worker +} + +// getCCC returns a pointer to this worker's CCC instance. +// Only used in tests. +func (w *worker) getCCC() *circuitcapacitychecker.CircuitCapacityChecker { + return w.circuitCapacityChecker +} + +// setEtherbase sets the etherbase used to initialize the block coinbase field. +func (w *worker) setEtherbase(addr common.Address) { + w.mu.Lock() + defer w.mu.Unlock() + w.coinbase = addr +} + +func (w *worker) setGasCeil(ceil uint64) { + w.mu.Lock() + defer w.mu.Unlock() + w.config.GasCeil = ceil +} + +// setExtra sets the content used to initialize the block extra field. +func (w *worker) setExtra(extra []byte) { + w.mu.Lock() + defer w.mu.Unlock() + w.extra = extra +} + +// disablePreseal disables pre-sealing mining feature +func (w *worker) disablePreseal() { + atomic.StoreUint32(&w.noempty, 1) +} + +// enablePreseal enables pre-sealing mining feature +func (w *worker) enablePreseal() { + atomic.StoreUint32(&w.noempty, 0) +} + +// pending returns the pending state and corresponding block. +func (w *worker) pending() (*types.Block, *state.StateDB) { + // return a snapshot to avoid contention on currentMu mutex + w.snapshotMu.RLock() + defer w.snapshotMu.RUnlock() + if w.snapshotState == nil { + return nil, nil + } + return w.snapshotBlock, w.snapshotState.Copy() +} + +// pendingBlock returns pending block. +func (w *worker) pendingBlock() *types.Block { + // return a snapshot to avoid contention on currentMu mutex + w.snapshotMu.RLock() + defer w.snapshotMu.RUnlock() + return w.snapshotBlock +} + +// pendingBlockAndReceipts returns pending block and corresponding receipts. +func (w *worker) pendingBlockAndReceipts() (*types.Block, types.Receipts) { + // return a snapshot to avoid contention on currentMu mutex + w.snapshotMu.RLock() + defer w.snapshotMu.RUnlock() + return w.snapshotBlock, w.snapshotReceipts +} + +// start sets the running status as 1 and triggers new work submitting. +func (w *worker) start() { + atomic.StoreInt32(&w.running, 1) + w.startCh <- struct{}{} +} + +// stop sets the running status as 0. +func (w *worker) stop() { + atomic.StoreInt32(&w.running, 0) +} + +// isRunning returns an indicator whether worker is running or not. +func (w *worker) isRunning() bool { + return atomic.LoadInt32(&w.running) == 1 +} + +// close terminates all background threads maintained by the worker. +// Note the worker does not support being closed multiple times. +func (w *worker) close() { + atomic.StoreInt32(&w.running, 0) + close(w.exitCh) + w.wg.Wait() +} + +// newWorkLoop is a standalone goroutine to submit new mining work upon received events. +func (w *worker) newWorkLoop(recommit time.Duration) { + defer w.wg.Done() + var ( + timestamp int64 // timestamp for each round of mining. + ) + + // commit aborts in-flight transaction execution with given signal and resubmits a new one. + commit := func(noempty bool) { + select { + case w.newWorkCh <- &newWorkReq{noempty: noempty, timestamp: timestamp}: + case <-w.exitCh: + return + } + atomic.StoreInt32(&w.newTxs, 0) + atomic.StoreInt32(&w.newL1Msgs, 0) + } + // clearPending cleans the stale pending tasks. + clearPending := func(number uint64) { + w.pendingMu.Lock() + for h, t := range w.pendingTasks { + if t.block.NumberU64()+staleThreshold <= number { + delete(w.pendingTasks, h) + } + } + w.pendingMu.Unlock() + } + + for { + select { + case <-w.startCh: + clearPending(w.chain.CurrentBlock().NumberU64()) + timestamp = time.Now().Unix() + commit(false) + case head := <-w.chainHeadCh: + clearPending(head.Block.NumberU64()) + timestamp = time.Now().Unix() + commit(true) + case <-w.exitCh: + return + } + } +} + +// mainLoop is a standalone goroutine to regenerate the sealing task based on the received event. +func (w *worker) mainLoop() { + defer w.wg.Done() + defer w.txsSub.Unsubscribe() + defer w.chainHeadSub.Unsubscribe() + defer w.chainSideSub.Unsubscribe() + + deadCh := make(chan *pipeline.Result) + pipelineResultCh := func() <-chan *pipeline.Result { + if w.currentPipeline == nil { + return deadCh + } + return w.currentPipeline.ResultCh + } + + for { + select { + case req := <-w.newWorkCh: + w.startNewPipeline(req.timestamp) + case result := <-pipelineResultCh(): + w.handlePipelineResult(result) + case ev := <-w.txsCh: + // Apply transactions to the pending state + // + // Note all transactions received may not be continuous with transactions + // already included in the current mining block. These transactions will + // be automatically eliminated. + if w.currentPipeline != nil { + txs := make(map[common.Address]types.Transactions) + signer := types.MakeSigner(w.chainConfig, w.currentPipeline.Header.Number) + for _, tx := range ev.Txs { + acc, _ := types.Sender(signer, tx) + txs[acc] = append(txs[acc], tx) + } + txset := types.NewTransactionsByPriceAndNonce(signer, txs, w.currentPipeline.Header.BaseFee) + if result := w.currentPipeline.TryPushTxns(txset, w.onTxFailingInPipeline); result != nil { + w.handlePipelineResult(result) + } + } + atomic.AddInt32(&w.newTxs, int32(len(ev.Txs))) + + // System stopped + case <-w.exitCh: + return + case <-w.txsSub.Err(): + return + case <-w.chainHeadSub.Err(): + return + case <-w.chainSideSub.Err(): + return + } + } +} + +// taskLoop is a standalone goroutine to fetch sealing task from the generator and +// push them to consensus engine. +func (w *worker) taskLoop() { + defer w.wg.Done() + var ( + stopCh chan struct{} + prev common.Hash + ) + + // interrupt aborts the in-flight sealing task. + interrupt := func() { + if stopCh != nil { + close(stopCh) + stopCh = nil + } + } + for { + select { + case task := <-w.taskCh: + if w.newTaskHook != nil { + w.newTaskHook(task) + } + // Reject duplicate sealing work due to resubmitting. + sealHash := w.engine.SealHash(task.block.Header()) + if sealHash == prev { + continue + } + // Interrupt previous sealing operation + interrupt() + stopCh, prev = make(chan struct{}), sealHash + + if w.skipSealHook != nil && w.skipSealHook(task) { + continue + } + w.pendingMu.Lock() + w.pendingTasks[sealHash] = task + w.pendingMu.Unlock() + + if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil { + log.Warn("Block sealing failed", "err", err) + w.pendingMu.Lock() + delete(w.pendingTasks, sealHash) + w.pendingMu.Unlock() + } + case <-w.exitCh: + interrupt() + return + } + } +} + +// resultLoop is a standalone goroutine to handle sealing result submitting +// and flush relative data to the database. +func (w *worker) resultLoop() { + defer w.wg.Done() + for { + select { + case block := <-w.resultCh: + // Short circuit when receiving empty result. + if block == nil { + continue + } + // Short circuit when receiving duplicate result caused by resubmitting. + if w.chain.HasBlock(block.Hash(), block.NumberU64()) { + continue + } + + var ( + sealhash = w.engine.SealHash(block.Header()) + hash = block.Hash() + ) + + w.pendingMu.RLock() + task, exist := w.pendingTasks[sealhash] + w.pendingMu.RUnlock() + + if !exist { + log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", hash) + continue + } + + startTime := time.Now() + + // Different block could share same sealhash, deep copy here to prevent write-write conflict. + var ( + receipts = make([]*types.Receipt, len(task.receipts)) + logs []*types.Log + ) + for i, taskReceipt := range task.receipts { + receipt := new(types.Receipt) + receipts[i] = receipt + *receipt = *taskReceipt + + // add block location fields + receipt.BlockHash = hash + receipt.BlockNumber = block.Number() + receipt.TransactionIndex = uint(i) + + // Update the block hash in all logs since it is now available and not when the + // receipt/log of individual transactions were created. + receipt.Logs = make([]*types.Log, len(taskReceipt.Logs)) + for i, taskLog := range taskReceipt.Logs { + log := new(types.Log) + receipt.Logs[i] = log + *log = *taskLog + log.BlockHash = hash + } + logs = append(logs, receipt.Logs...) + } + // It's possible that we've stored L1 queue index for this block previously, + // in this case do not overwrite it. + if index := rawdb.ReadFirstQueueIndexNotInL2Block(w.eth.ChainDb(), hash); index == nil { + // Store first L1 queue index not processed by this block. + // Note: This accounts for both included and skipped messages. This + // way, if a block only skips messages, we won't reprocess the same + // messages from the next block. + log.Trace( + "Worker WriteFirstQueueIndexNotInL2Block", + "number", block.Number(), + "hash", hash.String(), + "task.nextL1MsgIndex", task.nextL1MsgIndex, + ) + rawdb.WriteFirstQueueIndexNotInL2Block(w.eth.ChainDb(), hash, task.nextL1MsgIndex) + } else { + log.Trace( + "Worker WriteFirstQueueIndexNotInL2Block: not overwriting existing index", + "number", block.Number(), + "hash", hash.String(), + "index", *index, + "task.nextL1MsgIndex", task.nextL1MsgIndex, + ) + } + // Store circuit row consumption. + log.Trace( + "Worker write block row consumption", + "id", w.circuitCapacityChecker.ID, + "number", block.Number(), + "hash", hash.String(), + "accRows", task.accRows, + ) + rawdb.WriteBlockRowConsumption(w.eth.ChainDb(), hash, task.accRows) + // Commit block and state to database. + _, err := w.chain.WriteBlockWithState(block, receipts, logs, task.state, true) + if err != nil { + resultTimer.Update(time.Since(startTime)) + log.Error("Failed writing block to chain", "err", err) + continue + } + log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", hash, + "elapsed", common.PrettyDuration(time.Since(task.createdAt))) + + // Broadcast the block and announce chain insertion event + w.mux.Post(core.NewMinedBlockEvent{Block: block}) + + // Insert the block into the set of pending ones to resultLoop for confirmations + w.unconfirmed.Insert(block.NumberU64(), block.Hash()) + + resultTimer.Update(time.Since(startTime)) + + case <-w.exitCh: + return + } + } +} + +// updateSnapshot updates pending snapshot block and state. +// Note this function assumes the current variable is thread safe. +func (w *worker) updateSnapshot(current *pipeline.BlockCandidate) { + w.snapshotMu.Lock() + defer w.snapshotMu.Unlock() + + w.snapshotBlock = types.NewBlock( + current.Header, + current.Txs, + nil, + current.Receipts, + trie.NewStackTrie(nil), + ) + w.snapshotReceipts = copyReceipts(current.Receipts) + w.snapshotState = current.State.Copy() +} + +func (w *worker) collectPendingL1Messages(startIndex uint64) []types.L1MessageTx { + maxCount := w.chainConfig.Scroll.L1Config.NumL1MessagesPerBlock + return rawdb.ReadL1MessagesFrom(w.eth.ChainDb(), startIndex, maxCount) +} + +// startNewPipeline generates several new sealing tasks based on the parent block. +func (w *worker) startNewPipeline(timestamp int64) { + + if w.currentPipeline != nil { + w.currentPipeline.Kill() + w.currentPipeline = nil + } + + parent := w.chain.CurrentBlock() + + num := parent.Number() + header := &types.Header{ + ParentHash: parent.Hash(), + Number: num.Add(num, common.Big1), + GasLimit: core.CalcGasLimit(parent.GasLimit(), w.config.GasCeil), + Extra: w.extra, + Time: uint64(timestamp), + } + // Set baseFee if we are on an EIP-1559 chain + if w.chainConfig.IsCurie(header.Number) { + state, err := w.chain.StateAt(parent.Root()) + if err != nil { + log.Error("Failed to create mining context", "err", err) + return + } + parentL1BaseFee := fees.GetL1BaseFee(state) + header.BaseFee = misc.CalcBaseFee(w.chainConfig, parent.Header(), parentL1BaseFee) + } + // Only set the coinbase if our consensus engine is running (avoid spurious block rewards) + if w.isRunning() { + if w.coinbase == (common.Address{}) { + log.Error("Refusing to mine without etherbase") + return + } + header.Coinbase = w.coinbase + } + + common.WithTimer(prepareTimer, func() { + if err := w.engine.Prepare(w.chain, header); err != nil { + log.Error("Failed to prepare header for mining", "err", err) + return + } + }) + + // If we are care about TheDAO hard-fork check whether to override the extra-data or not + if daoBlock := w.chainConfig.DAOForkBlock; daoBlock != nil { + // Check whether the block is among the fork extra-override range + limit := new(big.Int).Add(daoBlock, params.DAOForkExtraRange) + if header.Number.Cmp(daoBlock) >= 0 && header.Number.Cmp(limit) < 0 { + // Depending whether we support or oppose the fork, override differently + if w.chainConfig.DAOForkSupport { + header.Extra = common.CopyBytes(params.DAOForkBlockExtra) + } else if bytes.Equal(header.Extra, params.DAOForkBlockExtra) { + header.Extra = []byte{} // If miner opposes, don't let it use the reserved extra-data + } + } + } + + parentState, err := w.chain.StateAt(parent.Root()) + if err != nil { + log.Error("failed to fetch parent state", "err", err) + return + } + + // fetch l1Txs + var l1Messages []types.L1MessageTx + if w.chainConfig.Scroll.ShouldIncludeL1Messages() { + common.WithTimer(collectL1MsgsTimer, func() { + l1Messages = w.collectPendingL1Messages(*rawdb.ReadFirstQueueIndexNotInL2Block(w.eth.ChainDb(), parent.Hash())) + }) + } + + tidyPendingStart := time.Now() + // Fill the block with all available pending transactions. + pending := w.eth.TxPool().PendingWithMax(false, w.config.MaxAccountsNum) + // Split the pending transactions into locals and remotes + localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending + for _, account := range w.eth.TxPool().Locals() { + if txs := remoteTxs[account]; len(txs) > 0 { + delete(remoteTxs, account) + localTxs[account] = txs + } + } + collectL2Timer.UpdateSince(tidyPendingStart) + + var nextL1MsgIndex uint64 + if dbIndex := rawdb.ReadFirstQueueIndexNotInL2Block(w.chain.Database(), parent.Hash()); dbIndex != nil { + nextL1MsgIndex = *dbIndex + } else { + log.Error("failed to read nextL1MsgIndex", "parent", parent.Hash()) + return + } + + w.currentPipelineStart = time.Now() + w.currentPipeline = pipeline.NewPipeline(w.chain, w.chain.GetVMConfig(), parentState, header, nextL1MsgIndex, w.getCCC()).WithBeforeTxHook(w.beforeTxHook) + if err := w.currentPipeline.Start(time.Unix(int64(header.Time), 0)); err != nil { + log.Error("failed to start pipeline", "err", err) + return + } + + // Short circuit if there is no available pending transactions. + // But if we disable empty precommit already, ignore it. Since + // empty block is necessary to keep the liveness of the network. + if len(localTxs) == 0 && len(remoteTxs) == 0 && len(l1Messages) == 0 && atomic.LoadUint32(&w.noempty) == 0 { + return + } + + if w.chainConfig.Scroll.ShouldIncludeL1Messages() && len(l1Messages) > 0 { + log.Trace("Processing L1 messages for inclusion", "count", len(l1Messages)) + txs, err := types.NewL1MessagesByQueueIndex(l1Messages) + if err != nil { + log.Error("Failed to create L1 message set", "l1Messages", l1Messages, "err", err) + return + } + + if result := w.currentPipeline.TryPushTxns(txs, w.onTxFailingInPipeline); result != nil { + w.handlePipelineResult(result) + return + } + } + signer := types.MakeSigner(w.chainConfig, header.Number) + + if w.prioritizedTx != nil && w.currentPipeline.Header.Number.Uint64() > w.prioritizedTx.blockNumber { + w.prioritizedTx = nil + } + if w.prioritizedTx != nil { + from, _ := types.Sender(signer, w.prioritizedTx.tx) // error already checked before + txList := map[common.Address]types.Transactions{from: []*types.Transaction{w.prioritizedTx.tx}} + txs := types.NewTransactionsByPriceAndNonce(signer, txList, header.BaseFee) + if result := w.currentPipeline.TryPushTxns(txs, w.onTxFailingInPipeline); result != nil { + w.handlePipelineResult(result) + return + } + } + + if len(localTxs) > 0 { + txs := types.NewTransactionsByPriceAndNonce(signer, localTxs, header.BaseFee) + if result := w.currentPipeline.TryPushTxns(txs, w.onTxFailingInPipeline); result != nil { + w.handlePipelineResult(result) + return + } + } + if len(remoteTxs) > 0 { + txs := types.NewTransactionsByPriceAndNonce(signer, remoteTxs, header.BaseFee) + if result := w.currentPipeline.TryPushTxns(txs, w.onTxFailingInPipeline); result != nil { + w.handlePipelineResult(result) + return + } + } +} + +func (w *worker) handlePipelineResult(res *pipeline.Result) error { + if res != nil && res.OverflowingTx != nil { + if res.FinalBlock == nil { + // first txn overflowed the circuit, skip + log.Trace("Circuit capacity limit reached for a single tx", "tx", res.OverflowingTx.Hash().String(), + "isL1Message", res.OverflowingTx.IsL1MessageTx(), "reason", res.CCCErr.Error()) + + // Store skipped transaction in local db + overflowingTrace := res.OverflowingTrace + if !w.config.StoreSkippedTxTraces { + overflowingTrace = nil + } + rawdb.WriteSkippedTransaction(w.eth.ChainDb(), res.OverflowingTx, overflowingTrace, res.CCCErr.Error(), + w.currentPipeline.Header.Number.Uint64(), nil) + + if overflowingL1MsgTx := res.OverflowingTx.AsL1MessageTx(); overflowingL1MsgTx != nil { + rawdb.WriteFirstQueueIndexNotInL2Block(w.eth.ChainDb(), w.currentPipeline.Header.ParentHash, overflowingL1MsgTx.QueueIndex+1) + } else { + w.eth.TxPool().RemoveTx(res.OverflowingTx.Hash(), true) + } + } else if !res.OverflowingTx.IsL1MessageTx() { + // prioritize overflowing L2 message as the first txn next block + // no need to prioritize L1 messages, they are fetched in order + // and processed first in every block anyways + w.prioritizedTx = &prioritizedTransaction{ + blockNumber: w.currentPipeline.Header.Number.Uint64() + 1, + tx: res.OverflowingTx, + } + } + + switch { + case res.OverflowingTx.IsL1MessageTx() && + errors.Is(res.CCCErr, circuitcapacitychecker.ErrBlockRowConsumptionOverflow): + l1TxRowConsumptionOverflowCounter.Inc(1) + case !res.OverflowingTx.IsL1MessageTx() && + errors.Is(res.CCCErr, circuitcapacitychecker.ErrBlockRowConsumptionOverflow): + l2TxRowConsumptionOverflowCounter.Inc(1) + case res.OverflowingTx.IsL1MessageTx() && + errors.Is(res.CCCErr, circuitcapacitychecker.ErrUnknown): + l1TxCccUnknownErrCounter.Inc(1) + case !res.OverflowingTx.IsL1MessageTx() && + errors.Is(res.CCCErr, circuitcapacitychecker.ErrUnknown): + l2TxCccUnknownErrCounter.Inc(1) + } + } + + if !w.isRunning() { + if res != nil && res.FinalBlock != nil { + w.updateSnapshot(res.FinalBlock) + } + w.currentPipeline = nil + return nil + } + + if res == nil || res.FinalBlock == nil { + w.startNewPipeline(time.Now().Unix()) + return nil + } + return w.commit(res) +} + +// commit runs any post-transaction state modifications, assembles the final block +// and commits new work if consensus engine is running. +func (w *worker) commit(res *pipeline.Result) error { + defer func(t0 time.Time) { + l2CommitTimer.Update(time.Since(t0)) + }(time.Now()) + + if res.CCCErr != nil { + commitReasonCCCCounter.Inc(1) + } else { + commitReasonDeadlineCounter.Inc(1) + } + commitGasCounter.Inc(int64(res.FinalBlock.Header.GasUsed)) + + block, err := w.engine.FinalizeAndAssemble(w.chain, res.FinalBlock.Header, res.FinalBlock.State, + res.FinalBlock.Txs, nil, res.FinalBlock.Receipts) + if err != nil { + return err + } + + select { + case w.taskCh <- &task{receipts: res.FinalBlock.Receipts, state: res.FinalBlock.State, block: block, createdAt: time.Now(), + accRows: res.Rows, nextL1MsgIndex: res.FinalBlock.NextL1MsgIndex}: + w.unconfirmed.Shift(block.NumberU64() - 1) + log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()), + "txs", res.FinalBlock.Txs.Len(), + "gas", block.GasUsed(), "fees", totalFees(block, res.FinalBlock.Receipts), + "elapsed", common.PrettyDuration(time.Since(w.currentPipelineStart))) + case <-w.exitCh: + log.Info("Worker has exited") + } + + w.currentPipeline = nil + return nil +} + +// copyReceipts makes a deep copy of the given receipts. +func copyReceipts(receipts []*types.Receipt) []*types.Receipt { + result := make([]*types.Receipt, len(receipts)) + for i, l := range receipts { + cpy := *l + result[i] = &cpy + } + return result +} + +// postSideBlock fires a side chain event, only use it for testing. +func (w *worker) postSideBlock(event core.ChainSideEvent) { + select { + case w.chainSideCh <- event: + case <-w.exitCh: + } +} + +func (w *worker) onTxFailingInPipeline(txIndex int, tx *types.Transaction, err error) bool { + writeTrace := func() { + var trace *types.BlockTrace + var errWithTrace *pipeline.ErrorWithTrace + if w.config.StoreSkippedTxTraces && errors.As(err, &errWithTrace) { + trace = errWithTrace.Trace + } + rawdb.WriteSkippedTransaction(w.eth.ChainDb(), tx, trace, err.Error(), + w.currentPipeline.Header.Number.Uint64(), nil) + } + + switch { + case errors.Is(err, core.ErrGasLimitReached) && tx.IsL1MessageTx(): + // If this block already contains some L1 messages try again in the next block. + if txIndex > 0 { + break + } + // A single L1 message leads to out-of-gas. Skip it. + queueIndex := tx.AsL1MessageTx().QueueIndex + log.Info("Skipping L1 message", "queueIndex", queueIndex, "tx", tx.Hash().String(), "block", + w.currentPipeline.Header.Number, "reason", "gas limit exceeded") + writeTrace() + l1TxGasLimitExceededCounter.Inc(1) + + case errors.Is(err, core.ErrInsufficientFunds): + log.Trace("Skipping tx with insufficient funds", "tx", tx.Hash().String()) + w.eth.TxPool().RemoveTx(tx.Hash(), true) + + case errors.Is(err, pipeline.ErrUnexpectedL1MessageIndex): + log.Warn( + "Unexpected L1 message queue index in worker", + "got", tx.AsL1MessageTx().QueueIndex, + ) + case errors.Is(err, core.ErrGasLimitReached), errors.Is(err, core.ErrNonceTooLow), errors.Is(err, core.ErrNonceTooHigh), errors.Is(err, core.ErrTxTypeNotSupported): + break + default: + // Strange error + log.Debug("Transaction failed, account skipped", "hash", tx.Hash().String(), "err", err) + if tx.IsL1MessageTx() { + queueIndex := tx.AsL1MessageTx().QueueIndex + log.Info("Skipping L1 message", "queueIndex", queueIndex, "tx", tx.Hash().String(), "block", + w.currentPipeline.Header.Number, "reason", "strange error", "err", err) + writeTrace() + l1TxStrangeErrCounter.Inc(1) + } + } + return false +} + +// totalFees computes total consumed miner fees in ETH. Block transactions and receipts have to have the same order. +func totalFees(block *types.Block, receipts []*types.Receipt) *big.Float { + feesWei := new(big.Int) + for i, tx := range block.Transactions() { + minerFee, _ := tx.EffectiveGasTip(block.BaseFee()) + feesWei.Add(feesWei, new(big.Int).Mul(new(big.Int).SetUint64(receipts[i].GasUsed), minerFee)) + } + return new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether))) +} diff --git a/miner/worker_test.go b/miner/scroll_worker_test.go similarity index 80% rename from miner/worker_test.go rename to miner/scroll_worker_test.go index 6e099905f3e1..eadc8830daf2 100644 --- a/miner/worker_test.go +++ b/miner/scroll_worker_test.go @@ -20,7 +20,6 @@ import ( "math" "math/big" "math/rand" - "sync/atomic" "testing" "time" @@ -213,10 +212,6 @@ func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consens return w, backend } -func TestGenerateBlockAndImportEthash(t *testing.T) { - testGenerateBlockAndImport(t, false) -} - func TestGenerateBlockAndImportClique(t *testing.T) { testGenerateBlockAndImport(t, true) } @@ -235,6 +230,7 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool) { chainConfig = params.AllEthashProtocolChanges engine = ethash.NewFaker() } + chainConfig.Scroll.FeeVaultAddress = &common.Address{} chainConfig.LondonBlock = big.NewInt(0) w, b := newTestWorker(t, chainConfig, engine, db, 0) @@ -248,11 +244,6 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool) { Tracer: vm.NewStructLogger(&vm.LogConfig{EnableMemory: true, EnableReturnData: true})}, nil, nil) defer chain.Stop() - // Ignore empty commit here for less noise. - w.skipSealHook = func(task *task) bool { - return len(task.receipts) == 0 - } - // Wait for mined blocks. sub := w.mux.Subscribe(core.NewMinedBlockEvent{}) defer sub.Unsubscribe() @@ -278,267 +269,6 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool) { } } -func TestEmptyWorkEthash(t *testing.T) { - testEmptyWork(t, ethashChainConfig, ethash.NewFaker()) -} -func TestEmptyWorkClique(t *testing.T) { - testEmptyWork(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase())) -} - -func testEmptyWork(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { - defer engine.Close() - - w, _ := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0) - defer w.close() - - var ( - taskIndex int - taskCh = make(chan struct{}, 2) - ) - checkEqual := func(t *testing.T, task *task, index int) { - // The first empty work without any txs included - receiptLen, balance := 0, big.NewInt(0) - if index == 1 { - // The second full work with 1 tx included - receiptLen, balance = 1, big.NewInt(1000) - } - if len(task.receipts) != receiptLen { - t.Fatalf("receipt number mismatch: have %d, want %d", len(task.receipts), receiptLen) - } - if task.state.GetBalance(testUserAddress).Cmp(balance) != 0 { - t.Fatalf("account balance mismatch: have %d, want %d", task.state.GetBalance(testUserAddress), balance) - } - } - w.newTaskHook = func(task *task) { - if task.block.NumberU64() == 1 { - checkEqual(t, task, taskIndex) - taskIndex += 1 - taskCh <- struct{}{} - } - } - w.skipSealHook = func(task *task) bool { return true } - w.fullTaskHook = func() { - time.Sleep(100 * time.Millisecond) - } - w.start() // Start mining! - for i := 0; i < 2; i += 1 { - select { - case <-taskCh: - case <-time.NewTimer(3 * time.Second).C: - t.Error("new task timeout") - } - } -} - -func TestStreamUncleBlock(t *testing.T) { - ethash := ethash.NewFaker() - defer ethash.Close() - - w, b := newTestWorker(t, ethashChainConfig, ethash, rawdb.NewMemoryDatabase(), 1) - defer w.close() - - var taskCh = make(chan struct{}) - - taskIndex := 0 - w.newTaskHook = func(task *task) { - if task.block.NumberU64() == 2 { - // The first task is an empty task, the second - // one has 1 pending tx, the third one has 1 tx - // and 1 uncle. - if taskIndex == 2 { - have := task.block.Header().UncleHash - want := types.CalcUncleHash([]*types.Header{b.uncleBlock.Header()}) - if have != want { - t.Errorf("uncle hash mismatch: have %s, want %s", have.Hex(), want.Hex()) - } - } - taskCh <- struct{}{} - taskIndex += 1 - } - } - w.skipSealHook = func(task *task) bool { - return true - } - w.fullTaskHook = func() { - time.Sleep(100 * time.Millisecond) - } - w.start() - - for i := 0; i < 2; i += 1 { - select { - case <-taskCh: - case <-time.NewTimer(time.Second).C: - t.Error("new task timeout") - } - } - - w.postSideBlock(core.ChainSideEvent{Block: b.uncleBlock}) - - select { - case <-taskCh: - case <-time.NewTimer(time.Second).C: - t.Error("new task timeout") - } -} - -func TestRegenerateMiningBlockEthash(t *testing.T) { - testRegenerateMiningBlock(t, ethashChainConfig, ethash.NewFaker()) -} - -func TestRegenerateMiningBlockClique(t *testing.T) { - testRegenerateMiningBlock(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase())) -} - -func testRegenerateMiningBlock(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { - defer engine.Close() - - w, b := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0) - defer w.close() - - var taskCh = make(chan struct{}) - - taskIndex := 0 - w.newTaskHook = func(task *task) { - if task.block.NumberU64() == 1 { - // The first task is an empty task, the second - // one has 1 pending tx, the third one has 2 txs - if taskIndex == 2 { - receiptLen, balance := 2, big.NewInt(2000) - if len(task.receipts) != receiptLen { - t.Errorf("receipt number mismatch: have %d, want %d", len(task.receipts), receiptLen) - } - if task.state.GetBalance(testUserAddress).Cmp(balance) != 0 { - t.Errorf("account balance mismatch: have %d, want %d", task.state.GetBalance(testUserAddress), balance) - } - } - taskCh <- struct{}{} - taskIndex += 1 - } - } - w.skipSealHook = func(task *task) bool { - return true - } - w.fullTaskHook = func() { - time.Sleep(100 * time.Millisecond) - } - - w.start() - // Ignore the first two works - for i := 0; i < 2; i += 1 { - select { - case <-taskCh: - case <-time.NewTimer(time.Second).C: - t.Error("new task timeout") - } - } - b.txPool.AddLocals(newTxs) - time.Sleep(time.Second) - - select { - case <-taskCh: - case <-time.NewTimer(time.Second).C: - t.Error("new task timeout") - } -} - -func TestAdjustIntervalEthash(t *testing.T) { - testAdjustInterval(t, ethashChainConfig, ethash.NewFaker()) -} - -func TestAdjustIntervalClique(t *testing.T) { - testAdjustInterval(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase())) -} - -func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { - defer engine.Close() - - w, _ := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0) - defer w.close() - - w.skipSealHook = func(task *task) bool { - return true - } - w.fullTaskHook = func() { - time.Sleep(100 * time.Millisecond) - } - var ( - progress = make(chan struct{}, 10) - result = make([]float64, 0, 10) - index = 0 - start uint32 - ) - w.resubmitHook = func(minInterval time.Duration, recommitInterval time.Duration) { - // Short circuit if interval checking hasn't started. - if atomic.LoadUint32(&start) == 0 { - return - } - var wantMinInterval, wantRecommitInterval time.Duration - - switch index { - case 0: - wantMinInterval, wantRecommitInterval = 3*time.Second, 3*time.Second - case 1: - origin := float64(3 * time.Second.Nanoseconds()) - estimate := origin*(1-intervalAdjustRatio) + intervalAdjustRatio*(origin/0.8+intervalAdjustBias) - wantMinInterval, wantRecommitInterval = 3*time.Second, time.Duration(estimate)*time.Nanosecond - case 2: - estimate := result[index-1] - min := float64(3 * time.Second.Nanoseconds()) - estimate = estimate*(1-intervalAdjustRatio) + intervalAdjustRatio*(min-intervalAdjustBias) - wantMinInterval, wantRecommitInterval = 3*time.Second, time.Duration(estimate)*time.Nanosecond - case 3: - wantMinInterval, wantRecommitInterval = time.Second, time.Second - } - - // Check interval - if minInterval != wantMinInterval { - t.Errorf("resubmit min interval mismatch: have %v, want %v ", minInterval, wantMinInterval) - } - if recommitInterval != wantRecommitInterval { - t.Errorf("resubmit interval mismatch: have %v, want %v", recommitInterval, wantRecommitInterval) - } - result = append(result, float64(recommitInterval.Nanoseconds())) - index += 1 - progress <- struct{}{} - } - w.start() - - time.Sleep(time.Second) // Ensure two tasks have been summitted due to start opt - atomic.StoreUint32(&start, 1) - - w.setRecommitInterval(3 * time.Second) - select { - case <-progress: - case <-time.NewTimer(time.Second).C: - t.Error("interval reset timeout") - } - - w.resubmitAdjustCh <- &intervalAdjust{inc: true, ratio: 0.8} - select { - case <-progress: - case <-time.NewTimer(time.Second).C: - t.Error("interval reset timeout") - } - - w.resubmitAdjustCh <- &intervalAdjust{inc: false} - select { - case <-progress: - case <-time.NewTimer(time.Second).C: - t.Error("interval reset timeout") - } - - w.setRecommitInterval(500 * time.Millisecond) - select { - case <-progress: - case <-time.NewTimer(time.Second).C: - t.Error("interval reset timeout") - } -} - -func TestGenerateBlockWithL1MsgEthash(t *testing.T) { - testGenerateBlockWithL1Msg(t, false) -} - func TestGenerateBlockWithL1MsgClique(t *testing.T) { testGenerateBlockWithL1Msg(t, true) } @@ -566,6 +296,7 @@ func testGenerateBlockWithL1Msg(t *testing.T, isClique bool) { chainConfig.Scroll.L1Config = ¶ms.L1Config{ NumL1MessagesPerBlock: 1, } + chainConfig.Scroll.FeeVaultAddress = &common.Address{} chainConfig.LondonBlock = big.NewInt(0) w, b := newTestWorker(t, chainConfig, engine, db, 0) @@ -578,11 +309,6 @@ func testGenerateBlockWithL1Msg(t *testing.T, isClique bool) { Tracer: vm.NewStructLogger(&vm.LogConfig{EnableMemory: true, EnableReturnData: true})}, nil, nil) defer chain.Stop() - // Ignore empty commit here for less noise. - w.skipSealHook = func(task *task) bool { - return len(task.receipts) == 0 - } - // Wait for mined blocks. sub := w.mux.Subscribe(core.NewMinedBlockEvent{}) defer sub.Unsubscribe() @@ -618,6 +344,7 @@ func TestAcceptableTxlimit(t *testing.T) { ) chainConfig = params.AllCliqueProtocolChanges chainConfig.Clique = ¶ms.CliqueConfig{Period: 1, Epoch: 30000} + chainConfig.Scroll.FeeVaultAddress = &common.Address{} engine = clique.New(chainConfig.Clique, db) // Set maxTxPerBlock = 4, which >= non-l1msg + non-skipped l1msg txs @@ -645,11 +372,6 @@ func TestAcceptableTxlimit(t *testing.T) { Tracer: vm.NewStructLogger(&vm.LogConfig{EnableMemory: true, EnableReturnData: true})}, nil, nil) defer chain.Stop() - // Ignore empty commit here for less noise. - w.skipSealHook = func(task *task) bool { - return len(task.receipts) == 0 - } - // Wait for mined blocks. sub := w.mux.Subscribe(core.NewMinedBlockEvent{}) defer sub.Unsubscribe() @@ -682,6 +404,7 @@ func TestUnacceptableTxlimit(t *testing.T) { ) chainConfig = params.AllCliqueProtocolChanges chainConfig.Clique = ¶ms.CliqueConfig{Period: 1, Epoch: 30000} + chainConfig.Scroll.FeeVaultAddress = &common.Address{} engine = clique.New(chainConfig.Clique, db) // Set maxTxPerBlock = 3, which < non-l1msg + l1msg txs @@ -708,11 +431,6 @@ func TestUnacceptableTxlimit(t *testing.T) { Tracer: vm.NewStructLogger(&vm.LogConfig{EnableMemory: true, EnableReturnData: true})}, nil, nil) defer chain.Stop() - // Ignore empty commit here for less noise. - w.skipSealHook = func(task *task) bool { - return len(task.receipts) == 0 - } - // Wait for mined blocks. sub := w.mux.Subscribe(core.NewMinedBlockEvent{}) defer sub.Unsubscribe() @@ -745,6 +463,7 @@ func TestL1MsgCorrectOrder(t *testing.T) { ) chainConfig = params.AllCliqueProtocolChanges chainConfig.Clique = ¶ms.CliqueConfig{Period: 1, Epoch: 30000} + chainConfig.Scroll.FeeVaultAddress = &common.Address{} engine = clique.New(chainConfig.Clique, db) maxTxPerBlock := 4 @@ -771,11 +490,6 @@ func TestL1MsgCorrectOrder(t *testing.T) { Tracer: vm.NewStructLogger(&vm.LogConfig{EnableMemory: true, EnableReturnData: true})}, nil, nil) defer chain.Stop() - // Ignore empty commit here for less noise. - w.skipSealHook = func(task *task) bool { - return len(task.receipts) == 0 - } - // Wait for mined blocks. sub := w.mux.Subscribe(core.NewMinedBlockEvent{}) defer sub.Unsubscribe() @@ -833,11 +547,6 @@ func l1MessageTest(t *testing.T, msgs []types.L1MessageTx, withL2Tx bool, callba Tracer: vm.NewStructLogger(&vm.LogConfig{EnableMemory: true, EnableReturnData: true})}, nil, nil) defer chain.Stop() - // Ignore empty commit here for less noise. - w.skipSealHook = func(task *task) bool { - return len(task.receipts) == 0 - } - // Wait for mined blocks. sub := w.mux.Subscribe(core.NewMinedBlockEvent{}) defer sub.Unsubscribe() @@ -1124,11 +833,12 @@ func TestPrioritizeOverflowTx(t *testing.T) { var ( chainConfig = params.AllCliqueProtocolChanges db = rawdb.NewMemoryDatabase() - engine = clique.New(chainConfig.Clique, db) ) chainConfig.Clique = ¶ms.CliqueConfig{Period: 1, Epoch: 30000} chainConfig.LondonBlock = big.NewInt(0) + chainConfig.Scroll.FeeVaultAddress = &common.Address{} + engine := clique.New(chainConfig.Clique, db) w, b := newTestWorker(t, chainConfig, engine, db, 0) defer w.close() @@ -1141,11 +851,6 @@ func TestPrioritizeOverflowTx(t *testing.T) { Tracer: vm.NewStructLogger(&vm.LogConfig{EnableMemory: true, EnableReturnData: true})}, nil, nil) defer chain.Stop() - // Ignore empty commit here for less noise. - w.skipSealHook = func(task *task) bool { - return len(task.receipts) == 0 - } - // Wait for mined blocks. sub := w.mux.Subscribe(core.NewMinedBlockEvent{}) defer sub.Unsubscribe() @@ -1165,7 +870,6 @@ func TestPrioritizeOverflowTx(t *testing.T) { select { case ev := <-sub.Chan(): - w.stop() block := ev.Data.(core.NewMinedBlockEvent).Block assert.Equal(1, len(block.Transactions())) assert.Equal(tx0.Hash(), block.Transactions()[0].Hash()) @@ -1179,7 +883,6 @@ func TestPrioritizeOverflowTx(t *testing.T) { // Process 2 transactions with gas order: tx2 > tx1, // but we will prioritize tx1. b.txPool.AddRemotesSync([]*types.Transaction{tx2}) - w.start() select { case ev := <-sub.Chan(): @@ -1230,7 +933,7 @@ func TestSkippedTransactionDatabaseEntries(t *testing.T) { assert.NotNil(stx) assert.True(stx.Tx.IsL1MessageTx()) assert.Equal(uint64(0), stx.Tx.AsL1MessageTx().QueueIndex) - assert.Equal("gas limit exceeded", stx.Reason) + assert.Equal("gas limit reached", stx.Reason) assert.Equal(block.NumberU64(), stx.BlockNumber) assert.Nil(stx.BlockHash) @@ -1266,6 +969,7 @@ func TestSealBlockAfterCliquePeriod(t *testing.T) { ) chainConfig = params.AllCliqueProtocolChanges chainConfig.Clique = ¶ms.CliqueConfig{Period: 1, Epoch: 30000} + chainConfig.Scroll.FeeVaultAddress = &common.Address{} engine = clique.New(chainConfig.Clique, db) w, b := newTestWorker(t, chainConfig, engine, db, 0) defer w.close() @@ -1277,11 +981,6 @@ func TestSealBlockAfterCliquePeriod(t *testing.T) { Tracer: vm.NewStructLogger(&vm.LogConfig{EnableMemory: true, EnableReturnData: true})}, nil, nil) defer chain.Stop() - // Ignore empty commit here for less noise. - w.skipSealHook = func(task *task) bool { - return len(task.receipts) == 0 - } - // Add artificial delay to transaction processing. w.beforeTxHook = func() { time.Sleep(time.Duration(chainConfig.Clique.Period) * 1 * time.Second) diff --git a/miner/worker.go b/miner/worker.go index b351906f0064..40292c3d1d8d 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -16,6 +16,7 @@ package miner +/* import ( "bytes" "errors" @@ -1643,3 +1644,4 @@ func totalFees(block *types.Block, receipts []*types.Receipt) *big.Float { } return new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether))) } +*/ diff --git a/params/version.go b/params/version.go index e2b3df804e56..e3902e1d18bb 100644 --- a/params/version.go +++ b/params/version.go @@ -24,7 +24,7 @@ import ( const ( VersionMajor = 5 // Major version component of the current release VersionMinor = 3 // Minor version component of the current release - VersionPatch = 17 // Patch version component of the current release + VersionPatch = 18 // Patch version component of the current release VersionMeta = "mainnet" // Version metadata to append to the version string ) diff --git a/rollup/pipeline/pipeline.go b/rollup/pipeline/pipeline.go new file mode 100644 index 000000000000..fffdd21a12a5 --- /dev/null +++ b/rollup/pipeline/pipeline.go @@ -0,0 +1,410 @@ +package pipeline + +import ( + "errors" + "time" + + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/core" + "github.com/scroll-tech/go-ethereum/core/state" + "github.com/scroll-tech/go-ethereum/core/types" + "github.com/scroll-tech/go-ethereum/core/vm" + "github.com/scroll-tech/go-ethereum/log" + "github.com/scroll-tech/go-ethereum/metrics" + "github.com/scroll-tech/go-ethereum/params" + "github.com/scroll-tech/go-ethereum/rollup/circuitcapacitychecker" + "github.com/scroll-tech/go-ethereum/rollup/tracing" +) + +type ErrorWithTrace struct { + Trace *types.BlockTrace + err error +} + +func (e *ErrorWithTrace) Error() string { + return e.err.Error() +} + +func (e *ErrorWithTrace) Unwrap() error { + return e.err +} + +var ( + ErrApplyStageDone = errors.New("apply stage is done") + ErrUnexpectedL1MessageIndex = errors.New("unexpected L1 message index") + + lifetimeTimer = metrics.NewRegisteredTimer("pipeline/lifetime", nil) + applyTimer = metrics.NewRegisteredTimer("pipeline/apply", nil) + applyIdleTimer = metrics.NewRegisteredTimer("pipeline/apply_idle", nil) + applyStallTimer = metrics.NewRegisteredTimer("pipeline/apply_stall", nil) + cccTimer = metrics.NewRegisteredTimer("pipeline/ccc", nil) + cccIdleTimer = metrics.NewRegisteredTimer("pipeline/ccc_idle", nil) +) + +type Pipeline struct { + chain *core.BlockChain + vmConfig *vm.Config + parent *types.Block + start time.Time + + // accumalators + ccc *circuitcapacitychecker.CircuitCapacityChecker + Header types.Header + state *state.StateDB + nextL1MsgIndex uint64 + blockSize common.StorageSize + txs types.Transactions + coalescedLogs []*types.Log + receipts types.Receipts + gasPool *core.GasPool + + // com channels + txnQueue chan *types.Transaction + applyStageRespCh <-chan error + ResultCh <-chan *Result + + // Test hooks + beforeTxHook func() // Method to call before processing a transaction. +} + +func NewPipeline( + chain *core.BlockChain, + vmConfig *vm.Config, + state *state.StateDB, + + header *types.Header, + nextL1MsgIndex uint64, + ccc *circuitcapacitychecker.CircuitCapacityChecker, +) *Pipeline { + return &Pipeline{ + chain: chain, + vmConfig: vmConfig, + parent: chain.GetBlock(header.ParentHash, header.Number.Uint64()-1), + nextL1MsgIndex: nextL1MsgIndex, + Header: *header, + ccc: ccc, + state: state, + gasPool: new(core.GasPool).AddGas(header.GasLimit), + } +} + +func (p *Pipeline) WithBeforeTxHook(beforeTxHook func()) *Pipeline { + p.beforeTxHook = beforeTxHook + return p +} + +func (p *Pipeline) Start(deadline time.Time) error { + p.start = time.Now() + p.txnQueue = make(chan *types.Transaction) + applyStageRespCh, candidateCh, err := p.traceAndApplyStage(p.txnQueue) + if err != nil { + log.Error("Failed starting traceAndApplyStage", "err", err) + return err + } + p.applyStageRespCh = applyStageRespCh + p.ResultCh = p.cccStage(candidateCh, deadline) + return nil +} + +func (p *Pipeline) TryPushTxns(txs types.OrderedTransactionSet, onFailingTxn func(txnIndex int, tx *types.Transaction, err error) bool) *Result { + for { + tx := txs.Peek() + if tx == nil { + break + } + + result, err := p.TryPushTxn(tx) + if result != nil { + return result + } + + switch { + case err == nil, errors.Is(err, core.ErrNonceTooLow): + txs.Shift() + default: + if errors.Is(err, ErrApplyStageDone) || onFailingTxn(p.txs.Len(), tx, err) { + close(p.txnQueue) + p.txnQueue = nil + return nil + } + + if tx.IsL1MessageTx() { + txs.Shift() + } else { + txs.Pop() + } + } + } + + return nil +} + +func (p *Pipeline) TryPushTxn(tx *types.Transaction) (*Result, error) { + if p.txnQueue == nil { + return nil, ErrApplyStageDone + } + + select { + case p.txnQueue <- tx: + case res := <-p.ResultCh: + return res, nil + } + + select { + case err, valid := <-p.applyStageRespCh: + if !valid { + return nil, ErrApplyStageDone + } + return nil, err + case res := <-p.ResultCh: + return res, nil + } +} + +func (p *Pipeline) Kill() { + if p.txnQueue != nil { + close(p.txnQueue) + } + + select { + case <-p.applyStageRespCh: + <-p.ResultCh + case <-p.ResultCh: + <-p.applyStageRespCh + } +} + +type BlockCandidate struct { + LastTrace *types.BlockTrace + NextL1MsgIndex uint64 + + // accumulated state + Header *types.Header + State *state.StateDB + Txs types.Transactions + Receipts types.Receipts + CoalescedLogs []*types.Log +} + +func (p *Pipeline) traceAndApplyStage(txsIn <-chan *types.Transaction) (<-chan error, <-chan *BlockCandidate, error) { + p.state.StartPrefetcher("miner") + newCandidateCh := make(chan *BlockCandidate) + resCh := make(chan error) + go func() { + defer func() { + close(newCandidateCh) + close(resCh) + p.state.StopPrefetcher() + }() + + var tx *types.Transaction + for { + applyIdleTimer.Time(func() { + tx = <-txsIn + }) + if tx == nil { + return + } + + applyStart := time.Now() + + // If we don't have enough gas for any further transactions then we're done + if p.gasPool.Gas() < params.TxGas { + return + } + + // If we have collected enough transactions then we're done + // Originally we only limit l2txs count, but now strictly limit total txs number. + if !p.chain.Config().Scroll.IsValidTxCount(p.txs.Len() + 1) { + return + } + + if tx.IsL1MessageTx() && tx.AsL1MessageTx().QueueIndex != p.nextL1MsgIndex { + // Continue, we might still be able to include some L2 messages + resCh <- ErrUnexpectedL1MessageIndex + continue + } + + if !tx.IsL1MessageTx() && !p.chain.Config().Scroll.IsValidBlockSize(p.blockSize+tx.Size()) { + // can't fit this txn in this block, silently ignore and continue looking for more txns + resCh <- nil + continue + } + + // Start executing the transaction + p.state.SetTxContext(tx.Hash(), p.txs.Len()) + receipt, trace, err := p.traceAndApply(tx) + + if p.txs.Len() == 0 && tx.IsL1MessageTx() && err != nil { + // L1 message errored as the first txn, skip + p.nextL1MsgIndex = tx.AsL1MessageTx().QueueIndex + 1 + } + + if err == nil { + // Everything ok, collect the logs and shift in the next transaction from the same account + p.coalescedLogs = append(p.coalescedLogs, receipt.Logs...) + p.txs = append(p.txs, tx) + p.receipts = append(p.receipts, receipt) + + if !tx.IsL1MessageTx() { + // only consider block size limit for L2 transactions + p.blockSize += tx.Size() + } else { + p.nextL1MsgIndex = tx.AsL1MessageTx().QueueIndex + 1 + } + + stallStart := time.Now() + select { + case newCandidateCh <- &BlockCandidate{ + LastTrace: trace, + NextL1MsgIndex: p.nextL1MsgIndex, + + Header: types.CopyHeader(&p.Header), + State: p.state.Copy(), + Txs: p.txs, + Receipts: p.receipts, + CoalescedLogs: p.coalescedLogs, + }: + case tx = <-txsIn: + if tx != nil { + panic("shouldn't have happened") + } + // next stage terminated and caller terminated us as well + return + } + applyStallTimer.UpdateSince(stallStart) + } + if err != nil && trace != nil { + err = &ErrorWithTrace{ + Trace: trace, + err: err, + } + } + applyTimer.UpdateSince(applyStart) + resCh <- err + } + }() + return resCh, newCandidateCh, nil +} + +type Result struct { + OverflowingTx *types.Transaction + OverflowingTrace *types.BlockTrace + CCCErr error + + Rows *types.RowConsumption + FinalBlock *BlockCandidate +} + +func (p *Pipeline) cccStage(candidates <-chan *BlockCandidate, deadline time.Time) <-chan *Result { + p.ccc.Reset() + resultCh := make(chan *Result) + var lastCandidate *BlockCandidate + var lastAccRows *types.RowConsumption + var deadlineReached bool + + go func() { + defer func() { + close(resultCh) + lifetimeTimer.UpdateSince(p.start) + }() + for { + idleStart := time.Now() + select { + case <-time.After(time.Until(deadline)): + cccIdleTimer.UpdateSince(idleStart) + // note: currently we don't allow empty blocks, but if we ever do; make sure to CCC check it first + if lastCandidate != nil { + resultCh <- &Result{ + Rows: lastAccRows, + FinalBlock: lastCandidate, + } + return + } + deadlineReached = true + // avoid deadline case being triggered again and again + deadline = time.Now().Add(time.Hour) + case candidate := <-candidates: + cccIdleTimer.UpdateSince(idleStart) + cccStart := time.Now() + var accRows *types.RowConsumption + var err error + if candidate != nil { + accRows, err = p.ccc.ApplyTransaction(candidate.LastTrace) + lastTxn := candidate.Txs[candidate.Txs.Len()-1] + cccTimer.UpdateSince(cccStart) + if err != nil { + resultCh <- &Result{ + OverflowingTx: lastTxn, + OverflowingTrace: candidate.LastTrace, + CCCErr: err, + Rows: lastAccRows, + FinalBlock: lastCandidate, + } + return + } + + lastCandidate = candidate + lastAccRows = accRows + } + + // immediately close the block if deadline reached or apply stage is done + if candidate == nil || deadlineReached { + resultCh <- &Result{ + Rows: lastAccRows, + FinalBlock: lastCandidate, + } + return + } + } + } + }() + return resultCh +} + +func (p *Pipeline) traceAndApply(tx *types.Transaction) (*types.Receipt, *types.BlockTrace, error) { + var trace *types.BlockTrace + var err error + + if p.beforeTxHook != nil { + p.beforeTxHook() + } + + // do gas limit check up-front and do not run CCC if it fails + if p.gasPool.Gas() < tx.Gas() { + return nil, nil, core.ErrGasLimitReached + } + + // don't commit the state during tracing for circuit capacity checker, otherwise we cannot revert. + // and even if we don't commit the state, the `refund` value will still be correct, as explained in `CommitTransaction` + commitStateAfterApply := false + snap := p.state.Snapshot() + + // 1. we have to check circuit capacity before `core.ApplyTransaction`, + // because if the tx can be successfully executed but circuit capacity overflows, it will be inconvenient to revert. + // 2. even if we don't commit to the state during the tracing (which means `clearJournalAndRefund` is not called during the tracing), + // the `refund` value will still be correct, because: + // 2.1 when starting handling the first tx, `state.refund` is 0 by default, + // 2.2 after tracing, the state is either committed in `core.ApplyTransaction`, or reverted, so the `state.refund` can be cleared, + // 2.3 when starting handling the following txs, `state.refund` comes as 0 + trace, err = tracing.NewTracerWrapper().CreateTraceEnvAndGetBlockTrace(p.chain.Config(), p.chain, p.chain.Engine(), p.chain.Database(), + p.state, p.parent, types.NewBlockWithHeader(&p.Header).WithBody([]*types.Transaction{tx}, nil), commitStateAfterApply) + // `w.current.traceEnv.State` & `w.current.state` share a same pointer to the state, so only need to revert `w.current.state` + // revert to snapshot for calling `core.ApplyMessage` again, (both `traceEnv.GetBlockTrace` & `core.ApplyTransaction` will call `core.ApplyMessage`) + p.state.RevertToSnapshot(snap) + if err != nil { + return nil, nil, err + } + + // create new snapshot for `core.ApplyTransaction` + snap = p.state.Snapshot() + + var receipt *types.Receipt + receipt, err = core.ApplyTransaction(p.chain.Config(), p.chain, nil /* coinbase will default to chainConfig.Scroll.FeeVaultAddress */, p.gasPool, + p.state, &p.Header, tx, &p.Header.GasUsed, *p.vmConfig) + if err != nil { + p.state.RevertToSnapshot(snap) + return nil, trace, err + } + return receipt, trace, nil +}