From 58e8e55325ce0639d17eb63e89bdb4c6ddb17bd6 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Thu, 21 Mar 2024 20:08:11 -0400 Subject: [PATCH] Add heapIndex with safety check (#213) * add heapIndex with safety check * cleanup * comment out for perf test * add back perf improvement * fix nil test * Use write-lock in (*TxPriorityQueue).ReapMax funcs (#209) ReapMaxBytesMaxGas and ReapMaxTxs funcs in TxPriorityQueue claim > Transactions returned are not removed from the mempool transaction > store or indexes. However, they use a priority queue to accomplish the claim > Transaction are retrieved in priority order. This is accomplished by popping all items out of the whole heap, and then pushing then back in sequentially. A copy of the heap cannot be obtained otherwise. Both of the mentioned functions use a read-lock (RLock) when doing this. This results in a potential scenario where multiple executions of the ReapMax can be started in parallel, and both would be popping items out of the priority queue. In practice, this can be abused by executing the `unconfirmed_txs` RPC call repeatedly. Based on our observations, running it multiple times per millisecond results in multiple threads picking it up at the same time. Such a scenario can be obtained via the WebSocket interface, and spamming `unconfirmed_txs` calls there. The behavior that happens is a `Panic in WSJSONRPC handler` when a queue item unexpectedly disappears for `mempool.(*TxPriorityQueue).Swap`. (`runtime error: index out of range [0] with length 0`) This can additionally lead to a `CONSENSUS FAILURE!!!` if the race condition occurs for `internal/consensus.(*State).finalizeCommit` when it tries to do `mempool.(*TxPriorityQueue).RemoveTx`, but the ReapMax has already removed all elements from the underlying heap. (`runtime error: index out of range [-1]`) This commit switches the lock type to a write-lock (Lock) to ensure no parallel modifications take place. This commit additionally updates the tests to allow parallel execution of the func calls in testing, as to prevent regressions (in case someone wants to downgrade the locks without considering the implications from the underlying heap usage). --------- Co-authored-by: Valters Jansons --- internal/mempool/priority_queue.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/internal/mempool/priority_queue.go b/internal/mempool/priority_queue.go index 0dcc6edd0..6b420bb67 100644 --- a/internal/mempool/priority_queue.go +++ b/internal/mempool/priority_queue.go @@ -178,6 +178,12 @@ func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) (removedIdx in } func (pq *TxPriorityQueue) findTxIndexUnsafe(tx *WrappedTx) (int, bool) { + // safety check for race situation where heapIndex is out of range of txs + if tx.heapIndex >= 0 && tx.heapIndex < len(pq.txs) && pq.txs[tx.heapIndex].tx.Key() == tx.tx.Key() { + return tx.heapIndex, true + } + + // heap index isn't trustable here, so attempt to find it for i, t := range pq.txs { if t.tx.Key() == tx.tx.Key() { return i, true @@ -443,7 +449,9 @@ func (pq *TxPriorityQueue) PeekTxs(max int) []*WrappedTx { // // NOTE: A caller should never call Push. Use PushTx instead. func (pq *TxPriorityQueue) Push(x interface{}) { + n := len(pq.txs) item := x.(*WrappedTx) + item.heapIndex = n pq.txs = append(pq.txs, item) } @@ -454,7 +462,8 @@ func (pq *TxPriorityQueue) Pop() interface{} { old := pq.txs n := len(old) item := old[n-1] - old[n-1] = nil // avoid memory leak + old[n-1] = nil // avoid memory leak + setHeapIndex(item, -1) // for safety pq.txs = old[0 : n-1] return item } @@ -483,4 +492,14 @@ func (pq *TxPriorityQueue) Less(i, j int) bool { // Swap implements the Heap interface. It swaps two transactions in the queue. func (pq *TxPriorityQueue) Swap(i, j int) { pq.txs[i], pq.txs[j] = pq.txs[j], pq.txs[i] + setHeapIndex(pq.txs[i], i) + setHeapIndex(pq.txs[j], j) +} + +func setHeapIndex(tx *WrappedTx, i int) { + // a removed tx can be nil + if tx == nil { + return + } + tx.heapIndex = i }