Skip to content

Commit

Permalink
Flashbots changes v0.3 to v0.4
Browse files Browse the repository at this point in the history
  • Loading branch information
bogatyy authored and Ruteri committed Jan 6, 2022
1 parent cbdfcf2 commit 8677391
Show file tree
Hide file tree
Showing 12 changed files with 242 additions and 28 deletions.
3 changes: 2 additions & 1 deletion cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ var (
utils.MinerExtraDataFlag,
utils.MinerRecommitIntervalFlag,
utils.MinerNoVerifyFlag,
utils.MinerMaxMergedBundles,
utils.MinerMaxMergedBundlesFlag,
utils.MinerTrustedRelaysFlag,
utils.NATFlag,
utils.NoDiscoverFlag,
utils.DiscoveryV5Flag,
Expand Down
3 changes: 2 additions & 1 deletion cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.MinerExtraDataFlag,
utils.MinerRecommitIntervalFlag,
utils.MinerNoVerifyFlag,
utils.MinerMaxMergedBundles,
utils.MinerMaxMergedBundlesFlag,
utils.MinerTrustedRelaysFlag,
},
},
{
Expand Down
28 changes: 26 additions & 2 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,11 +474,16 @@ var (
Usage: "Time interval to recreate the block being mined",
Value: ethconfig.Defaults.Miner.Recommit,
}
MinerMaxMergedBundles = cli.IntFlag{
MinerMaxMergedBundlesFlag = cli.IntFlag{
Name: "miner.maxmergedbundles",
Usage: "flashbots - The maximum amount of bundles to merge. The miner will run this many workers in parallel to calculate if the full block is more profitable with these additional bundles.",
Value: 3,
}
MinerTrustedRelaysFlag = cli.StringFlag{
Name: "miner.trustedrelays",
Usage: "flashbots - The Ethereum addresses of trusted relays for signature verification. The miner will accept signed bundles and other tasks from the relay, being reasonably certain about DDoS safety.",
Value: "0x870e2734DdBe2Fba9864f33f3420d59Bc641f2be",
}
MinerNoVerifyFlag = cli.BoolFlag{
Name: "miner.noverify",
Usage: "Disable remote sealing verification",
Expand Down Expand Up @@ -1359,6 +1364,15 @@ func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) {
if ctx.GlobalIsSet(TxPoolLifetimeFlag.Name) {
cfg.Lifetime = ctx.GlobalDuration(TxPoolLifetimeFlag.Name)
}

addresses := strings.Split(ctx.GlobalString(MinerTrustedRelaysFlag.Name), ",")
for _, address := range addresses {
if trimmed := strings.TrimSpace(address); !common.IsHexAddress(trimmed) {
Fatalf("Invalid account in --miner.trustedrelays: %s", trimmed)
} else {
cfg.TrustedRelays = append(cfg.TrustedRelays, common.HexToAddress(trimmed))
}
}
}

func setEthash(ctx *cli.Context, cfg *ethconfig.Config) {
Expand Down Expand Up @@ -1412,7 +1426,17 @@ func setMiner(ctx *cli.Context, cfg *miner.Config) {
log.Warn("The generic --miner.gastarget flag is deprecated and will be removed in the future!")
}

cfg.MaxMergedBundles = ctx.GlobalInt(MinerMaxMergedBundles.Name)
cfg.MaxMergedBundles = ctx.GlobalInt(MinerMaxMergedBundlesFlag.Name)

addresses := strings.Split(ctx.GlobalString(MinerTrustedRelaysFlag.Name), ",")
for _, address := range addresses {
if trimmed := strings.TrimSpace(address); !common.IsHexAddress(trimmed) {
Fatalf("Invalid account in --miner.trustedrelays: %s", trimmed)
} else {
cfg.TrustedRelays = append(cfg.TrustedRelays, common.HexToAddress(trimmed))
}
}
log.Info("Trusted relays set as", "addresses", cfg.TrustedRelays)
}

func setWhitelist(ctx *cli.Context, cfg *ethconfig.Config) {
Expand Down
62 changes: 56 additions & 6 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ type TxPoolConfig struct {
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts

Lifetime time.Duration // Maximum amount of time non-executable transaction are queued

TrustedRelays []common.Address // Trusted relay addresses. Duplicated from the miner config.
}

// DefaultTxPoolConfig contains the default configurations for the transaction
Expand Down Expand Up @@ -251,12 +253,13 @@ type TxPool struct {
locals *accountSet // Set of local transaction to exempt from eviction rules
journal *txJournal // Journal of local transaction to back up to disk

pending map[common.Address]*txList // All currently processable transactions
queue map[common.Address]*txList // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
mevBundles []types.MevBundle
all *txLookup // All transactions to allow lookups
priced *txPricedList // All transactions sorted by price
pending map[common.Address]*txList // All currently processable transactions
queue map[common.Address]*txList // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
mevBundles []types.MevBundle
megabundles map[common.Address]types.MevBundle // One megabundle per each trusted relay
all *txLookup // All transactions to allow lookups
priced *txPricedList // All transactions sorted by price

chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
Expand Down Expand Up @@ -290,6 +293,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
megabundles: make(map[common.Address]types.MevBundle),
all: newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
reqResetCh: make(chan *txpoolResetRequest),
Expand Down Expand Up @@ -611,6 +615,52 @@ func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, m
return nil
}

// AddMegaBundle adds a megabundle to the pool. Assumes the relay signature has been verified already.
func (pool *TxPool) AddMegabundle(relayAddr common.Address, txs types.Transactions, blockNumber *big.Int, minTimestamp, maxTimestamp uint64, revertingTxHashes []common.Hash) error {
pool.mu.Lock()
defer pool.mu.Unlock()

fromTrustedRelay := false
for _, trustedAddr := range pool.config.TrustedRelays {
if relayAddr == trustedAddr {
fromTrustedRelay = true
}
}
if !fromTrustedRelay {
return errors.New("megabundle from non-trusted address")
}

pool.megabundles[relayAddr] = types.MevBundle{
Txs: txs,
BlockNumber: blockNumber,
MinTimestamp: minTimestamp,
MaxTimestamp: maxTimestamp,
RevertingTxHashes: revertingTxHashes,
}
return nil
}

// GetMegabundle returns the latest megabundle submitted by a given relay.
func (pool *TxPool) GetMegabundle(relayAddr common.Address, blockNumber *big.Int, blockTimestamp uint64) (types.MevBundle, error) {
pool.mu.Lock()
defer pool.mu.Unlock()

megabundle, ok := pool.megabundles[relayAddr]
if !ok {
return types.MevBundle{}, errors.New("No megabundle found")
}
if megabundle.BlockNumber.Cmp(blockNumber) != 0 {
return types.MevBundle{}, errors.New("Megabundle does not fit blockNumber constraints")
}
if megabundle.MinTimestamp != 0 && megabundle.MinTimestamp > blockTimestamp {
return types.MevBundle{}, errors.New("Megabundle does not fit minTimestamp constraints")
}
if megabundle.MaxTimestamp != 0 && megabundle.MaxTimestamp < blockTimestamp {
return types.MevBundle{}, errors.New("Megabundle does not fit maxTimestamp constraints")
}
return megabundle, nil
}

// Locals retrieves the accounts currently considered local by the pool.
func (pool *TxPool) Locals() []common.Address {
pool.mu.Lock()
Expand Down
4 changes: 4 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ func (b *EthAPIBackend) SendBundle(ctx context.Context, txs types.Transactions,
return b.eth.txPool.AddMevBundle(txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes)
}

func (b *EthAPIBackend) SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error {
return b.eth.txPool.AddMegabundle(relayAddr, txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes)
}

func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
pending := b.eth.txPool.Pending(false)
var txs types.Transactions
Expand Down
76 changes: 75 additions & 1 deletion internal/ethapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2096,7 +2096,7 @@ func NewPrivateTxBundleAPI(b Backend) *PrivateTxBundleAPI {
return &PrivateTxBundleAPI{b}
}

// SendBundleArgs represents the arguments for a call.
// SendBundleArgs represents the arguments for a SendBundle call.
type SendBundleArgs struct {
Txs []hexutil.Bytes `json:"txs"`
BlockNumber rpc.BlockNumber `json:"blockNumber"`
Expand All @@ -2105,6 +2105,25 @@ type SendBundleArgs struct {
RevertingTxHashes []common.Hash `json:"revertingTxHashes"`
}

// SendMegabundleArgs represents the arguments for a SendMegabundle call.
type SendMegabundleArgs struct {
Txs []hexutil.Bytes `json:"txs"`
BlockNumber uint64 `json:"blockNumber"`
MinTimestamp *uint64 `json:"minTimestamp"`
MaxTimestamp *uint64 `json:"maxTimestamp"`
RevertingTxHashes []common.Hash `json:"revertingTxHashes"`
RelaySignature hexutil.Bytes `json:"relaySignature"`
}

// UnsignedMegabundle is used for serialization and subsequent digital signing.
type UnsignedMegabundle struct {
Txs []hexutil.Bytes
BlockNumber uint64
MinTimestamp uint64
MaxTimestamp uint64
RevertingTxHashes []common.Hash
}

// SendBundle will add the signed transaction to the transaction pool.
// The sender is responsible for signing the transaction and using the correct nonce and ensuring validity
func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args SendBundleArgs) error {
Expand Down Expand Up @@ -2134,3 +2153,58 @@ func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args SendBundleArgs

return s.b.SendBundle(ctx, txs, args.BlockNumber, minTimestamp, maxTimestamp, args.RevertingTxHashes)
}

// Recovers the Ethereum address of the trusted relay that signed the megabundle.
func RecoverRelayAddress(args SendMegabundleArgs) (common.Address, error) {
megabundle := UnsignedMegabundle{Txs: args.Txs, BlockNumber: args.BlockNumber, RevertingTxHashes: args.RevertingTxHashes}
if args.MinTimestamp != nil {
megabundle.MinTimestamp = *args.MinTimestamp
} else {
megabundle.MinTimestamp = 0
}
if args.MaxTimestamp != nil {
megabundle.MaxTimestamp = *args.MaxTimestamp
} else {
megabundle.MaxTimestamp = 0
}
rlpEncoding, _ := rlp.EncodeToBytes(megabundle)
signature := args.RelaySignature
signature[64] -= 27 // account for Ethereum V
recoveredPubkey, err := crypto.SigToPub(accounts.TextHash(rlpEncoding), args.RelaySignature)
if err != nil {
return common.Address{}, err
}
return crypto.PubkeyToAddress(*recoveredPubkey), nil
}

// SendMegabundle will add the signed megabundle to one of the workers for evaluation.
func (s *PrivateTxBundleAPI) SendMegabundle(ctx context.Context, args SendMegabundleArgs) error {
log.Info("Received a Megabundle request", "signature", args.RelaySignature)
var txs types.Transactions
if len(args.Txs) == 0 {
return errors.New("megabundle missing txs")
}
if args.BlockNumber == 0 {
return errors.New("megabundle missing blockNumber")
}
for _, encodedTx := range args.Txs {
tx := new(types.Transaction)
if err := tx.UnmarshalBinary(encodedTx); err != nil {
return err
}
txs = append(txs, tx)
}
var minTimestamp, maxTimestamp uint64
if args.MinTimestamp != nil {
minTimestamp = *args.MinTimestamp
}
if args.MaxTimestamp != nil {
maxTimestamp = *args.MaxTimestamp
}
relayAddr, err := RecoverRelayAddress(args)
log.Info("Megabundle", "relayAddr", relayAddr, "err", err)
if err != nil {
return err
}
return s.b.SendMegabundle(ctx, txs, rpc.BlockNumber(args.BlockNumber), minTimestamp, maxTimestamp, args.RevertingTxHashes, relayAddr)
}
1 change: 1 addition & 0 deletions internal/ethapi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Backend interface {
// Transaction pool API
SendTx(ctx context.Context, signedTx *types.Transaction) error
SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error
SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error
GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error)
GetPoolTransactions() (types.Transactions, error)
GetPoolTransaction(txHash common.Hash) *types.Transaction
Expand Down
5 changes: 5 additions & 0 deletions internal/web3ext/web3ext.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,11 @@ web3._extend({
call: 'eth_sendBundle',
params: 1
}),
new web3._extend.Method({
name: 'sendMegabundle',
call: 'eth_sendMegabundle',
params: 1
}),
],
properties: [
new web3._extend.Property({
Expand Down
5 changes: 5 additions & 0 deletions les/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,15 @@ func (b *LesApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction)
func (b *LesApiBackend) RemoveTx(txHash common.Hash) {
b.eth.txPool.RemoveTx(txHash)
}

func (b *LesApiBackend) SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error {
return b.eth.txPool.AddMevBundle(txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes)
}

func (b *LesApiBackend) SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error {
return nil
}

func (b *LesApiBackend) GetPoolTransactions() (types.Transactions, error) {
return b.eth.txPool.GetTransactions()
}
Expand Down
1 change: 1 addition & 0 deletions miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Config struct {
Recommit time.Duration // The time interval for miner to re-create mining work.
Noverify bool // Disable remote mining solution verification(only useful in ethash).
MaxMergedBundles int
TrustedRelays []common.Address `toml:",omitempty"` // Trusted relay addresses to receive tasks from.
}

// Miner creates blocks and searches for proof-of-work values.
Expand Down
27 changes: 20 additions & 7 deletions miner/multi_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,34 @@ func newMultiWorker(config *Config, chainConfig *params.ChainConfig, engine cons
for i := 1; i <= config.MaxMergedBundles; i++ {
workers = append(workers,
newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, merger, &flashbotsData{
isFlashbots: true,
queue: queue,
maxMergedBundles: i,
isFlashbots: true,
isMegabundleWorker: false,
queue: queue,
maxMergedBundles: i,
}))
}

log.Info("creating multi worker", "config.MaxMergedBundles", config.MaxMergedBundles, "worker", len(workers))
for i := 0; i < len(config.TrustedRelays); i++ {
workers = append(workers,
newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, merger, &flashbotsData{
isFlashbots: true,
isMegabundleWorker: true,
queue: queue,
relayAddr: config.TrustedRelays[i],
}))
}

log.Info("creating multi worker", "config.MaxMergedBundles", config.MaxMergedBundles, "config.TrustedRelays", config.TrustedRelays, "worker", len(workers))
return &multiWorker{
regularWorker: regularWorker,
workers: workers,
}
}

type flashbotsData struct {
isFlashbots bool
queue chan *task
maxMergedBundles int
isFlashbots bool
isMegabundleWorker bool
queue chan *task
maxMergedBundles int
relayAddr common.Address
}
Loading

0 comments on commit 8677391

Please sign in to comment.