Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 50 additions & 65 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,12 @@ type LegacyPool struct {
currentState *state.StateDB // Current state in the blockchain head
pendingNonces *noncer // Pending state tracking virtual nonces

reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools
pending map[common.Address]*list // All currently processable transactions
queue map[common.Address]*list // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
all *lookup // All transactions to allow lookups
priced *pricedList // All transactions sorted by price
auths map[common.Address][]*types.Transaction // All accounts with a pooled authorization
reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools
pending map[common.Address]*list // All currently processable transactions
queue map[common.Address]*list // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
all *lookup // All transactions to allow lookups
priced *pricedList // All transactions sorted by price

reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
Expand Down Expand Up @@ -248,7 +247,6 @@ func New(config Config, chain BlockChain) *LegacyPool {
pending: make(map[common.Address]*list),
queue: make(map[common.Address]*list),
beats: make(map[common.Address]time.Time),
auths: make(map[common.Address][]*types.Transaction),
all: newLookup(),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
Expand Down Expand Up @@ -592,7 +590,7 @@ func (pool *LegacyPool) validateTx(tx *types.Transaction) error {
KnownConflicts: func(from common.Address, auths []common.Address) []common.Address {
var conflicts []common.Address
// The transaction sender cannot have an in-flight authorization.
if _, ok := pool.auths[from]; ok {
if _, ok := pool.all.auths[from]; ok {
conflicts = append(conflicts, from)
}
// Authorities cannot conflict with any pending or queued transactions.
Expand Down Expand Up @@ -728,13 +726,11 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
pool.removeAuthorities(old)
pendingReplaceMeter.Mark(1)
}
pool.all.Add(tx)
pool.priced.Put(tx)
pool.queueTxEvent(tx)
pool.addAuthorities(tx)
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())

// Successful promotion, bump the heartbeat
Expand All @@ -746,7 +742,6 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
if err != nil {
return false, err
}
pool.addAuthorities(tx)

log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
return replaced, nil
Expand Down Expand Up @@ -794,7 +789,6 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, addAl
// Discard any previous transaction and mark this
if old != nil {
pool.all.Remove(old.Hash())
pool.removeAuthorities(old)
pool.priced.Removed(1)
queuedReplaceMeter.Mark(1)
} else {
Expand All @@ -814,7 +808,6 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, addAl
if _, exist := pool.beats[from]; !exist {
pool.beats[from] = time.Now()
}
pool.addAuthorities(tx)
return old != nil, nil
}

Expand All @@ -833,15 +826,13 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ
if !inserted {
// An older transaction was better, discard this
pool.all.Remove(hash)
pool.removeAuthorities(tx)
pool.priced.Removed(1)
pendingDiscardMeter.Mark(1)
return false
}
// Otherwise discard any previous transaction and mark this
if old != nil {
pool.all.Remove(old.Hash())
pool.removeAuthorities(old)
pool.priced.Removed(1)
pendingReplaceMeter.Mark(1)
} else {
Expand Down Expand Up @@ -1033,9 +1024,6 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
if outofbound {
pool.priced.Removed(1)
}
// Remove any authorities the pool was tracking.
pool.removeAuthorities(tx)

// Remove the transaction from the pending lists and reset the account nonce
if pending := pool.pending[addr]; pending != nil {
if removed, invalids := pending.Remove(tx); removed {
Expand Down Expand Up @@ -1069,43 +1057,6 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
return 0
}

// addAuthorities tracks the supplied tx in relation to each authority it
// specifies.
func (pool *LegacyPool) addAuthorities(tx *types.Transaction) {
for _, addr := range tx.Authorities() {
list, ok := pool.auths[addr]
if !ok {
list = []*types.Transaction{}
}
if slices.Contains(list, tx) {
// Don't add duplicates.
continue
}
list = append(list, tx)
pool.auths[addr] = list
}
}

// removeAuthorities stops tracking the supplied tx in relation to its
// authorities.
func (pool *LegacyPool) removeAuthorities(tx *types.Transaction) {
for _, addr := range tx.Authorities() {
// Remove tx from tracker.
list := pool.auths[addr]
if i := slices.Index(list, tx); i >= 0 {
list = append(list[:i], list[i+1:]...)
} else {
log.Error("Authority with untracked tx", "addr", addr, "hash", tx.Hash())
}
if len(list) == 0 {
// If list is newly empty, delete it entirely.
delete(pool.auths, addr)
continue
}
pool.auths[addr] = list
}
}

// requestReset requests a pool reset to the new head block.
// The returned channel is closed when the reset has occurred.
func (pool *LegacyPool) requestReset(oldHead *types.Header, newHead *types.Header) chan struct{} {
Expand Down Expand Up @@ -1406,14 +1357,12 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
forwards := list.Forward(pool.currentState.GetNonce(addr))
for _, tx := range forwards {
pool.all.Remove(tx.Hash())
pool.removeAuthorities(tx)
}
log.Trace("Removed old queued transactions", "count", len(forwards))
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), gasLimit)
for _, tx := range drops {
pool.all.Remove(tx.Hash())
pool.removeAuthorities(tx)
}
log.Trace("Removed unpayable queued transactions", "count", len(drops))
queuedNofundsMeter.Mark(int64(len(drops)))
Expand All @@ -1434,7 +1383,6 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
for _, tx := range caps {
hash := tx.Hash()
pool.all.Remove(hash)
pool.removeAuthorities(tx)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
queuedRateLimitMeter.Mark(int64(len(caps)))
Expand Down Expand Up @@ -1497,7 +1445,6 @@ func (pool *LegacyPool) truncatePending() {
// Drop the transaction from the global pools too
hash := tx.Hash()
pool.all.Remove(hash)
pool.removeAuthorities(tx)

// Update the account nonce to the dropped transaction
pool.pendingNonces.setIfLower(offenders[i], tx.Nonce())
Expand All @@ -1523,7 +1470,6 @@ func (pool *LegacyPool) truncatePending() {
// Drop the transaction from the global pools too
hash := tx.Hash()
pool.all.Remove(hash)
pool.removeAuthorities(tx)

// Update the account nonce to the dropped transaction
pool.pendingNonces.setIfLower(addr, tx.Nonce())
Expand Down Expand Up @@ -1599,15 +1545,13 @@ func (pool *LegacyPool) demoteUnexecutables() {
for _, tx := range olds {
hash := tx.Hash()
pool.all.Remove(hash)
pool.removeAuthorities(tx)
log.Trace("Removed old pending transaction", "hash", hash)
}
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), gasLimit)
for _, tx := range drops {
hash := tx.Hash()
pool.all.Remove(hash)
pool.removeAuthorities(tx)
log.Trace("Removed unpayable pending transaction", "hash", hash)
}
pendingNofundsMeter.Mark(int64(len(drops)))
Expand Down Expand Up @@ -1717,12 +1661,15 @@ type lookup struct {
slots int
lock sync.RWMutex
txs map[common.Hash]*types.Transaction

auths map[common.Address][]*types.Transaction // All accounts with a pooled authorization
}

// newLookup returns a new lookup structure.
func newLookup() *lookup {
return &lookup{
txs: make(map[common.Hash]*types.Transaction),
txs: make(map[common.Hash]*types.Transaction),
auths: make(map[common.Address][]*types.Transaction),
}
}

Expand Down Expand Up @@ -1773,6 +1720,7 @@ func (t *lookup) Add(tx *types.Transaction) {
slotsGauge.Update(int64(t.slots))

t.txs[tx.Hash()] = tx
t.addAuthorities(tx)
}

// Remove removes a transaction from the lookup.
Expand All @@ -1787,6 +1735,7 @@ func (t *lookup) Remove(hash common.Hash) {
}
t.slots -= numSlots(tx)
slotsGauge.Update(int64(t.slots))
t.removeAuthorities(tx)

delete(t.txs, hash)
}
Expand All @@ -1803,6 +1752,43 @@ func (t *lookup) TxsBelowTip(threshold *big.Int) types.Transactions {
return found
}

// addAuthorities tracks the supplied tx in relation to each authority it
// specifies.
func (t *lookup) addAuthorities(tx *types.Transaction) {
for _, addr := range tx.Authorities() {
list, ok := t.auths[addr]
if !ok {
list = []*types.Transaction{}
}
if slices.Contains(list, tx) {
// Don't add duplicates.
continue
}
list = append(list, tx)
t.auths[addr] = list
}
}

// removeAuthorities stops tracking the supplied tx in relation to its
// authorities.
func (t *lookup) removeAuthorities(tx *types.Transaction) {
for _, addr := range tx.Authorities() {
// Remove tx from tracker.
list := t.auths[addr]
if i := slices.Index(list, tx); i >= 0 {
list = append(list[:i], list[i+1:]...)
} else {
log.Error("Authority with untracked tx", "addr", addr, "hash", tx.Hash())
}
if len(list) == 0 {
// If list is newly empty, delete it entirely.
delete(t.auths, addr)
continue
}
t.auths[addr] = list
}
}

// numSlots calculates the number of slots needed for a single transaction.
func numSlots(tx *types.Transaction) int {
return int((tx.Size() + txSlotSize - 1) / txSlotSize)
Expand Down Expand Up @@ -1837,5 +1823,4 @@ func (pool *LegacyPool) Clear() {
pool.pending = make(map[common.Address]*list)
pool.queue = make(map[common.Address]*list)
pool.pendingNonces = newNoncer(pool.currentState)
pool.auths = make(map[common.Address][]*types.Transaction)
}