diff --git a/cmd/geth/main.go b/cmd/geth/main.go index f8929c016817..f3e0336f061c 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -131,7 +131,8 @@ var ( utils.MinerExtraDataFlag, utils.MinerRecommitIntervalFlag, utils.MinerNoVerifyFlag, - utils.MinerMaxMergedBundles, + utils.MinerMaxMergedBundlesFlag, + utils.MinerTrustedRelaysFlag, utils.NATFlag, utils.NoDiscoverFlag, utils.DiscoveryV5Flag, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index a3d1ced632b0..cfb29eea29d1 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -189,7 +189,8 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.MinerExtraDataFlag, utils.MinerRecommitIntervalFlag, utils.MinerNoVerifyFlag, - utils.MinerMaxMergedBundles, + utils.MinerMaxMergedBundlesFlag, + utils.MinerTrustedRelaysFlag, }, }, { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index b37d347fadaa..2368d3cf3851 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -474,11 +474,16 @@ var ( Usage: "Time interval to recreate the block being mined", Value: ethconfig.Defaults.Miner.Recommit, } - MinerMaxMergedBundles = cli.IntFlag{ + MinerMaxMergedBundlesFlag = cli.IntFlag{ Name: "miner.maxmergedbundles", Usage: "flashbots - The maximum amount of bundles to merge. The miner will run this many workers in parallel to calculate if the full block is more profitable with these additional bundles.", Value: 3, } + MinerTrustedRelaysFlag = cli.StringFlag{ + Name: "miner.trustedrelays", + Usage: "flashbots - The Ethereum addresses of trusted relays for signature verification. The miner will accept signed bundles and other tasks from the relay, being reasonably certain about DDoS safety.", + Value: "0x870e2734DdBe2Fba9864f33f3420d59Bc641f2be", + } MinerNoVerifyFlag = cli.BoolFlag{ Name: "miner.noverify", Usage: "Disable remote sealing verification", @@ -1359,6 +1364,15 @@ func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) { if ctx.GlobalIsSet(TxPoolLifetimeFlag.Name) { cfg.Lifetime = ctx.GlobalDuration(TxPoolLifetimeFlag.Name) } + + addresses := strings.Split(ctx.GlobalString(MinerTrustedRelaysFlag.Name), ",") + for _, address := range addresses { + if trimmed := strings.TrimSpace(address); !common.IsHexAddress(trimmed) { + Fatalf("Invalid account in --miner.trustedrelays: %s", trimmed) + } else { + cfg.TrustedRelays = append(cfg.TrustedRelays, common.HexToAddress(trimmed)) + } + } } func setEthash(ctx *cli.Context, cfg *ethconfig.Config) { @@ -1412,7 +1426,17 @@ func setMiner(ctx *cli.Context, cfg *miner.Config) { log.Warn("The generic --miner.gastarget flag is deprecated and will be removed in the future!") } - cfg.MaxMergedBundles = ctx.GlobalInt(MinerMaxMergedBundles.Name) + cfg.MaxMergedBundles = ctx.GlobalInt(MinerMaxMergedBundlesFlag.Name) + + addresses := strings.Split(ctx.GlobalString(MinerTrustedRelaysFlag.Name), ",") + for _, address := range addresses { + if trimmed := strings.TrimSpace(address); !common.IsHexAddress(trimmed) { + Fatalf("Invalid account in --miner.trustedrelays: %s", trimmed) + } else { + cfg.TrustedRelays = append(cfg.TrustedRelays, common.HexToAddress(trimmed)) + } + } + log.Info("Trusted relays set as", "addresses", cfg.TrustedRelays) } func setWhitelist(ctx *cli.Context, cfg *ethconfig.Config) { diff --git a/core/tx_pool.go b/core/tx_pool.go index 51312d3e78b6..75b5ac101949 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -165,6 +165,8 @@ type TxPoolConfig struct { GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts Lifetime time.Duration // Maximum amount of time non-executable transaction are queued + + TrustedRelays []common.Address // Trusted relay addresses. Duplicated from the miner config. } // DefaultTxPoolConfig contains the default configurations for the transaction @@ -251,12 +253,13 @@ type TxPool struct { locals *accountSet // Set of local transaction to exempt from eviction rules journal *txJournal // Journal of local transaction to back up to disk - pending map[common.Address]*txList // All currently processable transactions - queue map[common.Address]*txList // Queued but non-processable transactions - beats map[common.Address]time.Time // Last heartbeat from each known account - mevBundles []types.MevBundle - all *txLookup // All transactions to allow lookups - priced *txPricedList // All transactions sorted by price + pending map[common.Address]*txList // All currently processable transactions + queue map[common.Address]*txList // Queued but non-processable transactions + beats map[common.Address]time.Time // Last heartbeat from each known account + mevBundles []types.MevBundle + megabundles map[common.Address]types.MevBundle // One megabundle per each trusted relay + all *txLookup // All transactions to allow lookups + priced *txPricedList // All transactions sorted by price chainHeadCh chan ChainHeadEvent chainHeadSub event.Subscription @@ -290,6 +293,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block pending: make(map[common.Address]*txList), queue: make(map[common.Address]*txList), beats: make(map[common.Address]time.Time), + megabundles: make(map[common.Address]types.MevBundle), all: newTxLookup(), chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), reqResetCh: make(chan *txpoolResetRequest), @@ -611,6 +615,52 @@ func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, m return nil } +// AddMegaBundle adds a megabundle to the pool. Assumes the relay signature has been verified already. +func (pool *TxPool) AddMegabundle(relayAddr common.Address, txs types.Transactions, blockNumber *big.Int, minTimestamp, maxTimestamp uint64, revertingTxHashes []common.Hash) error { + pool.mu.Lock() + defer pool.mu.Unlock() + + fromTrustedRelay := false + for _, trustedAddr := range pool.config.TrustedRelays { + if relayAddr == trustedAddr { + fromTrustedRelay = true + } + } + if !fromTrustedRelay { + return errors.New("megabundle from non-trusted address") + } + + pool.megabundles[relayAddr] = types.MevBundle{ + Txs: txs, + BlockNumber: blockNumber, + MinTimestamp: minTimestamp, + MaxTimestamp: maxTimestamp, + RevertingTxHashes: revertingTxHashes, + } + return nil +} + +// GetMegabundle returns the latest megabundle submitted by a given relay. +func (pool *TxPool) GetMegabundle(relayAddr common.Address, blockNumber *big.Int, blockTimestamp uint64) (types.MevBundle, error) { + pool.mu.Lock() + defer pool.mu.Unlock() + + megabundle, ok := pool.megabundles[relayAddr] + if !ok { + return types.MevBundle{}, errors.New("No megabundle found") + } + if megabundle.BlockNumber.Cmp(blockNumber) != 0 { + return types.MevBundle{}, errors.New("Megabundle does not fit blockNumber constraints") + } + if megabundle.MinTimestamp != 0 && megabundle.MinTimestamp > blockTimestamp { + return types.MevBundle{}, errors.New("Megabundle does not fit minTimestamp constraints") + } + if megabundle.MaxTimestamp != 0 && megabundle.MaxTimestamp < blockTimestamp { + return types.MevBundle{}, errors.New("Megabundle does not fit maxTimestamp constraints") + } + return megabundle, nil +} + // Locals retrieves the accounts currently considered local by the pool. func (pool *TxPool) Locals() []common.Address { pool.mu.Lock() diff --git a/eth/api_backend.go b/eth/api_backend.go index 8454c0afe701..4582f84d1b98 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -243,6 +243,10 @@ func (b *EthAPIBackend) SendBundle(ctx context.Context, txs types.Transactions, return b.eth.txPool.AddMevBundle(txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes) } +func (b *EthAPIBackend) SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error { + return b.eth.txPool.AddMegabundle(relayAddr, txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes) +} + func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) { pending := b.eth.txPool.Pending(false) var txs types.Transactions diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 9e05179bda1e..f1081bd18918 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -2096,7 +2096,7 @@ func NewPrivateTxBundleAPI(b Backend) *PrivateTxBundleAPI { return &PrivateTxBundleAPI{b} } -// SendBundleArgs represents the arguments for a call. +// SendBundleArgs represents the arguments for a SendBundle call. type SendBundleArgs struct { Txs []hexutil.Bytes `json:"txs"` BlockNumber rpc.BlockNumber `json:"blockNumber"` @@ -2105,6 +2105,25 @@ type SendBundleArgs struct { RevertingTxHashes []common.Hash `json:"revertingTxHashes"` } +// SendMegabundleArgs represents the arguments for a SendMegabundle call. +type SendMegabundleArgs struct { + Txs []hexutil.Bytes `json:"txs"` + BlockNumber uint64 `json:"blockNumber"` + MinTimestamp *uint64 `json:"minTimestamp"` + MaxTimestamp *uint64 `json:"maxTimestamp"` + RevertingTxHashes []common.Hash `json:"revertingTxHashes"` + RelaySignature hexutil.Bytes `json:"relaySignature"` +} + +// UnsignedMegabundle is used for serialization and subsequent digital signing. +type UnsignedMegabundle struct { + Txs []hexutil.Bytes + BlockNumber uint64 + MinTimestamp uint64 + MaxTimestamp uint64 + RevertingTxHashes []common.Hash +} + // SendBundle will add the signed transaction to the transaction pool. // The sender is responsible for signing the transaction and using the correct nonce and ensuring validity func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args SendBundleArgs) error { @@ -2134,3 +2153,58 @@ func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args SendBundleArgs return s.b.SendBundle(ctx, txs, args.BlockNumber, minTimestamp, maxTimestamp, args.RevertingTxHashes) } + +// Recovers the Ethereum address of the trusted relay that signed the megabundle. +func RecoverRelayAddress(args SendMegabundleArgs) (common.Address, error) { + megabundle := UnsignedMegabundle{Txs: args.Txs, BlockNumber: args.BlockNumber, RevertingTxHashes: args.RevertingTxHashes} + if args.MinTimestamp != nil { + megabundle.MinTimestamp = *args.MinTimestamp + } else { + megabundle.MinTimestamp = 0 + } + if args.MaxTimestamp != nil { + megabundle.MaxTimestamp = *args.MaxTimestamp + } else { + megabundle.MaxTimestamp = 0 + } + rlpEncoding, _ := rlp.EncodeToBytes(megabundle) + signature := args.RelaySignature + signature[64] -= 27 // account for Ethereum V + recoveredPubkey, err := crypto.SigToPub(accounts.TextHash(rlpEncoding), args.RelaySignature) + if err != nil { + return common.Address{}, err + } + return crypto.PubkeyToAddress(*recoveredPubkey), nil +} + +// SendMegabundle will add the signed megabundle to one of the workers for evaluation. +func (s *PrivateTxBundleAPI) SendMegabundle(ctx context.Context, args SendMegabundleArgs) error { + log.Info("Received a Megabundle request", "signature", args.RelaySignature) + var txs types.Transactions + if len(args.Txs) == 0 { + return errors.New("megabundle missing txs") + } + if args.BlockNumber == 0 { + return errors.New("megabundle missing blockNumber") + } + for _, encodedTx := range args.Txs { + tx := new(types.Transaction) + if err := tx.UnmarshalBinary(encodedTx); err != nil { + return err + } + txs = append(txs, tx) + } + var minTimestamp, maxTimestamp uint64 + if args.MinTimestamp != nil { + minTimestamp = *args.MinTimestamp + } + if args.MaxTimestamp != nil { + maxTimestamp = *args.MaxTimestamp + } + relayAddr, err := RecoverRelayAddress(args) + log.Info("Megabundle", "relayAddr", relayAddr, "err", err) + if err != nil { + return err + } + return s.b.SendMegabundle(ctx, txs, rpc.BlockNumber(args.BlockNumber), minTimestamp, maxTimestamp, args.RevertingTxHashes, relayAddr) +} diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index bcdccf2bd9d6..58c8f0bf04e1 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -75,6 +75,7 @@ type Backend interface { // Transaction pool API SendTx(ctx context.Context, signedTx *types.Transaction) error SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error + SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) GetPoolTransactions() (types.Transactions, error) GetPoolTransaction(txHash common.Hash) *types.Transaction diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index 9eeada1a9fa1..eab8cb5ea171 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -581,6 +581,11 @@ web3._extend({ call: 'eth_sendBundle', params: 1 }), + new web3._extend.Method({ + name: 'sendMegabundle', + call: 'eth_sendMegabundle', + params: 1 + }), ], properties: [ new web3._extend.Property({ diff --git a/les/api_backend.go b/les/api_backend.go index 9bb08c79f6a7..e1ebb51326f4 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -198,10 +198,15 @@ func (b *LesApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) func (b *LesApiBackend) RemoveTx(txHash common.Hash) { b.eth.txPool.RemoveTx(txHash) } + func (b *LesApiBackend) SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error { return b.eth.txPool.AddMevBundle(txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes) } +func (b *LesApiBackend) SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error { + return nil +} + func (b *LesApiBackend) GetPoolTransactions() (types.Transactions, error) { return b.eth.txPool.GetTransactions() } diff --git a/miner/miner.go b/miner/miner.go index 03304c9548a4..5be144abc396 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -53,6 +53,7 @@ type Config struct { Recommit time.Duration // The time interval for miner to re-create mining work. Noverify bool // Disable remote mining solution verification(only useful in ethash). MaxMergedBundles int + TrustedRelays []common.Address `toml:",omitempty"` // Trusted relay addresses to receive tasks from. } // Miner creates blocks and searches for proof-of-work values. diff --git a/miner/multi_worker.go b/miner/multi_worker.go index f5947bb0d238..3452fb0f214b 100644 --- a/miner/multi_worker.go +++ b/miner/multi_worker.go @@ -98,13 +98,24 @@ func newMultiWorker(config *Config, chainConfig *params.ChainConfig, engine cons for i := 1; i <= config.MaxMergedBundles; i++ { workers = append(workers, newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, merger, &flashbotsData{ - isFlashbots: true, - queue: queue, - maxMergedBundles: i, + isFlashbots: true, + isMegabundleWorker: false, + queue: queue, + maxMergedBundles: i, })) } - log.Info("creating multi worker", "config.MaxMergedBundles", config.MaxMergedBundles, "worker", len(workers)) + for i := 0; i < len(config.TrustedRelays); i++ { + workers = append(workers, + newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, merger, &flashbotsData{ + isFlashbots: true, + isMegabundleWorker: true, + queue: queue, + relayAddr: config.TrustedRelays[i], + })) + } + + log.Info("creating multi worker", "config.MaxMergedBundles", config.MaxMergedBundles, "config.TrustedRelays", config.TrustedRelays, "worker", len(workers)) return &multiWorker{ regularWorker: regularWorker, workers: workers, @@ -112,7 +123,9 @@ func newMultiWorker(config *Config, chainConfig *params.ChainConfig, engine cons } type flashbotsData struct { - isFlashbots bool - queue chan *task - maxMergedBundles int + isFlashbots bool + isMegabundleWorker bool + queue chan *task + maxMergedBundles int + relayAddr common.Address } diff --git a/miner/worker.go b/miner/worker.go index 111710539430..26be048daeca 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -102,9 +102,10 @@ type task struct { block *types.Block createdAt time.Time - profit *big.Int - isFlashbots bool - worker int + profit *big.Int + isFlashbots bool + worker int + isMegabundle bool } const ( @@ -633,7 +634,7 @@ func (w *worker) taskLoop() { // Interrupt previous sealing operation interrupt() stopCh, prev = make(chan struct{}), sealHash - log.Info("Proposed miner block", "blockNumber", task.block.Number(), "profit", ethIntToFloat(prevProfit), "isFlashbots", task.isFlashbots, "sealhash", sealHash, "parentHash", prevParentHash, "worker", task.worker) + log.Info("Proposed miner block", "blockNumber", task.block.Number(), "profit", ethIntToFloat(prevProfit), "isFlashbots", task.isFlashbots, "sealhash", sealHash, "parentHash", prevParentHash, "worker", task.worker, "isMegabundle", task.isMegabundle) if w.skipSealHook != nil && w.skipSealHook(task) { continue } @@ -1182,7 +1183,7 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) if w.flashbots.isFlashbots && len(w.eth.TxPool().AllMevBundles()) > 0 { noBundles = false } - if len(pending) == 0 && atomic.LoadUint32(&w.noempty) == 0 && noBundles { + if len(pending) == 0 && atomic.LoadUint32(&w.noempty) == 0 && noBundles && !w.flashbots.isMegabundleWorker { w.updateSnapshot() return } @@ -1194,13 +1195,13 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) localTxs[account] = txs } } - if w.flashbots.isFlashbots { + + if w.flashbots.isFlashbots && !w.flashbots.isMegabundleWorker { bundles, err := w.eth.TxPool().MevBundles(header.Number, header.Time) if err != nil { log.Error("Failed to fetch pending transactions", "err", err) return } - bundleTxs, bundle, numBundles, err := w.generateFlashbotsBundle(bundles, w.coinbase, parent, header, pending) if err != nil { log.Error("Failed to generate flashbots bundle", "err", err) @@ -1213,8 +1214,42 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) if w.commitBundle(bundleTxs, w.coinbase, interrupt) { return } - w.current.profit.Add(w.current.profit, bundle.totalEth) + w.current.profit.Add(w.current.profit, bundle.ethSentToCoinbase) } + if w.flashbots.isMegabundleWorker { + megabundle, err := w.eth.TxPool().GetMegabundle(w.flashbots.relayAddr, header.Number, header.Time) + log.Info("Starting to process a Megabundle", "relay", w.flashbots.relayAddr, "megabundle", megabundle, "error", err) + if err != nil { + return // no valid megabundle for this relay, nothing to do + } + // Flashbots bundle merging duplicates work by simulating TXes and then committing them once more. + // Megabundles API focuses on speed and runs everything in one cycle. + coinbaseBalanceBefore := w.current.state.GetBalance(w.coinbase) + if w.commitBundle(megabundle.Txs, w.coinbase, interrupt) { + log.Info("Could not commit a Megabundle", "relay", w.flashbots.relayAddr, "megabundle", megabundle) + return + } + var txStatuses = map[common.Hash]bool{} + for _, receipt := range w.current.receipts { + txStatuses[receipt.TxHash] = receipt.Status == types.ReceiptStatusSuccessful + } + for _, tx := range megabundle.Txs { + status, ok := txStatuses[tx.Hash()] + if !ok { + log.Error("No TX receipt after megabundle simulation", "TxHash", tx.Hash()) + return + } + if !status && !containsHash(megabundle.RevertingTxHashes, tx.Hash()) { + log.Info("Ignoring megabundle because of failing TX", "relay", w.flashbots.relayAddr, "TxHash", tx.Hash()) + return + } + } + coinbaseBalanceAfter := w.current.state.GetBalance(w.coinbase) + coinbaseDelta := big.NewInt(0).Sub(coinbaseBalanceAfter, coinbaseBalanceBefore) + w.current.profit = coinbaseDelta + log.Info("Megabundle processed", "relay", w.flashbots.relayAddr, "totalProfit", ethIntToFloat(w.current.profit)) + } + if len(localTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs, header.BaseFee) if w.commitTransactions(txs, w.coinbase, interrupt) { @@ -1250,13 +1285,13 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st return nil } select { - case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now(), profit: w.current.profit, isFlashbots: w.flashbots.isFlashbots, worker: w.flashbots.maxMergedBundles}: + case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now(), profit: w.current.profit, isFlashbots: w.flashbots.isFlashbots, worker: w.flashbots.maxMergedBundles, isMegabundle: w.flashbots.isMegabundleWorker}: w.unconfirmed.Shift(block.NumberU64() - 1) log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()), "uncles", len(uncles), "txs", w.current.tcount, "gas", block.GasUsed(), "fees", totalFees(block, receipts), "profit", ethIntToFloat(w.current.profit), "elapsed", common.PrettyDuration(time.Since(start)), - "isFlashbots", w.flashbots.isFlashbots, "worker", w.flashbots.maxMergedBundles) + "isFlashbots", w.flashbots.isFlashbots, "worker", w.flashbots.maxMergedBundles, "isMegabundle", w.flashbots.isMegabundleWorker) case <-w.exitCh: log.Info("Worker has exited")