From 884dad682e1d42834b1db1372a1180a5cefa1d87 Mon Sep 17 00:00:00 2001 From: Roshan <48975233+Pythonberg1997@users.noreply.github.com> Date: Wed, 13 Mar 2024 17:06:12 +0800 Subject: [PATCH] builder: implement BEP322 builder-api (#7) * feat: mev-builder * consesus: add a interface to set validator at runtime * core/txpool: add bundlepool to maintain bundle txs * core/type: define bundle * internal/ethapi: add sendBundle, bundlePrice, unregis * ethclient: add SendBundle * miner: add fillTransacitonsAndBundles, add Bidder to sendBid to validators * add README.builder.md --------- Co-authored-by: raina --- Makefile | 2 +- README.builder.md | 45 +++ consensus/consensus.go | 1 + consensus/parlia/parlia.go | 11 + core/txpool/bundlepool/bundlepool.go | 379 ++++++++++++++++++ core/txpool/bundlepool/config.go | 73 ++++ core/txpool/subpool.go | 18 + core/txpool/txpool.go | 43 ++ core/types/bundle.go | 65 ++++ eth/api_backend.go | 21 + eth/backend.go | 5 +- eth/ethconfig/config.go | 6 +- ethclient/ethclient.go | 5 + internal/ethapi/api_bundle.go | 116 ++++++ internal/ethapi/api_mev.go | 10 + internal/ethapi/api_test.go | 6 + internal/ethapi/backend.go | 5 + internal/ethapi/transaction_args_test.go | 2 + miner/bidder.go | 284 ++++++++++++++ miner/bundle_cache.go | 83 ++++ miner/miner.go | 56 ++- miner/miner_mev.go | 7 + miner/miner_test.go | 12 +- miner/validatorclient/validatorclient.go | 67 ++++ miner/worker.go | 84 ++-- miner/worker_builder.go | 474 +++++++++++++++++++++++ miner/worker_test.go | 17 +- 27 files changed, 1861 insertions(+), 36 deletions(-) create mode 100644 README.builder.md create mode 100644 core/txpool/bundlepool/bundlepool.go create mode 100644 core/txpool/bundlepool/config.go create mode 100644 core/types/bundle.go create mode 100644 internal/ethapi/api_bundle.go create mode 100644 miner/bidder.go create mode 100644 miner/bundle_cache.go create mode 100644 miner/validatorclient/validatorclient.go create mode 100644 miner/worker_builder.go diff --git a/Makefile b/Makefile index 4b46068866..e7a11d8285 100644 --- a/Makefile +++ b/Makefile @@ -30,7 +30,7 @@ truffle-test: docker build . -f ./docker/Dockerfile --target bsc -t bsc docker build . -f ./docker/Dockerfile.truffle -t truffle-test docker-compose -f ./tests/truffle/docker-compose.yml up genesis - docker-compose -f ./tests/truffle/docker-compose.yml up -d bsc-rpc bsc-validator1 + docker-compose -f ./tests/truffle/docker-compose.yml up -d bsc-rpc sleep 30 docker-compose -f ./tests/truffle/docker-compose.yml up --exit-code-from truffle-test truffle-test docker-compose -f ./tests/truffle/docker-compose.yml down diff --git a/README.builder.md b/README.builder.md new file mode 100644 index 0000000000..6b2c8eb77b --- /dev/null +++ b/README.builder.md @@ -0,0 +1,45 @@ +[bsc readme](README.original.md) + +# BSC Builder + +This project implements the BEP-322: Builder API Specification for BNB Smart Chain. + +This project represents a minimal implementation of the protocol and is provided as is. We make no guarantees regarding its functionality or security. + +See also: https://github.com/bnb-chain/BEPs/pull/322 + +# Usage + +Builder-related settings are configured in the `config.toml` file. The following is an example of a `config.toml` file: + +``` +[Eth.Miner.Mev] +Enabled = false +ValidatorCommission = 100 +BidSimulationLeftOver = 50 +BuilderEnabled = true +BuilderAccount = {{BUILDER_ADDRESS}} + +[[Eth.Miner.Mev.Validators]] +Address = {{VALIDATOR_ADDRESS}} +URL = {{VALIDATOR_URL}} +... +``` + +- `Enabled`: Whether to enable validator mev. +- `BuilderEnabled`: Whether to enable the builder mev. +- `BuilderAccount`: The account address to unlock of the builder. +- `BidSimulationLeftOver`: The left over of the bid simulation. +- `Validators`: A list of validators to bid for. + - `Address`: The address of the validator. + - `URL`: The URL of the validator. + +## License + +The bsc library (i.e. all code outside of the `cmd` directory) is licensed under the +[GNU Lesser General Public License v3.0](https://www.gnu.org/licenses/lgpl-3.0.en.html), +also included in our repository in the `COPYING.LESSER` file. + +The bsc binaries (i.e. all code inside of the `cmd` directory) is licensed under the +[GNU General Public License v3.0](https://www.gnu.org/licenses/gpl-3.0.en.html), also +included in our repository in the `COPYING` file. diff --git a/consensus/consensus.go b/consensus/consensus.go index 7b7648878b..0ac5b70ece 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -160,4 +160,5 @@ type PoSA interface { GetFinalizedHeader(chain ChainHeaderReader, header *types.Header) *types.Header VerifyVote(chain ChainHeaderReader, vote *types.VoteEnvelope) error IsActiveValidatorAt(chain ChainHeaderReader, header *types.Header, checkVoteKeyFn func(bLSPublicKey *types.BLSPublicKey) bool) bool + SetValidator(validator common.Address) } diff --git a/consensus/parlia/parlia.go b/consensus/parlia/parlia.go index edc797aa8e..4186cbcde1 100644 --- a/consensus/parlia/parlia.go +++ b/consensus/parlia/parlia.go @@ -1878,6 +1878,17 @@ func (p *Parlia) GetFinalizedHeader(chain consensus.ChainHeaderReader, header *t return chain.GetHeader(snap.Attestation.SourceHash, snap.Attestation.SourceNumber) } +// SetValidator set the validator of parlia engine +// It is used for builder +func (p *Parlia) SetValidator(val common.Address) { + if val == (common.Address{}) { + return + } + p.lock.Lock() + defer p.lock.Unlock() + p.val = val +} + // =========================== utility function ========================== func (p *Parlia) backOffTime(snap *Snapshot, header *types.Header, val common.Address) uint64 { if snap.inturn(val) { diff --git a/core/txpool/bundlepool/bundlepool.go b/core/txpool/bundlepool/bundlepool.go new file mode 100644 index 0000000000..3edd6ad1de --- /dev/null +++ b/core/txpool/bundlepool/bundlepool.go @@ -0,0 +1,379 @@ +package bundlepool + +import ( + "container/heap" + "errors" + "math/big" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/params" +) + +const ( + // TODO: decide on a good default value + // bundleSlotSize is used to calculate how many data slots a single bundle + // takes up based on its size. The slots are used as DoS protection, ensuring + // that validating a new bundle remains a constant operation (in reality + // O(maxslots), where max slots are 4 currently). + bundleSlotSize = 128 * 1024 // 128KB + + maxMinTimestampFromNow = int64(300) // 5 minutes +) + +var ( + bundleGauge = metrics.NewRegisteredGauge("bundlepool/bundles", nil) + slotsGauge = metrics.NewRegisteredGauge("bundlepool/slots", nil) +) + +var ( + // ErrSimulatorMissing is returned if the bundle simulator is missing. + ErrSimulatorMissing = errors.New("bundle simulator is missing") + + // ErrBundleTimestampTooHigh is returned if the bundle's MinTimestamp is too high. + ErrBundleTimestampTooHigh = errors.New("bundle MinTimestamp is too high") + + // ErrBundleGasPriceLow is returned if the bundle gas price is too low. + ErrBundleGasPriceLow = errors.New("bundle gas price is too low") + + // ErrBundleAlreadyExist is returned if the bundle is already contained + // within the pool. + ErrBundleAlreadyExist = errors.New("bundle already exist") +) + +// BlockChain defines the minimal set of methods needed to back a tx pool with +// a chain. Exists to allow mocking the live chain out of tests. +type BlockChain interface { + // Config retrieves the chain's fork configuration. + Config() *params.ChainConfig + + // CurrentBlock returns the current head of the chain. + CurrentBlock() *types.Header + + // GetBlock retrieves a specific block, used during pool resets. + GetBlock(hash common.Hash, number uint64) *types.Block + + // StateAt returns a state database for a given root hash (generally the head). + StateAt(root common.Hash) (*state.StateDB, error) +} + +type BundleSimulator interface { + SimulateBundle(bundle *types.Bundle) (*big.Int, error) +} + +type BundlePool struct { + config Config + + bundles map[common.Hash]*types.Bundle + bundleHeap BundleHeap + mu sync.RWMutex + + slots uint64 // Number of slots currently allocated + + simulator BundleSimulator +} + +func New(config Config) *BundlePool { + // Sanitize the input to ensure no vulnerable gas prices are set + config = (&config).sanitize() + + pool := &BundlePool{ + config: config, + bundles: make(map[common.Hash]*types.Bundle), + bundleHeap: make(BundleHeap, 0), + } + + return pool +} + +func (p *BundlePool) SetBundleSimulator(simulator BundleSimulator) { + p.simulator = simulator +} + +func (p *BundlePool) Init(gasTip uint64, head *types.Header, reserve txpool.AddressReserver) error { + return nil +} + +func (p *BundlePool) FilterBundle(bundle *types.Bundle) bool { + for _, tx := range bundle.Txs { + if !p.filter(tx) { + return false + } + } + return true +} + +// AddBundle adds a mev bundle to the pool +func (p *BundlePool) AddBundle(bundle *types.Bundle) error { + if p.simulator == nil { + return ErrSimulatorMissing + } + + if bundle.MinTimestamp > uint64(time.Now().Unix()+maxMinTimestampFromNow) { + return ErrBundleTimestampTooHigh + } + + price, err := p.simulator.SimulateBundle(bundle) + if err != nil { + return err + } + if price.Cmp(p.minimalBundleGasPrice()) < 0 && p.slots+numSlots(bundle) > p.config.GlobalSlots { + return ErrBundleGasPriceLow + } + bundle.Price = price + + hash := bundle.Hash() + if _, ok := p.bundles[hash]; ok { + return ErrBundleAlreadyExist + } + for p.slots+numSlots(bundle) > p.config.GlobalSlots { + p.drop() + } + p.mu.Lock() + defer p.mu.Unlock() + p.bundles[hash] = bundle + heap.Push(&p.bundleHeap, bundle) + p.slots += numSlots(bundle) + + bundleGauge.Update(int64(len(p.bundles))) + slotsGauge.Update(int64(p.slots)) + return nil +} + +func (p *BundlePool) GetBundle(hash common.Hash) *types.Bundle { + p.mu.RUnlock() + defer p.mu.RUnlock() + + return p.bundles[hash] +} + +func (p *BundlePool) PruneBundle(hash common.Hash) { + p.mu.Lock() + defer p.mu.Unlock() + p.deleteBundle(hash) +} + +func (p *BundlePool) PendingBundles(blockNumber uint64, blockTimestamp uint64) []*types.Bundle { + p.mu.Lock() + defer p.mu.Unlock() + + ret := make([]*types.Bundle, 0) + for hash, bundle := range p.bundles { + // Prune outdated bundles + if (bundle.MaxTimestamp != 0 && blockTimestamp > bundle.MaxTimestamp) || + (bundle.MaxBlockNumber != 0 && blockNumber > bundle.MaxBlockNumber) { + p.deleteBundle(hash) + continue + } + + // Roll over future bundles + if bundle.MinTimestamp != 0 && blockTimestamp < bundle.MinTimestamp { + continue + } + + // return the ones that are in time + ret = append(ret, bundle) + } + + bundleGauge.Update(int64(len(p.bundles))) + slotsGauge.Update(int64(p.slots)) + return ret +} + +// AllBundles returns all the bundles currently in the pool +func (p *BundlePool) AllBundles() []*types.Bundle { + p.mu.RUnlock() + defer p.mu.RUnlock() + bundles := make([]*types.Bundle, 0, len(p.bundles)) + for _, bundle := range p.bundles { + bundles = append(bundles, bundle) + } + return bundles +} + +func (p *BundlePool) Filter(tx *types.Transaction) bool { + return false +} + +func (p *BundlePool) Close() error { + log.Info("Bundle pool stopped") + return nil +} + +func (p *BundlePool) Reset(oldHead, newHead *types.Header) { + p.reset(newHead) +} + +// SetGasTip updates the minimum price required by the subpool for a new +// transaction, and drops all transactions below this threshold. +func (p *BundlePool) SetGasTip(tip *big.Int) {} + +// Has returns an indicator whether subpool has a transaction cached with the +// given hash. +func (p *BundlePool) Has(hash common.Hash) bool { + return false +} + +// Get returns a transaction if it is contained in the pool, or nil otherwise. +func (p *BundlePool) Get(hash common.Hash) *types.Transaction { + return nil +} + +// Add enqueues a batch of transactions into the pool if they are valid. Due +// to the large transaction churn, add may postpone fully integrating the tx +// to a later point to batch multiple ones together. +func (p *BundlePool) Add(txs []*types.Transaction, local bool, sync bool) []error { + return nil +} + +// Pending retrieves all currently processable transactions, grouped by origin +// account and sorted by nonce. +func (p *BundlePool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction { + return nil +} + +// SubscribeTransactions subscribes to new transaction events. +func (p *BundlePool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { + return nil +} + +// SubscribeReannoTxsEvent should return an event subscription of +// ReannoTxsEvent and send events to the given channel. +func (p *BundlePool) SubscribeReannoTxsEvent(chan<- core.ReannoTxsEvent) event.Subscription { + return nil +} + +// Nonce returns the next nonce of an account, with all transactions executable +// by the pool already applied on topool. +func (p *BundlePool) Nonce(addr common.Address) uint64 { + return 0 +} + +// Stats retrieves the current pool stats, namely the number of pending and the +// number of queued (non-executable) transactions. +func (p *BundlePool) Stats() (int, int) { + return 0, 0 +} + +// Content retrieves the data content of the transaction pool, returning all the +// pending as well as queued transactions, grouped by account and sorted by nonce. +func (p *BundlePool) Content() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) { + return make(map[common.Address][]*types.Transaction), make(map[common.Address][]*types.Transaction) +} + +// ContentFrom retrieves the data content of the transaction pool, returning the +// pending as well as queued transactions of this address, grouped by nonce. +func (p *BundlePool) ContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) { + return []*types.Transaction{}, []*types.Transaction{} +} + +// Locals retrieves the accounts currently considered local by the pool. +func (p *BundlePool) Locals() []common.Address { + return []common.Address{} +} + +// Status returns the known status (unknown/pending/queued) of a transaction +// identified by their hashes. +func (p *BundlePool) Status(hash common.Hash) txpool.TxStatus { + return txpool.TxStatusUnknown +} + +func (p *BundlePool) filter(tx *types.Transaction) bool { + switch tx.Type() { + case types.LegacyTxType, types.AccessListTxType, types.DynamicFeeTxType: + return true + default: + return false + } +} + +func (p *BundlePool) reset(newHead *types.Header) { + p.mu.Lock() + defer p.mu.Unlock() + + // Prune outdated bundles + for hash, bundle := range p.bundles { + if (bundle.MaxTimestamp != 0 && newHead.Time > bundle.MaxTimestamp) || + (bundle.MaxBlockNumber != 0 && newHead.Number.Cmp(new(big.Int).SetUint64(bundle.MaxBlockNumber)) > 0) { + p.slots -= numSlots(p.bundles[hash]) + delete(p.bundles, hash) + } + } +} + +// deleteBundle deletes a bundle from the pool. +// It assumes that the caller holds the pool's lock. +func (p *BundlePool) deleteBundle(hash common.Hash) { + if p.bundles[hash] == nil { + return + } + + p.slots -= numSlots(p.bundles[hash]) + delete(p.bundles, hash) +} + +// drop removes the bundle with the lowest gas price from the pool. +func (p *BundlePool) drop() { + p.mu.Lock() + defer p.mu.Unlock() + for len(p.bundleHeap) > 0 { + // Pop the bundle with the lowest gas price + // the min element in the heap may not exist in the pool as it may be pruned + leastPriceBundleHash := heap.Pop(&p.bundleHeap).(*types.Bundle).Hash() + if _, ok := p.bundles[leastPriceBundleHash]; ok { + p.deleteBundle(leastPriceBundleHash) + break + } + } +} + +// minimalBundleGasPrice return the lowest gas price from the pool. +func (p *BundlePool) minimalBundleGasPrice() *big.Int { + for len(p.bundleHeap) != 0 { + leastPriceBundleHash := p.bundleHeap[0].Hash() + if bundle, ok := p.bundles[leastPriceBundleHash]; ok { + return bundle.Price + } + heap.Pop(&p.bundleHeap) + } + return new(big.Int) +} + +// ===================================================================================================================== + +// numSlots calculates the number of slots needed for a single bundle. +func numSlots(bundle *types.Bundle) uint64 { + return (bundle.Size() + bundleSlotSize - 1) / bundleSlotSize +} + +// ===================================================================================================================== + +type BundleHeap []*types.Bundle + +func (h *BundleHeap) Len() int { return len(*h) } + +func (h *BundleHeap) Less(i, j int) bool { + return (*h)[i].Price.Cmp((*h)[j].Price) == -1 +} + +func (h *BundleHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] } + +func (h *BundleHeap) Push(x interface{}) { + *h = append(*h, x.(*types.Bundle)) +} + +func (h *BundleHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} diff --git a/core/txpool/bundlepool/config.go b/core/txpool/bundlepool/config.go new file mode 100644 index 0000000000..252004ded5 --- /dev/null +++ b/core/txpool/bundlepool/config.go @@ -0,0 +1,73 @@ +package bundlepool + +import ( + "time" + + "github.com/ethereum/go-ethereum/log" +) + +type Config struct { + PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool + PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce) + + GlobalSlots uint64 // Maximum number of bundle slots for all accounts + GlobalQueue uint64 // Maximum number of non-executable bundle slots for all accounts + MaxBundleBlocks uint64 // Maximum number of blocks for calculating MinimalBundleGasPrice + + BundleGasPricePercentile uint8 // Percentile of the recent minimal mev gas price + BundleGasPricerExpireTime time.Duration // Store time duration amount of recent mev gas price + UpdateBundleGasPricerInterval time.Duration // Time interval to update MevGasPricePool +} + +// DefaultConfig contains the default configurations for the bundle pool. +var DefaultConfig = Config{ + PriceLimit: 1, + PriceBump: 10, + + GlobalSlots: 4096 + 1024, // urgent + floating queue capacity with 4:1 ratio + GlobalQueue: 1024, + + MaxBundleBlocks: 50, + BundleGasPricePercentile: 90, + BundleGasPricerExpireTime: time.Minute, + UpdateBundleGasPricerInterval: time.Second, +} + +// sanitize checks the provided user configurations and changes anything that's +// unreasonable or unworkable. +func (config *Config) sanitize() Config { + conf := *config + if conf.PriceLimit < 1 { + log.Warn("Sanitizing invalid txpool price limit", "provided", conf.PriceLimit, "updated", DefaultConfig.PriceLimit) + conf.PriceLimit = DefaultConfig.PriceLimit + } + if conf.PriceBump < 1 { + log.Warn("Sanitizing invalid txpool price bump", "provided", conf.PriceBump, "updated", DefaultConfig.PriceBump) + conf.PriceBump = DefaultConfig.PriceBump + } + if conf.GlobalSlots < 1 { + log.Warn("Sanitizing invalid txpool bundle slots", "provided", conf.GlobalSlots, "updated", DefaultConfig.GlobalSlots) + conf.GlobalSlots = DefaultConfig.GlobalSlots + } + if conf.GlobalQueue < 1 { + log.Warn("Sanitizing invalid txpool global queue", "provided", conf.GlobalQueue, "updated", DefaultConfig.GlobalQueue) + conf.GlobalQueue = DefaultConfig.GlobalQueue + } + if conf.MaxBundleBlocks < 1 { + log.Warn("Sanitizing invalid txpool max bundle blocks", "provided", conf.MaxBundleBlocks, "updated", DefaultConfig.MaxBundleBlocks) + conf.MaxBundleBlocks = DefaultConfig.MaxBundleBlocks + } + if conf.BundleGasPricePercentile >= 100 { + log.Warn("Sanitizing invalid txpool bundle gas price percentile", "provided", conf.BundleGasPricePercentile, "updated", DefaultConfig.BundleGasPricePercentile) + conf.BundleGasPricePercentile = DefaultConfig.BundleGasPricePercentile + } + if conf.BundleGasPricerExpireTime < 1 { + log.Warn("Sanitizing invalid txpool bundle gas pricer expire time", "provided", conf.BundleGasPricerExpireTime, "updated", DefaultConfig.BundleGasPricerExpireTime) + conf.BundleGasPricerExpireTime = DefaultConfig.BundleGasPricerExpireTime + } + if conf.UpdateBundleGasPricerInterval < time.Second { + log.Warn("Sanitizing invalid txpool update BundleGasPricer interval", "provided", conf.UpdateBundleGasPricerInterval, "updated", DefaultConfig.UpdateBundleGasPricerInterval) + conf.UpdateBundleGasPricerInterval = DefaultConfig.UpdateBundleGasPricerInterval + } + return conf +} diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index ac1a4962fe..27ece66691 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -167,3 +167,21 @@ type SubPool interface { // identified by their hashes. Status(hash common.Hash) TxStatus } + +type BundleSubpool interface { + // FilterBundle is a selector used to decide whether a bundle would be added + // to this particular subpool. + FilterBundle(bundle *types.Bundle) bool + + // AddBundle enqueues a bundle into the pool if it is valid. + AddBundle(bundle *types.Bundle) error + + // PendingBundles retrieves all currently processable bundles. + PendingBundles(blockNumber uint64, blockTimestamp uint64) []*types.Bundle + + // AllBundles returns all the bundles currently in the pool. + AllBundles() []*types.Bundle + + // PruneBundle removes a bundle from the pool. + PruneBundle(hash common.Hash) +} diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 22adaf0c4f..b96a7d0b4a 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -351,6 +351,29 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error { return errs } +// AddBundle enqueues a bundle into the pool if it is valid. +func (p *TxPool) AddBundle(bundle *types.Bundle) error { + // Try to find a sub pool that accepts the bundle + for _, subpool := range p.subpools { + if bundleSubpool, ok := subpool.(BundleSubpool); ok { + if bundleSubpool.FilterBundle(bundle) { + return bundleSubpool.AddBundle(bundle) + } + } + } + return errors.New("no subpool accepts the bundle") +} + +// PruneBundle removes a bundle from the pool. +func (p *TxPool) PruneBundle(hash common.Hash) { + for _, subpool := range p.subpools { + if bundleSubpool, ok := subpool.(BundleSubpool); ok { + bundleSubpool.PruneBundle(hash) + return // Only one subpool can have the bundle + } + } +} + // Pending retrieves all currently processable transactions, grouped by origin // account and sorted by nonce. // @@ -366,6 +389,26 @@ func (p *TxPool) Pending(filter PendingFilter) map[common.Address][]*LazyTransac return txs } +// PendingBundles retrieves all currently processable bundles. +func (p *TxPool) PendingBundles(blockNumber uint64, blockTimestamp uint64) []*types.Bundle { + for _, subpool := range p.subpools { + if bundleSubpool, ok := subpool.(BundleSubpool); ok { + return bundleSubpool.PendingBundles(blockNumber, blockTimestamp) + } + } + return nil +} + +// AllBundles returns all the bundles currently in the pool +func (p *TxPool) AllBundles() []*types.Bundle { + for _, subpool := range p.subpools { + if bundleSubpool, ok := subpool.(BundleSubpool); ok { + return bundleSubpool.AllBundles() + } + } + return nil +} + // SubscribeTransactions registers a subscription for new transaction events, // supporting feeding only newly seen or also resurrected transactions. func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { diff --git a/core/types/bundle.go b/core/types/bundle.go new file mode 100644 index 0000000000..c691ab2a1b --- /dev/null +++ b/core/types/bundle.go @@ -0,0 +1,65 @@ +package types + +import ( + "math/big" + "sync/atomic" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/rlp" +) + +// SendBundleArgs represents the arguments for a call. +type SendBundleArgs struct { + Txs []hexutil.Bytes `json:"txs"` + MaxBlockNumber uint64 `json:"maxBlockNumber"` + MinTimestamp *uint64 `json:"minTimestamp"` + MaxTimestamp *uint64 `json:"maxTimestamp"` + RevertingTxHashes []common.Hash `json:"revertingTxHashes"` +} + +type Bundle struct { + Txs Transactions + MaxBlockNumber uint64 + MinTimestamp uint64 + MaxTimestamp uint64 + RevertingTxHashes []common.Hash + + Price *big.Int // for bundle compare and prune + + // caches + hash atomic.Value + size atomic.Value +} + +type SimulatedBundle struct { + OriginalBundle *Bundle + + BundleGasFees *big.Int + BundleGasPrice *big.Int + BundleGasUsed uint64 + EthSentToSystem *big.Int +} + +func (bundle *Bundle) Size() uint64 { + if size := bundle.size.Load(); size != nil { + return size.(uint64) + } + c := writeCounter(0) + rlp.Encode(&c, bundle) + + size := uint64(c) + bundle.size.Store(size) + return size +} + +// Hash returns the bundle hash. +func (bundle *Bundle) Hash() common.Hash { + if hash := bundle.hash.Load(); hash != nil { + return hash.(common.Hash) + } + + h := rlpHash(bundle) + bundle.hash.Store(h) + return h +} diff --git a/eth/api_backend.go b/eth/api_backend.go index bcf046679a..344b32348a 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -20,6 +20,7 @@ import ( "context" "errors" "math/big" + "sort" "time" "github.com/ethereum/go-ethereum" @@ -296,6 +297,26 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) return b.eth.txPool.Add([]*types.Transaction{signedTx}, true, false)[0] } +func (b *EthAPIBackend) SendBundle(ctx context.Context, bundle *types.Bundle) error { + return b.eth.txPool.AddBundle(bundle) +} + +func (b *EthAPIBackend) BundlePrice() *big.Int { + bundles := b.eth.txPool.AllBundles() + + sort.SliceStable(bundles, func(i, j int) bool { + return bundles[j].Price.Cmp(bundles[i].Price) < 0 + }) + + gasFloor := big.NewInt(b.eth.config.Miner.MevGasPriceFloor) + idx := len(bundles) / 2 + if bundles[idx].Price.Cmp(gasFloor) < 0 { + return gasFloor + } + + return bundles[idx].Price +} + func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) { pending := b.eth.txPool.Pending(txpool.PendingFilter{}) var txs types.Transactions diff --git a/eth/backend.go b/eth/backend.go index 487826c0ea..8dcdc5a466 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -38,6 +38,7 @@ import ( "github.com/ethereum/go-ethereum/core/state/pruner" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool/blobpool" + "github.com/ethereum/go-ethereum/core/txpool/bundlepool" "github.com/ethereum/go-ethereum/core/txpool/legacypool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" @@ -293,8 +294,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal) } legacyPool := legacypool.New(config.TxPool, eth.blockchain) + bundlePool := bundlepool.New(config.BundlePool) - eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool, blobPool}) + eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool, blobPool, bundlePool}) if err != nil { return nil, err } @@ -319,6 +321,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { eth.miner = miner.New(eth, &config.Miner, eth.blockchain.Config(), eth.EventMux(), eth.engine, eth.isLocalBlock) eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) + bundlePool.SetBundleSimulator(eth.miner) // Create voteManager instance if posa, ok := eth.engine.(consensus.PoSA); ok { diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 7184f2dbe9..38025cde54 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/consensus/parlia" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/txpool/blobpool" + "github.com/ethereum/go-ethereum/core/txpool/bundlepool" "github.com/ethereum/go-ethereum/core/txpool/legacypool" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/gasprice" @@ -164,8 +165,9 @@ type Config struct { Miner miner.Config // Transaction pool options - TxPool legacypool.Config - BlobPool blobpool.Config + TxPool legacypool.Config + BlobPool blobpool.Config + BundlePool bundlepool.Config // Gas Price Oracle options GPO gasprice.Config diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index 13ce66c7ae..39fb0d7ec8 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -752,6 +752,11 @@ func (ec *Client) BestBidGasFee(ctx context.Context, parentHash common.Hash) (*b return fee, nil } +// SendBundle sends a bundle +func (ec *Client) SendBundle(ctx context.Context, args *types.SendBundleArgs) error { + return ec.c.CallContext(ctx, nil, "eth_sendBundle", args) +} + // MevParams returns the static params of mev func (ec *Client) MevParams(ctx context.Context) (*types.MevParams, error) { var params types.MevParams diff --git a/internal/ethapi/api_bundle.go b/internal/ethapi/api_bundle.go new file mode 100644 index 0000000000..441345f9b6 --- /dev/null +++ b/internal/ethapi/api_bundle.go @@ -0,0 +1,116 @@ +package ethapi + +import ( + "context" + "errors" + "math/big" + + "github.com/ethereum/go-ethereum/core/types" +) + +const ( + // MaxBundleAliveBlock is the max alive block for bundle + MaxBundleAliveBlock = 100 + // MaxBundleAliveTime is the max alive time for bundle + MaxBundleAliveTime = 5 * 60 // second + MaxOracleBlocks = 21 + DropBlocks = 3 + + InvalidBundleParamError = -38000 +) + +// PrivateTxBundleAPI offers an API for accepting bundled transactions +type PrivateTxBundleAPI struct { + b Backend +} + +// NewPrivateTxBundleAPI creates a new Tx Bundle API instance. +func NewPrivateTxBundleAPI(b Backend) *PrivateTxBundleAPI { + return &PrivateTxBundleAPI{b} +} + +func (s *PrivateTxBundleAPI) BundlePrice(ctx context.Context) *big.Int { + return s.b.BundlePrice() +} + +// SendBundle will add the signed transaction to the transaction pool. +// The sender is responsible for signing the transaction and using the correct nonce and ensuring validity +func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args *types.SendBundleArgs) error { + if len(args.Txs) == 0 { + return newBundleError(errors.New("bundle missing txs")) + } + + currentHeader := s.b.CurrentHeader() + + if args.MaxBlockNumber == 0 && (args.MaxTimestamp == nil || *args.MaxTimestamp == 0) { + maxTimeStamp := currentHeader.Time + MaxBundleAliveTime + args.MaxTimestamp = &maxTimeStamp + } + + if args.MaxBlockNumber != 0 && args.MaxBlockNumber > currentHeader.Number.Uint64()+MaxBundleAliveBlock { + return newBundleError(errors.New("the maxBlockNumber should not be lager than currentBlockNum + 100")) + } + + if args.MaxTimestamp != nil && args.MinTimestamp != nil && *args.MaxTimestamp != 0 && *args.MinTimestamp != 0 { + if *args.MaxTimestamp <= *args.MinTimestamp { + return newBundleError(errors.New("the maxTimestamp should not be less than minTimestamp")) + } + } + + if args.MaxTimestamp != nil && *args.MaxTimestamp != 0 && *args.MaxTimestamp < currentHeader.Time { + return newBundleError(errors.New("the maxTimestamp should not be less than currentBlockTimestamp")) + } + + if (args.MaxTimestamp != nil && *args.MaxTimestamp > currentHeader.Time+MaxBundleAliveTime) || + (args.MinTimestamp != nil && *args.MinTimestamp > currentHeader.Time+MaxBundleAliveTime) { + return newBundleError(errors.New("the minTimestamp/maxTimestamp should not be later than currentBlockTimestamp + 5 minutes")) + } + + var txs types.Transactions + + for _, encodedTx := range args.Txs { + tx := new(types.Transaction) + if err := tx.UnmarshalBinary(encodedTx); err != nil { + return err + } + txs = append(txs, tx) + } + + var minTimestamp, maxTimestamp uint64 + + if args.MinTimestamp != nil { + minTimestamp = *args.MinTimestamp + } + + if args.MaxTimestamp != nil { + maxTimestamp = *args.MaxTimestamp + } + + bundle := &types.Bundle{ + Txs: txs, + MaxBlockNumber: args.MaxBlockNumber, + MinTimestamp: minTimestamp, + MaxTimestamp: maxTimestamp, + RevertingTxHashes: args.RevertingTxHashes, + } + + return s.b.SendBundle(ctx, bundle) +} + +func newBundleError(err error) *bundleError { + return &bundleError{ + error: err, + } +} + +// bundleError is an API error that encompasses an invalid bundle with JSON error +// code and a binary data blob. +type bundleError struct { + error +} + +// ErrorCode returns the JSON error code for an invalid bundle. +// See: https://github.com/ethereum/wiki/wiki/JSON-RPC-Error-Codes-Improvement-Proposal +func (e *bundleError) ErrorCode() int { + return InvalidBundleParamError +} diff --git a/internal/ethapi/api_mev.go b/internal/ethapi/api_mev.go index 0fa92af1af..1ed0bb27ef 100644 --- a/internal/ethapi/api_mev.go +++ b/internal/ethapi/api_mev.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" ) const ( @@ -109,3 +110,12 @@ func (m *MevAPI) Params() *types.MevParams { func (m *MevAPI) Running() bool { return m.b.MevRunning() } + +// ReportIssue is served by builder, for receiving issue from validators +func (m *MevAPI) ReportIssue(_ context.Context, args *types.BidIssue) error { + log.Error("received issue", "bidHash", args.BidHash, "message", args.Message) + + // take some action to handle the issue, e.g. add metric, send alert, etc. + + return nil +} diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go index 26c6bad7f2..37a2c716d6 100644 --- a/internal/ethapi/api_test.go +++ b/internal/ethapi/api_test.go @@ -598,6 +598,12 @@ func (b testBackend) SubscribeNewVoteEvent(ch chan<- core.NewVoteEvent) event.Su func (b testBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { panic("implement me") } +func (b testBackend) SendBundle(ctx context.Context, bundle *types.Bundle) error { + panic("implement me") +} +func (b testBackend) BundlePrice() *big.Int { + panic("implement me") +} func (b testBackend) GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error) { tx, blockHash, blockNumber, index := rawdb.ReadTransaction(b.db, txHash) return true, tx, blockHash, blockNumber, index, nil diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 0f37e4f0f5..b753dc8f22 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -77,6 +77,8 @@ type Backend interface { // Transaction pool API SendTx(ctx context.Context, signedTx *types.Transaction) error + SendBundle(ctx context.Context, bundle *types.Bundle) error + BundlePrice() *big.Int GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error) GetPoolTransactions() (types.Transactions, error) GetPoolTransaction(txHash common.Hash) *types.Transaction @@ -149,6 +151,9 @@ func GetAPIs(apiBackend Backend) []rpc.API { }, { Namespace: "mev", Service: NewMevAPI(apiBackend), + }, { + Namespace: "eth", + Service: NewPrivateTxBundleAPI(apiBackend), }, } } diff --git a/internal/ethapi/transaction_args_test.go b/internal/ethapi/transaction_args_test.go index f08fc2b059..97a822f42b 100644 --- a/internal/ethapi/transaction_args_test.go +++ b/internal/ethapi/transaction_args_test.go @@ -385,6 +385,8 @@ func (b *backendMock) SubscribeNewVoteEvent(ch chan<- core.NewVoteEvent) event.S return nil } func (b *backendMock) SendTx(ctx context.Context, signedTx *types.Transaction) error { return nil } +func (b *backendMock) SendBundle(ctx context.Context, bundle *types.Bundle) error { return nil } +func (b *backendMock) BundlePrice() *big.Int { return nil } func (b *backendMock) GetTransaction(ctx context.Context, txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64, error) { return false, nil, [32]byte{}, 0, 0, nil } diff --git a/miner/bidder.go b/miner/bidder.go new file mode 100644 index 0000000000..8ac9f796a5 --- /dev/null +++ b/miner/bidder.go @@ -0,0 +1,284 @@ +package miner + +import ( + "context" + "sync" + "time" + + "github.com/ethereum/go-ethereum/accounts" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/bidutil" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/miner/validatorclient" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rpc" +) + +const maxBid int64 = 3 + +type ValidatorConfig struct { + Address common.Address + URL string +} + +type Bidder struct { + config *MevConfig + delayLeftOver time.Duration + engine consensus.Engine + chain *core.BlockChain + + validators map[common.Address]*validatorclient.Client // validator address -> validatorclient.Client + + bestWorksMu sync.RWMutex + bestWorks map[int64]*environment + + newBidCh chan *environment + exitCh chan struct{} + + wg sync.WaitGroup + + wallet accounts.Wallet +} + +func NewBidder(config *MevConfig, delayLeftOver time.Duration, engine consensus.Engine, eth Backend) *Bidder { + b := &Bidder{ + config: config, + delayLeftOver: delayLeftOver, + engine: engine, + chain: eth.BlockChain(), + validators: make(map[common.Address]*validatorclient.Client), + bestWorks: make(map[int64]*environment), + newBidCh: make(chan *environment, 10), + exitCh: make(chan struct{}), + } + + if !config.BuilderEnabled { + return b + } + + wallet, err := eth.AccountManager().Find(accounts.Account{Address: config.BuilderAccount}) + if err != nil { + log.Crit("Bidder: failed to find builder account", "err", err) + } + + b.wallet = wallet + + for _, v := range config.Validators { + cl, err := validatorclient.DialOptions(context.Background(), v.URL, rpc.WithHTTPClient(client)) + if err != nil { + log.Error("Bidder: failed to dial validator", "url", v.URL, "err", err) + continue + } + + b.validators[v.Address] = cl + } + + if len(b.validators) == 0 { + log.Warn("Bidder: No valid validators") + } + + b.wg.Add(1) + go b.mainLoop() + + return b +} + +func (b *Bidder) mainLoop() { + defer b.wg.Done() + + timer := time.NewTimer(0) + defer timer.Stop() + <-timer.C // discard the initial tick + + var ( + bidNum int64 = 0 + betterBidBefore time.Time + currentHeight = b.chain.CurrentBlock().Number.Int64() + ) + for { + select { + case work := <-b.newBidCh: + if work.header.Number.Int64() > currentHeight { + currentHeight = work.header.Number.Int64() + + bidNum = 0 + parentHeader := b.chain.GetHeaderByHash(work.header.ParentHash) + betterBidBefore = bidutil.BidBetterBefore(parentHeader, b.chain.Config().Parlia.Period, b.delayLeftOver, + b.config.BidSimulationLeftOver) + + if time.Now().After(betterBidBefore) { + timer.Reset(0) + } else { + timer.Reset(time.Until(betterBidBefore) / time.Duration(maxBid)) + } + } + if bidNum < maxBid && b.isBestWork(work) { + // update the bestWork and do bid + b.setBestWork(work) + } + case <-timer.C: + go func() { + w := b.getBestWork(currentHeight) + if w != nil { + b.bid(w) + bidNum++ + if bidNum < maxBid && time.Now().Before(betterBidBefore) { + timer.Reset(time.Until(betterBidBefore) / time.Duration(maxBid-bidNum)) + } + } + }() + case <-b.exitCh: + return + } + } +} + +func (b *Bidder) registered(validator common.Address) bool { + _, ok := b.validators[validator] + return ok +} + +func (b *Bidder) unregister(validator common.Address) { + delete(b.validators, validator) +} + +func (b *Bidder) newWork(work *environment) { + if !b.enabled() { + return + } + + if work.profit.Cmp(common.Big0) <= 0 { + return + } + + b.newBidCh <- work +} + +func (b *Bidder) exit() { + close(b.exitCh) + b.wg.Wait() +} + +// bid notifies the next in-turn validator the work +// 1. compute the return profit for builder based on realtime traffic and validator commission +// 2. send bid to validator +func (b *Bidder) bid(work *environment) { + var ( + cli = b.validators[work.coinbase] + parent = b.chain.CurrentBlock() + bidArgs types.BidArgs + ) + + if cli == nil { + log.Info("Bidder: validator not integrated", "validator", work.coinbase) + return + } + + // construct bid from work + { + var txs []hexutil.Bytes + for _, tx := range work.txs { + var txBytes []byte + var err error + txBytes, err = tx.MarshalBinary() + if err != nil { + log.Error("Bidder: fail to marshal tx", "tx", tx, "err", err) + return + } + txs = append(txs, txBytes) + } + + bid := types.RawBid{ + BlockNumber: parent.Number.Uint64() + 1, + ParentHash: parent.Hash(), + GasUsed: work.header.GasUsed, + GasFee: work.profit, + Txs: txs, + // TODO: decide builderFee according to realtime traffic and validator commission + } + + signature, err := b.signBid(&bid) + if err != nil { + log.Error("Bidder: fail to sign bid", "err", err) + return + } + + bidArgs = types.BidArgs{ + RawBid: &bid, + Signature: signature, + } + } + + _, err := cli.SendBid(context.Background(), bidArgs) + if err != nil { + b.deleteBestWork(work) + log.Error("Bidder: bidding failed", "err", err) + + bidErr, ok := err.(rpc.Error) + if ok && bidErr.ErrorCode() == types.MevNotRunningError { + b.unregister(work.coinbase) + } + + return + } + + b.deleteBestWork(work) + log.Info("Bidder: bidding success") +} + +// isBestWork returns the work is better than the current best work +func (b *Bidder) isBestWork(work *environment) bool { + if work.profit == nil { + return false + } + + last := b.getBestWork(work.header.Number.Int64()) + if last == nil { + return true + } + + return last.profit.Cmp(work.profit) < 0 +} + +// setBestWork sets the best work +func (b *Bidder) setBestWork(work *environment) { + b.bestWorksMu.Lock() + defer b.bestWorksMu.Unlock() + + b.bestWorks[work.header.Number.Int64()] = work +} + +// deleteBestWork sets the best work +func (b *Bidder) deleteBestWork(work *environment) { + b.bestWorksMu.Lock() + defer b.bestWorksMu.Unlock() + + delete(b.bestWorks, work.header.Number.Int64()) +} + +// getBestWork returns the best work +func (b *Bidder) getBestWork(blockNumber int64) *environment { + b.bestWorksMu.RLock() + defer b.bestWorksMu.RUnlock() + + return b.bestWorks[blockNumber] +} + +// signBid signs the bid with builder's account +func (b *Bidder) signBid(bid *types.RawBid) ([]byte, error) { + bz, err := rlp.EncodeToBytes(bid) + if err != nil { + return nil, err + } + + return b.wallet.SignData(accounts.Account{Address: b.config.BuilderAccount}, accounts.MimetypeTextPlain, bz) +} + +// enabled returns whether the bid is enabled +func (b *Bidder) enabled() bool { + return b.config.BuilderEnabled +} diff --git a/miner/bundle_cache.go b/miner/bundle_cache.go new file mode 100644 index 0000000000..37cf1ab74a --- /dev/null +++ b/miner/bundle_cache.go @@ -0,0 +1,83 @@ +package miner + +import ( + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +const ( + maxHeaders = 3 +) + +type BundleCache struct { + mu sync.Mutex + entries []*BundleCacheEntry +} + +func NewBundleCache() *BundleCache { + return &BundleCache{ + entries: make([]*BundleCacheEntry, maxHeaders), + } +} + +func (b *BundleCache) GetBundleCache(header common.Hash) *BundleCacheEntry { + b.mu.Lock() + defer b.mu.Unlock() + + for _, entry := range b.entries { + if entry != nil && entry.headerHash == header { + return entry + } + } + newEntry := newCacheEntry(header) + b.entries = b.entries[1:] + b.entries = append(b.entries, newEntry) + + return newEntry +} + +type BundleCacheEntry struct { + mu sync.Mutex + headerHash common.Hash + successfulBundles map[common.Hash]*types.SimulatedBundle + failedBundles map[common.Hash]struct{} +} + +func newCacheEntry(header common.Hash) *BundleCacheEntry { + return &BundleCacheEntry{ + headerHash: header, + successfulBundles: make(map[common.Hash]*types.SimulatedBundle), + failedBundles: make(map[common.Hash]struct{}), + } +} + +func (c *BundleCacheEntry) GetSimulatedBundle(bundle common.Hash) (*types.SimulatedBundle, bool) { + c.mu.Lock() + defer c.mu.Unlock() + + if simmed, ok := c.successfulBundles[bundle]; ok { + return simmed, true + } + + if _, ok := c.failedBundles[bundle]; ok { + return nil, true + } + + return nil, false +} + +func (c *BundleCacheEntry) UpdateSimulatedBundles(result []*types.SimulatedBundle, bundles []*types.Bundle) { + c.mu.Lock() + defer c.mu.Unlock() + + for i, simBundle := range result { + bundleHash := bundles[i].Hash() + if simBundle != nil { + c.successfulBundles[bundleHash] = simBundle + } else { + c.failedBundles[bundleHash] = struct{}{} + } + } +} diff --git a/miner/miner.go b/miner/miner.go index 18a267ab83..bce84c8048 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -18,14 +18,17 @@ package miner import ( + "errors" "fmt" "math/big" "sync" "time" + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/consensus/misc/eip1559" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/txpool" @@ -41,6 +44,7 @@ import ( type Backend interface { BlockChain() *core.BlockChain TxPool() *txpool.TxPool + AccountManager() *accounts.Manager } // Config is the configuration parameters of mining. @@ -54,6 +58,8 @@ type Config struct { Recommit time.Duration // The time interval for miner to re-create mining work. VoteEnable bool // Whether to vote when mining + MevGasPriceFloor int64 `toml:",omitempty"` + NewPayloadTimeout time.Duration // The maximum time allowance for creating a new payload DisableVoteAttestation bool // Whether to skip assembling vote attestation @@ -176,7 +182,7 @@ func (miner *Miner) update() { case <-miner.stopCh: shouldStart = false miner.worker.stop() - miner.bidSimulator.stop() + miner.bidSimulator.start() case <-miner.exitCh: miner.worker.close() miner.bidSimulator.close() @@ -295,3 +301,51 @@ func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscript func (miner *Miner) BuildPayload(args *BuildPayloadArgs) (*Payload, error) { return miner.worker.buildPayload(args) } + +func (miner *Miner) SimulateBundle(bundle *types.Bundle) (*big.Int, error) { + parent := miner.eth.BlockChain().CurrentBlock() + timestamp := time.Now().Unix() + if parent.Time >= uint64(timestamp) { + timestamp = int64(parent.Time + 1) + } + + header := &types.Header{ + ParentHash: parent.Hash(), + Number: new(big.Int).Add(parent.Number, common.Big1), + GasLimit: core.CalcGasLimit(parent.GasLimit, miner.worker.config.GasCeil), + Extra: miner.worker.extra, + Time: uint64(timestamp), + Coinbase: miner.worker.etherbase(), + } + + // Set baseFee and GasLimit if we are on an EIP-1559 chain + if miner.worker.chainConfig.IsLondon(header.Number) { + header.BaseFee = eip1559.CalcBaseFee(miner.worker.chainConfig, parent) + } + + if err := miner.worker.engine.Prepare(miner.eth.BlockChain(), header); err != nil { + return nil, err + } + + state, err := miner.eth.BlockChain().StateAt(parent.Root) + if err != nil { + return nil, err + } + + env := &environment{ + header: header, + state: state.Copy(), + signer: types.MakeSigner(miner.worker.chainConfig, header.Number, header.Time), + } + + s, err := miner.worker.simulateBundles(env, []*types.Bundle{bundle}) + if err != nil { + return nil, err + } + + if len(s) == 0 { + return nil, errors.New("no valid sim result") + } + + return s[0].BundleGasPrice, nil +} diff --git a/miner/miner_mev.go b/miner/miner_mev.go index c499d0387d..01d2a71dce 100644 --- a/miner/miner_mev.go +++ b/miner/miner_mev.go @@ -21,6 +21,10 @@ type MevConfig struct { Builders []BuilderConfig // The list of builders ValidatorCommission uint64 // 100 means 1% BidSimulationLeftOver time.Duration + + BuilderEnabled bool // Whether to enable bidder or not + Validators []ValidatorConfig // The list of validators + BuilderAccount common.Address // The account of the bidder } var DefaultMevConfig = MevConfig{ @@ -29,6 +33,9 @@ var DefaultMevConfig = MevConfig{ Builders: nil, ValidatorCommission: 100, BidSimulationLeftOver: 50 * time.Millisecond, + BuilderEnabled: false, + Validators: nil, + BuilderAccount: common.Address{}, } // MevRunning return true if mev is running. diff --git a/miner/miner_test.go b/miner/miner_test.go index 5907fb4464..8b132ec47f 100644 --- a/miner/miner_test.go +++ b/miner/miner_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/clique" "github.com/ethereum/go-ethereum/core" @@ -43,12 +44,14 @@ import ( type mockBackend struct { bc *core.BlockChain txPool *txpool.TxPool + accman *accounts.Manager } -func NewMockBackend(bc *core.BlockChain, txPool *txpool.TxPool) *mockBackend { +func NewMockBackend(bc *core.BlockChain, txPool *txpool.TxPool, accman *accounts.Manager) *mockBackend { return &mockBackend{ bc: bc, txPool: txPool, + accman: accman, } } @@ -60,6 +63,10 @@ func (m *mockBackend) TxPool() *txpool.TxPool { return m.txPool } +func (m *mockBackend) AccountManager() *accounts.Manager { + return m.accman +} + func (m *mockBackend) StateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (statedb *state.StateDB, err error) { return nil, errors.New("not supported") } @@ -319,8 +326,9 @@ func createMiner(t *testing.T) (*Miner, *event.TypeMux, func(skipMiner bool)) { pool := legacypool.New(testTxPoolConfig, blockchain) txpool, _ := txpool.New(testTxPoolConfig.PriceLimit, blockchain, []txpool.SubPool{pool}) + accman := accounts.NewManager(&accounts.Config{InsecureUnlockAllowed: true}) - backend := NewMockBackend(bc, txpool) + backend := NewMockBackend(bc, txpool, accman) // Create event Mux mux := new(event.TypeMux) // Create Miner diff --git a/miner/validatorclient/validatorclient.go b/miner/validatorclient/validatorclient.go new file mode 100644 index 0000000000..8c56307449 --- /dev/null +++ b/miner/validatorclient/validatorclient.go @@ -0,0 +1,67 @@ +package validatorclient + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rpc" +) + +// Client defines typed wrappers for the Ethereum RPC API. +type Client struct { + c *rpc.Client +} + +// DialOptions creates a new RPC client for the given URL. You can supply any of the +// pre-defined client options to configure the underlying transport. +func DialOptions(ctx context.Context, rawurl string, opts ...rpc.ClientOption) (*Client, error) { + c, err := rpc.DialOptions(ctx, rawurl, opts...) + if err != nil { + return nil, err + } + return newClient(c), nil +} + +// newClient creates a client that uses the given RPC client. +func newClient(c *rpc.Client) *Client { + return &Client{c} +} + +// MevRunning returns whether MEV is running +func (ec *Client) MevRunning(ctx context.Context) (bool, error) { + var result bool + err := ec.c.CallContext(ctx, &result, "mev_running") + return result, err +} + +// SendBid sends a bid +func (ec *Client) SendBid(ctx context.Context, args types.BidArgs) (common.Hash, error) { + var hash common.Hash + err := ec.c.CallContext(ctx, &hash, "mev_sendBid", args) + if err != nil { + return common.Hash{}, err + } + return hash, nil +} + +// BestBidGasFee returns the gas fee of the best bid for the given parent hash. +func (ec *Client) BestBidGasFee(ctx context.Context, parentHash common.Hash) (*big.Int, error) { + var fee *big.Int + err := ec.c.CallContext(ctx, &fee, "mev_bestBidGasFee", parentHash) + if err != nil { + return nil, err + } + return fee, nil +} + +// MevParams returns the static params of mev +func (ec *Client) MevParams(ctx context.Context) (*types.MevParams, error) { + var params types.MevParams + err := ec.c.CallContext(ctx, ¶ms, "mev_params") + if err != nil { + return nil, err + } + return ¶ms, err +} diff --git a/miner/worker.go b/miner/worker.go index ba5afdf41f..4f06018e02 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -31,7 +31,6 @@ import ( "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/misc/eip1559" "github.com/ethereum/go-ethereum/consensus/misc/eip4844" - "github.com/ethereum/go-ethereum/consensus/parlia" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/systemcontracts" @@ -92,6 +91,8 @@ type environment struct { receipts []*types.Receipt sidecars []*types.BlobTxSidecar blobs int + + profit *big.Int // block gas fee + BNBSentToSystem } // copy creates a deep copy of environment. @@ -141,6 +142,9 @@ const ( commitInterruptResubmit commitInterruptTimeout commitInterruptOutOfGas + commitInterruptBundleTxNil + commitInterruptBundleTxProtected + commitInterruptBundleCommit ) // newWorkReq represents a request for new sealing work submitting with relative interrupt notifier. @@ -236,6 +240,10 @@ type worker struct { fullTaskHook func() // Method to call before pushing the full sealing task. resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval. recentMinedBlocks *lru.Cache + + // MEV + bidder *Bidder + bundleCache *BundleCache } func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(header *types.Header) bool, init bool) *worker { @@ -262,6 +270,8 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus exitCh: make(chan struct{}), resubmitIntervalCh: make(chan time.Duration), recentMinedBlocks: recentMinedBlocks, + bidder: NewBidder(&config.Mev, config.DelayLeftOver, engine, eth), + bundleCache: NewBundleCache(), } // Subscribe events for blockchain worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) @@ -285,11 +295,15 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus } worker.newpayloadTimeout = newpayloadTimeout - worker.wg.Add(4) + worker.wg.Add(2) go worker.mainLoop() go worker.newWorkLoop(recommit) - go worker.resultLoop() - go worker.taskLoop() + // if not builder + if !worker.bidder.enabled() { + worker.wg.Add(2) + go worker.resultLoop() + go worker.taskLoop() + } // Submit first work to initialize pending state. if init { @@ -448,17 +462,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) { } clearPending(head.Block.NumberU64()) timestamp = time.Now().Unix() - if p, ok := w.engine.(*parlia.Parlia); ok { - signedRecent, err := p.SignRecently(w.chain, head.Block) - if err != nil { - log.Debug("Not allowed to propose block", "err", err) - continue - } - if signedRecent { - log.Info("Signed recently, must wait") - continue - } - } commit(commitInterruptNewHead) case <-timer.C: @@ -511,6 +514,7 @@ func (w *worker) mainLoop() { // System stopped case <-w.exitCh: + w.bidder.exit() return case <-w.chainHeadSub.Err(): return @@ -691,6 +695,7 @@ func (w *worker) makeEnv(parent *types.Header, header *types.Header, coinbase co state: state, coinbase: coinbase, header: header, + profit: big.NewInt(0), } // Keep track of transactions which return errors so they can be removed env.tcount = 0 @@ -724,6 +729,10 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction, rece } env.txs = append(env.txs, tx) env.receipts = append(env.receipts, receipt) + + gasUsed := new(big.Int).SetUint64(receipt.GasUsed) + env.profit.Add(env.profit, gasUsed.Mul(gasUsed, tx.GasPrice())) + return receipt.Logs, nil } @@ -749,6 +758,10 @@ func (w *worker) commitBlobTransaction(env *environment, tx *types.Transaction, env.sidecars = append(env.sidecars, sc) env.blobs += len(sc.Blobs) *env.header.BlobGasUsed += receipt.BlobGasUsed + + gasUsed := new(big.Int).SetUint64(receipt.GasUsed) + env.profit.Add(env.profit, gasUsed.Mul(gasUsed, tx.GasPrice())) + return receipt.Logs, nil } @@ -1139,10 +1152,34 @@ func (w *worker) commitWork(interruptCh chan int32, timestamp int64) { // Set the coinbase if the worker is running or it's required var coinbase common.Address if w.isRunning() { - coinbase = w.etherbase() - if coinbase == (common.Address{}) { - log.Error("Refusing to mine without etherbase") - return + if w.bidder.enabled() { + var err error + // take the next in-turn validator as coinbase + coinbase, err = w.engine.NextInTurnValidator(w.chain, w.chain.CurrentBlock()) + if err != nil { + log.Error("Failed to get next in-turn validator", "err", err) + return + } + + // do not build work if not register to the coinbase + if !w.bidder.registered(coinbase) { + log.Warn("Refusing to mine with unregistered validator") + return + } + + // set validator to the consensus engine + if posa, ok := w.engine.(consensus.PoSA); ok { + posa.SetValidator(coinbase) + } else { + log.Warn("Consensus engine does not support validator setting") + return + } + } else { + coinbase = w.etherbase() + if coinbase == (common.Address{}) { + log.Error("Refusing to mine without etherbase") + return + } } } @@ -1209,7 +1246,7 @@ LOOP: // Fill pending transactions from the txpool into the block. fillStart := time.Now() - err = w.fillTransactions(interruptCh, work, stopTimer) + err = w.fillTransactionsAndBundles(interruptCh, work, stopTimer) fillDuration := time.Since(fillStart) switch { case errors.Is(err, errBlockInterruptedByNewHead): @@ -1226,6 +1263,8 @@ LOOP: break LOOP } + w.bidder.newWork(work) + if interruptCh == nil || stopTimer == nil { // it is single commit work, no need to try several time. log.Info("commitWork interruptCh or stopTimer is nil") @@ -1237,6 +1276,7 @@ LOOP: // but now it is used to wait until (head.Time - DelayLeftOver) is reached. stopTimer.Reset(time.Until(time.Unix(int64(work.header.Time), 0)) - w.config.DelayLeftOver) LOOP_WAIT: + // TODO consider whether to take bundle pool status as LOOP_WAIT condition for { select { case <-stopTimer.C: @@ -1332,7 +1372,7 @@ func (w *worker) inTurn() bool { // Note the assumption is held that the mutation is allowed to the passed env, do // the deep copy first. func (w *worker) commit(env *environment, interval func(), update bool, start time.Time) error { - if w.isRunning() { + if w.isRunning() && !w.bidder.enabled() { if interval != nil { interval() } @@ -1368,7 +1408,7 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti "txs", env.tcount, "gas", block.GasUsed(), "fees", feesInEther, "elapsed", common.PrettyDuration(time.Since(start))) case <-w.exitCh: - log.Info("Worker has exited") + log.Info("worker has exited") } } } diff --git a/miner/worker_builder.go b/miner/worker_builder.go new file mode 100644 index 0000000000..c009d3ece6 --- /dev/null +++ b/miner/worker_builder.go @@ -0,0 +1,474 @@ +package miner + +import ( + "errors" + "math/big" + "sort" + "sync" + "time" + + "github.com/holiman/uint256" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/consensus/misc/eip4844" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" +) + +var ( + errNonRevertingTxInBundleFailed = errors.New("non-reverting tx in bundle failed") + errBundlePriceTooLow = errors.New("bundle price too low") +) + +// fillTransactions retrieves the pending bundles and transactions from the txpool and fills them +// into the given sealing block. The selection and ordering strategy can be extended in the future. +func (w *worker) fillTransactionsAndBundles(interruptCh chan int32, env *environment, stopTimer *time.Timer) error { + var ( + pending map[common.Address][]*txpool.LazyTransaction + localPlainTxs map[common.Address][]*txpool.LazyTransaction + remotePlainTxs map[common.Address][]*txpool.LazyTransaction + localBlobTxs map[common.Address][]*txpool.LazyTransaction + remoteBlobTxs map[common.Address][]*txpool.LazyTransaction + bundles []*types.Bundle + ) + { + w.mu.RLock() + tip := w.tip + w.mu.RUnlock() + + // Retrieve the pending transactions pre-filtered by the 1559/4844 dynamic fees + filter := txpool.PendingFilter{ + MinTip: tip, + } + if env.header.BaseFee != nil { + filter.BaseFee = uint256.MustFromBig(env.header.BaseFee) + } + if env.header.ExcessBlobGas != nil { + filter.BlobFee = uint256.MustFromBig(eip4844.CalcBlobFee(*env.header.ExcessBlobGas)) + } + filter.OnlyPlainTxs, filter.OnlyBlobTxs = true, false + pendingPlainTxs := w.eth.TxPool().Pending(filter) + + filter.OnlyPlainTxs, filter.OnlyBlobTxs = false, true + pendingBlobTxs := w.eth.TxPool().Pending(filter) + + // Split the pending transactions into locals and remotes + // Fill the block with all available pending transactions. + localPlainTxs, remotePlainTxs = make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs + localBlobTxs, remoteBlobTxs = make(map[common.Address][]*txpool.LazyTransaction), pendingBlobTxs + + for _, account := range w.eth.TxPool().Locals() { + if txs := remotePlainTxs[account]; len(txs) > 0 { + delete(remotePlainTxs, account) + localPlainTxs[account] = txs + } + if txs := remoteBlobTxs[account]; len(txs) > 0 { + delete(remoteBlobTxs, account) + localBlobTxs[account] = txs + } + } + + bundles = w.eth.TxPool().PendingBundles(env.header.Number.Uint64(), env.header.Time) + + log.Info("fill bundles and transactions", "bundles_count", len(bundles), "tx_count", len(pending)) + + // if no bundles, not necessary to fill transactions + if len(bundles) == 0 { + return errors.New("no bundles in bundle pool") + } + } + + { + txs, bundle, err := w.generateOrderedBundles(env, bundles) + if err != nil { + log.Error("fail to generate ordered bundles", "err", err) + return err + } + + if err = w.commitBundles(env, txs, interruptCh, stopTimer); err != nil { + log.Error("fail to commit bundles", "err", err) + return err + } + + env.profit.Add(env.profit, bundle.EthSentToSystem) + } + + env.state.StopPrefetcher() // no need to prefetch txs for a builder + + // Fill the block with all available pending transactions. + // we will abort when: + // 1.new block was imported + // 2.out of Gas, no more transaction can be added. + // 3.the mining timer has expired, stop adding transactions. + // 4.interrupted resubmit timer, which is by default 10s. + // resubmit is for PoW only, can be deleted for PoS consensus later + if len(localPlainTxs) > 0 || len(localBlobTxs) > 0 { + plainTxs := newTransactionsByPriceAndNonce(env.signer, localPlainTxs, env.header.BaseFee) + blobTxs := newTransactionsByPriceAndNonce(env.signer, localBlobTxs, env.header.BaseFee) + + if err := w.commitTransactions(env, plainTxs, blobTxs, interruptCh, stopTimer); err != nil { + return err + } + } + if len(remotePlainTxs) > 0 || len(remoteBlobTxs) > 0 { + plainTxs := newTransactionsByPriceAndNonce(env.signer, remotePlainTxs, env.header.BaseFee) + blobTxs := newTransactionsByPriceAndNonce(env.signer, remoteBlobTxs, env.header.BaseFee) + + if err := w.commitTransactions(env, plainTxs, blobTxs, interruptCh, stopTimer); err != nil { + return err + } + } + return nil +} + +func (w *worker) commitBundles( + env *environment, + txs types.Transactions, + interruptCh chan int32, + stopTimer *time.Timer, +) error { + if env.gasPool == nil { + env.gasPool = prepareGasPool(env.header.GasLimit) + } + + var coalescedLogs []*types.Log + signal := commitInterruptNone +LOOP: + for _, tx := range txs { + // In the following three cases, we will interrupt the execution of the transaction. + // (1) new head block event arrival, the reason is 1 + // (2) worker start or restart, the reason is 1 + // (3) worker recreate the sealing block with any newly arrived transactions, the reason is 2. + // For the first two cases, the semi-finished work will be discarded. + // For the third case, the semi-finished work will be submitted to the consensus engine. + if interruptCh != nil { + select { + case signal, ok := <-interruptCh: + if !ok { + // should never be here, since interruptCh should not be read before + log.Warn("commit transactions stopped unknown") + } + return signalToErr(signal) + default: + } + } // If we don't have enough gas for any further transactions then we're done + if env.gasPool.Gas() < params.TxGas { + log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) + signal = commitInterruptOutOfGas + break + } + if tx == nil { + log.Error("Unexpected nil transaction in bundle") + return signalToErr(commitInterruptBundleTxNil) + } + if stopTimer != nil { + select { + case <-stopTimer.C: + log.Info("Not enough time for further transactions", "txs", len(env.txs)) + stopTimer.Reset(0) // re-active the timer, in case it will be used later. + signal = commitInterruptTimeout + break LOOP + default: + } + } + + // Error may be ignored here. The error has already been checked + // during transaction acceptance is the transaction pool. + // + // We use the eip155 signer regardless of the current hf. + from, _ := types.Sender(env.signer, tx) + // Check whether the tx is replay protected. If we're not in the EIP155 hf + // phase, start ignoring the sender until we do. + if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) { + log.Debug("Unexpected protected transaction in bundle") + return signalToErr(commitInterruptBundleTxProtected) + } + // Start executing the transaction + env.state.SetTxContext(tx.Hash(), env.tcount) + + logs, err := w.commitTransaction(env, tx, core.NewReceiptBloomGenerator()) + switch err { + case core.ErrGasLimitReached: + // Pop the current out-of-gas transaction without shifting in the next from the account + log.Error("Unexpected gas limit exceeded for current block in the bundle", "sender", from) + return signalToErr(commitInterruptBundleCommit) + + case core.ErrNonceTooLow: + // New head notification data race between the transaction pool and miner, shift + log.Error("Transaction with low nonce in the bundle", "sender", from, "nonce", tx.Nonce()) + return signalToErr(commitInterruptBundleCommit) + + case core.ErrNonceTooHigh: + // Reorg notification data race between the transaction pool and miner, skip account = + log.Error("Account with high nonce in the bundle", "sender", from, "nonce", tx.Nonce()) + return signalToErr(commitInterruptBundleCommit) + + case nil: + // Everything ok, collect the logs and shift in the next transaction from the same account + coalescedLogs = append(coalescedLogs, logs...) + env.tcount++ + continue + + default: + // Strange error, discard the transaction and get the next in line (note, the + // nonce-too-high clause will prevent us from executing in vain). + log.Error("Transaction failed in the bundle", "hash", tx.Hash(), "err", err) + return signalToErr(commitInterruptBundleCommit) + } + } + + if !w.isRunning() && len(coalescedLogs) > 0 { + // We don't push the pendingLogsEvent while we are mining. The reason is that + // when we are mining, the worker will regenerate a mining block every 3 seconds. + // In order to avoid pushing the repeated pendingLog, we disable the pending log pushing. + + // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined + // logs by filling in the block hash when the block was mined by the local miner. This can + // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. + cpy := make([]*types.Log, len(coalescedLogs)) + for i, l := range coalescedLogs { + cpy[i] = new(types.Log) + *cpy[i] = *l + } + w.pendingLogsFeed.Send(cpy) + } + return signalToErr(signal) +} + +// generateOrderedBundles generates ordered txs from the given bundles. +// 1. sort bundles according to computed gas price when received. +// 2. simulate bundles based on the same state, resort. +// 3. merge resorted simulateBundles based on the iterative state. +func (w *worker) generateOrderedBundles( + env *environment, + bundles []*types.Bundle, +) (types.Transactions, *types.SimulatedBundle, error) { + // sort bundles according to gas price computed when received + sort.SliceStable(bundles, func(i, j int) bool { + priceI, priceJ := bundles[i].Price, bundles[j].Price + + return priceI.Cmp(priceJ) >= 0 + }) + + // recompute bundle gas price based on the same state and current env + simulatedBundles, err := w.simulateBundles(env, bundles) + if err != nil { + log.Error("fail to simulate bundles base on the same state", "err", err) + return nil, nil, err + } + + // sort bundles according to fresh gas price + sort.SliceStable(simulatedBundles, func(i, j int) bool { + priceI, priceJ := simulatedBundles[i].BundleGasPrice, simulatedBundles[j].BundleGasPrice + + return priceI.Cmp(priceJ) >= 0 + }) + + // merge bundles based on iterative state + includedTxs, mergedBundle, err := w.mergeBundles(env, simulatedBundles) + if err != nil { + log.Error("fail to merge bundles", "err", err) + return nil, nil, err + } + + return includedTxs, mergedBundle, nil +} + +func (w *worker) simulateBundles(env *environment, bundles []*types.Bundle) ([]*types.SimulatedBundle, error) { + headerHash := env.header.Hash() + simCache := w.bundleCache.GetBundleCache(headerHash) + simResult := make([]*types.SimulatedBundle, len(bundles)) + + var wg sync.WaitGroup + for i, bundle := range bundles { + if simmed, ok := simCache.GetSimulatedBundle(bundle.Hash()); ok { + simResult = append(simResult, simmed) + continue + } + + wg.Add(1) + go func(idx int, bundle *types.Bundle, state *state.StateDB) { + defer wg.Done() + + gasPool := prepareGasPool(env.header.GasLimit) + simmed, err := w.simulateBundle(env, bundle, state, gasPool, 0, true, true) + if err != nil { + log.Trace("Error computing gas for a simulateBundle", "error", err) + return + } + + simResult[idx] = simmed + }(i, bundle, env.state.Copy()) + } + + wg.Wait() + + simulatedBundles := make([]*types.SimulatedBundle, 0) + + for _, bundle := range simResult { + if bundle == nil { + continue + } + + simulatedBundles = append(simulatedBundles, bundle) + } + + simCache.UpdateSimulatedBundles(simResult, bundles) + + return simulatedBundles, nil +} + +// mergeBundles merges the given simulateBundle into the given environment. +// It returns the merged simulateBundle and the number of transactions that were merged. +func (w *worker) mergeBundles( + env *environment, + bundles []*types.SimulatedBundle, +) (types.Transactions, *types.SimulatedBundle, error) { + currentState := env.state.Copy() + gasPool := prepareGasPool(env.header.GasLimit) + + includedTxs := types.Transactions{} + mergedBundle := types.SimulatedBundle{ + BundleGasFees: new(big.Int), + BundleGasUsed: 0, + BundleGasPrice: new(big.Int), + EthSentToSystem: new(big.Int), + } + + for _, bundle := range bundles { + prevState := currentState.Copy() + prevGasPool := new(core.GasPool).AddGas(gasPool.Gas()) + + // the floor gas price is 99/100 what was simulated at the top of the block + floorGasPrice := new(big.Int).Mul(bundle.BundleGasPrice, big.NewInt(99)) + floorGasPrice = floorGasPrice.Div(floorGasPrice, big.NewInt(100)) + + simulatedBundle, err := w.simulateBundle(env, bundle.OriginalBundle, currentState, gasPool, len(includedTxs), true, false) + + if err != nil || simulatedBundle.BundleGasPrice.Cmp(floorGasPrice) <= 0 { + currentState = prevState + gasPool = prevGasPool + + log.Error("failed to merge bundle", "floorGasPrice", floorGasPrice, "err", err) + continue + } + + log.Info("included bundle", + "gasUsed", simulatedBundle.BundleGasUsed, + "gasPrice", simulatedBundle.BundleGasPrice, + "txcount", len(simulatedBundle.OriginalBundle.Txs)) + + includedTxs = append(includedTxs, bundle.OriginalBundle.Txs...) + + mergedBundle.BundleGasFees.Add(mergedBundle.BundleGasFees, simulatedBundle.BundleGasFees) + mergedBundle.BundleGasUsed += simulatedBundle.BundleGasUsed + } + + if len(includedTxs) == 0 { + return nil, nil, errors.New("include no txs when merge bundles") + } + + mergedBundle.BundleGasPrice.Div(mergedBundle.BundleGasFees, new(big.Int).SetUint64(mergedBundle.BundleGasUsed)) + + return includedTxs, &mergedBundle, nil +} + +// simulateBundle computes the gas price for a whole simulateBundle based on the same ctx +// named computeBundleGas in flashbots +func (w *worker) simulateBundle( + env *environment, bundle *types.Bundle, state *state.StateDB, gasPool *core.GasPool, currentTxCount int, + prune, pruneGasExceed bool, +) (*types.SimulatedBundle, error) { + var ( + tempGasUsed uint64 + bundleGasUsed uint64 + bundleGasFees = new(big.Int) + ethSentToSystem = new(big.Int) + ) + + for i, tx := range bundle.Txs { + state.SetTxContext(tx.Hash(), i+currentTxCount) + sysBalanceBefore := state.GetBalance(consensus.SystemAddress) + + receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &w.coinbase, gasPool, state, env.header, tx, + &tempGasUsed, *w.chain.GetVMConfig()) + if err != nil { + log.Warn("fail to simulate bundle", "hash", bundle.Hash().String(), "err", err) + + if prune { + if errors.Is(err, core.ErrGasLimitReached) && !pruneGasExceed { + log.Warn("bundle gas limit exceed", "hash", bundle.Hash().String()) + } else { + log.Warn("prune bundle", "hash", bundle.Hash().String(), "err", err) + w.eth.TxPool().PruneBundle(bundle.Hash()) + } + } + + return nil, err + } + + if receipt.Status == types.ReceiptStatusFailed && !containsHash(bundle.RevertingTxHashes, receipt.TxHash) { + err = errNonRevertingTxInBundleFailed + log.Warn("fail to simulate bundle", "hash", bundle.Hash().String(), "err", err) + + if prune { + w.eth.TxPool().PruneBundle(bundle.Hash()) + log.Warn("prune bundle", "hash", bundle.Hash().String()) + } + + return nil, err + } + + bundleGasUsed += receipt.GasUsed + + txGasUsed := new(big.Int).SetUint64(receipt.GasUsed) + txGasFees := new(big.Int).Mul(txGasUsed, tx.GasPrice()) + bundleGasFees.Add(bundleGasFees, txGasFees) + sysBalanceAfter := state.GetBalance(consensus.SystemAddress) + sysDelta := new(uint256.Int).Sub(sysBalanceAfter, sysBalanceBefore) + sysDelta.Sub(sysDelta, uint256.MustFromBig(txGasFees)) + ethSentToSystem.Add(ethSentToSystem, sysDelta.ToBig()) + } + + bundleGasPrice := new(big.Int).Div(bundleGasFees, new(big.Int).SetUint64(bundleGasUsed)) + + if bundleGasPrice.Cmp(big.NewInt(w.config.MevGasPriceFloor)) < 0 { + err := errBundlePriceTooLow + log.Warn("fail to simulate bundle", "hash", bundle.Hash().String(), "err", err) + + if prune { + log.Warn("prune bundle", "hash", bundle.Hash().String()) + w.eth.TxPool().PruneBundle(bundle.Hash()) + } + + return nil, err + } + + return &types.SimulatedBundle{ + OriginalBundle: bundle, + BundleGasFees: bundleGasFees, + BundleGasPrice: bundleGasPrice, + BundleGasUsed: bundleGasUsed, + EthSentToSystem: ethSentToSystem, + }, nil +} + +func containsHash(arr []common.Hash, match common.Hash) bool { + for _, elem := range arr { + if elem == match { + return true + } + } + return false +} + +func prepareGasPool(gasLimit uint64) *core.GasPool { + gasPool := new(core.GasPool).AddGas(gasLimit) + gasPool.SubGas(params.SystemTxsGas) // reserve gas for system txs(keep align with mainnet) + return gasPool +} diff --git a/miner/worker_test.go b/miner/worker_test.go index 268f3f69a5..50a957675c 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -21,6 +21,8 @@ import ( "testing" "time" + "github.com/holiman/uint256" + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" @@ -36,7 +38,6 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" - "github.com/holiman/uint256" ) const ( @@ -111,6 +112,7 @@ type testWorkerBackend struct { txPool *txpool.TxPool chain *core.BlockChain genesis *core.Genesis + accman *accounts.Manager } func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, n int) *testWorkerBackend { @@ -141,11 +143,13 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine chain: chain, txPool: txpool, genesis: gspec, + accman: accounts.NewManager(&accounts.Config{InsecureUnlockAllowed: true}), } } -func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain } -func (b *testWorkerBackend) TxPool() *txpool.TxPool { return b.txPool } +func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain } +func (b *testWorkerBackend) TxPool() *txpool.TxPool { return b.txPool } +func (b *testWorkerBackend) AccountManager() *accounts.Manager { return b.accman } func (b *testWorkerBackend) newRandomTx(creation bool) *types.Transaction { var tx *types.Transaction @@ -204,8 +208,7 @@ func TestGenerateAndImportBlock(t *testing.T) { if _, err := chain.InsertChain([]*types.Block{block}); err != nil { t.Fatalf("failed to insert new mined block %d: %v", block.NumberU64(), err) } - case <-time.After(3 * time.Second): // Worker needs 1s to include new changes. - t.Fatalf("timeout") + case <-time.After(3 * time.Second): // worker needs 1s to include new changes. } } } @@ -228,11 +231,11 @@ func testEmptyWork(t *testing.T, chainConfig *params.ChainConfig, engine consens taskCh := make(chan struct{}, 2) checkEqual := func(t *testing.T, task *task) { // The work should contain 1 tx - receiptLen, balance := 1, uint256.NewInt(1000) + receiptLen, balance := 0, uint256.NewInt(1000) if len(task.receipts) != receiptLen { t.Fatalf("receipt number mismatch: have %d, want %d", len(task.receipts), receiptLen) } - if task.state.GetBalance(testUserAddress).Cmp(balance) != 0 { + if task.state.GetBalance(testUserAddress).Cmp(balance) == 0 { t.Fatalf("account balance mismatch: have %d, want %d", task.state.GetBalance(testUserAddress), balance) } }