Skip to content

Commit

Permalink
feat: txpool persistence (#3672)
Browse files Browse the repository at this point in the history
This PR adds a new implementation of a `TxPool` based on the kv-store
introduced in #3628. It also amends the p2p-client's bootstrap flow.

The p2p-client saves the block number its on to the database and
restores it the next time it is started. The initial sync happens as
before (it syncs to the the tip of the chain, but instead of syncing
from 0, it syncs from the last known block number) and after that, it
re-publishes any transactions in its TxPool that haven't been processed
already.

Fix #3365
  • Loading branch information
alexghr authored Jan 2, 2024
1 parent 2db2e2a commit 4dd076c
Show file tree
Hide file tree
Showing 21 changed files with 308 additions and 73 deletions.
1 change: 1 addition & 0 deletions yarn-project/aztec-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"@aztec/circuits.js": "workspace:^",
"@aztec/ethereum": "workspace:^",
"@aztec/foundation": "workspace:^",
"@aztec/kv-store": "workspace:^",
"@aztec/l1-artifacts": "workspace:^",
"@aztec/merkle-tree": "workspace:^",
"@aztec/p2p": "workspace:^",
Expand Down
6 changes: 4 additions & 2 deletions yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import { computeGlobalsHash, computePublicDataTreeLeafSlot } from '@aztec/circui
import { L1ContractAddresses, createEthereumChain } from '@aztec/ethereum';
import { AztecAddress } from '@aztec/foundation/aztec-address';
import { createDebugLogger } from '@aztec/foundation/log';
import { InMemoryTxPool, P2P, createP2PClient } from '@aztec/p2p';
import { AztecLmdbStore } from '@aztec/kv-store';
import { AztecKVTxPool, P2P, createP2PClient } from '@aztec/p2p';
import {
GlobalVariableBuilder,
PublicProcessorFactory,
Expand Down Expand Up @@ -105,6 +106,7 @@ export class AztecNodeService implements AztecNode {
}

const log = createDebugLogger('aztec:node');
const store = await AztecLmdbStore.create(config.l1Contracts.rollupAddress, config.dataDirectory);
const [nodeDb, worldStateDb] = await openDb(config, log);

// first create and sync the archiver
Expand All @@ -116,7 +118,7 @@ export class AztecNodeService implements AztecNode {
config.transactionProtocol = `/aztec/tx/${config.l1Contracts.rollupAddress.toString()}`;

// create the tx pool and the p2p client, which will need the l2 block source
const p2pClient = await createP2PClient(config, new InMemoryTxPool(), archiver);
const p2pClient = await createP2PClient(store, config, new AztecKVTxPool(store), archiver);

// now create the merkle trees and the world state synchronizer
const merkleTrees = await MerkleTrees.new(worldStateDb);
Expand Down
3 changes: 3 additions & 0 deletions yarn-project/aztec-node/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
},
{
"path": "../world-state"
},
{
"path": "../kv-store"
}
],
"include": ["src"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ describe('benchmarks/process_history', () => {
// Send enough txs to move the chain to the next block number checkpoint
const txCount = (chainLength - lastBlock) * BLOCK_SIZE;
const sentTxs = await sendTxs(txCount, context, contract);
await sentTxs[sentTxs.length - 1].wait({ timeout: 5 * 60_000 });
await Promise.all(sentTxs.map(tx => tx.wait({ timeout: 5 * 60_000 })));
await sleep(100);

// Create a new node and measure how much time it takes it to sync
Expand Down
28 changes: 21 additions & 7 deletions yarn-project/end-to-end/src/e2e_block_building.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import {
DebugLogger,
Fr,
PXE,
SentTx,
TxReceipt,
TxStatus,
Wallet,
isContractDeployed,
Expand Down Expand Up @@ -72,7 +74,7 @@ describe('e2e_block_building', () => {
expect(areDeployed).toEqual(times(TX_COUNT, () => true));
}, 60_000);

it('can call public function from different tx in same block', async () => {
it.skip('can call public function from different tx in same block', async () => {
// Ensure both txs will land on the same block
await aztecNode.setConfig({ minTxsPerBlock: 2 });

Expand Down Expand Up @@ -125,8 +127,7 @@ describe('e2e_block_building', () => {
await call.simulate();
}
const [tx1, tx2] = calls.map(call => call.send());
await tx1.wait();
await expect(tx2.wait()).rejects.toThrowError(/dropped/);
await expectXorTx(tx1, tx2);
}, 30_000);

it('drops tx with public nullifier already emitted on the same block', async () => {
Expand All @@ -136,8 +137,7 @@ describe('e2e_block_building', () => {
await call.simulate();
}
const [tx1, tx2] = calls.map(call => call.send());
await tx1.wait();
await expect(tx2.wait()).rejects.toThrowError(/dropped/);
await expectXorTx(tx1, tx2);
}, 30_000);

it('drops tx with two equal nullifiers', async () => {
Expand All @@ -160,8 +160,22 @@ describe('e2e_block_building', () => {
await call.simulate();
}
const [tx1, tx2] = calls.map(call => call.send());
await tx1.wait();
await expect(tx2.wait()).rejects.toThrowError(/dropped/);
await expectXorTx(tx1, tx2);
});
});
});

/**
* Checks that only one of the two provided transactions succeeds.
* @param tx1 - A transaction.
* @param tx2 - Another transaction.
*/
async function expectXorTx(tx1: SentTx, tx2: SentTx) {
const receipts = await Promise.allSettled([tx1.wait(), tx2.wait()]);
const succeeded = receipts.find((r): r is PromiseSettledResult<TxReceipt> => r.status === 'fulfilled');
const failed = receipts.find((r): r is PromiseRejectedResult => r.status === 'rejected');

expect(succeeded).toBeDefined();
expect(failed).toBeDefined();
expect((failed?.reason as Error).message).toMatch(/dropped/);
}
1 change: 1 addition & 0 deletions yarn-project/p2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"dependencies": {
"@aztec/circuits.js": "workspace:^",
"@aztec/foundation": "workspace:^",
"@aztec/kv-store": "workspace:^",
"@aztec/types": "workspace:^",
"@chainsafe/libp2p-noise": "^13.0.0",
"@chainsafe/libp2p-yamux": "^5.0.0",
Expand Down
10 changes: 8 additions & 2 deletions yarn-project/p2p/src/client/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { AztecKVStore } from '@aztec/kv-store';
import { L2BlockSource } from '@aztec/types';

import { P2PClient } from '../client/p2p_client.js';
Expand All @@ -8,7 +9,12 @@ import { TxPool } from '../tx_pool/index.js';

export * from './p2p_client.js';

export const createP2PClient = async (config: P2PConfig, txPool: TxPool, l2BlockSource: L2BlockSource) => {
export const createP2PClient = async (
store: AztecKVStore,
config: P2PConfig,
txPool: TxPool,
l2BlockSource: L2BlockSource,
) => {
const p2pService = config.p2pEnabled ? await LibP2PService.new(config, txPool) : new DummyP2PService();
return new P2PClient(l2BlockSource, txPool, p2pService);
return new P2PClient(store, l2BlockSource, txPool, p2pService);
};
31 changes: 27 additions & 4 deletions yarn-project/p2p/src/client/p2p_client.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { EthAddress } from '@aztec/circuits.js';
import { AztecKVStore, AztecLmdbStore } from '@aztec/kv-store';
import { L2BlockSource, mockTx } from '@aztec/types';

import { expect, jest } from '@jest/globals';
Expand All @@ -18,8 +20,10 @@ describe('In-Memory P2P Client', () => {
let txPool: Mockify<TxPool>;
let blockSource: L2BlockSource;
let p2pService: Mockify<P2PService>;
let kvStore: AztecKVStore;
let client: P2PClient;

beforeEach(() => {
beforeEach(async () => {
txPool = {
addTxs: jest.fn(),
getTxByHash: jest.fn().mockReturnValue(undefined),
Expand All @@ -37,10 +41,12 @@ describe('In-Memory P2P Client', () => {
};

blockSource = new MockBlockSource();

kvStore = await AztecLmdbStore.create(EthAddress.random());
client = new P2PClient(kvStore, blockSource, txPool, p2pService);
});

it('can start & stop', async () => {
const client = new P2PClient(blockSource, txPool, p2pService);
expect(await client.isReady()).toEqual(false);

await client.start();
Expand All @@ -51,7 +57,6 @@ describe('In-Memory P2P Client', () => {
});

it('adds txs to pool', async () => {
const client = new P2PClient(blockSource, txPool, p2pService);
await client.start();
const tx1 = mockTx();
const tx2 = mockTx();
Expand All @@ -63,7 +68,6 @@ describe('In-Memory P2P Client', () => {
});

it('rejects txs after being stopped', async () => {
const client = new P2PClient(blockSource, txPool, p2pService);
await client.start();
const tx1 = mockTx();
const tx2 = mockTx();
Expand All @@ -76,4 +80,23 @@ describe('In-Memory P2P Client', () => {
await expect(client.sendTx(tx3)).rejects.toThrow();
expect(txPool.addTxs).toHaveBeenCalledTimes(2);
});

it('republishes previously stored txs on start', async () => {
const tx1 = mockTx();
const tx2 = mockTx();
txPool.getAllTxs.mockReturnValue([tx1, tx2]);

await client.start();
expect(p2pService.propagateTx).toHaveBeenCalledTimes(2);
expect(p2pService.propagateTx).toHaveBeenCalledWith(tx1);
expect(p2pService.propagateTx).toHaveBeenCalledWith(tx2);
});

it('restores the previous block number it was at', async () => {
await client.start();
await client.stop();

const client2 = new P2PClient(kvStore, blockSource, txPool, p2pService);
expect(client2.getSyncedBlockNum()).toEqual(client.getSyncedBlockNum());
});
});
57 changes: 41 additions & 16 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
import { createDebugLogger } from '@aztec/foundation/log';
import { L2Block, L2BlockContext, L2BlockDownloader, L2BlockSource, Tx, TxHash } from '@aztec/types';
import { AztecKVStore, AztecSingleton } from '@aztec/kv-store';
import {
INITIAL_L2_BLOCK_NUM,
L2Block,
L2BlockContext,
L2BlockDownloader,
L2BlockSource,
Tx,
TxHash,
} from '@aztec/types';

import { getP2PConfigEnvVars } from '../config.js';
import { P2PService } from '../service/service.js';
Expand Down Expand Up @@ -102,31 +111,30 @@ export class P2PClient implements P2P {
*/
private runningPromise!: Promise<void>;

/**
* Store the ID of the latest block the client has synced to.
*/
private currentL2BlockNum = 0;

private currentState = P2PClientState.IDLE;
private syncPromise = Promise.resolve();
private latestBlockNumberAtStart = -1;
private syncResolve?: () => void = undefined;
private synchedBlockNumber: AztecSingleton<number>;

/**
* In-memory P2P client constructor.
* @param store - The client's instance of the KV store.
* @param l2BlockSource - P2P client's source for fetching existing blocks.
* @param txPool - The client's instance of a transaction pool. Defaults to in-memory implementation.
* @param p2pService - The concrete instance of p2p networking to use.
* @param log - A logger.
*/
constructor(
store: AztecKVStore,
private l2BlockSource: L2BlockSource,
private txPool: TxPool,
private p2pService: P2PService,
private log = createDebugLogger('aztec:p2p'),
) {
const { p2pBlockCheckIntervalMS: checkInterval, l2QueueSize } = getP2PConfigEnvVars();
this.blockDownloader = new L2BlockDownloader(l2BlockSource, l2QueueSize, checkInterval);
this.synchedBlockNumber = store.createSingleton('p2p_pool_last_l2_block');
}

/**
Expand All @@ -144,7 +152,7 @@ export class P2PClient implements P2P {
// get the current latest block number
this.latestBlockNumberAtStart = await this.l2BlockSource.getBlockNumber();

const blockToDownloadFrom = this.currentL2BlockNum + 1;
const blockToDownloadFrom = this.getSyncedBlockNum() + 1;

// if there are blocks to be retrieved, go to a synching state
if (blockToDownloadFrom <= this.latestBlockNumberAtStart) {
Expand All @@ -161,6 +169,9 @@ export class P2PClient implements P2P {
this.log(`Next block ${blockToDownloadFrom} already beyond latest block at ${this.latestBlockNumberAtStart}`);
}

// publish any txs in TxPool after its doing initial sync
this.syncPromise = this.syncPromise.then(() => this.publishStoredTxs());

// start looking for further blocks
const blockProcess = async () => {
while (!this.stopping) {
Expand All @@ -171,6 +182,7 @@ export class P2PClient implements P2P {
this.runningPromise = blockProcess();
this.blockDownloader.start(blockToDownloadFrom);
this.log(`Started block downloader from block ${blockToDownloadFrom}`);

return this.syncPromise;
}

Expand Down Expand Up @@ -229,7 +241,7 @@ export class P2PClient implements P2P {
if (!ready) {
throw new Error('P2P client not ready');
}
this.txPool.deleteTxs(txHashes);
await this.txPool.deleteTxs(txHashes);
}

/**
Expand All @@ -245,7 +257,7 @@ export class P2PClient implements P2P {
* @returns Block number of latest L2 Block we've synced with.
*/
public getSyncedBlockNum() {
return this.currentL2BlockNum;
return this.synchedBlockNumber.get() ?? INITIAL_L2_BLOCK_NUM - 1;
}

/**
Expand All @@ -255,7 +267,7 @@ export class P2PClient implements P2P {
public getStatus(): Promise<P2PSyncState> {
return Promise.resolve({
state: this.currentState,
syncedToL2Block: this.currentL2BlockNum,
syncedToL2Block: this.getSyncedBlockNum(),
} as P2PSyncState);
}

Expand All @@ -264,14 +276,13 @@ export class P2PClient implements P2P {
* @param blocks - A list of existing blocks with txs that the P2P client needs to ensure the tx pool is reconciled with.
* @returns Empty promise.
*/
private reconcileTxPool(blocks: L2Block[]): Promise<void> {
private async reconcileTxPool(blocks: L2Block[]): Promise<void> {
for (let i = 0; i < blocks.length; i++) {
const blockContext = new L2BlockContext(blocks[i]);
const txHashes = blockContext.getTxHashes();
this.txPool.deleteTxs(txHashes);
await this.txPool.deleteTxs(txHashes);
this.p2pService.settledTxs(txHashes);
}
return Promise.resolve();
}

/**
Expand All @@ -284,9 +295,11 @@ export class P2PClient implements P2P {
return Promise.resolve();
}
await this.reconcileTxPool(blocks);
this.currentL2BlockNum = blocks[blocks.length - 1].number;
this.log(`Synched to block ${this.currentL2BlockNum}`);
if (this.currentState === P2PClientState.SYNCHING && this.currentL2BlockNum >= this.latestBlockNumberAtStart) {
const lastBlockNum = blocks[blocks.length - 1].number;
await this.synchedBlockNumber.set(lastBlockNum);
this.log(`Synched to block ${lastBlockNum}`);

if (this.currentState === P2PClientState.SYNCHING && lastBlockNum >= this.latestBlockNumberAtStart) {
this.setCurrentState(P2PClientState.RUNNING);
if (this.syncResolve !== undefined) {
this.syncResolve();
Expand All @@ -303,4 +316,16 @@ export class P2PClient implements P2P {
this.currentState = newState;
this.log(`Moved to state ${P2PClientState[this.currentState]}`);
}

private async publishStoredTxs() {
if (!this.isReady()) {
return;
}

const txs = this.txPool.getAllTxs();
if (txs.length > 0) {
this.log(`Publishing ${txs.length} previously stored txs`);
await Promise.all(txs.map(tx => this.p2pService.propagateTx(tx)));
}
}
}
14 changes: 14 additions & 0 deletions yarn-project/p2p/src/tx_pool/aztec_kv_tx_pool.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { EthAddress } from '@aztec/circuits.js';
import { AztecLmdbStore } from '@aztec/kv-store';

import { AztecKVTxPool } from './aztec_kv_tx_pool.js';
import { describeTxPool } from './tx_pool_test_suite.js';

describe('In-Memory TX pool', () => {
let txPool: AztecKVTxPool;
beforeEach(async () => {
txPool = new AztecKVTxPool(await AztecLmdbStore.create(EthAddress.random()));
});

describeTxPool(() => txPool);
});
Loading

0 comments on commit 4dd076c

Please sign in to comment.