diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index a00883b05..9bed03a8a 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -399,42 +399,28 @@ func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { totalSize int64 ) - // wTxs contains a list of *WrappedTx retrieved from the priority queue that - // need to be re-enqueued prior to returning. - wTxs := make([]*WrappedTx, 0, txmp.priorityIndex.NumTxs()) - defer func() { - for _, wtx := range wTxs { - txmp.priorityIndex.PushTx(wtx) - } - }() - - txs := make([]types.Tx, 0, txmp.priorityIndex.NumTxs()) + var txs []types.Tx if uint64(txmp.Size()) < txmp.config.TxNotifyThreshold { // do not reap anything if threshold is not met return txs } - for txmp.priorityIndex.NumTxs() > 0 { - wtx := txmp.priorityIndex.PopTx() - txs = append(txs, wtx.tx) - wTxs = append(wTxs, wtx) + txmp.priorityIndex.ForEachTx(func(wtx *WrappedTx) bool { size := types.ComputeProtoSizeForTxs([]types.Tx{wtx.tx}) - // Ensure we have capacity for the transaction with respect to the - // transaction size. if maxBytes > -1 && totalSize+size > maxBytes { - return txs[:len(txs)-1] + return false } - totalSize += size - - // ensure we have capacity for the transaction with respect to total gas gas := totalGas + wtx.gasWanted if maxGas > -1 && gas > maxGas { - return txs[:len(txs)-1] + return false } totalGas = gas - } + + txs = append(txs, wtx.tx) + return true + }) return txs } diff --git a/internal/mempool/priority_queue.go b/internal/mempool/priority_queue.go index 6906a9717..6dbbfe9b2 100644 --- a/internal/mempool/priority_queue.go +++ b/internal/mempool/priority_queue.go @@ -13,14 +13,11 @@ var _ heap.Interface = (*TxPriorityQueue)(nil) // TxPriorityQueue defines a thread-safe priority queue for valid transactions. type TxPriorityQueue struct { mtx sync.RWMutex - txs []*WrappedTx - evmQueue map[string][]*WrappedTx + txs []*WrappedTx // priority heap + evmQueue map[string][]*WrappedTx // sorted by nonce } -func insertToEVMQueue(queue []*WrappedTx, tx *WrappedTx) []*WrappedTx { - // Using BinarySearch to find the appropriate index to insert tx - i := binarySearch(queue, tx) - +func insertToEVMQueue(queue []*WrappedTx, tx *WrappedTx, i int) []*WrappedTx { // Make room for new value and add it queue = append(queue, nil) copy(queue[i+1:], queue[i:]) @@ -33,7 +30,7 @@ func binarySearch(queue []*WrappedTx, tx *WrappedTx) int { low, high := 0, len(queue) for low < high { mid := low + (high-low)/2 - if queue[mid].evmNonce < tx.evmNonce { + if queue[mid].evmNonce <= tx.evmNonce { low = mid + 1 } else { high = mid @@ -117,12 +114,15 @@ func (pq *TxPriorityQueue) NumTxs() int { func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) { if queue, ok := pq.evmQueue[tx.evmAddress]; ok { for i, t := range queue { - if t.evmNonce == tx.evmNonce { + if t.tx.Key() == tx.tx.Key() { pq.evmQueue[tx.evmAddress] = append(queue[:i], queue[i+1:]...) if len(pq.evmQueue[tx.evmAddress]) == 0 { delete(pq.evmQueue, tx.evmAddress) } else { - heap.Push(pq, pq.evmQueue[tx.evmAddress][0]) + // only if removing the first item, then push next onto queue + if i == 0 { + heap.Push(pq, pq.evmQueue[tx.evmAddress][0]) + } } break } @@ -174,9 +174,67 @@ func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) { heap.Push(pq, tx) } - pq.evmQueue[tx.evmAddress] = insertToEVMQueue(queue, tx) + pq.evmQueue[tx.evmAddress] = insertToEVMQueue(queue, tx, binarySearch(queue, tx)) + } +// These are available if we need to test the invariant checks +// these can be used to troubleshoot invariant violations +//func (pq *TxPriorityQueue) checkInvariants(msg string) { +// +// uniqHashes := make(map[string]bool) +// for _, tx := range pq.txs { +// if _, ok := uniqHashes[fmt.Sprintf("%x", tx.tx.Key())]; ok { +// pq.print() +// panic(fmt.Sprintf("INVARIANT (%s): duplicate hash=%x in heap", msg, tx.tx.Key())) +// } +// uniqHashes[fmt.Sprintf("%x", tx.tx.Key())] = true +// if tx.isEVM { +// if queue, ok := pq.evmQueue[tx.evmAddress]; ok { +// if queue[0].tx.Key() != tx.tx.Key() { +// pq.print() +// panic(fmt.Sprintf("INVARIANT (%s): tx in heap but not at front of evmQueue hash=%x", msg, tx.tx.Key())) +// } +// } else { +// pq.print() +// panic(fmt.Sprintf("INVARIANT (%s): tx in heap but not in evmQueue hash=%x", msg, tx.tx.Key())) +// } +// } +// } +// +// // each item in all queues should be unique nonce +// for _, queue := range pq.evmQueue { +// hashes := make(map[string]bool) +// for idx, tx := range queue { +// if idx == 0 { +// _, ok := pq.findTxIndexUnsafe(tx) +// if !ok { +// pq.print() +// panic(fmt.Sprintf("INVARIANT (%s): did not find tx[0] hash=%x nonce=%d in heap", msg, tx.tx.Key(), tx.evmNonce)) +// } +// } +// if _, ok := hashes[fmt.Sprintf("%x", tx.tx.Key())]; ok { +// pq.print() +// panic(fmt.Sprintf("INVARIANT (%s): duplicate hash=%x in queue nonce=%d", msg, tx.tx.Key(), tx.evmNonce)) +// } +// hashes[fmt.Sprintf("%x", tx.tx.Key())] = true +// } +// } +//} + +// for debugging situations where invariant violations occur +//func (pq *TxPriorityQueue) print() { +// for _, tx := range pq.txs { +// fmt.Printf("DEBUG PRINT: heap: nonce=%d, hash=%x\n", tx.evmNonce, tx.tx.Key()) +// } +// +// for _, queue := range pq.evmQueue { +// for idx, tx := range queue { +// fmt.Printf("DEBUG PRINT: evmQueue[%d]: nonce=%d, hash=%x\n", idx, tx.evmNonce, tx.tx.Key()) +// } +// } +//} + // PushTx adds a valid transaction to the priority queue. It is thread safe. func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) { pq.mtx.Lock() @@ -185,6 +243,9 @@ func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) { } func (pq *TxPriorityQueue) popTxUnsafe() *WrappedTx { + if len(pq.txs) == 0 { + return nil + } x := heap.Pop(pq) if x == nil { return nil @@ -209,6 +270,31 @@ func (pq *TxPriorityQueue) PopTx() *WrappedTx { } // dequeue up to `max` transactions and reenqueue while locked +func (pq *TxPriorityQueue) ForEachTx(handler func(wtx *WrappedTx) bool) { + pq.mtx.Lock() + defer pq.mtx.Unlock() + + numTxs := len(pq.txs) + pq.numQueuedUnsafe() + + txs := make([]*WrappedTx, 0, numTxs) + + defer func() { + for _, tx := range txs { + pq.pushTxUnsafe(tx) + } + }() + + for i := 0; i < numTxs; i++ { + popped := pq.popTxUnsafe() + txs = append(txs, popped) + if !handler(popped) { + return + } + } +} + +// dequeue up to `max` transactions and reenqueue while locked +// TODO: use ForEachTx instead func (pq *TxPriorityQueue) PeekTxs(max int) []*WrappedTx { pq.mtx.Lock() defer pq.mtx.Unlock() diff --git a/internal/mempool/priority_queue_test.go b/internal/mempool/priority_queue_test.go index c3bc90853..c1e17d278 100644 --- a/internal/mempool/priority_queue_test.go +++ b/internal/mempool/priority_queue_test.go @@ -49,6 +49,29 @@ func TestTxPriorityQueue_ReapHalf(t *testing.T) { func TestTxPriorityQueue_PriorityAndNonceOrdering(t *testing.T) { testCases := []TxTestCase{ + { + name: "PriorityWithEVMAndNonEVMDuplicateNonce", + inputTxs: []*WrappedTx{ + {sender: "1", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 10}, + {sender: "3", isEVM: true, evmAddress: "0xabc", evmNonce: 3, priority: 9}, + {sender: "2", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 7}, + }, + expectedOutput: []int64{1, 2, 3}, + }, + { + name: "PriorityWithEVMAndNonEVMDuplicateNonce", + inputTxs: []*WrappedTx{ + {sender: "1", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 10}, + {sender: "2", isEVM: false, priority: 9}, + {sender: "4", isEVM: true, evmAddress: "0xabc", evmNonce: 0, priority: 9}, // Same EVM address as first, lower nonce + {sender: "5", isEVM: true, evmAddress: "0xdef", evmNonce: 1, priority: 7}, + {sender: "5", isEVM: true, evmAddress: "0xdef", evmNonce: 1, priority: 7}, + {sender: "3", isEVM: true, evmAddress: "0xdef", evmNonce: 0, priority: 8}, + {sender: "6", isEVM: false, priority: 6}, + {sender: "7", isEVM: true, evmAddress: "0xghi", evmNonce: 2, priority: 5}, + }, + expectedOutput: []int64{2, 4, 1, 3, 5, 5, 6, 7}, + }, { name: "PriorityWithEVMAndNonEVM", inputTxs: []*WrappedTx{ @@ -107,6 +130,7 @@ func TestTxPriorityQueue_PriorityAndNonceOrdering(t *testing.T) { // Add input transactions to the queue and set timestamp to order inserted for i, tx := range tc.inputTxs { tx.timestamp = now.Add(time.Duration(i) * time.Second) + tx.tx = []byte(fmt.Sprintf("%d", time.Now().UnixNano())) pq.PushTx(tx) } @@ -126,9 +150,9 @@ func TestTxPriorityQueue_SameAddressDifferentNonces(t *testing.T) { address := "0x123" // Insert transactions with the same address but different nonces and priorities - pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 2, priority: 10}) - pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 1, priority: 5}) - pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 3, priority: 15}) + pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 2, priority: 10, tx: []byte("tx1")}) + pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 1, priority: 5, tx: []byte("tx2")}) + pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 3, priority: 15, tx: []byte("tx3")}) // Pop transactions and verify they are in the correct order of nonce tx1 := pq.PopTx() @@ -154,6 +178,7 @@ func TestTxPriorityQueue(t *testing.T) { pq.PushTx(&WrappedTx{ priority: int64(i), timestamp: time.Now(), + tx: []byte(fmt.Sprintf("%d", i)), }) wg.Done() @@ -278,12 +303,14 @@ func TestTxPriorityQueue_RemoveTxEvm(t *testing.T) { isEVM: true, evmAddress: "0xabc", evmNonce: 1, + tx: []byte("tx1"), } tx2 := &WrappedTx{ priority: 1, isEVM: true, evmAddress: "0xabc", evmNonce: 2, + tx: []byte("tx2"), } pq.PushTx(tx1) @@ -306,6 +333,7 @@ func TestTxPriorityQueue_RemoveTx(t *testing.T) { x := rng.Intn(100000) pq.PushTx(&WrappedTx{ priority: int64(x), + tx: []byte(fmt.Sprintf("%d", i)), }) values[i] = x diff --git a/internal/rpc/core/mempool.go b/internal/rpc/core/mempool.go index 1e8628641..eaad97b4f 100644 --- a/internal/rpc/core/mempool.go +++ b/internal/rpc/core/mempool.go @@ -157,6 +157,9 @@ func (env *Environment) UnconfirmedTxs(ctx context.Context, req *coretypes.Reque skipCount := validateSkipCount(page, perPage) txs := env.Mempool.ReapMaxTxs(skipCount + tmmath.MinInt(perPage, totalCount-skipCount)) + if skipCount > len(txs) { + skipCount = len(txs) + } result := txs[skipCount:] return &coretypes.ResultUnconfirmedTxs{ diff --git a/types/tx.go b/types/tx.go index 0a37d1e9d..5b7c2377e 100644 --- a/types/tx.go +++ b/types/tx.go @@ -184,7 +184,7 @@ func (t TxRecordSet) Validate(maxSizeBytes int64, otxs Txs) error { for i, cur := range allCopy { // allCopy is sorted, so any duplicated data will be adjacent. if i+1 < len(allCopy) && bytes.Equal(cur, allCopy[i+1]) { - return fmt.Errorf("found duplicate transaction with hash: %x", cur.Hash()) + return fmt.Errorf("found duplicate transaction with hash: %x", cur.Key()) } }