Skip to content

Commit

Permalink
fix: only move back to pending txs that were actually reorged
Browse files Browse the repository at this point in the history
  • Loading branch information
alexghr committed Oct 30, 2024
1 parent 6a2547c commit 8eb1c99
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 26 deletions.
12 changes: 9 additions & 3 deletions yarn-project/p2p/src/client/p2p_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -302,17 +302,23 @@ describe('In-Memory P2P Client', () => {
blockSource.setProvenBlockNumber(0);
await client.start();

// add two txs to the pool. One build against block 90, one against block 95
// add three txs to the pool built against different blocks
// then prune the chain back to block 90
// only one tx should be deleted
const goodButOldTx = mockTx();
goodButOldTx.data.constants.globalVariables.blockNumber = new Fr(89);

const goodTx = mockTx();
goodTx.data.constants.globalVariables.blockNumber = new Fr(90);

const badTx = mockTx();
badTx.data.constants.globalVariables.blockNumber = new Fr(95);

txPool.getAllTxs.mockReturnValue([goodTx, badTx]);
txPool.getMinedTxHashes.mockReturnValue([goodTx.getTxHash()]);
txPool.getAllTxs.mockReturnValue([goodButOldTx, goodTx, badTx]);
txPool.getMinedTxHashes.mockReturnValue([
[goodButOldTx.getTxHash(), 90],
[goodTx.getTxHash(), 91],
]);

blockSource.removeBlocks(10);
await sleep(150);
Expand Down
19 changes: 15 additions & 4 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ export class P2PClient extends WithTracer implements P2P {
} else if (filter === 'mined') {
return this.txPool
.getMinedTxHashes()
.map(txHash => this.txPool.getTxByHash(txHash))
.map(([txHash]) => this.txPool.getTxByHash(txHash))
.filter((tx): tx is Tx => !!tx);
} else if (filter === 'pending') {
return this.txPool
Expand Down Expand Up @@ -567,7 +567,7 @@ export class P2PClient extends WithTracer implements P2P {
private async markTxsAsMinedFromBlocks(blocks: L2Block[]): Promise<void> {
for (const block of blocks) {
const txHashes = block.body.txEffects.map(txEffect => txEffect.txHash);
await this.txPool.markAsMined(txHashes);
await this.txPool.markAsMined(txHashes, block.number);
}
}

Expand Down Expand Up @@ -655,9 +655,20 @@ export class P2PClient extends WithTracer implements P2P {

// delete invalid txs (both pending and mined)
await this.txPool.deleteTxs(txsToDelete);

// everything left in the mined set was built against a block on the proven chain so its still valid
// move back to pending set
await this.txPool.markMinedAsPending(this.txPool.getMinedTxHashes());
// move back to pending the txs that were reorged out of the chain
// NOTE: we can't move _all_ txs back to pending because the tx pool could keep hold of mined txs for longer
// (see this.keepProvenTxsFor)
const txsToMoveToPending: TxHash[] = [];
for (const [txHash, blockNumber] of this.txPool.getMinedTxHashes()) {
if (blockNumber > latestBlock) {
txsToMoveToPending.push(txHash);
}
}

this.log.info(`Moving ${txsToMoveToPending.length} mined txs back to pending`);
await this.txPool.markMinedAsPending(txsToMoveToPending);

await this.synchedLatestBlockNumber.set(latestBlock);
// no need to update block hashes, as they will be updated as new blocks are added
Expand Down
15 changes: 9 additions & 6 deletions yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export class AztecKVTxPool implements TxPool {
/** Index for pending txs. */
#pendingTxs: AztecSet<string>;
/** Index for mined txs. */
#minedTxs: AztecSet<string>;
#minedTxs: AztecMap<string, number>;

#log: Logger;

Expand All @@ -32,20 +32,20 @@ export class AztecKVTxPool implements TxPool {
*/
constructor(store: AztecKVStore, telemetry: TelemetryClient, log = createDebugLogger('aztec:tx_pool')) {
this.#txs = store.openMap('txs');
this.#minedTxs = store.openSet('minedTxs');
this.#minedTxs = store.openMap('minedTxs');
this.#pendingTxs = store.openSet('pendingTxs');

this.#store = store;
this.#log = log;
this.#metrics = new PoolInstrumentation(telemetry, 'AztecKVTxPool');
}

public markAsMined(txHashes: TxHash[]): Promise<void> {
public markAsMined(txHashes: TxHash[], blockNumber: number): Promise<void> {
return this.#store.transaction(() => {
let deleted = 0;
for (const hash of txHashes) {
const key = hash.toString();
void this.#minedTxs.add(key);
void this.#minedTxs.set(key, blockNumber);
if (this.#pendingTxs.has(key)) {
deleted++;
void this.#pendingTxs.delete(key);
Expand Down Expand Up @@ -86,8 +86,11 @@ export class AztecKVTxPool implements TxPool {
return Array.from(this.#pendingTxs.entries()).map(x => TxHash.fromString(x));
}

public getMinedTxHashes(): TxHash[] {
return Array.from(this.#minedTxs.entries()).map(x => TxHash.fromString(x));
public getMinedTxHashes(): [TxHash, number][] {
return Array.from(this.#minedTxs.entries()).map(([txHash, blockNumber]) => [
TxHash.fromString(txHash),
blockNumber,
]);
}

public getTxStatus(txHash: TxHash): 'pending' | 'mined' | undefined {
Expand Down
12 changes: 6 additions & 6 deletions yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export class InMemoryTxPool implements TxPool {
* Our tx pool, stored as a Map in-memory, with K: tx hash and V: the transaction.
*/
private txs: Map<bigint, Tx>;
private minedTxs: Set<bigint>;
private minedTxs: Map<bigint, number>;
private pendingTxs: Set<bigint>;

private metrics: PoolInstrumentation<Tx>;
Expand All @@ -25,15 +25,15 @@ export class InMemoryTxPool implements TxPool {
*/
constructor(telemetry: TelemetryClient, private log = createDebugLogger('aztec:tx_pool')) {
this.txs = new Map<bigint, Tx>();
this.minedTxs = new Set();
this.minedTxs = new Map();
this.pendingTxs = new Set();
this.metrics = new PoolInstrumentation(telemetry, 'InMemoryTxPool');
}

public markAsMined(txHashes: TxHash[]): Promise<void> {
public markAsMined(txHashes: TxHash[], blockNumber: number): Promise<void> {
const keys = txHashes.map(x => x.toBigInt());
for (const key of keys) {
this.minedTxs.add(key);
this.minedTxs.set(key, blockNumber);
this.pendingTxs.delete(key);
}
this.metrics.recordRemovedObjects(txHashes.length, 'pending');
Expand Down Expand Up @@ -71,8 +71,8 @@ export class InMemoryTxPool implements TxPool {
return Array.from(this.pendingTxs).map(x => TxHash.fromBigInt(x));
}

public getMinedTxHashes(): TxHash[] {
return Array.from(this.minedTxs).map(x => TxHash.fromBigInt(x));
public getMinedTxHashes(): [TxHash, number][] {
return Array.from(this.minedTxs.entries()).map(([txHash, blockNumber]) => [TxHash.fromBigInt(txHash), blockNumber]);
}

public getTxStatus(txHash: TxHash): 'pending' | 'mined' | undefined {
Expand Down
4 changes: 2 additions & 2 deletions yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export interface TxPool {
* Marks the set of txs as mined, as opposed to pending.
* @param txHashes - Hashes of the txs to flag as mined.
*/
markAsMined(txHashes: TxHash[]): Promise<void>;
markAsMined(txHashes: TxHash[], blockNumber: number): Promise<void>;

/**
* Moves mined txs back to the pending set in the case of a reorg.
Expand Down Expand Up @@ -58,7 +58,7 @@ export interface TxPool {
* Gets the hashes of mined transactions currently in the tx pool.
* @returns An array of mined transaction hashes found in the tx pool.
*/
getMinedTxHashes(): TxHash[];
getMinedTxHashes(): [tx: TxHash, blockNumber: number][];

/**
* Returns whether the given tx hash is flagged as pending or mined.
Expand Down
13 changes: 8 additions & 5 deletions yarn-project/p2p/src/mem_pools/tx_pool/tx_pool_test_suite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ export function describeTxPool(getTxPool: () => TxPool) {
const tx2 = mockTx(2);

await pool.addTxs([tx1, tx2]);
await pool.markAsMined([tx1.getTxHash()]);
await pool.markAsMined([tx1.getTxHash()], 1);

expect(pool.getTxByHash(tx1.getTxHash())).toEqual(tx1);
expect(pool.getTxStatus(tx1.getTxHash())).toEqual('mined');
expect(pool.getMinedTxHashes()).toEqual([tx1.getTxHash()]);
expect(pool.getMinedTxHashes()).toEqual([[tx1.getTxHash(), 1]]);
expect(pool.getPendingTxHashes()).toEqual([tx2.getTxHash()]);
});

Expand All @@ -51,7 +51,7 @@ export function describeTxPool(getTxPool: () => TxPool) {
const tx2 = mockTx(2);

await pool.addTxs([tx1, tx2]);
await pool.markAsMined([tx1.getTxHash()]);
await pool.markAsMined([tx1.getTxHash()], 1);

await pool.markMinedAsPending([tx1.getTxHash()]);
expect(pool.getMinedTxHashes()).toEqual([]);
Expand All @@ -66,8 +66,11 @@ export function describeTxPool(getTxPool: () => TxPool) {
const someTxHashThatThisPeerDidNotSee = mockTx(2).getTxHash();
await pool.addTxs([tx1]);
// this peer knows that tx2 was mined, but it does not have the tx object
await pool.markAsMined([tx1.getTxHash(), someTxHashThatThisPeerDidNotSee]);
expect(pool.getMinedTxHashes()).toEqual([tx1.getTxHash(), someTxHashThatThisPeerDidNotSee]);
await pool.markAsMined([tx1.getTxHash(), someTxHashThatThisPeerDidNotSee], 1);
expect(pool.getMinedTxHashes()).toEqual([
[tx1.getTxHash(), 1],
[someTxHashThatThisPeerDidNotSee, 1],
]);

// reorg: both txs should now become available again
await pool.markMinedAsPending([tx1.getTxHash(), someTxHashThatThisPeerDidNotSee]);
Expand Down

0 comments on commit 8eb1c99

Please sign in to comment.