Skip to content

Commit

Permalink
add TTL for pending txs (#174)
Browse files Browse the repository at this point in the history
  • Loading branch information
codchen authored and stevenlanders committed Jan 3, 2024
1 parent 9ce7a58 commit 47ff212
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
6 changes: 4 additions & 2 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,8 @@ func (txmp *TxMempool) Update(
}
}

txmp.handlePendingTransactions()
txmp.purgeExpiredTxs(blockHeight)
txmp.handlePendingTransactions()

// If there any uncommitted transactions left in the mempool, we either
// initiate re-CheckTx per remaining transaction or notify that remaining
Expand Down Expand Up @@ -844,7 +844,7 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) {

// purgeExpiredTxs removes all transactions that have exceeded their respective
// height- and/or time-based TTLs from their respective indexes. Every expired
// transaction will be removed from the mempool, but preserved in the cache.
// transaction will be removed from the mempool, but preserved in the cache (except for pending txs).
//
// NOTE: purgeExpiredTxs must only be called during TxMempool#Update in which
// the caller has a write-lock on the mempool and so we can safely iterate over
Expand Down Expand Up @@ -890,6 +890,8 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
for _, wtx := range expiredTxs {
txmp.removeTx(wtx, false)
}

txmp.pendingTxs.PurgeExpired(txmp.config.TTLNumBlocks, blockHeight, txmp.config.TTLDuration, now, txmp.cache.Remove)
}

func (txmp *TxMempool) notifyTxsAvailable() {
Expand Down
38 changes: 38 additions & 0 deletions internal/mempool/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,3 +372,41 @@ func (p *PendingTxs) Size() int {
defer p.mtx.RUnlock()
return len(p.txs)
}

func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDuration time.Duration, now time.Time, cb func(types.Tx)) {
p.mtx.Lock()
defer p.mtx.Unlock()

if len(p.txs) == 0 {
return
}

// txs retains the ordering of insertion
if ttlNumBlock > 0 {
idxFirstNotExpiredTx := len(p.txs)
for i, ptx := range p.txs {
if (blockHeight - ptx.tx.height) <= ttlNumBlock {
idxFirstNotExpiredTx = i
} else {
cb(ptx.tx.tx)
}
}
p.txs = p.txs[idxFirstNotExpiredTx:]
}

if len(p.txs) == 0 {
return
}

if ttlDuration > 0 {
idxFirstNotExpiredTx := len(p.txs)
for i, ptx := range p.txs {
if now.Sub(ptx.tx.timestamp) <= ttlDuration {
idxFirstNotExpiredTx = i
} else {
cb(ptx.tx.tx)
}
}
p.txs = p.txs[idxFirstNotExpiredTx:]
}
}

0 comments on commit 47ff212

Please sign in to comment.