diff --git a/README.md b/README.md index 8fe7df9e4255..4f87877d7494 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ See [here](https://docs.flashbots.net) for Flashbots documentation. | Version | Spec | | ------- | ------------------------------------------------------------------------------------------- | +| v0.5 | [MEV-Geth Spec v0.5](https://docs.flashbots.net/flashbots-auction/miners/mev-geth-spec/v05) | | v0.4 | [MEV-Geth Spec v0.4](https://docs.flashbots.net/flashbots-auction/miners/mev-geth-spec/v04) | | v0.3 | [MEV-Geth Spec v0.3](https://docs.flashbots.net/flashbots-auction/miners/mev-geth-spec/v03) | | v0.2 | [MEV-Geth Spec v0.2](https://docs.flashbots.net/flashbots-auction/miners/mev-geth-spec/v02) | diff --git a/cmd/evm/internal/t8ntool/block.go b/cmd/evm/internal/t8ntool/block.go index d4edd33bdeb7..ceb2388cd468 100644 --- a/cmd/evm/internal/t8ntool/block.go +++ b/cmd/evm/internal/t8ntool/block.go @@ -188,7 +188,7 @@ func (i *bbInput) sealEthash(block *types.Block) (*types.Block, error) { // If the testmode is used, the sealer will return quickly, and complain // "Sealing result is not read by miner" if it cannot write the result. results := make(chan *types.Block, 1) - if err := engine.Seal(nil, block, results, nil); err != nil { + if err := engine.Seal(nil, block, nil, results, nil); err != nil { panic(fmt.Sprintf("failed to seal block: %v", err)) } found := <-results diff --git a/cmd/geth/consolecmd_test.go b/cmd/geth/consolecmd_test.go index 845ede2f9cbd..e4e4cd8ca1a2 100644 --- a/cmd/geth/consolecmd_test.go +++ b/cmd/geth/consolecmd_test.go @@ -31,7 +31,7 @@ import ( ) const ( - ipcAPIs = "admin:1.0 debug:1.0 eth:1.0 ethash:1.0 miner:1.0 net:1.0 personal:1.0 rpc:1.0 txpool:1.0 web3:1.0" + ipcAPIs = "admin:1.0 debug:1.0 eth:1.0 ethash:1.0 flashbots:1.0 miner:1.0 net:1.0 personal:1.0 rpc:1.0 txpool:1.0 web3:1.0" httpAPIs = "eth:1.0 net:1.0 rpc:1.0 web3:1.0" ) diff --git a/consensus/beacon/consensus.go b/consensus/beacon/consensus.go index 1fd7deb872fb..a3c71c84b3ff 100644 --- a/consensus/beacon/consensus.go +++ b/consensus/beacon/consensus.go @@ -294,9 +294,9 @@ func (beacon *Beacon) FinalizeAndAssemble(chain consensus.ChainHeaderReader, hea // // Note, the method returns immediately and will send the result async. More // than one result may also be returned depending on the consensus algorithm. -func (beacon *Beacon) Seal(chain consensus.ChainHeaderReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error { +func (beacon *Beacon) Seal(chain consensus.ChainHeaderReader, block *types.Block, profit *big.Int, results chan<- *types.Block, stop <-chan struct{}) error { if !beacon.IsPoSHeader(block.Header()) { - return beacon.ethone.Seal(chain, block, results, stop) + return beacon.ethone.Seal(chain, block, profit, results, stop) } // The seal verification is done by the external consensus engine, // return directly without pushing any block back. In another word diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 685186817d2d..576d14f3c3a1 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -589,7 +589,7 @@ func (c *Clique) Authorize(signer common.Address, signFn SignerFn) { // Seal implements consensus.Engine, attempting to create a sealed block using // the local signing credentials. -func (c *Clique) Seal(chain consensus.ChainHeaderReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error { +func (c *Clique) Seal(chain consensus.ChainHeaderReader, block *types.Block, profit *big.Int, results chan<- *types.Block, stop <-chan struct{}) error { header := block.Header() // Sealing the genesis block is not supported diff --git a/consensus/consensus.go b/consensus/consensus.go index af8ce98ff3be..540c78209ff0 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -105,7 +105,7 @@ type Engine interface { // // Note, the method returns immediately and will send the result async. More // than one result may also be returned depending on the consensus algorithm. - Seal(chain ChainHeaderReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error + Seal(chain ChainHeaderReader, block *types.Block, profit *big.Int, results chan<- *types.Block, stop <-chan struct{}) error // SealHash returns the hash of a block prior to it being sealed. SealHash(header *types.Header) common.Hash diff --git a/consensus/ethash/api.go b/consensus/ethash/api.go index f4d3802e0b37..8aece9c7bb89 100644 --- a/consensus/ethash/api.go +++ b/consensus/ethash/api.go @@ -44,7 +44,7 @@ func (api *API) GetWork() ([4]string, error) { } var ( - workCh = make(chan [4]string, 1) + workCh = make(chan [5]string, 1) errc = make(chan error, 1) ) select { @@ -53,7 +53,10 @@ func (api *API) GetWork() ([4]string, error) { return [4]string{}, errEthashStopped } select { - case work := <-workCh: + case fullWork := <-workCh: + var work [4]string + copy(work[:], fullWork[:4]) + return work, nil case err := <-errc: return [4]string{}, err diff --git a/consensus/ethash/ethash.go b/consensus/ethash/ethash.go index 4e33d99c8dde..ad8dd1c34760 100644 --- a/consensus/ethash/ethash.go +++ b/consensus/ethash/ethash.go @@ -683,6 +683,12 @@ func (ethash *Ethash) APIs(chain consensus.ChainHeaderReader) []rpc.API { Service: &API{ethash}, Public: true, }, + { + Namespace: "flashbots", + Version: "1.0", + Service: &FlashbotsAPI{ethash}, + Public: true, + }, } } diff --git a/consensus/ethash/ethash_test.go b/consensus/ethash/ethash_test.go index 382eefeecf12..0b1c40572611 100644 --- a/consensus/ethash/ethash_test.go +++ b/consensus/ethash/ethash_test.go @@ -38,7 +38,7 @@ func TestTestMode(t *testing.T) { defer ethash.Close() results := make(chan *types.Block) - err := ethash.Seal(nil, types.NewBlockWithHeader(header), results, nil) + err := ethash.Seal(nil, types.NewBlockWithHeader(header), nil, results, nil) if err != nil { t.Fatalf("failed to seal block: %v", err) } @@ -111,7 +111,7 @@ func TestRemoteSealer(t *testing.T) { // Push new work. results := make(chan *types.Block) - ethash.Seal(nil, block, results, nil) + ethash.Seal(nil, block, nil, results, nil) var ( work [4]string @@ -128,7 +128,7 @@ func TestRemoteSealer(t *testing.T) { header = &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(1000)} block = types.NewBlockWithHeader(header) sealhash = ethash.SealHash(header) - ethash.Seal(nil, block, results, nil) + ethash.Seal(nil, block, nil, results, nil) if work, err = api.GetWork(); err != nil || work[0] != sealhash.Hex() { t.Error("expect to return the latest pushed work") diff --git a/consensus/ethash/flashbots_api.go b/consensus/ethash/flashbots_api.go new file mode 100644 index 000000000000..527d2a44352e --- /dev/null +++ b/consensus/ethash/flashbots_api.go @@ -0,0 +1,38 @@ +package ethash + +import "errors" + +// FlashbotsAPI exposes Flashbots related methods for the RPC interface. +type FlashbotsAPI struct { + ethash *Ethash +} + +// GetWork returns a work package for external miner. +// +// The work package consists of 5 strings: +// result[0] - 32 bytes hex encoded current block header pow-hash +// result[1] - 32 bytes hex encoded seed hash used for DAG +// result[2] - 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty +// result[3] - hex encoded block number +// result[4] - hex encoded profit generated from this block +func (api *FlashbotsAPI) GetWork() ([5]string, error) { + if api.ethash.remote == nil { + return [5]string{}, errors.New("not supported") + } + + var ( + workCh = make(chan [5]string, 1) + errc = make(chan error, 1) + ) + select { + case api.ethash.remote.fetchWorkCh <- &sealWork{errc: errc, res: workCh}: + case <-api.ethash.remote.exitCh: + return [5]string{}, errEthashStopped + } + select { + case work := <-workCh: + return work, nil + case err := <-errc: + return [5]string{}, err + } +} diff --git a/consensus/ethash/sealer.go b/consensus/ethash/sealer.go index 6fa60ef6a8bb..d2b9253e5c34 100644 --- a/consensus/ethash/sealer.go +++ b/consensus/ethash/sealer.go @@ -48,7 +48,7 @@ var ( // Seal implements consensus.Engine, attempting to find a nonce that satisfies // the block's difficulty requirements. -func (ethash *Ethash) Seal(chain consensus.ChainHeaderReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error { +func (ethash *Ethash) Seal(chain consensus.ChainHeaderReader, block *types.Block, profit *big.Int, results chan<- *types.Block, stop <-chan struct{}) error { // If we're running a fake PoW, simply return a 0 nonce immediately if ethash.config.PowMode == ModeFake || ethash.config.PowMode == ModeFullFake { header := block.Header() @@ -62,7 +62,7 @@ func (ethash *Ethash) Seal(chain consensus.ChainHeaderReader, block *types.Block } // If we're running a shared PoW, delegate sealing to it if ethash.shared != nil { - return ethash.shared.Seal(chain, block, results, stop) + return ethash.shared.Seal(chain, block, profit, results, stop) } // Create a runner and the multiple search threads it directs abort := make(chan struct{}) @@ -86,7 +86,7 @@ func (ethash *Ethash) Seal(chain consensus.ChainHeaderReader, block *types.Block } // Push new work to remote sealer if ethash.remote != nil { - ethash.remote.workCh <- &sealTask{block: block, results: results} + ethash.remote.workCh <- &sealTask{block: block, profit: profit, results: results} } var ( pend sync.WaitGroup @@ -117,7 +117,7 @@ func (ethash *Ethash) Seal(chain consensus.ChainHeaderReader, block *types.Block case <-ethash.update: // Thread count was changed on user request, restart close(abort) - if err := ethash.Seal(chain, block, results, stop); err != nil { + if err := ethash.Seal(chain, block, profit, results, stop); err != nil { ethash.config.Log.Error("Failed to restart sealing after update", "err", err) } } @@ -194,7 +194,7 @@ type remoteSealer struct { works map[common.Hash]*types.Block rates map[common.Hash]hashrate currentBlock *types.Block - currentWork [4]string + currentWork [5]string notifyCtx context.Context cancelNotify context.CancelFunc // cancels all notification requests reqWG sync.WaitGroup // tracks notification request goroutines @@ -215,6 +215,7 @@ type remoteSealer struct { // sealTask wraps a seal block with relative result channel for remote sealer thread. type sealTask struct { block *types.Block + profit *big.Int results chan<- *types.Block } @@ -239,7 +240,7 @@ type hashrate struct { // sealWork wraps a seal work package for remote sealer. type sealWork struct { errc chan error - res chan [4]string + res chan [5]string } func startRemoteSealer(ethash *Ethash, urls []string, noverify bool) *remoteSealer { @@ -281,7 +282,7 @@ func (s *remoteSealer) loop() { // Update current work with new received block. // Note same work can be past twice, happens when changing CPU threads. s.results = work.results - s.makeWork(work.block) + s.makeWork(work.block, work.profit) s.notifyWork() case work := <-s.fetchWorkCh: @@ -338,18 +339,23 @@ func (s *remoteSealer) loop() { // makeWork creates a work package for external miner. // -// The work package consists of 3 strings: +// The work package consists of 5 strings: // result[0], 32 bytes hex encoded current block header pow-hash // result[1], 32 bytes hex encoded seed hash used for DAG // result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty // result[3], hex encoded block number -func (s *remoteSealer) makeWork(block *types.Block) { +// result[4], hex encoded profit generated from this block, if present +func (s *remoteSealer) makeWork(block *types.Block, profit *big.Int) { hash := s.ethash.SealHash(block.Header()) s.currentWork[0] = hash.Hex() s.currentWork[1] = common.BytesToHash(SeedHash(block.NumberU64())).Hex() s.currentWork[2] = common.BytesToHash(new(big.Int).Div(two256, block.Difficulty()).Bytes()).Hex() s.currentWork[3] = hexutil.EncodeBig(block.Number()) + if profit != nil { + s.currentWork[4] = hexutil.EncodeBig(profit) + } + // Trace the seal work fetched by remote sealer. s.currentBlock = block s.works[hash] = block @@ -375,7 +381,7 @@ func (s *remoteSealer) notifyWork() { } } -func (s *remoteSealer) sendNotification(ctx context.Context, url string, json []byte, work [4]string) { +func (s *remoteSealer) sendNotification(ctx context.Context, url string, json []byte, work [5]string) { defer s.reqWG.Done() req, err := http.NewRequest("POST", url, bytes.NewReader(json)) diff --git a/consensus/ethash/sealer_test.go b/consensus/ethash/sealer_test.go index c34e76aec243..bcab88f4d74f 100644 --- a/consensus/ethash/sealer_test.go +++ b/consensus/ethash/sealer_test.go @@ -57,7 +57,7 @@ func TestRemoteNotify(t *testing.T) { header := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} block := types.NewBlockWithHeader(header) - ethash.Seal(nil, block, nil, nil) + ethash.Seal(nil, block, nil, nil, nil) select { case work := <-sink: if want := ethash.SealHash(header).Hex(); work[0] != want { @@ -105,7 +105,7 @@ func TestRemoteNotifyFull(t *testing.T) { header := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} block := types.NewBlockWithHeader(header) - ethash.Seal(nil, block, nil, nil) + ethash.Seal(nil, block, nil, nil, nil) select { case work := <-sink: if want := "0x" + strconv.FormatUint(header.Number.Uint64(), 16); work["number"] != want { @@ -151,7 +151,7 @@ func TestRemoteMultiNotify(t *testing.T) { for i := 0; i < cap(sink); i++ { header := &types.Header{Number: big.NewInt(int64(i)), Difficulty: big.NewInt(100)} block := types.NewBlockWithHeader(header) - ethash.Seal(nil, block, results, nil) + ethash.Seal(nil, block, nil, results, nil) } for i := 0; i < cap(sink); i++ { @@ -200,7 +200,7 @@ func TestRemoteMultiNotifyFull(t *testing.T) { for i := 0; i < cap(sink); i++ { header := &types.Header{Number: big.NewInt(int64(i)), Difficulty: big.NewInt(100)} block := types.NewBlockWithHeader(header) - ethash.Seal(nil, block, results, nil) + ethash.Seal(nil, block, nil, results, nil) } for i := 0; i < cap(sink); i++ { @@ -266,7 +266,7 @@ func TestStaleSubmission(t *testing.T) { for id, c := range testcases { for _, h := range c.headers { - ethash.Seal(nil, types.NewBlockWithHeader(h), results, nil) + ethash.Seal(nil, types.NewBlockWithHeader(h), nil, results, nil) } if res := api.SubmitWork(fakeNonce, ethash.SealHash(c.headers[c.submitIndex]), fakeDigest); res != c.submitRes { t.Errorf("case %d submit result mismatch, want %t, get %t", id+1, c.submitRes, res) diff --git a/core/state_processor.go b/core/state_processor.go index d4c77ae41042..05064a27cb21 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -137,6 +137,51 @@ func applyTransaction(msg types.Message, config *params.ChainConfig, bc ChainCon return receipt, err } +func applyTransactionWithResult(msg types.Message, config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, evm *vm.EVM) (*types.Receipt, *ExecutionResult, error) { + // Create a new context to be used in the EVM environment. + txContext := NewEVMTxContext(msg) + evm.Reset(txContext, statedb) + + // Apply the transaction to the current state (included in the env). + result, err := ApplyMessage(evm, msg, gp) + if err != nil { + return nil, nil, err + } + + // Update the state with pending changes. + var root []byte + if config.IsByzantium(header.Number) { + statedb.Finalise(true) + } else { + root = statedb.IntermediateRoot(config.IsEIP158(header.Number)).Bytes() + } + *usedGas += result.UsedGas + + // Create a new receipt for the transaction, storing the intermediate root and gas used + // by the tx. + receipt := &types.Receipt{Type: tx.Type(), PostState: root, CumulativeGasUsed: *usedGas} + if result.Failed() { + receipt.Status = types.ReceiptStatusFailed + } else { + receipt.Status = types.ReceiptStatusSuccessful + } + receipt.TxHash = tx.Hash() + receipt.GasUsed = result.UsedGas + + // If the transaction created a contract, store the creation address in the receipt. + if msg.To() == nil { + receipt.ContractAddress = crypto.CreateAddress(evm.TxContext.Origin, tx.Nonce()) + } + + // Set the receipt logs and create the bloom filter. + receipt.Logs = statedb.GetLogs(tx.Hash(), header.Hash()) + receipt.Bloom = types.CreateBloom(types.Receipts{receipt}) + receipt.BlockHash = header.Hash() + receipt.BlockNumber = header.Number + receipt.TransactionIndex = uint(statedb.TxIndex()) + return receipt, result, err +} + // ApplyTransaction attempts to apply a transaction to the given state database // and uses the input parameters for its environment. It returns the receipt // for the transaction, gas used and an error if the transaction failed, @@ -151,3 +196,14 @@ func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *commo vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, config, cfg) return applyTransaction(msg, config, bc, author, gp, statedb, header.Number, header.Hash(), tx, usedGas, vmenv) } + +func ApplyTransactionWithResult(config *params.ChainConfig, bc ChainContext, author *common.Address, gp *GasPool, statedb *state.StateDB, header *types.Header, tx *types.Transaction, usedGas *uint64, cfg vm.Config) (*types.Receipt, *ExecutionResult, error) { + msg, err := tx.AsMessage(types.MakeSigner(config, header.Number), header.BaseFee) + if err != nil { + return nil, nil, err + } + // Create a new context to be used in the EVM environment + blockContext := NewEVMBlockContext(header, bc, author) + vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, config, cfg) + return applyTransactionWithResult(msg, config, bc, author, gp, statedb, header, tx, usedGas, vmenv) +} diff --git a/core/tx_pool.go b/core/tx_pool.go index 75b5ac101949..ee507a5b7055 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -253,13 +253,14 @@ type TxPool struct { locals *accountSet // Set of local transaction to exempt from eviction rules journal *txJournal // Journal of local transaction to back up to disk - pending map[common.Address]*txList // All currently processable transactions - queue map[common.Address]*txList // Queued but non-processable transactions - beats map[common.Address]time.Time // Last heartbeat from each known account - mevBundles []types.MevBundle - megabundles map[common.Address]types.MevBundle // One megabundle per each trusted relay - all *txLookup // All transactions to allow lookups - priced *txPricedList // All transactions sorted by price + pending map[common.Address]*txList // All currently processable transactions + queue map[common.Address]*txList // Queued but non-processable transactions + beats map[common.Address]time.Time // Last heartbeat from each known account + mevBundles []types.MevBundle + megabundles map[common.Address]types.MevBundle // One megabundle per each trusted relay + NewMegabundleHooks []func(common.Address, *types.MevBundle) + all *txLookup // All transactions to allow lookups + priced *txPricedList // All transactions sorted by price chainHeadCh chan ChainHeadEvent chainHeadSub event.Subscription @@ -630,13 +631,20 @@ func (pool *TxPool) AddMegabundle(relayAddr common.Address, txs types.Transactio return errors.New("megabundle from non-trusted address") } - pool.megabundles[relayAddr] = types.MevBundle{ + megabundle := types.MevBundle{ Txs: txs, BlockNumber: blockNumber, MinTimestamp: minTimestamp, MaxTimestamp: maxTimestamp, RevertingTxHashes: revertingTxHashes, } + + pool.megabundles[relayAddr] = megabundle + + for _, hook := range pool.NewMegabundleHooks { + go hook(relayAddr, &megabundle) + } + return nil } diff --git a/eth/backend.go b/eth/backend.go index 22535e0e2289..1dbeebdd9718 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -292,7 +292,7 @@ func makeExtraData(extra []byte) []byte { // APIs return the collection of RPC services the ethereum package offers. // NOTE, some of these services probably need to be moved to somewhere else. func (s *Ethereum) APIs() []rpc.API { - apis := ethapi.GetAPIs(s.APIBackend) + apis := ethapi.GetAPIs(s.APIBackend, s.BlockChain()) // Append any APIs exposed explicitly by the consensus engine apis = append(apis, s.engine.APIs(s.BlockChain())...) diff --git a/infra/Dockerfile.node b/infra/Dockerfile.node index db8e99ac937e..7868453eba7f 100644 --- a/infra/Dockerfile.node +++ b/infra/Dockerfile.node @@ -4,7 +4,7 @@ FROM golang:1.15-alpine as builder RUN apk add --no-cache make gcc musl-dev linux-headers git ADD . /go-ethereum -RUN cd /go-ethereum && make geth +RUN cd /go-ethereum && GO111MODULE=on go run build/ci.go install ./cmd/geth # Pull Geth into a second stage deploy alpine container FROM alpine:latest diff --git a/infra/Dockerfile.updater b/infra/Dockerfile.updater index d3099d19ce1a..808f55aa2b9c 100644 --- a/infra/Dockerfile.updater +++ b/infra/Dockerfile.updater @@ -4,7 +4,7 @@ FROM golang:1.15-alpine as builder RUN apk add --no-cache make gcc musl-dev linux-headers git ADD . /go-ethereum -RUN cd /go-ethereum && make geth +RUN cd /go-ethereum && GO111MODULE=on go run build/ci.go install ./cmd/geth # Pull Geth into a second stage deploy alpine container FROM alpine:latest diff --git a/infra/mev-geth-nodes-x86-64.yaml b/infra/mev-geth-nodes-x86-64.yaml index bf7a196caa52..f93c19153394 100644 --- a/infra/mev-geth-nodes-x86-64.yaml +++ b/infra/mev-geth-nodes-x86-64.yaml @@ -45,11 +45,11 @@ Parameters: InstanceType: Type: String - Default: i3en.large + Default: i3en.xlarge MemoryLimit: Type: Number - Default: 6144 + Default: 20000 KeyPair: Type: AWS::EC2::KeyPair::KeyName @@ -82,6 +82,14 @@ Parameters: Type: AWS::SSM::Parameter::Value Default: /aws/service/ecs/optimized-ami/amazon-linux-2/recommended/image_id + PrivateZoneName: + Type: String + Default: geth.internal + + ServiceName: + Type: String + Default: node + # SNS Parameters SNSSubscriptionEndpoint: @@ -172,6 +180,10 @@ Metadata: default: "The name of the node ECS Task" ECSAMI: default: "The ECS AMI ID populated from SSM." + PrivateZoneName: + default: "The DNS zone that should be used for service discovery records." + ServiceName: + default: "The service name prefix that should be used for service discovery records." Network: default: "The Ethereum network you will be connecting to" SyncMode: @@ -211,6 +223,8 @@ Metadata: - NodeDesiredCount - NodeTaskName - ECSAMI + - PrivateZoneName + - ServiceName - Label: default: Mev-Geth Configuration Parameters: @@ -238,6 +252,8 @@ Mappings: us-east-2: mainnet: mev-geth-updater-fast-chainbucket-17p2xhnhcydlz goerli: mev-geth-updater-fast-goerli-chainbucket-j6dujg8apbna + eu-west-1: + mainnet: mev-geth-updater-chainbucket-11hs3dhhz7k0s #us-west-2: # mainnet: # goerli: @@ -284,6 +300,14 @@ Resources: FromPort: !Ref NetPort ToPort: !Ref NetPort CidrIpv6: ::/0 + - IpProtocol: tcp + FromPort: !Ref RpcPort + ToPort: !Ref RpcPort + CidrIp: 172.31.0.0/16 + - IpProtocol: tcp + FromPort: !Ref WsPort + ToPort: !Ref WsPort + CidrIp: 172.31.0.0/16 ECSAutoScalingGroup: Type: AWS::AutoScaling::AutoScalingGroup @@ -629,7 +653,7 @@ Resources: Properties: Cluster: !Ref Cluster DesiredCount: !Ref NodeDesiredCount - HealthCheckGracePeriodSeconds: 3600 + HealthCheckGracePeriodSeconds: 14400 TaskDefinition: !Ref NodeTaskDefinition LaunchType: EC2 DeploymentConfiguration: @@ -642,6 +666,11 @@ Resources: - ContainerName: !Ref NodeTaskName ContainerPort: !Ref WsPort TargetGroupArn: !Ref NodeWsTargetGroup + ServiceName: !Sub ${ServiceName}-${Network}-${SyncMode} + ServiceRegistries: + - RegistryArn: !GetAtt DiscoveryService.Arn + ContainerName: !Ref NodeTaskName + ContainerPort: !Ref RpcPort NodeTaskDefinition: Type: AWS::ECS::TaskDefinition @@ -688,14 +717,34 @@ Resources: awslogs-region: !Ref AWS::Region awslogs-group: !Ref NodeLogGroup awslogs-stream-prefix: !Ref AWS::StackName - #HealthCheck: - # Command: - # - CMD-SHELL - # - '[ `echo "eth.syncing.highestBlock - eth.syncing.currentBlock"|geth attach|head -10|tail -1` -lt 200 ] || exit 1' - # Interval: 300 - # Timeout: 60 - # Retries: 10 - # StartPeriod: 300 + HealthCheck: + Command: + - CMD-SHELL + - '[ `echo "eth.syncing" | geth attach | head -10 | tail -1` = "false" ] || exit 1' + Interval: 300 + Timeout: 60 + Retries: 10 + StartPeriod: 300 + + PrivateNamespace: + Type: AWS::ServiceDiscovery::PrivateDnsNamespace + Properties: + Name: !Ref PrivateZoneName + Vpc: !Ref VPC + + DiscoveryService: + Type: AWS::ServiceDiscovery::Service + Properties: + Description: Discovery service for nodes + DnsConfig: + RoutingPolicy: MULTIVALUE + DnsRecords: + - TTL: 60 + Type: SRV + HealthCheckCustomConfig: + FailureThreshold: 1 + Name: !Sub ${ServiceName}-${Network}-${SyncMode} + NamespaceId: !Ref PrivateNamespace # CodePipeline Resources @@ -969,4 +1018,4 @@ Outputs: Value: !Ref NodeTargetGroup NodeServiceUrl: Description: URL of the load balancer for the node service. - Value: !Sub http://${NodeLoadBalancer.DNSName} + Value: !Sub http://${NodeLoadBalancer.DNSName} \ No newline at end of file diff --git a/infra/start-mev-geth-node.sh b/infra/start-mev-geth-node.sh index 05ad50c61003..45ef0c519734 100755 --- a/infra/start-mev-geth-node.sh +++ b/infra/start-mev-geth-node.sh @@ -33,6 +33,7 @@ start_node() { --ws.api eth,net,web3 \ --ws.origins '*' \ --syncmode $syncmode \ + --gcmode archive \ --cache 4096 \ --maxpeers $connections \ --goerli @@ -41,7 +42,7 @@ start_node() { echo "Node failed to start; exiting." exit 1 fi - else + else geth \ --port $netport \ --http \ @@ -59,7 +60,9 @@ start_node() { --ws.api eth,net,web3 \ --ws.origins '*' \ --syncmode $syncmode \ + --gcmode archive \ --cache 4096 \ + --snapshot=false \ --maxpeers $connections if [ $? -ne 0 ] then diff --git a/infra/start-mev-geth-updater.sh b/infra/start-mev-geth-updater.sh index 11a6a533aa14..abad72fab9bb 100755 --- a/infra/start-mev-geth-updater.sh +++ b/infra/start-mev-geth-updater.sh @@ -18,6 +18,7 @@ start_node() { --port $netport \ --syncmode $syncmode \ --cache 4096 \ + --gcmode archive \ --maxpeers $connections \ --goerli & if [ $? -ne 0 ] @@ -30,6 +31,7 @@ start_node() { --port $netport \ --syncmode $syncmode \ --cache 4096 \ + --gcmode archive \ --maxpeers $connections & if [ $? -ne 0 ] then diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 63b046c90a2f..4497361674b0 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -18,6 +18,8 @@ package ethapi import ( "context" + "crypto/rand" + "encoding/hex" "errors" "fmt" "math/big" @@ -47,6 +49,7 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/tyler-smith/go-bip39" + "golang.org/x/crypto/sha3" ) // PublicEthereumAPI provides an API to access Ethereum related information. @@ -2207,3 +2210,310 @@ func (s *PrivateTxBundleAPI) SendMegabundle(ctx context.Context, args SendMegabu } return s.b.SendMegabundle(ctx, txs, rpc.BlockNumber(args.BlockNumber), minTimestamp, maxTimestamp, args.RevertingTxHashes, relayAddr) } + +// BundleAPI offers an API for accepting bundled transactions +type BundleAPI struct { + b Backend + chain *core.BlockChain +} + +// NewBundleAPI creates a new Tx Bundle API instance. +func NewBundleAPI(b Backend, chain *core.BlockChain) *BundleAPI { + return &BundleAPI{b, chain} +} + +// CallBundleArgs represents the arguments for a call. +type CallBundleArgs struct { + Txs []hexutil.Bytes `json:"txs"` + BlockNumber rpc.BlockNumber `json:"blockNumber"` + StateBlockNumberOrHash rpc.BlockNumberOrHash `json:"stateBlockNumber"` + Coinbase *string `json:"coinbase"` + Timestamp *uint64 `json:"timestamp"` + Timeout *int64 `json:"timeout"` + GasLimit *uint64 `json:"gasLimit"` + Difficulty *big.Int `json:"difficulty"` + BaseFee *big.Int `json:"baseFee"` +} + +// CallBundle will simulate a bundle of transactions at the top of a given block +// number with the state of another (or the same) block. This can be used to +// simulate future blocks with the current state, or it can be used to simulate +// a past block. +// The sender is responsible for signing the transactions and using the correct +// nonce and ensuring validity +func (s *BundleAPI) CallBundle(ctx context.Context, args CallBundleArgs) (map[string]interface{}, error) { + if len(args.Txs) == 0 { + return nil, errors.New("bundle missing txs") + } + if args.BlockNumber == 0 { + return nil, errors.New("bundle missing blockNumber") + } + + var txs types.Transactions + + for _, encodedTx := range args.Txs { + tx := new(types.Transaction) + if err := tx.UnmarshalBinary(encodedTx); err != nil { + return nil, err + } + txs = append(txs, tx) + } + defer func(start time.Time) { log.Debug("Executing EVM call finished", "runtime", time.Since(start)) }(time.Now()) + + timeoutMilliSeconds := int64(5000) + if args.Timeout != nil { + timeoutMilliSeconds = *args.Timeout + } + timeout := time.Millisecond * time.Duration(timeoutMilliSeconds) + state, parent, err := s.b.StateAndHeaderByNumberOrHash(ctx, args.StateBlockNumberOrHash) + if state == nil || err != nil { + return nil, err + } + blockNumber := big.NewInt(int64(args.BlockNumber)) + + timestamp := parent.Time + 1 + if args.Timestamp != nil { + timestamp = *args.Timestamp + } + coinbase := parent.Coinbase + if args.Coinbase != nil { + coinbase = common.HexToAddress(*args.Coinbase) + } + difficulty := parent.Difficulty + if args.Difficulty != nil { + difficulty = args.Difficulty + } + gasLimit := parent.GasLimit + if args.GasLimit != nil { + gasLimit = *args.GasLimit + } + var baseFee *big.Int + if args.BaseFee != nil { + baseFee = args.BaseFee + } else if s.b.ChainConfig().IsLondon(big.NewInt(args.BlockNumber.Int64())) { + baseFee = misc.CalcBaseFee(s.b.ChainConfig(), parent) + } + header := &types.Header{ + ParentHash: parent.Hash(), + Number: blockNumber, + GasLimit: gasLimit, + Time: timestamp, + Difficulty: difficulty, + Coinbase: coinbase, + BaseFee: baseFee, + } + + // Setup context so it may be cancelled the call has completed + // or, in case of unmetered gas, setup a context with a timeout. + var cancel context.CancelFunc + if timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, timeout) + } else { + ctx, cancel = context.WithCancel(ctx) + } + // Make sure the context is cancelled when the call has completed + // this makes sure resources are cleaned up. + defer cancel() + + vmconfig := vm.Config{} + + // Setup the gas pool (also for unmetered requests) + // and apply the message. + gp := new(core.GasPool).AddGas(math.MaxUint64) + + results := []map[string]interface{}{} + coinbaseBalanceBefore := state.GetBalance(coinbase) + + bundleHash := sha3.NewLegacyKeccak256() + signer := types.MakeSigner(s.b.ChainConfig(), blockNumber) + var totalGasUsed uint64 + gasFees := new(big.Int) + for i, tx := range txs { + coinbaseBalanceBeforeTx := state.GetBalance(coinbase) + state.Prepare(tx.Hash(), i) + + receipt, result, err := core.ApplyTransactionWithResult(s.b.ChainConfig(), s.chain, &coinbase, gp, state, header, tx, &header.GasUsed, vmconfig) + if err != nil { + return nil, fmt.Errorf("err: %w; txhash %s", err, tx.Hash()) + } + + txHash := tx.Hash().String() + from, err := types.Sender(signer, tx) + if err != nil { + return nil, fmt.Errorf("err: %w; txhash %s", err, tx.Hash()) + } + to := "0x" + if tx.To() != nil { + to = tx.To().String() + } + jsonResult := map[string]interface{}{ + "txHash": txHash, + "gasUsed": receipt.GasUsed, + "fromAddress": from.String(), + "toAddress": to, + } + totalGasUsed += receipt.GasUsed + gasPrice, err := tx.EffectiveGasTip(header.BaseFee) + if err != nil { + return nil, fmt.Errorf("err: %w; txhash %s", err, tx.Hash()) + } + gasFeesTx := new(big.Int).Mul(big.NewInt(int64(receipt.GasUsed)), gasPrice) + gasFees.Add(gasFees, gasFeesTx) + bundleHash.Write(tx.Hash().Bytes()) + if result.Err != nil { + jsonResult["error"] = result.Err.Error() + revert := result.Revert() + if len(revert) > 0 { + jsonResult["revert"] = string(revert) + } + } else { + dst := make([]byte, hex.EncodedLen(len(result.Return()))) + hex.Encode(dst, result.Return()) + jsonResult["value"] = "0x" + string(dst) + } + coinbaseDiffTx := new(big.Int).Sub(state.GetBalance(coinbase), coinbaseBalanceBeforeTx) + jsonResult["coinbaseDiff"] = coinbaseDiffTx.String() + jsonResult["gasFees"] = gasFeesTx.String() + jsonResult["ethSentToCoinbase"] = new(big.Int).Sub(coinbaseDiffTx, gasFeesTx).String() + jsonResult["gasPrice"] = new(big.Int).Div(coinbaseDiffTx, big.NewInt(int64(receipt.GasUsed))).String() + jsonResult["gasUsed"] = receipt.GasUsed + results = append(results, jsonResult) + } + + ret := map[string]interface{}{} + ret["results"] = results + coinbaseDiff := new(big.Int).Sub(state.GetBalance(coinbase), coinbaseBalanceBefore) + ret["coinbaseDiff"] = coinbaseDiff.String() + ret["gasFees"] = gasFees.String() + ret["ethSentToCoinbase"] = new(big.Int).Sub(coinbaseDiff, gasFees).String() + ret["bundleGasPrice"] = new(big.Int).Div(coinbaseDiff, big.NewInt(int64(totalGasUsed))).String() + ret["totalGasUsed"] = totalGasUsed + ret["stateBlockNumber"] = parent.Number.Int64() + + ret["bundleHash"] = "0x" + common.Bytes2Hex(bundleHash.Sum(nil)) + return ret, nil +} + +// EstimateGasBundleArgs represents the arguments for a call +type EstimateGasBundleArgs struct { + Txs []TransactionArgs `json:"txs"` + BlockNumber rpc.BlockNumber `json:"blockNumber"` + StateBlockNumberOrHash rpc.BlockNumberOrHash `json:"stateBlockNumber"` + Coinbase *string `json:"coinbase"` + Timestamp *uint64 `json:"timestamp"` + Timeout *int64 `json:"timeout"` +} + +func (s *BundleAPI) EstimateGasBundle(ctx context.Context, args EstimateGasBundleArgs) (map[string]interface{}, error) { + if len(args.Txs) == 0 { + return nil, errors.New("bundle missing txs") + } + if args.BlockNumber == 0 { + return nil, errors.New("bundle missing blockNumber") + } + + timeoutMS := int64(5000) + if args.Timeout != nil { + timeoutMS = *args.Timeout + } + timeout := time.Millisecond * time.Duration(timeoutMS) + + state, parent, err := s.b.StateAndHeaderByNumberOrHash(ctx, args.StateBlockNumberOrHash) + if state == nil || err != nil { + return nil, err + } + blockNumber := big.NewInt(int64(args.BlockNumber)) + timestamp := parent.Time + 1 + if args.Timestamp != nil { + timestamp = *args.Timestamp + } + coinbase := parent.Coinbase + if args.Coinbase != nil { + coinbase = common.HexToAddress(*args.Coinbase) + } + + header := &types.Header{ + ParentHash: parent.Hash(), + Number: blockNumber, + GasLimit: parent.GasLimit, + Time: timestamp, + Difficulty: parent.Difficulty, + Coinbase: coinbase, + BaseFee: parent.BaseFee, + } + + // Setup context so it may be cancelled when the call + // has completed or, in case of unmetered gas, setup + // a context with a timeout + var cancel context.CancelFunc + if timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, timeout) + } else { + ctx, cancel = context.WithCancel(ctx) + } + + // Make sure the context is cancelled when the call has completed + // This makes sure resources are cleaned up + defer cancel() + + // RPC Call gas cap + globalGasCap := s.b.RPCGasCap() + + // Results + results := []map[string]interface{}{} + + // Copy the original db so we don't modify it + statedb := state.Copy() + + // Gas pool + gp := new(core.GasPool).AddGas(math.MaxUint64) + + // Block context + blockContext := core.NewEVMBlockContext(header, s.chain, &coinbase) + + // Feed each of the transactions into the VM ctx + // And try and estimate the gas used + for i, txArgs := range args.Txs { + // Since its a txCall we'll just prepare the + // state with a random hash + var randomHash common.Hash + rand.Read(randomHash[:]) + + // New random hash since its a call + statedb.Prepare(randomHash, i) + + // Convert tx args to msg to apply state transition + msg, err := txArgs.ToMessage(globalGasCap, header.BaseFee) + if err != nil { + return nil, err + } + + // Prepare the hashes + txContext := core.NewEVMTxContext(msg) + + // Get EVM Environment + vmenv := vm.NewEVM(blockContext, txContext, statedb, s.b.ChainConfig(), vm.Config{NoBaseFee: true}) + + // Apply state transition + result, err := core.ApplyMessage(vmenv, msg, gp) + if err != nil { + return nil, err + } + + // Modifications are committed to the state + // Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect + statedb.Finalise(vmenv.ChainConfig().IsEIP158(blockNumber)) + + // Append result + jsonResult := map[string]interface{}{ + "gasUsed": result.UsedGas, + } + results = append(results, jsonResult) + } + + // Return results + ret := map[string]interface{}{} + ret["results"] = results + + return ret, nil +} diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 58c8f0bf04e1..783b46fbd296 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -97,7 +97,7 @@ type Backend interface { Engine() consensus.Engine } -func GetAPIs(apiBackend Backend) []rpc.API { +func GetAPIs(apiBackend Backend, chain *core.BlockChain) []rpc.API { nonceLock := new(AddrLocker) return []rpc.API{ { @@ -144,6 +144,11 @@ func GetAPIs(apiBackend Backend) []rpc.API { Version: "1.0", Service: NewPrivateTxBundleAPI(apiBackend), Public: true, + }, { + Namespace: "eth", + Version: "1.0", + Service: NewBundleAPI(apiBackend, chain), + Public: true, }, } } diff --git a/les/client.go b/les/client.go index 43207f3443ec..922c51627824 100644 --- a/les/client.go +++ b/les/client.go @@ -282,7 +282,7 @@ func (s *LightDummyAPI) Mining() bool { // APIs returns the collection of RPC services the ethereum package offers. // NOTE, some of these services probably need to be moved to somewhere else. func (s *LightEthereum) APIs() []rpc.API { - apis := ethapi.GetAPIs(s.ApiBackend) + apis := ethapi.GetAPIs(s.ApiBackend, nil) apis = append(apis, s.engine.APIs(s.BlockChain().HeaderChain())...) return append(apis, []rpc.API{ { diff --git a/miner/multi_worker.go b/miner/multi_worker.go index 050ea38af4e5..da1471fa3e75 100644 --- a/miner/multi_worker.go +++ b/miner/multi_worker.go @@ -105,16 +105,31 @@ func newMultiWorker(config *Config, chainConfig *params.ChainConfig, engine cons })) } + relayWorkerMap := make(map[common.Address]*worker) + for i := 0; i < len(config.TrustedRelays); i++ { - workers = append(workers, - newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, &flashbotsData{ - isFlashbots: true, - isMegabundleWorker: true, - queue: queue, - relayAddr: config.TrustedRelays[i], - })) + relayWorker := newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, &flashbotsData{ + isFlashbots: true, + isMegabundleWorker: true, + queue: queue, + relayAddr: config.TrustedRelays[i], + }) + workers = append(workers, relayWorker) + relayWorkerMap[config.TrustedRelays[i]] = relayWorker } + eth.TxPool().NewMegabundleHooks = append(eth.TxPool().NewMegabundleHooks, func(relayAddr common.Address, megabundle *types.MevBundle) { + worker, found := relayWorkerMap[relayAddr] + if !found { + return + } + + select { + case worker.newMegabundleCh <- megabundle: + default: + } + }) + log.Info("creating multi worker", "config.MaxMergedBundles", config.MaxMergedBundles, "config.TrustedRelays", config.TrustedRelays, "worker", len(workers)) return &multiWorker{ regularWorker: regularWorker, diff --git a/miner/worker.go b/miner/worker.go index 2205f4fa71da..3f1d7c2dfc2b 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -213,6 +213,7 @@ type worker struct { exitCh chan struct{} resubmitIntervalCh chan time.Duration resubmitAdjustCh chan *intervalAdjust + newMegabundleCh chan *types.MevBundle wg sync.WaitGroup @@ -295,16 +296,18 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus txsCh: make(chan core.NewTxsEvent, txChanSize), chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), - newWorkCh: make(chan *newWorkReq), + newWorkCh: make(chan *newWorkReq, 1), getWorkCh: make(chan *getWorkReq), taskCh: taskCh, resultCh: make(chan *types.Block, resultQueueSize), exitCh: exitCh, startCh: make(chan struct{}, 1), + newMegabundleCh: make(chan *types.MevBundle), resubmitIntervalCh: make(chan time.Duration), resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), flashbots: flashbots, } + // Subscribe NewTxsEvent for tx pool worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) // Subscribe events for blockchain @@ -450,26 +453,38 @@ func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) t func (w *worker) newWorkLoop(recommit time.Duration) { defer w.wg.Done() var ( - interrupt *int32 - minRecommit = recommit // minimal resubmit interval specified by user. - timestamp int64 // timestamp for each round of sealing. + runningInterrupt *int32 // Running task interrupt + queuedInterrupt *int32 // Queued task interrupt + minRecommit = recommit // minimal resubmit interval specified by user. + timestamp int64 // timestamp for each round of sealing. ) timer := time.NewTimer(0) defer timer.Stop() <-timer.C // discard the initial tick - // commit aborts in-flight transaction execution with given signal and resubmits a new one. + // commit aborts in-flight transaction execution with highest seen signal and resubmits a new one commit := func(noempty bool, s int32) { - if interrupt != nil { - atomic.StoreInt32(interrupt, s) - } - interrupt = new(int32) select { - case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}: case <-w.exitCh: return + case queuedRequest := <-w.newWorkCh: + // Previously queued request wasn't started yet, update the request and resubmit + queuedRequest.noempty = queuedRequest.noempty || noempty + queuedRequest.timestamp = timestamp + w.newWorkCh <- queuedRequest // guaranteed to be nonblocking + default: + // Previously queued request has already started, cycle interrupt pointer and submit new work + runningInterrupt = queuedInterrupt + queuedInterrupt = new(int32) + + w.newWorkCh <- &newWorkReq{interrupt: queuedInterrupt, noempty: noempty, timestamp: timestamp} // guaranteed to be nonblocking } + + if runningInterrupt != nil && s > atomic.LoadInt32(runningInterrupt) { + atomic.StoreInt32(runningInterrupt, s) + } + timer.Reset(recommit) atomic.StoreInt32(&w.newTxs, 0) } @@ -496,6 +511,11 @@ func (w *worker) newWorkLoop(recommit time.Duration) { timestamp = time.Now().Unix() commit(false, commitInterruptNewHead) + case <-w.newMegabundleCh: + if w.isRunning() { + commit(true, commitInterruptNone) + } + case <-timer.C: // If sealing is running resubmit a new work cycle periodically to pull in // higher priced transactions. Disable this overhead for pending blocks. @@ -564,7 +584,10 @@ func (w *worker) mainLoop() { for { select { case req := <-w.newWorkCh: - w.commitWork(req.interrupt, req.noempty, req.timestamp) + // Don't start if the work has already been interrupted + if req.interrupt == nil || atomic.LoadInt32(req.interrupt) == commitInterruptNone { + w.commitWork(req.interrupt, req.noempty, req.timestamp) + } case req := <-w.getWorkCh: block, err := w.generateWork(req.params) @@ -711,7 +734,7 @@ func (w *worker) taskLoop() { w.pendingTasks[sealHash] = task w.pendingMu.Unlock() - if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil { + if err := w.engine.Seal(w.chain, task.block, task.profit, w.resultCh, stopCh); err != nil { log.Warn("Block sealing failed", "err", err) w.pendingMu.Lock() delete(w.pendingTasks, sealHash) @@ -929,6 +952,7 @@ func (w *worker) commitBundle(env *environment, txs types.Transactions, interrup inc: true, } } + return atomic.LoadInt32(interrupt) == commitInterruptNewHead } // If we don't have enough gas for any further transactions then we're done @@ -1037,6 +1061,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP inc: true, } } + return atomic.LoadInt32(interrupt) == commitInterruptNewHead } // If we don't have enough gas for any further transactions then we're done @@ -1258,6 +1283,7 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) { if err != nil { return // no valid megabundle for this relay, nothing to do } + // Flashbots bundle merging duplicates work by simulating TXes and then committing them once more. // Megabundles API focuses on speed and runs everything in one cycle. coinbaseBalanceBefore := env.state.GetBalance(env.coinbase)