-
Notifications
You must be signed in to change notification settings - Fork 271
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(txpool): only try demoting txns from accounts that were active #1050
base: develop
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -280,7 +280,6 @@ type TxPool struct { | |||||
queueTxEventCh chan *types.Transaction | ||||||
reorgDoneCh chan chan struct{} | ||||||
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop | ||||||
reorgPauseCh chan bool // requests to pause scheduleReorgLoop | ||||||
realTxActivityShutdownCh chan struct{} | ||||||
wg sync.WaitGroup // tracks loop, scheduleReorgLoop | ||||||
initDoneCh chan struct{} // is closed once the pool is initialized (for tests) | ||||||
|
@@ -317,7 +316,6 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block | |||||
reorgDoneCh: make(chan chan struct{}), | ||||||
reorgShutdownCh: make(chan struct{}), | ||||||
realTxActivityShutdownCh: make(chan struct{}), | ||||||
reorgPauseCh: make(chan bool), | ||||||
initDoneCh: make(chan struct{}), | ||||||
gasPrice: new(big.Int).SetUint64(config.PriceLimit), | ||||||
} | ||||||
|
@@ -1229,14 +1227,13 @@ func (pool *TxPool) scheduleReorgLoop() { | |||||
curDone chan struct{} // non-nil while runReorg is active | ||||||
nextDone = make(chan struct{}) | ||||||
launchNextRun bool | ||||||
reorgsPaused bool | ||||||
reset *txpoolResetRequest | ||||||
dirtyAccounts *accountSet | ||||||
queuedEvents = make(map[common.Address]*txSortedMap) | ||||||
) | ||||||
for { | ||||||
// Launch next background reorg if needed | ||||||
if curDone == nil && launchNextRun && !reorgsPaused { | ||||||
if curDone == nil && launchNextRun { | ||||||
// Run the background reorg and announcements | ||||||
go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents) | ||||||
|
||||||
|
@@ -1288,7 +1285,6 @@ func (pool *TxPool) scheduleReorgLoop() { | |||||
} | ||||||
close(nextDone) | ||||||
return | ||||||
case reorgsPaused = <-pool.reorgPauseCh: | ||||||
} | ||||||
} | ||||||
} | ||||||
|
@@ -1308,9 +1304,10 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt | |||||
promoteAddrs = dirtyAccounts.flatten() | ||||||
} | ||||||
pool.mu.Lock() | ||||||
var affectedAccounts map[common.Address]bool | ||||||
if reset != nil { | ||||||
// Reset from the old head to the new, rescheduling any reorged transactions | ||||||
pool.reset(reset.oldHead, reset.newHead) | ||||||
affectedAccounts = pool.reset(reset.oldHead, reset.newHead) | ||||||
|
||||||
// Nonces were reset, discard any events that became stale | ||||||
for addr := range events { | ||||||
|
@@ -1332,7 +1329,7 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt | |||||
// remove any transaction that has been included in the block or was invalidated | ||||||
// because of another transaction (e.g. higher gas price). | ||||||
if reset != nil { | ||||||
pool.demoteUnexecutables() | ||||||
pool.demoteUnexecutables(affectedAccounts) | ||||||
if reset.newHead != nil && pool.chainconfig.IsCurie(new(big.Int).Add(reset.newHead.Number, big.NewInt(1))) { | ||||||
l1BaseFee := fees.GetL1BaseFee(pool.currentState) | ||||||
pendingBaseFee := misc.CalcBaseFee(pool.chainconfig, reset.newHead, l1BaseFee) | ||||||
|
@@ -1380,9 +1377,18 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt | |||||
|
||||||
// reset retrieves the current state of the blockchain and ensures the content | ||||||
// of the transaction pool is valid with regard to the chain state. | ||||||
func (pool *TxPool) reset(oldHead, newHead *types.Header) { | ||||||
func (pool *TxPool) reset(oldHead, newHead *types.Header) map[common.Address]bool { | ||||||
// If we're reorging an old state, reinject all dropped transactions | ||||||
var reinject types.Transactions | ||||||
affectedAccounts := make(map[common.Address]bool) | ||||||
collectAffectedAccounts := func(txs types.Transactions) { | ||||||
if affectedAccounts != nil { | ||||||
for _, tx := range txs { | ||||||
addr, _ := types.Sender(pool.signer, tx) | ||||||
affectedAccounts[addr] = true | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
if oldHead != nil && oldHead.Hash() != newHead.ParentHash { | ||||||
// If the reorg is too deep, avoid doing it (will happen during fast sync) | ||||||
|
@@ -1391,6 +1397,7 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) { | |||||
|
||||||
if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 { | ||||||
log.Debug("Skipping deep transaction reorg", "depth", depth) | ||||||
affectedAccounts = nil // do a deep txPool reorg | ||||||
} else { | ||||||
// Reorg seems shallow enough to pull in all transactions into memory | ||||||
var discarded, included types.Transactions | ||||||
|
@@ -1407,7 +1414,7 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) { | |||||
// If we reorged to a same or higher number, then it's not a case of setHead | ||||||
log.Warn("Transaction pool reset with missing oldhead", | ||||||
"old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum) | ||||||
return | ||||||
return nil | ||||||
} | ||||||
// If the reorg ended up on a lower number, it's indicative of setHead being the cause | ||||||
log.Debug("Skipping transaction reset caused by setHead", | ||||||
|
@@ -1418,44 +1425,48 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) { | |||||
discarded = append(discarded, rem.Transactions()...) | ||||||
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { | ||||||
log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) | ||||||
return | ||||||
return nil | ||||||
} | ||||||
} | ||||||
for add.NumberU64() > rem.NumberU64() { | ||||||
included = append(included, add.Transactions()...) | ||||||
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { | ||||||
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) | ||||||
return | ||||||
return nil | ||||||
} | ||||||
} | ||||||
for rem.Hash() != add.Hash() { | ||||||
discarded = append(discarded, rem.Transactions()...) | ||||||
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { | ||||||
log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) | ||||||
return | ||||||
return nil | ||||||
} | ||||||
included = append(included, add.Transactions()...) | ||||||
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { | ||||||
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) | ||||||
return | ||||||
return nil | ||||||
} | ||||||
} | ||||||
reinject = types.TxDifference(discarded, included) | ||||||
collectAffectedAccounts(discarded) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. set {reinject + included} = set {discarded + included}, while |
||||||
collectAffectedAccounts(included) | ||||||
} | ||||||
} | ||||||
} | ||||||
// Initialize the internal state to the current head | ||||||
if newHead == nil { | ||||||
affectedAccounts = nil | ||||||
newHead = pool.chain.CurrentBlock().Header() // Special case during testing | ||||||
} | ||||||
statedb, err := pool.chain.StateAt(newHead.Root) | ||||||
if err != nil { | ||||||
log.Error("Failed to reset txpool state", "err", err) | ||||||
return | ||||||
return nil | ||||||
} | ||||||
pool.currentState = statedb | ||||||
pool.pendingNonces = newTxNoncer(statedb) | ||||||
pool.currentMaxGas = newHead.GasLimit | ||||||
collectAffectedAccounts(pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64()).Transactions()) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If calculate the |
||||||
|
||||||
// Inject any transactions discarded due to reorgs | ||||||
log.Debug("Reinjecting stale transactions", "count", len(reinject)) | ||||||
|
@@ -1472,6 +1483,7 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) { | |||||
|
||||||
// Update current head | ||||||
pool.currentHead = next | ||||||
return affectedAccounts | ||||||
} | ||||||
|
||||||
// promoteExecutables moves transactions that have become processable from the | ||||||
|
@@ -1706,9 +1718,14 @@ func (pool *TxPool) truncateQueue() { | |||||
// Note: transactions are not marked as removed in the priced list because re-heaping | ||||||
// is always explicitly triggered by SetBaseFee and it would be unnecessary and wasteful | ||||||
// to trigger a re-heap is this function | ||||||
func (pool *TxPool) demoteUnexecutables() { | ||||||
func (pool *TxPool) demoteUnexecutables(affectedAccounts map[common.Address]bool) { | ||||||
log.Info("Demoting unexecutable transactions", "affected", len(affectedAccounts)) | ||||||
// Iterate over all accounts and demote any non-executable transactions | ||||||
for addr, list := range pool.pending { | ||||||
if affectedAccounts != nil && !affectedAccounts[addr] { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is true that only these accounts might have higher nonce or lower balance. But it is possible that a transaction has sufficient balance before, but it's balance is not sufficient anymore, because of L1 data fee. Would not processing these accounts here cause any problem? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, I missed that. In that case, those txns will linger in the txpool until worker tries to execute them. At that point, worker will encounter an ErrInsufficientFunds error and remove it from the pool manually. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but it is fine imo. Since L1DataFee increasing is not in control of any user, this should not be a viable attack vector There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or could we just randomly trigger the fallback demoteUnexecutables()? e.g. 1000 times with one that loops all accounts. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so the if true, maybe |
||||||
continue | ||||||
} | ||||||
|
||||||
nonce := pool.currentState.GetNonce(addr) | ||||||
|
||||||
// Drop all transactions that are deemed too old (low nonce) | ||||||
|
@@ -1772,24 +1789,6 @@ func (pool *TxPool) calculateTxsLifecycle(txs types.Transactions, t time.Time) { | |||||
} | ||||||
} | ||||||
|
||||||
// PauseReorgs stops any new reorg jobs to be started but doesn't interrupt any existing ones that are in flight | ||||||
// Keep in mind this function might block, although it is not expected to block for any significant amount of time | ||||||
func (pool *TxPool) PauseReorgs() { | ||||||
select { | ||||||
case pool.reorgPauseCh <- true: | ||||||
case <-pool.reorgShutdownCh: | ||||||
} | ||||||
} | ||||||
|
||||||
// ResumeReorgs allows new reorg jobs to be started. | ||||||
// Keep in mind this function might block, although it is not expected to block for any significant amount of time | ||||||
func (pool *TxPool) ResumeReorgs() { | ||||||
select { | ||||||
case pool.reorgPauseCh <- false: | ||||||
case <-pool.reorgShutdownCh: | ||||||
} | ||||||
} | ||||||
|
||||||
// addressByHeartbeat is an account address tagged with its last activity timestamp. | ||||||
type addressByHeartbeat struct { | ||||||
address common.Address | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -569,9 +569,6 @@ func (w *worker) processTxPool() (bool, error) { | |
// Fill the block with all available pending transactions. | ||
pending := w.eth.TxPool().PendingWithMax(false, w.config.MaxAccountsNum) | ||
|
||
// Allow txpool to be reorged as we build current block | ||
w.eth.TxPool().ResumeReorgs() | ||
|
||
// Split the pending transactions into locals and remotes | ||
localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending | ||
for _, account := range w.eth.TxPool().Locals() { | ||
|
@@ -892,10 +889,6 @@ func (w *worker) commit() (common.Hash, error) { | |
} | ||
} | ||
|
||
// A new block event will trigger a reorg in the txpool, pause reorgs to defer this until we fetch txns for next block. | ||
// We may end up trying to process txns that we already included in the previous block, but they will all fail the nonce check | ||
w.eth.TxPool().PauseReorgs() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removing as I don't think we will be needing this hack after this |
||
|
||
// Commit block and state to database. | ||
_, err = w.chain.WriteBlockWithState(block, w.current.receipts, w.current.coalescedLogs, w.current.state, true) | ||
if err != nil { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could save one byte. and in the meanwhile, we don't care about the case that some element is "false" in this map.