diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 5a20c3ce5a62..b1966905a8f5 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -298,8 +298,9 @@ func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transac // minimums will need to be done only starting at the swapped in/out nonce // and leading up to the first no-change. type BlobPool struct { - config Config // Pool configuration - reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools + config Config // Pool configuration + reserver *txpool.Reserver // Address reserver to ensure exclusivity across subpools + hasPendingAuth func(common.Address) bool // Determine whether the specified address has a pending 7702-auth store billy.Database // Persistent data store for the tx metadata and blobs stored uint64 // Useful data size of all transactions on disk @@ -329,13 +330,14 @@ type BlobPool struct { // New creates a new blob transaction pool to gather, sort and filter inbound // blob transactions from the network. -func New(config Config, chain BlockChain) *BlobPool { +func New(config Config, chain BlockChain, hasPendingAuth func(common.Address) bool) *BlobPool { // Sanitize the input to ensure no vulnerable gas prices are set config = (&config).sanitize() // Create the transaction pool with its initial settings return &BlobPool{ config: config, + hasPendingAuth: hasPendingAuth, signer: types.LatestSigner(chain.Config()), chain: chain, lookup: newLookup(), @@ -353,8 +355,8 @@ func (p *BlobPool) Filter(tx *types.Transaction) bool { // Init sets the gas price needed to keep a transaction in the pool and the chain // head to allow balance / nonce checks. The transaction journal will be loaded // from disk and filtered based on the provided starting settings. -func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserve txpool.AddressReserver) error { - p.reserve = reserve +func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver *txpool.Reserver) error { + p.reserver = reserver var ( queuedir string @@ -499,7 +501,7 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error { return err } if _, ok := p.index[sender]; !ok { - if err := p.reserve(sender, true); err != nil { + if err := p.reserver.Hold(sender); err != nil { return err } p.index[sender] = []*blobTxMeta{} @@ -554,7 +556,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 if inclusions != nil { // only during reorgs will the heap be initialized heap.Remove(p.evict, p.evict.index[addr]) } - p.reserve(addr, false) + p.reserver.Release(addr) if gapped { log.Warn("Dropping dangling blob transactions", "from", addr, "missing", next, "drop", nonces, "ids", ids) @@ -707,7 +709,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 if inclusions != nil { // only during reorgs will the heap be initialized heap.Remove(p.evict, p.evict.index[addr]) } - p.reserve(addr, false) + p.reserver.Release(addr) } else { p.index[addr] = txs } @@ -1006,7 +1008,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error { // Update the indices and metrics meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx) if _, ok := p.index[addr]; !ok { - if err := p.reserve(addr, true); err != nil { + if err := p.reserver.Hold(addr); err != nil { log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err) return err } @@ -1066,7 +1068,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) { delete(p.spent, addr) heap.Remove(p.evict, p.evict.index[addr]) - p.reserve(addr, false) + p.reserver.Release(addr) } // Clear out the transactions from the data store log.Warn("Dropping underpriced blob transaction", "from", addr, "rejected", tx.nonce, "tip", tx.execTipCap, "want", tip, "drop", nonces, "ids", ids) @@ -1101,6 +1103,39 @@ func (p *BlobPool) ValidateTxBasics(tx *types.Transaction) error { return txpool.ValidateTransaction(tx, p.head, p.signer, opts) } +// checkDelegationLimit determines if the tx sender is delegated or has a +// pending delegation, and if so, ensures they have at most one in-flight +// **executable** transaction, e.g. disallow stacked and gapped transactions +// from the account. +func (p *BlobPool) checkDelegationLimit(tx *types.Transaction) error { + from, _ := types.Sender(p.signer, tx) // validated + + // Short circuit if the sender has neither delegation nor pending delegation. + if p.state.GetCodeHash(from) == types.EmptyCodeHash { + // Because there is no exclusive lock held between different subpools + // when processing transactions, a blob transaction may be accepted + // while other SetCode transactions with pending authorities from the + // same address are also accepted simultaneously. + // + // This scenario is considered acceptable, as the rule primarily ensures + // that attackers cannot easily and endlessly stack blob transactions + // with a delegated or pending delegated sender. + if p.hasPendingAuth == nil || !p.hasPendingAuth(from) { + return nil + } + } + // Allow a single in-flight pending transaction. + pending := p.index[from] + if len(pending) == 0 { + return nil + } + // If account already has a pending transaction, allow replacement only. + if len(pending) == 1 && pending[0].nonce == tx.Nonce() { + return nil + } + return txpool.ErrInflightTxLimitReached +} + // validateTx checks whether a transaction is valid according to the consensus // rules and adheres to some heuristic limits of the local node (price and size). func (p *BlobPool) validateTx(tx *types.Transaction) error { @@ -1141,6 +1176,9 @@ func (p *BlobPool) validateTx(tx *types.Transaction) error { if err := txpool.ValidateTransactionWithState(tx, p.signer, stateOpts); err != nil { return err } + if err := p.checkDelegationLimit(tx); err != nil { + return err + } // If the transaction replaces an existing one, ensure that price bumps are // adhered to. var ( @@ -1369,7 +1407,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) { // only by this subpool until all transactions are evicted from, _ := types.Sender(p.signer, tx) // already validated above if _, ok := p.index[from]; !ok { - if err := p.reserve(from, true); err != nil { + if err := p.reserver.Hold(from); err != nil { addNonExclusiveMeter.Mark(1) return err } @@ -1381,7 +1419,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) { // by a return statement before running deferred methods. Take care with // removing or subscoping err as it will break this clause. if err != nil { - p.reserve(from, false) + p.reserver.Release(from) } }() } @@ -1513,7 +1551,7 @@ func (p *BlobPool) drop() { if last { delete(p.index, from) delete(p.spent, from) - p.reserve(from, false) + p.reserver.Release(from) } else { txs[len(txs)-1] = nil txs = txs[:len(txs)-1] @@ -1789,7 +1827,7 @@ func (p *BlobPool) Clear() { // can't happen until Clear releases the reservation lock. Clear cannot // acquire the subpool lock until the transaction addition is completed. for acct := range p.index { - p.reserve(acct, false) + p.reserver.Release(acct) } p.lookup = newLookup() p.index = make(map[common.Address][]*blobTxMeta) diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index b7c6cfa51e8d..4dfba3b52b13 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -26,7 +26,6 @@ import ( "math/big" "os" "path/filepath" - "sync" "testing" "github.com/ethereum/go-ethereum/common" @@ -168,33 +167,6 @@ func (bc *testBlockChain) StateAt(common.Hash) (*state.StateDB, error) { return bc.statedb, nil } -// makeAddressReserver is a utility method to sanity check that accounts are -// properly reserved by the blobpool (no duplicate reserves or unreserves). -func makeAddressReserver() txpool.AddressReserver { - var ( - reserved = make(map[common.Address]struct{}) - lock sync.Mutex - ) - return func(addr common.Address, reserve bool) error { - lock.Lock() - defer lock.Unlock() - - _, exists := reserved[addr] - if reserve { - if exists { - panic("already reserved") - } - reserved[addr] = struct{}{} - return nil - } - if !exists { - panic("not reserved") - } - delete(reserved, addr) - return nil - } -} - // makeTx is a utility method to construct a random blob transaction and sign it // with a valid key, only setting the interesting fields from the perspective of // the blob pool. @@ -433,6 +405,10 @@ func verifyBlobRetrievals(t *testing.T, pool *BlobPool) { } } +func newReserver() *txpool.Reserver { + return txpool.NewReservationTracker().NewHandle(42) +} + // Tests that transactions can be loaded from disk on startup and that they are // correctly discarded if invalid. // @@ -699,8 +675,8 @@ func TestOpenDrops(t *testing.T) { blobfee: uint256.NewInt(params.BlobTxMinBlobGasprice), statedb: statedb, } - pool := New(Config{Datadir: storage}, chain) - if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil { + pool := New(Config{Datadir: storage}, chain, nil) + if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { t.Fatalf("failed to create blob pool: %v", err) } defer pool.Close() @@ -817,8 +793,8 @@ func TestOpenIndex(t *testing.T) { blobfee: uint256.NewInt(params.BlobTxMinBlobGasprice), statedb: statedb, } - pool := New(Config{Datadir: storage}, chain) - if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil { + pool := New(Config{Datadir: storage}, chain, nil) + if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { t.Fatalf("failed to create blob pool: %v", err) } defer pool.Close() @@ -918,8 +894,8 @@ func TestOpenHeap(t *testing.T) { blobfee: uint256.NewInt(105), statedb: statedb, } - pool := New(Config{Datadir: storage}, chain) - if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil { + pool := New(Config{Datadir: storage}, chain, nil) + if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { t.Fatalf("failed to create blob pool: %v", err) } defer pool.Close() @@ -997,8 +973,8 @@ func TestOpenCap(t *testing.T) { blobfee: uint256.NewInt(105), statedb: statedb, } - pool := New(Config{Datadir: storage, Datacap: datacap}, chain) - if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil { + pool := New(Config{Datadir: storage, Datacap: datacap}, chain, nil) + if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { t.Fatalf("failed to create blob pool: %v", err) } // Verify that enough transactions have been dropped to get the pool's size @@ -1098,8 +1074,8 @@ func TestChangingSlotterSize(t *testing.T) { blobfee: uint256.NewInt(105), statedb: statedb, } - pool := New(Config{Datadir: storage}, chain) - if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil { + pool := New(Config{Datadir: storage}, chain, nil) + if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { t.Fatalf("failed to create blob pool: %v", err) } @@ -1541,8 +1517,8 @@ func TestAdd(t *testing.T) { blobfee: uint256.NewInt(105), statedb: statedb, } - pool := New(Config{Datadir: storage}, chain) - if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil { + pool := New(Config{Datadir: storage}, chain, nil) + if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { t.Fatalf("test %d: failed to create blob pool: %v", i, err) } verifyPoolInternals(t, pool) @@ -1638,10 +1614,10 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) { blobfee: uint256.NewInt(blobfee), statedb: statedb, } - pool = New(Config{Datadir: ""}, chain) + pool = New(Config{Datadir: ""}, chain, nil) ) - if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil { + if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { b.Fatalf("failed to create blob pool: %v", err) } // Make the pool not use disk (just drop everything). This test never reads diff --git a/core/txpool/errors.go b/core/txpool/errors.go index c38644857e33..02f5703b6cae 100644 --- a/core/txpool/errors.go +++ b/core/txpool/errors.go @@ -56,4 +56,8 @@ var ( // input transaction of non-blob type when a blob transaction from this sender // remains pending (and vice-versa). ErrAlreadyReserved = errors.New("address already reserved") + + // ErrInflightTxLimitReached is returned when the maximum number of in-flight + // transactions is reached for specific accounts. + ErrInflightTxLimitReached = errors.New("in-flight transaction limit reached for delegated accounts") ) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 9066f3e16b2d..278ad0791f0c 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -63,10 +63,6 @@ var ( // another remote transaction. ErrTxPoolOverflow = errors.New("txpool is full") - // ErrInflightTxLimitReached is returned when the maximum number of in-flight - // transactions is reached for specific accounts. - ErrInflightTxLimitReached = errors.New("in-flight transaction limit reached for delegated accounts") - // ErrOutOfOrderTxFromDelegated is returned when the transaction with gapped // nonce received from the accounts with delegation or pending delegation. ErrOutOfOrderTxFromDelegated = errors.New("gapped-nonce tx from delegated accounts") @@ -241,8 +237,8 @@ type LegacyPool struct { currentHead atomic.Pointer[types.Header] // Current head of the blockchain currentState *state.StateDB // Current state in the blockchain head pendingNonces *noncer // Pending state tracking virtual nonces + reserver *txpool.Reserver // Address reserver to ensure exclusivity across subpools - reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools pending map[common.Address]*list // All currently processable transactions queue map[common.Address]*list // Queued but non-processable transactions beats map[common.Address]time.Time // Last heartbeat from each known account @@ -306,9 +302,9 @@ func (pool *LegacyPool) Filter(tx *types.Transaction) bool { // Init sets the gas price needed to keep a transaction in the pool and the chain // head to allow balance / nonce checks. The internal // goroutines will be spun up and the pool deemed operational afterwards. -func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.AddressReserver) error { +func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserver *txpool.Reserver) error { // Set the address reserver to request exclusive access to pooled accounts - pool.reserve = reserve + pool.reserver = reserver // Set the basic pool parameters pool.gasTip.Store(uint256.NewInt(gasTip)) @@ -618,7 +614,7 @@ func (pool *LegacyPool) checkDelegationLimit(tx *types.Transaction) error { from, _ := types.Sender(pool.signer, tx) // validated // Short circuit if the sender has neither delegation nor pending delegation. - if pool.currentState.GetCodeHash(from) == types.EmptyCodeHash && pool.all.delegationTxsCount(from) == 0 { + if pool.currentState.GetCodeHash(from) == types.EmptyCodeHash && !pool.all.hasAuth(from) { return nil } pending := pool.pending[from] @@ -633,7 +629,7 @@ func (pool *LegacyPool) checkDelegationLimit(tx *types.Transaction) error { if pending.Contains(tx.Nonce()) { return nil } - return ErrInflightTxLimitReached + return txpool.ErrInflightTxLimitReached } // validateAuth verifies that the transaction complies with code authorization @@ -644,12 +640,24 @@ func (pool *LegacyPool) validateAuth(tx *types.Transaction) error { if err := pool.checkDelegationLimit(tx); err != nil { return err } - // Authorities cannot conflict with any pending or queued transactions. + // Authorities must not conflict with any pending or queued transactions, + // nor with addresses that have already been reserved. if auths := tx.SetCodeAuthorities(); len(auths) > 0 { for _, auth := range auths { if pool.pending[auth] != nil || pool.queue[auth] != nil { return ErrAuthorityReserved } + // Because there is no exclusive lock held between different subpools + // when processing transactions, the SetCode transaction may be accepted + // while other transactions with the same sender address are also + // accepted simultaneously in the other pools. + // + // This scenario is considered acceptable, as the rule primarily ensures + // that attackers cannot easily stack a SetCode transaction when the sender + // is reserved by other pools. + if pool.reserver.Has(auth) { + return ErrAuthorityReserved + } } } return nil @@ -683,7 +691,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) { _, hasQueued = pool.queue[from] ) if !hasPending && !hasQueued { - if err := pool.reserve(from, true); err != nil { + if err := pool.reserver.Hold(from); err != nil { return false, err } defer func() { @@ -694,7 +702,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) { // by a return statement before running deferred methods. Take care with // removing or subscoping err as it will break this clause. if err != nil { - pool.reserve(from, false) + pool.reserver.Release(from) } }() } @@ -1087,7 +1095,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo _, hasQueued = pool.queue[addr] ) if !hasPending && !hasQueued { - pool.reserve(addr, false) + pool.reserver.Release(addr) } }() } @@ -1467,7 +1475,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T delete(pool.queue, addr) delete(pool.beats, addr) if _, ok := pool.pending[addr]; !ok { - pool.reserve(addr, false) + pool.reserver.Release(addr) } } } @@ -1653,7 +1661,7 @@ func (pool *LegacyPool) demoteUnexecutables() { if list.Empty() { delete(pool.pending, addr) if _, ok := pool.queue[addr]; !ok { - pool.reserve(addr, false) + pool.reserver.Release(addr) } } } @@ -1862,11 +1870,13 @@ func (t *lookup) removeAuthorities(tx *types.Transaction) { } } -// delegationTxsCount returns the number of pending authorizations for the specified address. -func (t *lookup) delegationTxsCount(addr common.Address) int { +// hasAuth returns a flag indicating whether there are pending authorizations +// from the specified address. +func (t *lookup) hasAuth(addr common.Address) bool { t.lock.RLock() defer t.lock.RUnlock() - return len(t.auths[addr]) + + return len(t.auths[addr]) > 0 } // numSlots calculates the number of slots needed for a single transaction. @@ -1880,8 +1890,8 @@ func (pool *LegacyPool) Clear() { pool.mu.Lock() defer pool.mu.Unlock() - // unreserve each tracked account. Ideally, we could just clear the - // reservation map in the parent txpool context. However, if we clear in + // unreserve each tracked account. Ideally, we could just clear the + // reservation map in the parent txpool context. However, if we clear in // parent context, to avoid exposing the subpool lock, we have to lock the // reservations and then lock each subpool. // @@ -1892,11 +1902,11 @@ func (pool *LegacyPool) Clear() { // * TxPool.Clear attempts to lock subpool mutex // // The transaction addition may attempt to reserve the sender addr which - // can't happen until Clear releases the reservation lock. Clear cannot + // can't happen until Clear releases the reservation lock. Clear cannot // acquire the subpool lock until the transaction addition is completed. for _, tx := range pool.all.txs { senderAddr, _ := types.Sender(pool.signer, tx) - pool.reserve(senderAddr, false) + pool.reserver.Release(senderAddr) } pool.all = newLookup() pool.priced = newPricedList(pool.all) @@ -1904,3 +1914,9 @@ func (pool *LegacyPool) Clear() { pool.queue = make(map[common.Address]*list) pool.pendingNonces = newNoncer(pool.currentState) } + +// HasPendingAuth returns a flag indicating whether there are pending +// authorizations from the specific address cached in the pool. +func (pool *LegacyPool) HasPendingAuth(addr common.Address) bool { + return pool.all.hasAuth(addr) +} diff --git a/core/txpool/legacypool/legacypool2_test.go b/core/txpool/legacypool/legacypool2_test.go index d55e85d74f7b..3f210e3d1b93 100644 --- a/core/txpool/legacypool/legacypool2_test.go +++ b/core/txpool/legacypool/legacypool2_test.go @@ -86,7 +86,7 @@ func TestTransactionFutureAttack(t *testing.T) { config.GlobalQueue = 100 config.GlobalSlots = 100 pool := New(config, blockchain) - pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() fillPool(t, pool) pending, _ := pool.Stats() @@ -120,7 +120,7 @@ func TestTransactionFuture1559(t *testing.T) { statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create a number of test accounts, fund them and make transactions @@ -153,7 +153,7 @@ func TestTransactionZAttack(t *testing.T) { statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create a number of test accounts, fund them and make transactions fillPool(t, pool) @@ -224,7 +224,7 @@ func BenchmarkFutureAttack(b *testing.B) { config.GlobalQueue = 100 config.GlobalSlots = 100 pool := New(config, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() fillPool(b, pool) diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index 3f269bd69ec1..c47a65520493 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -24,7 +24,6 @@ import ( "math/big" "math/rand" "slices" - "sync" "sync/atomic" "testing" "time" @@ -168,42 +167,21 @@ func pricedSetCodeTxWithAuth(nonce uint64, gaslimit uint64, gasFee, tip *uint256 }) } -func makeAddressReserver() txpool.AddressReserver { - var ( - reserved = make(map[common.Address]struct{}) - lock sync.Mutex - ) - return func(addr common.Address, reserve bool) error { - lock.Lock() - defer lock.Unlock() - - _, exists := reserved[addr] - if reserve { - if exists { - panic("already reserved") - } - reserved[addr] = struct{}{} - return nil - } - if !exists { - panic("not reserved") - } - delete(reserved, addr) - return nil - } -} - func setupPool() (*LegacyPool, *ecdsa.PrivateKey) { return setupPoolWithConfig(params.TestChainConfig) } +func newReserver() *txpool.Reserver { + return txpool.NewReservationTracker().NewHandle(42) +} + func setupPoolWithConfig(config *params.ChainConfig) (*LegacyPool, *ecdsa.PrivateKey) { statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) blockchain := newTestBlockChain(config, 10000000, statedb, new(event.Feed)) key, _ := crypto.GenerateKey() pool := New(testTxPoolConfig, blockchain) - if err := pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()); err != nil { + if err := pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()); err != nil { panic(err) } // wait for the pool to initialize @@ -336,7 +314,7 @@ func TestStateChangeDuringReset(t *testing.T) { tx1 := transaction(1, 100000, key) pool := New(testTxPoolConfig, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() nonce := pool.Nonce(address) @@ -753,7 +731,7 @@ func TestPostponing(t *testing.T) { blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create two test accounts to produce different gap profiles with @@ -963,7 +941,7 @@ func TestQueueGlobalLimiting(t *testing.T) { config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible) pool := New(config, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create a number of test accounts and fund them (last one will be the local) @@ -1015,7 +993,7 @@ func TestQueueTimeLimiting(t *testing.T) { config.Lifetime = time.Second pool := New(config, blockchain) - pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create a test account to ensure remotes expire @@ -1176,7 +1154,7 @@ func TestPendingGlobalLimiting(t *testing.T) { config.GlobalSlots = config.AccountSlots * 10 pool := New(config, blockchain) - pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create a number of test accounts and fund them @@ -1275,7 +1253,7 @@ func TestCapClearsFromAll(t *testing.T) { config.GlobalSlots = 8 pool := New(config, blockchain) - pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create a number of test accounts and fund them @@ -1308,7 +1286,7 @@ func TestPendingMinimumAllowance(t *testing.T) { config.GlobalSlots = 1 pool := New(config, blockchain) - pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create a number of test accounts and fund them @@ -1352,7 +1330,7 @@ func TestRepricing(t *testing.T) { blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Keep track of transaction events to ensure all executables get announced @@ -1457,7 +1435,7 @@ func TestMinGasPriceEnforced(t *testing.T) { txPoolConfig := DefaultConfig txPoolConfig.NoLocals = true pool := New(txPoolConfig, blockchain) - pool.Init(txPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(txPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() key, _ := crypto.GenerateKey() @@ -1606,7 +1584,7 @@ func TestUnderpricing(t *testing.T) { config.GlobalQueue = 2 pool := New(config, blockchain) - pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Keep track of transaction events to ensure all executables get announced @@ -1696,7 +1674,7 @@ func TestStableUnderpricing(t *testing.T) { config.GlobalQueue = 0 pool := New(config, blockchain) - pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Keep track of transaction events to ensure all executables get announced @@ -1899,7 +1877,7 @@ func TestDeduplication(t *testing.T) { blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create a test account to add transactions with @@ -1966,7 +1944,7 @@ func TestReplacement(t *testing.T) { blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Keep track of transaction events to ensure all executables get announced @@ -2157,7 +2135,7 @@ func TestStatusCheck(t *testing.T) { blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create the test accounts to check various transaction statuses with @@ -2230,7 +2208,7 @@ func TestSetCodeTransactions(t *testing.T) { blockchain := newTestBlockChain(params.MergedTestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create the test accounts @@ -2271,12 +2249,12 @@ func TestSetCodeTransactions(t *testing.T) { if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), keyA)); err != nil { t.Fatalf("%s: failed to add remote transaction: %v", name, err) } - if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1), keyA)); !errors.Is(err, ErrInflightTxLimitReached) { - t.Fatalf("%s: error mismatch: want %v, have %v", name, ErrInflightTxLimitReached, err) + if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1), keyA)); !errors.Is(err, txpool.ErrInflightTxLimitReached) { + t.Fatalf("%s: error mismatch: want %v, have %v", name, txpool.ErrInflightTxLimitReached, err) } // Check gapped transaction again. - if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1), keyA)); !errors.Is(err, ErrInflightTxLimitReached) { - t.Fatalf("%s: error mismatch: want %v, have %v", name, ErrInflightTxLimitReached, err) + if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1), keyA)); !errors.Is(err, txpool.ErrInflightTxLimitReached) { + t.Fatalf("%s: error mismatch: want %v, have %v", name, txpool.ErrInflightTxLimitReached, err) } // Replace by fee. if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(10), keyA)); err != nil { @@ -2310,8 +2288,8 @@ func TestSetCodeTransactions(t *testing.T) { t.Fatalf("%s: failed to add with pending delegatio: %v", name, err) } // Also check gapped transaction is rejected. - if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1), keyC)); !errors.Is(err, ErrInflightTxLimitReached) { - t.Fatalf("%s: error mismatch: want %v, have %v", name, ErrInflightTxLimitReached, err) + if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1), keyC)); !errors.Is(err, txpool.ErrInflightTxLimitReached) { + t.Fatalf("%s: error mismatch: want %v, have %v", name, txpool.ErrInflightTxLimitReached, err) } }, }, @@ -2386,7 +2364,7 @@ func TestSetCodeTransactions(t *testing.T) { if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1000), keyC)); err != nil { t.Fatalf("%s: failed to added single pooled for account with pending delegation: %v", name, err) } - if err, want := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1000), keyC)), ErrInflightTxLimitReached; !errors.Is(err, want) { + if err, want := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1000), keyC)), txpool.ErrInflightTxLimitReached; !errors.Is(err, want) { t.Fatalf("%s: error mismatch: want %v, have %v", name, want, err) } }, @@ -2454,7 +2432,7 @@ func TestSetCodeTransactionsReorg(t *testing.T) { blockchain := newTestBlockChain(params.MergedTestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create the test accounts @@ -2489,8 +2467,8 @@ func TestSetCodeTransactionsReorg(t *testing.T) { t.Fatalf("failed to add with remote setcode transaction: %v", err) } // Try to add a transactions in - if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1000), keyA)); !errors.Is(err, ErrInflightTxLimitReached) { - t.Fatalf("unexpected error %v, expecting %v", err, ErrInflightTxLimitReached) + if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1000), keyA)); !errors.Is(err, txpool.ErrInflightTxLimitReached) { + t.Fatalf("unexpected error %v, expecting %v", err, txpool.ErrInflightTxLimitReached) } // Simulate the chain moving blockchain.statedb.SetNonce(addrA, 2, tracing.NonceChangeAuthorization) diff --git a/core/txpool/reserver.go b/core/txpool/reserver.go new file mode 100644 index 000000000000..76ead0f3bb0b --- /dev/null +++ b/core/txpool/reserver.go @@ -0,0 +1,124 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package txpool + +import ( + "errors" + "fmt" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" +) + +var ( + // reservationsGaugeName is the prefix of a per-subpool address reservation + // metric. + // + // This is mostly a sanity metric to ensure there's no bug that would make + // some subpool hog all the reservations due to mis-accounting. + reservationsGaugeName = "txpool/reservations" +) + +// ReservationTracker is a struct shared between different subpools. It is used to reserve +// the account and ensure that one address cannot initiate transactions, authorizations, +// and other state-changing behaviors in different pools at the same time. +type ReservationTracker struct { + accounts map[common.Address]int + lock sync.RWMutex +} + +// NewReservationTracker initializes the account reservation tracker. +func NewReservationTracker() *ReservationTracker { + return &ReservationTracker{ + accounts: make(map[common.Address]int), + } +} + +// NewHandle creates a named handle on the ReservationTracker. The handle +// identifies the subpool so ownership of reservations can be determined. +func (r *ReservationTracker) NewHandle(id int) *Reserver { + return &Reserver{r, id} +} + +// Reserver is a named handle on ReservationTracker. It is held by subpools to +// make reservations for accounts it is tracking. The id is used to determine +// which pool owns an address and disallows non-owners to hold or release +// addresses it doesn't own. +type Reserver struct { + tracker *ReservationTracker + id int +} + +// Hold attempts to reserve the specified account address for the given pool. +// Returns an error if the account is already reserved. +func (h *Reserver) Hold(addr common.Address) error { + h.tracker.lock.Lock() + defer h.tracker.lock.Unlock() + + // Double reservations are forbidden even from the same pool to + // avoid subtle bugs in the long term. + owner, exists := h.tracker.accounts[addr] + if exists { + if owner == h.id { + log.Error("pool attempted to reserve already-owned address", "address", addr) + return nil // Ignore fault to give the pool a chance to recover while the bug gets fixed + } + return ErrAlreadyReserved + } + h.tracker.accounts[addr] = h.id + if metrics.Enabled() { + m := fmt.Sprintf("%s/%d", reservationsGaugeName, h.id) + metrics.GetOrRegisterGauge(m, nil).Inc(1) + } + return nil +} + +// Release attempts to release the reservation for the specified account. +// Returns an error if the address is not reserved or is reserved by another pool. +func (h *Reserver) Release(addr common.Address) error { + h.tracker.lock.Lock() + defer h.tracker.lock.Unlock() + + // Ensure subpools only attempt to unreserve their own owned addresses, + // otherwise flag as a programming error. + owner, exists := h.tracker.accounts[addr] + if !exists { + log.Error("pool attempted to unreserve non-reserved address", "address", addr) + return errors.New("address not reserved") + } + if owner != h.id { + log.Error("pool attempted to unreserve non-owned address", "address", addr) + return errors.New("address not owned") + } + delete(h.tracker.accounts, addr) + if metrics.Enabled() { + m := fmt.Sprintf("%s/%d", reservationsGaugeName, h.id) + metrics.GetOrRegisterGauge(m, nil).Dec(1) + } + return nil +} + +// Has returns a flag indicating if the address has been reserved or not. +func (h *Reserver) Has(address common.Address) bool { + h.tracker.lock.RLock() + defer h.tracker.lock.RUnlock() + + _, exists := h.tracker.accounts[address] + return exists +} diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index f5cb852d8fff..2cb510387540 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -67,10 +67,6 @@ type LazyResolver interface { Get(hash common.Hash) *types.Transaction } -// AddressReserver is passed by the main transaction pool to subpools, so they -// may request (and relinquish) exclusive access to certain addresses. -type AddressReserver func(addr common.Address, reserve bool) error - // PendingFilter is a collection of filter rules to allow retrieving a subset // of transactions for announcement or mining. // @@ -109,7 +105,7 @@ type SubPool interface { // These should not be passed as a constructor argument - nor should the pools // start by themselves - in order to keep multiple subpools in lockstep with // one another. - Init(gasTip uint64, head *types.Header, reserve AddressReserver) error + Init(gasTip uint64, head *types.Header, reserver *Reserver) error // Close terminates any background processing threads and releases any held // resources. diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 083aac92c66f..47d83e03d4b4 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -29,7 +29,6 @@ import ( "github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" ) @@ -43,15 +42,6 @@ const ( TxStatusIncluded ) -var ( - // reservationsGaugeName is the prefix of a per-subpool address reservation - // metric. - // - // This is mostly a sanity metric to ensure there's no bug that would make - // some subpool hog all the reservations due to mis-accounting. - reservationsGaugeName = "txpool/reservations" -) - // BlockChain defines the minimal set of methods needed to back a tx pool with // a chain. Exists to allow mocking the live chain out of tests. type BlockChain interface { @@ -81,9 +71,6 @@ type TxPool struct { stateLock sync.RWMutex // The lock for protecting state instance state *state.StateDB // Current state at the blockchain head - reservations map[common.Address]SubPool // Map with the account to pool reservations - reserveLock sync.Mutex // Lock protecting the account reservations - subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown quit chan chan error // Quit channel to tear down the head updater term chan struct{} // Termination channel to detect a closed pool @@ -110,17 +97,17 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) { return nil, err } pool := &TxPool{ - subpools: subpools, - chain: chain, - signer: types.LatestSigner(chain.Config()), - state: statedb, - reservations: make(map[common.Address]SubPool), - quit: make(chan chan error), - term: make(chan struct{}), - sync: make(chan chan error), + subpools: subpools, + chain: chain, + signer: types.LatestSigner(chain.Config()), + state: statedb, + quit: make(chan chan error), + term: make(chan struct{}), + sync: make(chan chan error), } + reserver := NewReservationTracker() for i, subpool := range subpools { - if err := subpool.Init(gasTip, head, pool.reserver(i, subpool)); err != nil { + if err := subpool.Init(gasTip, head, reserver.NewHandle(i)); err != nil { for j := i - 1; j >= 0; j-- { subpools[j].Close() } @@ -131,52 +118,6 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) { return pool, nil } -// reserver is a method to create an address reservation callback to exclusively -// assign/deassign addresses to/from subpools. This can ensure that at any point -// in time, only a single subpool is able to manage an account, avoiding cross -// subpool eviction issues and nonce conflicts. -func (p *TxPool) reserver(id int, subpool SubPool) AddressReserver { - return func(addr common.Address, reserve bool) error { - p.reserveLock.Lock() - defer p.reserveLock.Unlock() - - owner, exists := p.reservations[addr] - if reserve { - // Double reservations are forbidden even from the same pool to - // avoid subtle bugs in the long term. - if exists { - if owner == subpool { - log.Error("pool attempted to reserve already-owned address", "address", addr) - return nil // Ignore fault to give the pool a chance to recover while the bug gets fixed - } - return ErrAlreadyReserved - } - p.reservations[addr] = subpool - if metrics.Enabled() { - m := fmt.Sprintf("%s/%d", reservationsGaugeName, id) - metrics.GetOrRegisterGauge(m, nil).Inc(1) - } - return nil - } - // Ensure subpools only attempt to unreserve their own owned addresses, - // otherwise flag as a programming error. - if !exists { - log.Error("pool attempted to unreserve non-reserved address", "address", addr) - return errors.New("address not reserved") - } - if subpool != owner { - log.Error("pool attempted to unreserve non-owned address", "address", addr) - return errors.New("address not owned") - } - delete(p.reservations, addr) - if metrics.Enabled() { - m := fmt.Sprintf("%s/%d", reservationsGaugeName, id) - metrics.GetOrRegisterGauge(m, nil).Dec(1) - } - return nil - } -} - // Close terminates the transaction pool and all its subpools. func (p *TxPool) Close() error { var errs []error diff --git a/eth/backend.go b/eth/backend.go index 79759710b61d..6716a77562b4 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -260,16 +260,16 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { eth.filterMaps = filtermaps.NewFilterMaps(chainDb, chainView, historyCutoff, finalBlock, filtermaps.DefaultParams, fmConfig) eth.closeFilterMaps = make(chan chan struct{}) - if config.BlobPool.Datadir != "" { - config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir) - } - blobPool := blobpool.New(config.BlobPool, eth.blockchain) - if config.TxPool.Journal != "" { config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal) } legacyPool := legacypool.New(config.TxPool, eth.blockchain) + if config.BlobPool.Datadir != "" { + config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir) + } + blobPool := blobpool.New(config.BlobPool, eth.blockchain, legacyPool.HasPendingAuth) + eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool, blobPool}) if err != nil { return nil, err diff --git a/eth/protocols/eth/handler_test.go b/eth/protocols/eth/handler_test.go index bbbd888701d2..fa031d989970 100644 --- a/eth/protocols/eth/handler_test.go +++ b/eth/protocols/eth/handler_test.go @@ -135,7 +135,7 @@ func newTestBackendWithGenerator(blocks int, shanghai bool, cancun bool, generat storage, _ := os.MkdirTemp("", "blobpool-") defer os.RemoveAll(storage) - blobPool := blobpool.New(blobpool.Config{Datadir: storage}, chain) + blobPool := blobpool.New(blobpool.Config{Datadir: storage}, chain, nil) legacyPool := legacypool.New(txconfig, chain) txpool, _ := txpool.New(txconfig.PriceLimit, chain, []txpool.SubPool{legacyPool, blobPool})