diff --git a/config/toml.go b/config/toml.go index eee6fe008..ae0667010 100644 --- a/config/toml.go +++ b/config/toml.go @@ -409,7 +409,7 @@ pending-size = {{ .Mempool.PendingSize }} max-pending-txs-bytes = {{ .Mempool.MaxPendingTxsBytes }} -pending-ttl-duration = {{ .Mempool.PendingTTLDuration }} +pending-ttl-duration = "{{ .Mempool.PendingTTLDuration }}" pending-ttl-num-blocks = {{ .Mempool.PendingTTLNumBlocks }} diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 8504c7cd7..96b214a33 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -127,7 +127,7 @@ func NewTxMempool( timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp) }), - pendingTxs: NewPendingTxs(), + pendingTxs: NewPendingTxs(cfg), failedCheckTxCounts: map[types.NodeID]uint64{}, peerManager: peerManager, } @@ -340,7 +340,9 @@ func (txmp *TxMempool) CheckTx( return err } atomic.AddInt64(&txmp.pendingSizeBytes, int64(wtx.Size())) - txmp.pendingTxs.Insert(wtx, res, txInfo) + if err := txmp.pendingTxs.Insert(wtx, res, txInfo); err != nil { + return err + } } } @@ -362,7 +364,7 @@ func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error { // remove the committed transaction from the transaction store and indexes if wtx := txmp.txStore.GetTxByHash(txKey); wtx != nil { - txmp.removeTx(wtx, false) + txmp.removeTx(wtx, false, true) return nil } @@ -401,7 +403,7 @@ func (txmp *TxMempool) Flush() { txmp.timestampIndex.Reset() for _, wtx := range txmp.txStore.GetAllTxs() { - txmp.removeTx(wtx, false) + txmp.removeTx(wtx, false, false) } atomic.SwapInt64(&txmp.sizeBytes, 0) @@ -513,7 +515,7 @@ func (txmp *TxMempool) Update( // remove the committed transaction from the transaction store and indexes if wtx := txmp.txStore.GetTxByHash(tx.Key()); wtx != nil { - txmp.removeTx(wtx, false) + txmp.removeTx(wtx, false, false) } } @@ -634,7 +636,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck // - The transaction, toEvict, can be removed while a concurrent // reCheckTx callback is being executed for the same transaction. for _, toEvict := range evictTxs { - txmp.removeTx(toEvict, true) + txmp.removeTx(toEvict, true, true) txmp.logger.Debug( "evicted existing good transaction; mempool full", "old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()), @@ -745,7 +747,7 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckT panic("corrupted reCheckTx cursor") } - txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache) + txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache, true) } } @@ -871,13 +873,13 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) bool { return true } -func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) { +func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool, shouldReenqueue bool) { if txmp.txStore.IsTxRemoved(wtx.hash) { return } txmp.txStore.RemoveTx(wtx) - txmp.priorityIndex.RemoveTx(wtx) + toBeReenqueued := txmp.priorityIndex.RemoveTx(wtx, shouldReenqueue) txmp.heightIndex.Remove(wtx) txmp.timestampIndex.Remove(wtx) @@ -889,6 +891,20 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) { atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size())) wtx.removeHandler(removeFromCache) + + if shouldReenqueue { + for _, reenqueue := range toBeReenqueued { + txmp.removeTx(reenqueue, removeFromCache, false) + } + for _, reenqueue := range toBeReenqueued { + rtx := reenqueue.tx + go func() { + if err := txmp.CheckTx(context.Background(), rtx, nil, TxInfo{}); err != nil { + txmp.logger.Error(fmt.Sprintf("failed to reenqueue transaction %X due to %s", rtx.Hash(), err)) + } + }() + } + } } func (txmp *TxMempool) expire(blockHeight int64, wtx *WrappedTx) { @@ -967,7 +983,7 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { } // remove pending txs that have expired - txmp.pendingTxs.PurgeExpired(txmp.config.PendingTTLNumBlocks, blockHeight, txmp.config.PendingTTLDuration, now, func(wtx *WrappedTx) { + txmp.pendingTxs.PurgeExpired(blockHeight, now, func(wtx *WrappedTx) { atomic.AddInt64(&txmp.pendingSizeBytes, int64(-wtx.Size())) txmp.expire(blockHeight, wtx) }) diff --git a/internal/mempool/mempool_test.go b/internal/mempool/mempool_test.go index dd4874417..9b7d56888 100644 --- a/internal/mempool/mempool_test.go +++ b/internal/mempool/mempool_test.go @@ -30,6 +30,8 @@ import ( // transaction priority based on the value in the key/value pair. type application struct { *kvstore.Application + + occupiedNonces map[string][]uint64 } type testTx struct { @@ -38,6 +40,7 @@ type testTx struct { } func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTxV2, error) { + var ( priority int64 sender string @@ -58,7 +61,7 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a GasWanted: 1, }}, nil } - nonce, err := strconv.ParseInt(string(parts[3]), 10, 64) + nonce, err := strconv.ParseUint(string(parts[3]), 10, 64) if err != nil { // could not parse return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{ @@ -67,15 +70,50 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a GasWanted: 1, }}, nil } + if app.occupiedNonces == nil { + app.occupiedNonces = make(map[string][]uint64) + } + if _, exists := app.occupiedNonces[account]; !exists { + app.occupiedNonces[account] = []uint64{} + } + active := true + for i := uint64(0); i < nonce; i++ { + found := false + for _, occ := range app.occupiedNonces[account] { + if occ == i { + found = true + break + } + } + if !found { + active = false + break + } + } + app.occupiedNonces[account] = append(app.occupiedNonces[account], nonce) return &abci.ResponseCheckTxV2{ ResponseCheckTx: &abci.ResponseCheckTx{ Priority: v, Code: code.CodeTypeOK, GasWanted: 1, }, - EVMNonce: uint64(nonce), - EVMSenderAddress: account, - IsEVM: true, + EVMNonce: nonce, + EVMSenderAddress: account, + IsEVM: true, + IsPendingTransaction: !active, + Checker: func() abci.PendingTxCheckerResponse { return abci.Pending }, + ExpireTxHandler: func() { + idx := -1 + for i, n := range app.occupiedNonces[account] { + if n == nonce { + idx = i + break + } + } + if idx >= 0 { + app.occupiedNonces[account] = append(app.occupiedNonces[account][:idx], app.occupiedNonces[account][idx+1:]...) + } + }, }, nil } @@ -470,12 +508,14 @@ func TestTxMempool_Prioritization(t *testing.T) { txs := [][]byte{ []byte(fmt.Sprintf("sender-0-1=peer=%d", 9)), []byte(fmt.Sprintf("sender-1-1=peer=%d", 8)), - []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 7, 0)), - []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 9, 1)), []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 6, 0)), []byte(fmt.Sprintf("sender-2-1=peer=%d", 5)), []byte(fmt.Sprintf("sender-3-1=peer=%d", 4)), } + evmTxs := [][]byte{ + []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 7, 0)), + []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 9, 1)), + } // copy the slice of txs and shuffle the order randomly txsCopy := make([][]byte, len(txs)) @@ -484,6 +524,16 @@ func TestTxMempool_Prioritization(t *testing.T) { rng.Shuffle(len(txsCopy), func(i, j int) { txsCopy[i], txsCopy[j] = txsCopy[j], txsCopy[i] }) + txs = [][]byte{ + []byte(fmt.Sprintf("sender-0-1=peer=%d", 9)), + []byte(fmt.Sprintf("sender-1-1=peer=%d", 8)), + []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 7, 0)), + []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 9, 1)), + []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 6, 0)), + []byte(fmt.Sprintf("sender-2-1=peer=%d", 5)), + []byte(fmt.Sprintf("sender-3-1=peer=%d", 4)), + } + txsCopy = append(txsCopy, evmTxs...) for i := range txsCopy { require.NoError(t, txmp.CheckTx(ctx, txsCopy[i], nil, TxInfo{SenderID: peerID})) @@ -504,6 +554,71 @@ func TestTxMempool_Prioritization(t *testing.T) { } } +func TestTxMempool_PendingStoreSize(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()}) + if err := client.Start(ctx); err != nil { + t.Fatal(err) + } + t.Cleanup(client.Wait) + + txmp := setup(t, client, 100) + txmp.config.PendingSize = 1 + peerID := uint16(1) + + address1 := "0xeD23B3A9DE15e92B9ef9540E587B3661E15A12fA" + + require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 1, 1)), nil, TxInfo{SenderID: peerID})) + err := txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 1, 2)), nil, TxInfo{SenderID: peerID}) + require.Error(t, err) + require.Contains(t, err.Error(), "mempool pending set is full") +} + +func TestTxMempool_EVMEviction(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()}) + if err := client.Start(ctx); err != nil { + t.Fatal(err) + } + t.Cleanup(client.Wait) + + txmp := setup(t, client, 100) + txmp.config.Size = 1 + peerID := uint16(1) + + address1 := "0xeD23B3A9DE15e92B9ef9540E587B3661E15A12fA" + address2 := "0xfD23B3A9DE15e92B9ef9540E587B3661E15A12fA" + + require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 1, 0)), nil, TxInfo{SenderID: peerID})) + // this should evict the previous tx + require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 2, 0)), nil, TxInfo{SenderID: peerID})) + require.Equal(t, 1, txmp.priorityIndex.NumTxs()) + require.Equal(t, int64(2), txmp.priorityIndex.txs[0].priority) + + txmp.config.Size = 2 + require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 3, 1)), nil, TxInfo{SenderID: peerID})) + require.Equal(t, 0, txmp.pendingTxs.Size()) + require.Equal(t, 2, txmp.priorityIndex.NumTxs()) + // this would evict the tx with priority 2 and cause the tx with priority 3 to go pending + require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 4, 0)), nil, TxInfo{SenderID: peerID})) + time.Sleep(1 * time.Second) // reenqueue is async + require.Equal(t, 1, txmp.priorityIndex.NumTxs()) + tx := txmp.priorityIndex.txs[0] + require.Equal(t, 1, txmp.pendingTxs.Size()) + + require.NoError(t, txmp.CheckTx(ctx, []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 5, 1)), nil, TxInfo{SenderID: peerID})) + require.Equal(t, 2, txmp.priorityIndex.NumTxs()) + txmp.removeTx(tx, true, false) + // should not reenqueue + require.Equal(t, 1, txmp.priorityIndex.NumTxs()) + time.Sleep(1 * time.Second) // pendingTxs should still be one even after sleeping for a sec + require.Equal(t, 1, txmp.pendingTxs.Size()) +} + func TestTxMempool_CheckTxSamePeer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/mempool/priority_queue.go b/internal/mempool/priority_queue.go index abcbfe6a7..e9a357d63 100644 --- a/internal/mempool/priority_queue.go +++ b/internal/mempool/priority_queue.go @@ -60,8 +60,11 @@ func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int6 pq.mtx.RLock() defer pq.mtx.RUnlock() - txs := make([]*WrappedTx, len(pq.txs)) - copy(txs, pq.txs) + txs := []*WrappedTx{} + txs = append(txs, pq.txs...) + for _, queue := range pq.evmQueue { + txs = append(txs, queue[1:]...) + } sort.Slice(txs, func(i, j int) bool { return txs[i].priority < txs[j].priority @@ -111,7 +114,7 @@ func (pq *TxPriorityQueue) NumTxs() int { return len(pq.txs) + pq.numQueuedUnsafe() } -func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) { +func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) (removedIdx int) { if queue, ok := pq.evmQueue[tx.evmAddress]; ok { for i, t := range queue { if t.tx.Key() == tx.tx.Key() { @@ -119,10 +122,11 @@ func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) { if len(pq.evmQueue[tx.evmAddress]) == 0 { delete(pq.evmQueue, tx.evmAddress) } - break + return i } } } + return -1 } func (pq *TxPriorityQueue) findTxIndexUnsafe(tx *WrappedTx) (int, bool) { @@ -135,21 +139,27 @@ func (pq *TxPriorityQueue) findTxIndexUnsafe(tx *WrappedTx) (int, bool) { } // RemoveTx removes a specific transaction from the priority queue. -func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx) { +func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx, shouldReenqueue bool) (toBeReenqueued []*WrappedTx) { pq.mtx.Lock() defer pq.mtx.Unlock() + var removedIdx int + if idx, ok := pq.findTxIndexUnsafe(tx); ok { heap.Remove(pq, idx) if tx.isEVM { - pq.removeQueuedEvmTxUnsafe(tx) - if len(pq.evmQueue[tx.evmAddress]) > 0 { + removedIdx = pq.removeQueuedEvmTxUnsafe(tx) + if !shouldReenqueue && len(pq.evmQueue[tx.evmAddress]) > 0 { heap.Push(pq, pq.evmQueue[tx.evmAddress][0]) } } } else if tx.isEVM { - pq.removeQueuedEvmTxUnsafe(tx) + removedIdx = pq.removeQueuedEvmTxUnsafe(tx) + } + if tx.isEVM && shouldReenqueue && len(pq.evmQueue[tx.evmAddress]) > 0 && removedIdx >= 0 { + toBeReenqueued = pq.evmQueue[tx.evmAddress][removedIdx:] } + return } func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) { diff --git a/internal/mempool/priority_queue_test.go b/internal/mempool/priority_queue_test.go index eb340b170..f680dc2fc 100644 --- a/internal/mempool/priority_queue_test.go +++ b/internal/mempool/priority_queue_test.go @@ -331,7 +331,7 @@ func TestTxPriorityQueue_RemoveTxEvm(t *testing.T) { pq.PushTx(tx1) pq.PushTx(tx2) - pq.RemoveTx(tx1) + pq.RemoveTx(tx1, false) result := pq.PopTx() require.Equal(t, tx2, result) @@ -360,14 +360,14 @@ func TestTxPriorityQueue_RemoveTx(t *testing.T) { max := values[len(values)-1] wtx := pq.txs[pq.NumTxs()/2] - pq.RemoveTx(wtx) + pq.RemoveTx(wtx, false) require.Equal(t, numTxs-1, pq.NumTxs()) require.Equal(t, int64(max), pq.PopTx().priority) require.Equal(t, numTxs-2, pq.NumTxs()) require.NotPanics(t, func() { - pq.RemoveTx(&WrappedTx{heapIndex: numTxs}) - pq.RemoveTx(&WrappedTx{heapIndex: numTxs + 1}) + pq.RemoveTx(&WrappedTx{heapIndex: numTxs}, false) + pq.RemoveTx(&WrappedTx{heapIndex: numTxs + 1}, false) }) require.Equal(t, numTxs-2, pq.NumTxs()) } diff --git a/internal/mempool/tx.go b/internal/mempool/tx.go index 13ddb0a12..79c91e039 100644 --- a/internal/mempool/tx.go +++ b/internal/mempool/tx.go @@ -1,11 +1,13 @@ package mempool import ( + "errors" "sort" "sync" "time" abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/internal/libs/clist" "github.com/tendermint/tendermint/types" ) @@ -308,8 +310,10 @@ func (wtl *WrappedTxList) Remove(wtx *WrappedTx) { } type PendingTxs struct { - mtx *sync.RWMutex - txs []TxWithResponse + mtx *sync.RWMutex + txs []TxWithResponse + config *config.MempoolConfig + sizeBytes uint64 } type TxWithResponse struct { @@ -318,10 +322,12 @@ type TxWithResponse struct { txInfo TxInfo } -func NewPendingTxs() *PendingTxs { +func NewPendingTxs(conf *config.MempoolConfig) *PendingTxs { return &PendingTxs{ - mtx: &sync.RWMutex{}, - txs: []TxWithResponse{}, + mtx: &sync.RWMutex{}, + txs: []TxWithResponse{}, + config: conf, + sizeBytes: 0, } } func (p *PendingTxs) EvaluatePendingTransactions() ( @@ -359,6 +365,7 @@ func (p *PendingTxs) popTxsAtIndices(indices []int) { if idx >= len(p.txs) { panic("indices popped from pending tx store out of range") } + p.sizeBytes -= uint64(p.txs[idx].tx.Size()) newTxs = append(newTxs, p.txs[start:idx]...) start = idx + 1 } @@ -366,14 +373,21 @@ func (p *PendingTxs) popTxsAtIndices(indices []int) { p.txs = newTxs } -func (p *PendingTxs) Insert(tx *WrappedTx, resCheckTx *abci.ResponseCheckTxV2, txInfo TxInfo) { +func (p *PendingTxs) Insert(tx *WrappedTx, resCheckTx *abci.ResponseCheckTxV2, txInfo TxInfo) error { p.mtx.Lock() defer p.mtx.Unlock() + + if len(p.txs) >= p.config.PendingSize && uint64(tx.Size())+p.sizeBytes > uint64(p.config.MaxPendingTxsBytes) { + return errors.New("pending store is full") + } + p.txs = append(p.txs, TxWithResponse{ tx: tx, checkTxResponse: resCheckTx, txInfo: txInfo, }) + p.sizeBytes += uint64(tx.Size()) + return nil } func (p *PendingTxs) Peek(max int) []TxWithResponse { @@ -392,7 +406,7 @@ func (p *PendingTxs) Size() int { return len(p.txs) } -func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDuration time.Duration, now time.Time, cb func(wtx *WrappedTx)) { +func (p *PendingTxs) PurgeExpired(blockHeight int64, now time.Time, cb func(wtx *WrappedTx)) { p.mtx.Lock() defer p.mtx.Unlock() @@ -401,15 +415,16 @@ func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDurat } // txs retains the ordering of insertion - if ttlNumBlock > 0 { + if p.config.TTLNumBlocks > 0 { idxFirstNotExpiredTx := len(p.txs) for i, ptx := range p.txs { // once found, we can break because these are ordered - if (blockHeight - ptx.tx.height) <= ttlNumBlock { + if (blockHeight - ptx.tx.height) <= p.config.TTLNumBlocks { idxFirstNotExpiredTx = i break } else { cb(ptx.tx) + p.sizeBytes -= uint64(ptx.tx.Size()) } } p.txs = p.txs[idxFirstNotExpiredTx:] @@ -419,15 +434,16 @@ func (p *PendingTxs) PurgeExpired(ttlNumBlock int64, blockHeight int64, ttlDurat return } - if ttlDuration > 0 { + if p.config.TTLDuration > 0 { idxFirstNotExpiredTx := len(p.txs) for i, ptx := range p.txs { // once found, we can break because these are ordered - if now.Sub(ptx.tx.timestamp) <= ttlDuration { + if now.Sub(ptx.tx.timestamp) <= p.config.TTLDuration { idxFirstNotExpiredTx = i break } else { cb(ptx.tx) + p.sizeBytes -= uint64(ptx.tx.Size()) } } p.txs = p.txs[idxFirstNotExpiredTx:] diff --git a/internal/mempool/tx_test.go b/internal/mempool/tx_test.go index 77a49c276..834beeda1 100644 --- a/internal/mempool/tx_test.go +++ b/internal/mempool/tx_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/types" ) @@ -231,7 +232,7 @@ func TestWrappedTxList_Remove(t *testing.T) { } func TestPendingTxsPopTxsGood(t *testing.T) { - pendingTxs := NewPendingTxs() + pendingTxs := NewPendingTxs(config.TestMempoolConfig()) for _, test := range []struct { origLen int popIndices []int @@ -281,7 +282,9 @@ func TestPendingTxsPopTxsGood(t *testing.T) { } { pendingTxs.txs = []TxWithResponse{} for i := 0; i < test.origLen; i++ { - pendingTxs.txs = append(pendingTxs.txs, TxWithResponse{txInfo: TxInfo{SenderID: uint16(i)}}) + pendingTxs.txs = append(pendingTxs.txs, TxWithResponse{ + tx: &WrappedTx{tx: []byte{}}, + txInfo: TxInfo{SenderID: uint16(i)}}) } pendingTxs.popTxsAtIndices(test.popIndices) require.Equal(t, len(test.expected), len(pendingTxs.txs)) @@ -292,7 +295,7 @@ func TestPendingTxsPopTxsGood(t *testing.T) { } func TestPendingTxsPopTxsBad(t *testing.T) { - pendingTxs := NewPendingTxs() + pendingTxs := NewPendingTxs(config.TestMempoolConfig()) // out of range require.Panics(t, func() { pendingTxs.popTxsAtIndices([]int{0}) }) // out of order