Skip to content

Commit 710bcc3

Browse files
committed
core/txpool: introduce reserver
1 parent 651db00 commit 710bcc3

File tree

8 files changed

+174
-241
lines changed

8 files changed

+174
-241
lines changed

core/txpool/blobpool/blobpool.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ import (
4747
)
4848

4949
const (
50+
poolName = "blobpool" // The name of legacy txpool
51+
5052
// blobSize is the protocol constrained byte size of a single blob in a
5153
// transaction. There can be multiple of these embedded into a single tx.
5254
blobSize = params.BlobTxFieldElementsPerBlob * params.BlobTxBytesPerFieldElement
@@ -299,7 +301,7 @@ func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transac
299301
// and leading up to the first no-change.
300302
type BlobPool struct {
301303
config Config // Pool configuration
302-
reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools
304+
reserver *txpool.Reserver // Address reserver to ensure exclusivity across subpools
303305
hasPendingAuth func(common.Address) bool // Determine whether the specified address has a pending 7702-auth
304306

305307
store billy.Database // Persistent data store for the tx metadata and blobs
@@ -355,8 +357,8 @@ func (p *BlobPool) Filter(tx *types.Transaction) bool {
355357
// Init sets the gas price needed to keep a transaction in the pool and the chain
356358
// head to allow balance / nonce checks. The transaction journal will be loaded
357359
// from disk and filtered based on the provided starting settings.
358-
func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserve txpool.AddressReserver, _ txpool.IsAddressReserved) error {
359-
p.reserve = reserve
360+
func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver *txpool.Reserver) error {
361+
p.reserver = reserver
360362

361363
var (
362364
queuedir string
@@ -501,7 +503,7 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
501503
return err
502504
}
503505
if _, ok := p.index[sender]; !ok {
504-
if err := p.reserve(sender, true); err != nil {
506+
if err := p.reserver.Hold(sender, poolName); err != nil {
505507
return err
506508
}
507509
p.index[sender] = []*blobTxMeta{}
@@ -556,7 +558,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
556558
if inclusions != nil { // only during reorgs will the heap be initialized
557559
heap.Remove(p.evict, p.evict.index[addr])
558560
}
559-
p.reserve(addr, false)
561+
p.reserver.Release(addr, poolName)
560562

561563
if gapped {
562564
log.Warn("Dropping dangling blob transactions", "from", addr, "missing", next, "drop", nonces, "ids", ids)
@@ -709,7 +711,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
709711
if inclusions != nil { // only during reorgs will the heap be initialized
710712
heap.Remove(p.evict, p.evict.index[addr])
711713
}
712-
p.reserve(addr, false)
714+
p.reserver.Release(addr, poolName)
713715
} else {
714716
p.index[addr] = txs
715717
}
@@ -1008,7 +1010,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
10081010
// Update the indices and metrics
10091011
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx)
10101012
if _, ok := p.index[addr]; !ok {
1011-
if err := p.reserve(addr, true); err != nil {
1013+
if err := p.reserver.Hold(addr, poolName); err != nil {
10121014
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
10131015
return err
10141016
}
@@ -1068,7 +1070,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) {
10681070
delete(p.spent, addr)
10691071

10701072
heap.Remove(p.evict, p.evict.index[addr])
1071-
p.reserve(addr, false)
1073+
p.reserver.Release(addr, poolName)
10721074
}
10731075
// Clear out the transactions from the data store
10741076
log.Warn("Dropping underpriced blob transaction", "from", addr, "rejected", tx.nonce, "tip", tx.execTipCap, "want", tip, "drop", nonces, "ids", ids)
@@ -1407,7 +1409,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
14071409
// only by this subpool until all transactions are evicted
14081410
from, _ := types.Sender(p.signer, tx) // already validated above
14091411
if _, ok := p.index[from]; !ok {
1410-
if err := p.reserve(from, true); err != nil {
1412+
if err := p.reserver.Hold(from, poolName); err != nil {
14111413
addNonExclusiveMeter.Mark(1)
14121414
return err
14131415
}
@@ -1419,7 +1421,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
14191421
// by a return statement before running deferred methods. Take care with
14201422
// removing or subscoping err as it will break this clause.
14211423
if err != nil {
1422-
p.reserve(from, false)
1424+
p.reserver.Release(from, poolName)
14231425
}
14241426
}()
14251427
}
@@ -1551,7 +1553,7 @@ func (p *BlobPool) drop() {
15511553
if last {
15521554
delete(p.index, from)
15531555
delete(p.spent, from)
1554-
p.reserve(from, false)
1556+
p.reserver.Release(from, poolName)
15551557
} else {
15561558
txs[len(txs)-1] = nil
15571559
txs = txs[:len(txs)-1]
@@ -1827,7 +1829,7 @@ func (p *BlobPool) Clear() {
18271829
// can't happen until Clear releases the reservation lock. Clear cannot
18281830
// acquire the subpool lock until the transaction addition is completed.
18291831
for acct := range p.index {
1830-
p.reserve(acct, false)
1832+
p.reserver.Release(acct, poolName)
18311833
}
18321834
p.lookup = newLookup()
18331835
p.index = make(map[common.Address][]*blobTxMeta)

core/txpool/blobpool/blobpool_test.go

Lines changed: 7 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"math/big"
2727
"os"
2828
"path/filepath"
29-
"sync"
3029
"testing"
3130

3231
"github.com/ethereum/go-ethereum/common"
@@ -168,33 +167,6 @@ func (bc *testBlockChain) StateAt(common.Hash) (*state.StateDB, error) {
168167
return bc.statedb, nil
169168
}
170169

171-
// makeAddressReserver is a utility method to sanity check that accounts are
172-
// properly reserved by the blobpool (no duplicate reserves or unreserves).
173-
func makeAddressReserver() txpool.AddressReserver {
174-
var (
175-
reserved = make(map[common.Address]struct{})
176-
lock sync.Mutex
177-
)
178-
return func(addr common.Address, reserve bool) error {
179-
lock.Lock()
180-
defer lock.Unlock()
181-
182-
_, exists := reserved[addr]
183-
if reserve {
184-
if exists {
185-
panic("already reserved")
186-
}
187-
reserved[addr] = struct{}{}
188-
return nil
189-
}
190-
if !exists {
191-
panic("not reserved")
192-
}
193-
delete(reserved, addr)
194-
return nil
195-
}
196-
}
197-
198170
// makeTx is a utility method to construct a random blob transaction and sign it
199171
// with a valid key, only setting the interesting fields from the perspective of
200172
// the blob pool.
@@ -700,7 +672,7 @@ func TestOpenDrops(t *testing.T) {
700672
statedb: statedb,
701673
}
702674
pool := New(Config{Datadir: storage}, chain, nil)
703-
if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver(), nil); err != nil {
675+
if err := pool.Init(1, chain.CurrentBlock(), txpool.NewReserver()); err != nil {
704676
t.Fatalf("failed to create blob pool: %v", err)
705677
}
706678
defer pool.Close()
@@ -818,7 +790,7 @@ func TestOpenIndex(t *testing.T) {
818790
statedb: statedb,
819791
}
820792
pool := New(Config{Datadir: storage}, chain, nil)
821-
if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver(), nil); err != nil {
793+
if err := pool.Init(1, chain.CurrentBlock(), txpool.NewReserver()); err != nil {
822794
t.Fatalf("failed to create blob pool: %v", err)
823795
}
824796
defer pool.Close()
@@ -919,7 +891,7 @@ func TestOpenHeap(t *testing.T) {
919891
statedb: statedb,
920892
}
921893
pool := New(Config{Datadir: storage}, chain, nil)
922-
if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver(), nil); err != nil {
894+
if err := pool.Init(1, chain.CurrentBlock(), txpool.NewReserver()); err != nil {
923895
t.Fatalf("failed to create blob pool: %v", err)
924896
}
925897
defer pool.Close()
@@ -998,7 +970,7 @@ func TestOpenCap(t *testing.T) {
998970
statedb: statedb,
999971
}
1000972
pool := New(Config{Datadir: storage, Datacap: datacap}, chain, nil)
1001-
if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver(), nil); err != nil {
973+
if err := pool.Init(1, chain.CurrentBlock(), txpool.NewReserver()); err != nil {
1002974
t.Fatalf("failed to create blob pool: %v", err)
1003975
}
1004976
// Verify that enough transactions have been dropped to get the pool's size
@@ -1099,7 +1071,7 @@ func TestChangingSlotterSize(t *testing.T) {
10991071
statedb: statedb,
11001072
}
11011073
pool := New(Config{Datadir: storage}, chain, nil)
1102-
if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver(), nil); err != nil {
1074+
if err := pool.Init(1, chain.CurrentBlock(), txpool.NewReserver()); err != nil {
11031075
t.Fatalf("failed to create blob pool: %v", err)
11041076
}
11051077

@@ -1542,7 +1514,7 @@ func TestAdd(t *testing.T) {
15421514
statedb: statedb,
15431515
}
15441516
pool := New(Config{Datadir: storage}, chain, nil)
1545-
if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver(), nil); err != nil {
1517+
if err := pool.Init(1, chain.CurrentBlock(), txpool.NewReserver()); err != nil {
15461518
t.Fatalf("test %d: failed to create blob pool: %v", i, err)
15471519
}
15481520
verifyPoolInternals(t, pool)
@@ -1641,7 +1613,7 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) {
16411613
pool = New(Config{Datadir: ""}, chain, nil)
16421614
)
16431615

1644-
if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver(), nil); err != nil {
1616+
if err := pool.Init(1, chain.CurrentBlock(), txpool.NewReserver()); err != nil {
16451617
b.Fatalf("failed to create blob pool: %v", err)
16461618
}
16471619
// Make the pool not use disk (just drop everything). This test never reads

core/txpool/legacypool/legacypool.go

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ import (
4545
)
4646

4747
const (
48+
poolName = "general" // The name of legacy txpool
49+
4850
// txSlotSize is used to calculate how many data slots a single transaction
4951
// takes up based on its size. The slots are used as DoS protection, ensuring
5052
// that validating a new transaction remains a constant operation (in reality
@@ -237,9 +239,7 @@ type LegacyPool struct {
237239
currentHead atomic.Pointer[types.Header] // Current head of the blockchain
238240
currentState *state.StateDB // Current state in the blockchain head
239241
pendingNonces *noncer // Pending state tracking virtual nonces
240-
241-
reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools
242-
isReserved txpool.IsAddressReserved // Address reserver to check whether the address has been reserved
242+
reserver *txpool.Reserver // Address reserver to ensure exclusivity across subpools
243243

244244
pending map[common.Address]*list // All currently processable transactions
245245
queue map[common.Address]*list // Queued but non-processable transactions
@@ -304,10 +304,9 @@ func (pool *LegacyPool) Filter(tx *types.Transaction) bool {
304304
// Init sets the gas price needed to keep a transaction in the pool and the chain
305305
// head to allow balance / nonce checks. The internal
306306
// goroutines will be spun up and the pool deemed operational afterwards.
307-
func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.AddressReserver, isReserved txpool.IsAddressReserved) error {
307+
func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserver *txpool.Reserver) error {
308308
// Set the address reserver to request exclusive access to pooled accounts
309-
pool.reserve = reserve
310-
pool.isReserved = isReserved
309+
pool.reserver = reserver
311310

312311
// Set the basic pool parameters
313312
pool.gasTip.Store(uint256.NewInt(gasTip))
@@ -658,7 +657,7 @@ func (pool *LegacyPool) validateAuth(tx *types.Transaction) error {
658657
// This scenario is considered acceptable, as the rule primarily ensures
659658
// that attackers cannot easily stack a SetCode transaction when the sender
660659
// is reserved by other pools.
661-
if pool.isReserved(auth) {
660+
if pool.reserver.Has(auth) {
662661
return ErrAuthorityReserved
663662
}
664663
}
@@ -694,7 +693,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
694693
_, hasQueued = pool.queue[from]
695694
)
696695
if !hasPending && !hasQueued {
697-
if err := pool.reserve(from, true); err != nil {
696+
if err := pool.reserver.Hold(from, poolName); err != nil {
698697
return false, err
699698
}
700699
defer func() {
@@ -705,7 +704,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
705704
// by a return statement before running deferred methods. Take care with
706705
// removing or subscoping err as it will break this clause.
707706
if err != nil {
708-
pool.reserve(from, false)
707+
pool.reserver.Release(from, poolName)
709708
}
710709
}()
711710
}
@@ -1098,7 +1097,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
10981097
_, hasQueued = pool.queue[addr]
10991098
)
11001099
if !hasPending && !hasQueued {
1101-
pool.reserve(addr, false)
1100+
pool.reserver.Release(addr, poolName)
11021101
}
11031102
}()
11041103
}
@@ -1478,7 +1477,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
14781477
delete(pool.queue, addr)
14791478
delete(pool.beats, addr)
14801479
if _, ok := pool.pending[addr]; !ok {
1481-
pool.reserve(addr, false)
1480+
pool.reserver.Release(addr, poolName)
14821481
}
14831482
}
14841483
}
@@ -1664,7 +1663,7 @@ func (pool *LegacyPool) demoteUnexecutables() {
16641663
if list.Empty() {
16651664
delete(pool.pending, addr)
16661665
if _, ok := pool.queue[addr]; !ok {
1667-
pool.reserve(addr, false)
1666+
pool.reserver.Release(addr, poolName)
16681667
}
16691668
}
16701669
}
@@ -1909,7 +1908,7 @@ func (pool *LegacyPool) Clear() {
19091908
// acquire the subpool lock until the transaction addition is completed.
19101909
for _, tx := range pool.all.txs {
19111910
senderAddr, _ := types.Sender(pool.signer, tx)
1912-
pool.reserve(senderAddr, false)
1911+
pool.reserver.Release(senderAddr, poolName)
19131912
}
19141913
pool.all = newLookup()
19151914
pool.priced = newPricedList(pool.all)

core/txpool/legacypool/legacypool2_test.go

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/ethereum/go-ethereum/common"
2525
"github.com/ethereum/go-ethereum/core/state"
2626
"github.com/ethereum/go-ethereum/core/tracing"
27+
"github.com/ethereum/go-ethereum/core/txpool"
2728
"github.com/ethereum/go-ethereum/core/types"
2829
"github.com/ethereum/go-ethereum/crypto"
2930
"github.com/ethereum/go-ethereum/event"
@@ -86,9 +87,7 @@ func TestTransactionFutureAttack(t *testing.T) {
8687
config.GlobalQueue = 100
8788
config.GlobalSlots = 100
8889
pool := New(config, blockchain)
89-
90-
reserver, isReserved := makeAddressReserver()
91-
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), reserver, isReserved)
90+
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), txpool.NewReserver())
9291
defer pool.Close()
9392
fillPool(t, pool)
9493
pending, _ := pool.Stats()
@@ -122,9 +121,7 @@ func TestTransactionFuture1559(t *testing.T) {
122121
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
123122
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
124123
pool := New(testTxPoolConfig, blockchain)
125-
126-
reserver, isReserved := makeAddressReserver()
127-
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), reserver, isReserved)
124+
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), txpool.NewReserver())
128125
defer pool.Close()
129126

130127
// Create a number of test accounts, fund them and make transactions
@@ -157,9 +154,7 @@ func TestTransactionZAttack(t *testing.T) {
157154
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
158155
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
159156
pool := New(testTxPoolConfig, blockchain)
160-
161-
reserver, isReserved := makeAddressReserver()
162-
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), reserver, isReserved)
157+
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), txpool.NewReserver())
163158
defer pool.Close()
164159
// Create a number of test accounts, fund them and make transactions
165160
fillPool(t, pool)
@@ -230,9 +225,7 @@ func BenchmarkFutureAttack(b *testing.B) {
230225
config.GlobalQueue = 100
231226
config.GlobalSlots = 100
232227
pool := New(config, blockchain)
233-
234-
reserver, isReserved := makeAddressReserver()
235-
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), reserver, isReserved)
228+
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), txpool.NewReserver())
236229
defer pool.Close()
237230
fillPool(b, pool)
238231

0 commit comments

Comments
 (0)