From 90dfc2a7ca643d7dfcbdd95670b0cf978924e1e6 Mon Sep 17 00:00:00 2001 From: lightclient Date: Tue, 4 Feb 2025 11:51:17 -0700 Subject: [PATCH] txpool: move authorizations to lookup obj Co-authored-by: Marius van der Wijden Co-authored-by: lightclient --- core/txpool/legacypool/legacypool.go | 115 ++++++++++++--------------- 1 file changed, 50 insertions(+), 65 deletions(-) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index d3dba3d8d77d..23fd7a06fd3b 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -210,13 +210,12 @@ type LegacyPool struct { currentState *state.StateDB // Current state in the blockchain head pendingNonces *noncer // Pending state tracking virtual nonces - reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools - pending map[common.Address]*list // All currently processable transactions - queue map[common.Address]*list // Queued but non-processable transactions - beats map[common.Address]time.Time // Last heartbeat from each known account - all *lookup // All transactions to allow lookups - priced *pricedList // All transactions sorted by price - auths map[common.Address][]*types.Transaction // All accounts with a pooled authorization + reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools + pending map[common.Address]*list // All currently processable transactions + queue map[common.Address]*list // Queued but non-processable transactions + beats map[common.Address]time.Time // Last heartbeat from each known account + all *lookup // All transactions to allow lookups + priced *pricedList // All transactions sorted by price reqResetCh chan *txpoolResetRequest reqPromoteCh chan *accountSet @@ -248,7 +247,6 @@ func New(config Config, chain BlockChain) *LegacyPool { pending: make(map[common.Address]*list), queue: make(map[common.Address]*list), beats: make(map[common.Address]time.Time), - auths: make(map[common.Address][]*types.Transaction), all: newLookup(), reqResetCh: make(chan *txpoolResetRequest), reqPromoteCh: make(chan *accountSet), @@ -592,7 +590,7 @@ func (pool *LegacyPool) validateTx(tx *types.Transaction) error { KnownConflicts: func(from common.Address, auths []common.Address) []common.Address { var conflicts []common.Address // The transaction sender cannot have an in-flight authorization. - if _, ok := pool.auths[from]; ok { + if _, ok := pool.all.auths[from]; ok { conflicts = append(conflicts, from) } // Authorities cannot conflict with any pending or queued transactions. @@ -728,13 +726,11 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) { if old != nil { pool.all.Remove(old.Hash()) pool.priced.Removed(1) - pool.removeAuthorities(old) pendingReplaceMeter.Mark(1) } pool.all.Add(tx) pool.priced.Put(tx) pool.queueTxEvent(tx) - pool.addAuthorities(tx) log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) // Successful promotion, bump the heartbeat @@ -746,7 +742,6 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) { if err != nil { return false, err } - pool.addAuthorities(tx) log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) return replaced, nil @@ -794,7 +789,6 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, addAl // Discard any previous transaction and mark this if old != nil { pool.all.Remove(old.Hash()) - pool.removeAuthorities(old) pool.priced.Removed(1) queuedReplaceMeter.Mark(1) } else { @@ -814,7 +808,6 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, addAl if _, exist := pool.beats[from]; !exist { pool.beats[from] = time.Now() } - pool.addAuthorities(tx) return old != nil, nil } @@ -833,7 +826,6 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ if !inserted { // An older transaction was better, discard this pool.all.Remove(hash) - pool.removeAuthorities(tx) pool.priced.Removed(1) pendingDiscardMeter.Mark(1) return false @@ -841,7 +833,6 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ // Otherwise discard any previous transaction and mark this if old != nil { pool.all.Remove(old.Hash()) - pool.removeAuthorities(old) pool.priced.Removed(1) pendingReplaceMeter.Mark(1) } else { @@ -1033,9 +1024,6 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo if outofbound { pool.priced.Removed(1) } - // Remove any authorities the pool was tracking. - pool.removeAuthorities(tx) - // Remove the transaction from the pending lists and reset the account nonce if pending := pool.pending[addr]; pending != nil { if removed, invalids := pending.Remove(tx); removed { @@ -1069,43 +1057,6 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo return 0 } -// addAuthorities tracks the supplied tx in relation to each authority it -// specifies. -func (pool *LegacyPool) addAuthorities(tx *types.Transaction) { - for _, addr := range tx.Authorities() { - list, ok := pool.auths[addr] - if !ok { - list = []*types.Transaction{} - } - if slices.Contains(list, tx) { - // Don't add duplicates. - continue - } - list = append(list, tx) - pool.auths[addr] = list - } -} - -// removeAuthorities stops tracking the supplied tx in relation to its -// authorities. -func (pool *LegacyPool) removeAuthorities(tx *types.Transaction) { - for _, addr := range tx.Authorities() { - // Remove tx from tracker. - list := pool.auths[addr] - if i := slices.Index(list, tx); i >= 0 { - list = append(list[:i], list[i+1:]...) - } else { - log.Error("Authority with untracked tx", "addr", addr, "hash", tx.Hash()) - } - if len(list) == 0 { - // If list is newly empty, delete it entirely. - delete(pool.auths, addr) - continue - } - pool.auths[addr] = list - } -} - // requestReset requests a pool reset to the new head block. // The returned channel is closed when the reset has occurred. func (pool *LegacyPool) requestReset(oldHead *types.Header, newHead *types.Header) chan struct{} { @@ -1406,14 +1357,12 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T forwards := list.Forward(pool.currentState.GetNonce(addr)) for _, tx := range forwards { pool.all.Remove(tx.Hash()) - pool.removeAuthorities(tx) } log.Trace("Removed old queued transactions", "count", len(forwards)) // Drop all transactions that are too costly (low balance or out of gas) drops, _ := list.Filter(pool.currentState.GetBalance(addr), gasLimit) for _, tx := range drops { pool.all.Remove(tx.Hash()) - pool.removeAuthorities(tx) } log.Trace("Removed unpayable queued transactions", "count", len(drops)) queuedNofundsMeter.Mark(int64(len(drops))) @@ -1434,7 +1383,6 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T for _, tx := range caps { hash := tx.Hash() pool.all.Remove(hash) - pool.removeAuthorities(tx) log.Trace("Removed cap-exceeding queued transaction", "hash", hash) } queuedRateLimitMeter.Mark(int64(len(caps))) @@ -1497,7 +1445,6 @@ func (pool *LegacyPool) truncatePending() { // Drop the transaction from the global pools too hash := tx.Hash() pool.all.Remove(hash) - pool.removeAuthorities(tx) // Update the account nonce to the dropped transaction pool.pendingNonces.setIfLower(offenders[i], tx.Nonce()) @@ -1523,7 +1470,6 @@ func (pool *LegacyPool) truncatePending() { // Drop the transaction from the global pools too hash := tx.Hash() pool.all.Remove(hash) - pool.removeAuthorities(tx) // Update the account nonce to the dropped transaction pool.pendingNonces.setIfLower(addr, tx.Nonce()) @@ -1599,7 +1545,6 @@ func (pool *LegacyPool) demoteUnexecutables() { for _, tx := range olds { hash := tx.Hash() pool.all.Remove(hash) - pool.removeAuthorities(tx) log.Trace("Removed old pending transaction", "hash", hash) } // Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later @@ -1607,7 +1552,6 @@ func (pool *LegacyPool) demoteUnexecutables() { for _, tx := range drops { hash := tx.Hash() pool.all.Remove(hash) - pool.removeAuthorities(tx) log.Trace("Removed unpayable pending transaction", "hash", hash) } pendingNofundsMeter.Mark(int64(len(drops))) @@ -1717,12 +1661,15 @@ type lookup struct { slots int lock sync.RWMutex txs map[common.Hash]*types.Transaction + + auths map[common.Address][]*types.Transaction // All accounts with a pooled authorization } // newLookup returns a new lookup structure. func newLookup() *lookup { return &lookup{ - txs: make(map[common.Hash]*types.Transaction), + txs: make(map[common.Hash]*types.Transaction), + auths: make(map[common.Address][]*types.Transaction), } } @@ -1773,6 +1720,7 @@ func (t *lookup) Add(tx *types.Transaction) { slotsGauge.Update(int64(t.slots)) t.txs[tx.Hash()] = tx + t.addAuthorities(tx) } // Remove removes a transaction from the lookup. @@ -1787,6 +1735,7 @@ func (t *lookup) Remove(hash common.Hash) { } t.slots -= numSlots(tx) slotsGauge.Update(int64(t.slots)) + t.removeAuthorities(tx) delete(t.txs, hash) } @@ -1803,6 +1752,43 @@ func (t *lookup) TxsBelowTip(threshold *big.Int) types.Transactions { return found } +// addAuthorities tracks the supplied tx in relation to each authority it +// specifies. +func (t *lookup) addAuthorities(tx *types.Transaction) { + for _, addr := range tx.Authorities() { + list, ok := t.auths[addr] + if !ok { + list = []*types.Transaction{} + } + if slices.Contains(list, tx) { + // Don't add duplicates. + continue + } + list = append(list, tx) + t.auths[addr] = list + } +} + +// removeAuthorities stops tracking the supplied tx in relation to its +// authorities. +func (t *lookup) removeAuthorities(tx *types.Transaction) { + for _, addr := range tx.Authorities() { + // Remove tx from tracker. + list := t.auths[addr] + if i := slices.Index(list, tx); i >= 0 { + list = append(list[:i], list[i+1:]...) + } else { + log.Error("Authority with untracked tx", "addr", addr, "hash", tx.Hash()) + } + if len(list) == 0 { + // If list is newly empty, delete it entirely. + delete(t.auths, addr) + continue + } + t.auths[addr] = list + } +} + // numSlots calculates the number of slots needed for a single transaction. func numSlots(tx *types.Transaction) int { return int((tx.Size() + txSlotSize - 1) / txSlotSize) @@ -1837,5 +1823,4 @@ func (pool *LegacyPool) Clear() { pool.pending = make(map[common.Address]*list) pool.queue = make(map[common.Address]*list) pool.pendingNonces = newNoncer(pool.currentState) - pool.auths = make(map[common.Address][]*types.Transaction) }