From 8eb1c998b38a717729c3b42be793cf318683d781 Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Wed, 30 Oct 2024 11:12:42 +0000 Subject: [PATCH] fix: only move back to pending txs that were actually reorged --- .../p2p/src/client/p2p_client.test.ts | 12 +++++++++--- yarn-project/p2p/src/client/p2p_client.ts | 19 +++++++++++++++---- .../src/mem_pools/tx_pool/aztec_kv_tx_pool.ts | 15 +++++++++------ .../src/mem_pools/tx_pool/memory_tx_pool.ts | 12 ++++++------ .../p2p/src/mem_pools/tx_pool/tx_pool.ts | 4 ++-- .../mem_pools/tx_pool/tx_pool_test_suite.ts | 13 ++++++++----- 6 files changed, 49 insertions(+), 26 deletions(-) diff --git a/yarn-project/p2p/src/client/p2p_client.test.ts b/yarn-project/p2p/src/client/p2p_client.test.ts index 421838e220d..0cf8baeddf1 100644 --- a/yarn-project/p2p/src/client/p2p_client.test.ts +++ b/yarn-project/p2p/src/client/p2p_client.test.ts @@ -302,17 +302,23 @@ describe('In-Memory P2P Client', () => { blockSource.setProvenBlockNumber(0); await client.start(); - // add two txs to the pool. One build against block 90, one against block 95 + // add three txs to the pool built against different blocks // then prune the chain back to block 90 // only one tx should be deleted + const goodButOldTx = mockTx(); + goodButOldTx.data.constants.globalVariables.blockNumber = new Fr(89); + const goodTx = mockTx(); goodTx.data.constants.globalVariables.blockNumber = new Fr(90); const badTx = mockTx(); badTx.data.constants.globalVariables.blockNumber = new Fr(95); - txPool.getAllTxs.mockReturnValue([goodTx, badTx]); - txPool.getMinedTxHashes.mockReturnValue([goodTx.getTxHash()]); + txPool.getAllTxs.mockReturnValue([goodButOldTx, goodTx, badTx]); + txPool.getMinedTxHashes.mockReturnValue([ + [goodButOldTx.getTxHash(), 90], + [goodTx.getTxHash(), 91], + ]); blockSource.removeBlocks(10); await sleep(150); diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index ef7b6abca30..3ce86f36d1d 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -448,7 +448,7 @@ export class P2PClient extends WithTracer implements P2P { } else if (filter === 'mined') { return this.txPool .getMinedTxHashes() - .map(txHash => this.txPool.getTxByHash(txHash)) + .map(([txHash]) => this.txPool.getTxByHash(txHash)) .filter((tx): tx is Tx => !!tx); } else if (filter === 'pending') { return this.txPool @@ -567,7 +567,7 @@ export class P2PClient extends WithTracer implements P2P { private async markTxsAsMinedFromBlocks(blocks: L2Block[]): Promise { for (const block of blocks) { const txHashes = block.body.txEffects.map(txEffect => txEffect.txHash); - await this.txPool.markAsMined(txHashes); + await this.txPool.markAsMined(txHashes, block.number); } } @@ -655,9 +655,20 @@ export class P2PClient extends WithTracer implements P2P { // delete invalid txs (both pending and mined) await this.txPool.deleteTxs(txsToDelete); + // everything left in the mined set was built against a block on the proven chain so its still valid - // move back to pending set - await this.txPool.markMinedAsPending(this.txPool.getMinedTxHashes()); + // move back to pending the txs that were reorged out of the chain + // NOTE: we can't move _all_ txs back to pending because the tx pool could keep hold of mined txs for longer + // (see this.keepProvenTxsFor) + const txsToMoveToPending: TxHash[] = []; + for (const [txHash, blockNumber] of this.txPool.getMinedTxHashes()) { + if (blockNumber > latestBlock) { + txsToMoveToPending.push(txHash); + } + } + + this.log.info(`Moving ${txsToMoveToPending.length} mined txs back to pending`); + await this.txPool.markMinedAsPending(txsToMoveToPending); await this.synchedLatestBlockNumber.set(latestBlock); // no need to update block hashes, as they will be updated as new blocks are added diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts index 4600aa9ee42..04d931c4240 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts @@ -19,7 +19,7 @@ export class AztecKVTxPool implements TxPool { /** Index for pending txs. */ #pendingTxs: AztecSet; /** Index for mined txs. */ - #minedTxs: AztecSet; + #minedTxs: AztecMap; #log: Logger; @@ -32,7 +32,7 @@ export class AztecKVTxPool implements TxPool { */ constructor(store: AztecKVStore, telemetry: TelemetryClient, log = createDebugLogger('aztec:tx_pool')) { this.#txs = store.openMap('txs'); - this.#minedTxs = store.openSet('minedTxs'); + this.#minedTxs = store.openMap('minedTxs'); this.#pendingTxs = store.openSet('pendingTxs'); this.#store = store; @@ -40,12 +40,12 @@ export class AztecKVTxPool implements TxPool { this.#metrics = new PoolInstrumentation(telemetry, 'AztecKVTxPool'); } - public markAsMined(txHashes: TxHash[]): Promise { + public markAsMined(txHashes: TxHash[], blockNumber: number): Promise { return this.#store.transaction(() => { let deleted = 0; for (const hash of txHashes) { const key = hash.toString(); - void this.#minedTxs.add(key); + void this.#minedTxs.set(key, blockNumber); if (this.#pendingTxs.has(key)) { deleted++; void this.#pendingTxs.delete(key); @@ -86,8 +86,11 @@ export class AztecKVTxPool implements TxPool { return Array.from(this.#pendingTxs.entries()).map(x => TxHash.fromString(x)); } - public getMinedTxHashes(): TxHash[] { - return Array.from(this.#minedTxs.entries()).map(x => TxHash.fromString(x)); + public getMinedTxHashes(): [TxHash, number][] { + return Array.from(this.#minedTxs.entries()).map(([txHash, blockNumber]) => [ + TxHash.fromString(txHash), + blockNumber, + ]); } public getTxStatus(txHash: TxHash): 'pending' | 'mined' | undefined { diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts b/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts index 80efff265d6..f7d6b59fea4 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts @@ -14,7 +14,7 @@ export class InMemoryTxPool implements TxPool { * Our tx pool, stored as a Map in-memory, with K: tx hash and V: the transaction. */ private txs: Map; - private minedTxs: Set; + private minedTxs: Map; private pendingTxs: Set; private metrics: PoolInstrumentation; @@ -25,15 +25,15 @@ export class InMemoryTxPool implements TxPool { */ constructor(telemetry: TelemetryClient, private log = createDebugLogger('aztec:tx_pool')) { this.txs = new Map(); - this.minedTxs = new Set(); + this.minedTxs = new Map(); this.pendingTxs = new Set(); this.metrics = new PoolInstrumentation(telemetry, 'InMemoryTxPool'); } - public markAsMined(txHashes: TxHash[]): Promise { + public markAsMined(txHashes: TxHash[], blockNumber: number): Promise { const keys = txHashes.map(x => x.toBigInt()); for (const key of keys) { - this.minedTxs.add(key); + this.minedTxs.set(key, blockNumber); this.pendingTxs.delete(key); } this.metrics.recordRemovedObjects(txHashes.length, 'pending'); @@ -71,8 +71,8 @@ export class InMemoryTxPool implements TxPool { return Array.from(this.pendingTxs).map(x => TxHash.fromBigInt(x)); } - public getMinedTxHashes(): TxHash[] { - return Array.from(this.minedTxs).map(x => TxHash.fromBigInt(x)); + public getMinedTxHashes(): [TxHash, number][] { + return Array.from(this.minedTxs.entries()).map(([txHash, blockNumber]) => [TxHash.fromBigInt(txHash), blockNumber]); } public getTxStatus(txHash: TxHash): 'pending' | 'mined' | undefined { diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts b/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts index 3ce6ed670e2..01511951f8a 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts @@ -21,7 +21,7 @@ export interface TxPool { * Marks the set of txs as mined, as opposed to pending. * @param txHashes - Hashes of the txs to flag as mined. */ - markAsMined(txHashes: TxHash[]): Promise; + markAsMined(txHashes: TxHash[], blockNumber: number): Promise; /** * Moves mined txs back to the pending set in the case of a reorg. @@ -58,7 +58,7 @@ export interface TxPool { * Gets the hashes of mined transactions currently in the tx pool. * @returns An array of mined transaction hashes found in the tx pool. */ - getMinedTxHashes(): TxHash[]; + getMinedTxHashes(): [tx: TxHash, blockNumber: number][]; /** * Returns whether the given tx hash is flagged as pending or mined. diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool_test_suite.ts b/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool_test_suite.ts index 1911fb0ec1a..35af12fbd68 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool_test_suite.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool_test_suite.ts @@ -38,11 +38,11 @@ export function describeTxPool(getTxPool: () => TxPool) { const tx2 = mockTx(2); await pool.addTxs([tx1, tx2]); - await pool.markAsMined([tx1.getTxHash()]); + await pool.markAsMined([tx1.getTxHash()], 1); expect(pool.getTxByHash(tx1.getTxHash())).toEqual(tx1); expect(pool.getTxStatus(tx1.getTxHash())).toEqual('mined'); - expect(pool.getMinedTxHashes()).toEqual([tx1.getTxHash()]); + expect(pool.getMinedTxHashes()).toEqual([[tx1.getTxHash(), 1]]); expect(pool.getPendingTxHashes()).toEqual([tx2.getTxHash()]); }); @@ -51,7 +51,7 @@ export function describeTxPool(getTxPool: () => TxPool) { const tx2 = mockTx(2); await pool.addTxs([tx1, tx2]); - await pool.markAsMined([tx1.getTxHash()]); + await pool.markAsMined([tx1.getTxHash()], 1); await pool.markMinedAsPending([tx1.getTxHash()]); expect(pool.getMinedTxHashes()).toEqual([]); @@ -66,8 +66,11 @@ export function describeTxPool(getTxPool: () => TxPool) { const someTxHashThatThisPeerDidNotSee = mockTx(2).getTxHash(); await pool.addTxs([tx1]); // this peer knows that tx2 was mined, but it does not have the tx object - await pool.markAsMined([tx1.getTxHash(), someTxHashThatThisPeerDidNotSee]); - expect(pool.getMinedTxHashes()).toEqual([tx1.getTxHash(), someTxHashThatThisPeerDidNotSee]); + await pool.markAsMined([tx1.getTxHash(), someTxHashThatThisPeerDidNotSee], 1); + expect(pool.getMinedTxHashes()).toEqual([ + [tx1.getTxHash(), 1], + [someTxHashThatThisPeerDidNotSee, 1], + ]); // reorg: both txs should now become available again await pool.markMinedAsPending([tx1.getTxHash(), someTxHashThatThisPeerDidNotSee]);