Skip to content

Commit

Permalink
chore: catch up note processors could be synced more efficiently (#3933)
Browse files Browse the repository at this point in the history
In this PR, when catching up note processors, instead of processing them
sequentially, we sort them to know the oldest block we need, then start
at that point and fetch `limit` blocks.

When we have the blocks, we provide them to all processors that need it.
  • Loading branch information
just-mitch authored Jan 11, 2024
1 parent bdeb10c commit df54f33
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 62 deletions.
7 changes: 5 additions & 2 deletions yarn-project/foundation/src/fifo/memory_fifo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,18 @@ export class MemoryFifo<T> {
/**
* Put an item onto back of the queue.
* @param item - The item to enqueue.
* @returns A boolean indicating whether the item was successfully added to the queue.
*/
public put(item: T) {
public put(item: T): boolean {
if (this.flushing) {
this.log.warn('Discarding item because queue is flushing');
return;
return false;
} else if (this.waiting.length) {
this.waiting.shift()!(item);
return true;
} else {
this.items.push(item);
return true;
}
}

Expand Down
7 changes: 5 additions & 2 deletions yarn-project/foundation/src/fifo/serial_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,21 @@ export class SerialQueue {
* Enqueues fn for execution on the serial queue.
* Returns the result of the function after execution.
* @param fn - The function to enqueue.
* @returns A resolution promise.
* @returns A resolution promise. Rejects if the function does, or if the function could not be enqueued.
*/
public put<T>(fn: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
this.queue.put(async () => {
const accepted = this.queue.put(async () => {
try {
const res = await fn();
resolve(res);
} catch (e) {
reject(e);
}
});
if (!accepted) {
reject(new Error('Could not enqueue function'));
}
});
}

Expand Down
9 changes: 0 additions & 9 deletions yarn-project/pxe/src/pxe_service/pxe_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ export class PXEService implements PXE {
// serialize synchronizer and calls to simulateTx.
// ensures that state is not changed while simulating
private jobQueue = new SerialQueue();
private running = false;

constructor(
private keyStore: KeyStore,
Expand Down Expand Up @@ -105,7 +104,6 @@ export class PXEService implements PXE {
await this.restoreNoteProcessors();
const info = await this.getNodeInfo();
this.log.info(`Started PXE connected to chain ${info.chainId} version ${info.protocolVersion}`);
this.running = true;
}

private async restoreNoteProcessors() {
Expand Down Expand Up @@ -355,9 +353,6 @@ export class PXEService implements PXE {
if (txRequest.functionData.isInternal === undefined) {
throw new Error(`Unspecified internal are not allowed`);
}
if (!this.running) {
throw new Error('PXE Service is not running');
}

// all simulations must be serialized w.r.t. the synchronizer
return await this.jobQueue.put(async () => {
Expand Down Expand Up @@ -398,10 +393,6 @@ export class PXEService implements PXE {
to: AztecAddress,
_from?: AztecAddress,
): Promise<DecodedReturn> {
if (!this.running) {
throw new Error('PXE Service is not running');
}

// all simulations must be serialized w.r.t. the synchronizer
return await this.jobQueue.put(async () => {
// TODO - Should check if `from` has the permission to call the view function.
Expand Down
71 changes: 50 additions & 21 deletions yarn-project/pxe/src/synchronizer/synchronizer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,47 +91,76 @@ describe('Synchronizer', () => {
});

it('note processor successfully catches up', async () => {
const block = L2Block.random(1, 4);
const blocks = [L2Block.random(1, 4), L2Block.random(2, 4)];

aztecNode.getBlocks
// called by synchronizer.work
.mockResolvedValueOnce([L2Block.fromFields(omit(blocks[0], 'newEncryptedLogs', 'newUnencryptedLogs'))])
.mockResolvedValueOnce([L2Block.fromFields(omit(blocks[1], 'newEncryptedLogs', 'newUnencryptedLogs'))])
// called by synchronizer.workNoteProcessorCatchUp
.mockResolvedValueOnce([L2Block.fromFields(omit(blocks[0], 'newEncryptedLogs', 'newUnencryptedLogs'))])
.mockResolvedValueOnce([L2Block.fromFields(omit(blocks[1], 'newEncryptedLogs', 'newUnencryptedLogs'))]);

// getBlocks is called by both synchronizer.work and synchronizer.workNoteProcessorCatchUp
aztecNode.getBlocks.mockResolvedValue([L2Block.fromFields(omit(block, 'newEncryptedLogs', 'newUnencryptedLogs'))]);
aztecNode.getLogs
.mockResolvedValueOnce([block.newEncryptedLogs!]) // called by synchronizer.work
.mockResolvedValueOnce([block.newUnencryptedLogs!]) // called by synchronizer.work
.mockResolvedValueOnce([block.newEncryptedLogs!]); // called by synchronizer.workNoteProcessorCatchUp
// called by synchronizer.work
.mockResolvedValueOnce([blocks[0].newEncryptedLogs!])
.mockResolvedValueOnce([blocks[0].newUnencryptedLogs!])
.mockResolvedValueOnce([blocks[1].newEncryptedLogs!])
.mockResolvedValueOnce([blocks[1].newUnencryptedLogs!])
// called by synchronizer.workNoteProcessorCatchUp
.mockResolvedValueOnce([blocks[0].newEncryptedLogs!])
.mockResolvedValueOnce([blocks[1].newEncryptedLogs!]);

// Sync the synchronizer so that note processor has something to catch up to
await synchronizer.work();
aztecNode.getBlockNumber.mockResolvedValue(INITIAL_L2_BLOCK_NUM + 1);

// Used in synchronizer.isAccountStateSynchronized
aztecNode.getBlockNumber.mockResolvedValueOnce(1);
// Sync the synchronizer so that note processor has something to catch up to
// There are two blocks, and we have a limit of 1 block per work call
await synchronizer.work(1);
expect(await synchronizer.isGlobalStateSynchronized()).toBe(false);
await synchronizer.work(1);
expect(await synchronizer.isGlobalStateSynchronized()).toBe(true);

// Manually adding account to database so that we can call synchronizer.isAccountStateSynchronized
const keyStore = new TestKeyStore(new Grumpkin(), await AztecLmdbStore.create(EthAddress.random()));
const privateKey = GrumpkinScalar.random();
await keyStore.addAccount(privateKey);
const completeAddress = CompleteAddress.fromPrivateKeyAndPartialAddress(privateKey, Fr.random());
await database.addCompleteAddress(completeAddress);
const addAddress = async (startingBlockNum: number) => {
const privateKey = GrumpkinScalar.random();
await keyStore.addAccount(privateKey);
const completeAddress = CompleteAddress.fromPrivateKeyAndPartialAddress(privateKey, Fr.random());
await database.addCompleteAddress(completeAddress);
synchronizer.addAccount(completeAddress.publicKey, keyStore, startingBlockNum);
return completeAddress;
};

const [completeAddressA, completeAddressB, completeAddressC] = await Promise.all([
addAddress(INITIAL_L2_BLOCK_NUM),
addAddress(INITIAL_L2_BLOCK_NUM),
addAddress(INITIAL_L2_BLOCK_NUM + 1),
]);

await synchronizer.workNoteProcessorCatchUp();

// Add the account which will add the note processor to the synchronizer
synchronizer.addAccount(completeAddress.publicKey, keyStore, INITIAL_L2_BLOCK_NUM);
expect(await synchronizer.isAccountStateSynchronized(completeAddressA.address)).toBe(false);
expect(await synchronizer.isAccountStateSynchronized(completeAddressB.address)).toBe(false);
expect(await synchronizer.isAccountStateSynchronized(completeAddressC.address)).toBe(false);

await synchronizer.workNoteProcessorCatchUp();

expect(await synchronizer.isAccountStateSynchronized(completeAddress.address)).toBe(true);
expect(await synchronizer.isAccountStateSynchronized(completeAddressA.address)).toBe(true);
expect(await synchronizer.isAccountStateSynchronized(completeAddressB.address)).toBe(true);
expect(await synchronizer.isAccountStateSynchronized(completeAddressC.address)).toBe(true);
});
});

class TestSynchronizer extends Synchronizer {
public work() {
return super.work();
public work(limit = 1) {
return super.work(limit);
}

public initialSync(): Promise<void> {
return super.initialSync();
}

public workNoteProcessorCatchUp(): Promise<boolean> {
return super.workNoteProcessorCatchUp();
public workNoteProcessorCatchUp(limit = 1): Promise<boolean> {
return super.workNoteProcessorCatchUp(limit);
}
}
87 changes: 63 additions & 24 deletions yarn-project/pxe/src/synchronizer/synchronizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,27 +159,45 @@ export class Synchronizer {
}

/**
* Catch up a note processor that is lagging behind the main sync,
* Catch up note processors that are lagging behind the main sync.
* e.g. because we just added a new account.
*
* @param limit - the maximum number of encrypted, unencrypted logs and blocks to fetch in each iteration.
* @returns true if there could be more work, false if we're caught up or there was an error.
* @returns true if there could be more work, false if there was an error which allows a retry with delay.
*/
protected async workNoteProcessorCatchUp(limit = 1): Promise<boolean> {
const noteProcessor = this.noteProcessorsToCatchUp[0];
const toBlockNumber = this.getSynchedBlockNumber();

if (noteProcessor.status.syncedToBlock >= toBlockNumber) {
// Note processor already synched, nothing to do
this.noteProcessorsToCatchUp.shift();
this.noteProcessors.push(noteProcessor);
// could be more work if there are more note processors to catch up
// filter out note processors that are already caught up
// and sort them by the block number they are lagging behind in ascending order
this.noteProcessorsToCatchUp = this.noteProcessorsToCatchUp.filter(noteProcessor => {
if (noteProcessor.status.syncedToBlock >= toBlockNumber) {
// Note processor is ahead of main sync, nothing to do
this.noteProcessors.push(noteProcessor);
return false;
}
return true;
});

if (!this.noteProcessorsToCatchUp.length) {
// No note processors to catch up, nothing to do here,
// but we return true to continue with the normal flow.
return true;
}

const from = noteProcessor.status.syncedToBlock + 1;
// create a copy so that:
// 1. we can modify the original array while iterating over it
// 2. we don't need to serialize insertions into the array
const catchUpGroup = this.noteProcessorsToCatchUp
.slice()
// sort by the block number they are lagging behind
.sort((a, b) => a.status.syncedToBlock - b.status.syncedToBlock);

// grab the note processor that is lagging behind the most
const from = catchUpGroup[0].status.syncedToBlock + 1;
// Ensuring that the note processor does not sync further than the main sync.
limit = Math.min(limit, toBlockNumber - from + 1);
// this.log(`Catching up ${catchUpGroup.length} note processors by up to ${limit} blocks starting at block ${from}`);

if (limit < 1) {
throw new Error(`Unexpected limit ${limit} for note processor catch up`);
Expand Down Expand Up @@ -209,22 +227,43 @@ export class Synchronizer {
const blockContexts = blocks.map(block => new L2BlockContext(block));

const logCount = L2BlockL2Logs.getTotalLogCount(encryptedLogs);
this.log(`Forwarding ${logCount} encrypted logs and blocks to note processor in catch up mode`);
await noteProcessor.process(blockContexts, encryptedLogs);

if (noteProcessor.status.syncedToBlock === toBlockNumber) {
// Note processor caught up, move it to `noteProcessors` from `noteProcessorsToCatchUp`.
this.log(`Note processor for ${noteProcessor.publicKey.toString()} has caught up`, {
eventName: 'note-processor-caught-up',
publicKey: noteProcessor.publicKey.toString(),
duration: noteProcessor.timer.ms(),
dbSize: this.db.estimateSize(),
...noteProcessor.stats,
} satisfies NoteProcessorCaughtUpStats);
this.noteProcessorsToCatchUp.shift();
this.noteProcessors.push(noteProcessor);
this.log(`Forwarding ${logCount} encrypted logs and blocks to note processors in catch up mode`);

for (const noteProcessor of catchUpGroup) {
// find the index of the first block that the note processor is not yet synced to
const index = blockContexts.findIndex(block => block.block.number > noteProcessor.status.syncedToBlock);
if (index === -1) {
// Due to the limit, we might not have fetched a new enough block for the note processor.
// And since the group is sorted, we break as soon as we find a note processor
// that needs blocks newer than the newest block we fetched.
break;
}

this.log.debug(
`Catching up note processor ${noteProcessor.publicKey.toString()} by processing ${
blockContexts.length - index
} blocks`,
);
await noteProcessor.process(blockContexts.slice(index), encryptedLogs.slice(index));

if (noteProcessor.status.syncedToBlock === toBlockNumber) {
// Note processor caught up, move it to `noteProcessors` from `noteProcessorsToCatchUp`.
this.log(`Note processor for ${noteProcessor.publicKey.toString()} has caught up`, {
eventName: 'note-processor-caught-up',
publicKey: noteProcessor.publicKey.toString(),
duration: noteProcessor.timer.ms(),
dbSize: this.db.estimateSize(),
...noteProcessor.stats,
} satisfies NoteProcessorCaughtUpStats);

this.noteProcessorsToCatchUp = this.noteProcessorsToCatchUp.filter(
np => !np.publicKey.equals(noteProcessor.publicKey),
);
this.noteProcessors.push(noteProcessor);
}
}
return true;

return true; // could be more work, immediately continue syncing
} catch (err) {
this.log.error(`Error in synchronizer workNoteProcessorCatchUp`, err);
return false;
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/types/src/l2_block_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { EthAddress } from '@aztec/circuits.js';

import { L2Block } from './l2_block.js';
import { L2Tx } from './l2_tx.js';
import { TxHash } from './tx/index.js';
import { TxHash } from './tx/tx_hash.js';

/**
* Interface of classes allowing for the retrieval of L2 blocks.
Expand Down
4 changes: 2 additions & 2 deletions yarn-project/types/src/notes/extended_note.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { AztecAddress, Fr } from '@aztec/circuits.js';
import { BufferReader } from '@aztec/foundation/serialize';

import { Note } from '../logs/index.js';
import { TxHash } from '../tx/index.js';
import { Note } from '../logs/l1_note_payload/note.js';
import { TxHash } from '../tx/tx_hash.js';

/**
* A note with contextual data.
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/types/src/tx/tx_receipt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { AztecAddress } from '@aztec/foundation/aztec-address';
import { Fr } from '@aztec/foundation/fields';

import { ContractData } from '../contract_data.js';
import { ExtendedNote } from '../notes/index.js';
import { ExtendedNote } from '../notes/extended_note.js';
import { PublicDataWrite } from '../public_data_write.js';
import { TxHash } from './tx_hash.js';

Expand Down

0 comments on commit df54f33

Please sign in to comment.