Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: catch up note processors could be synced more efficiently #3933

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions yarn-project/foundation/src/fifo/memory_fifo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,18 @@ export class MemoryFifo<T> {
/**
* Put an item onto back of the queue.
* @param item - The item to enqueue.
* @returns A boolean indicating whether the item was successfully added to the queue.
*/
public put(item: T) {
public put(item: T): boolean {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor unrelated cleanup so we don't need to keep running flags to avoid inserting to a flushing queue.

if (this.flushing) {
this.log.warn('Discarding item because queue is flushing');
return;
return false;
} else if (this.waiting.length) {
this.waiting.shift()!(item);
return true;
} else {
this.items.push(item);
return true;
}
}

Expand Down
7 changes: 5 additions & 2 deletions yarn-project/foundation/src/fifo/serial_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,21 @@ export class SerialQueue {
* Enqueues fn for execution on the serial queue.
* Returns the result of the function after execution.
* @param fn - The function to enqueue.
* @returns A resolution promise.
* @returns A resolution promise. Rejects if the function does, or if the function could not be enqueued.
*/
public put<T>(fn: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
this.queue.put(async () => {
const accepted = this.queue.put(async () => {
try {
const res = await fn();
resolve(res);
} catch (e) {
reject(e);
}
});
if (!accepted) {
reject(new Error('Could not enqueue function'));
}
});
}

Expand Down
9 changes: 0 additions & 9 deletions yarn-project/pxe/src/pxe_service/pxe_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ export class PXEService implements PXE {
// serialize synchronizer and calls to simulateTx.
// ensures that state is not changed while simulating
private jobQueue = new SerialQueue();
private running = false;

constructor(
private keyStore: KeyStore,
Expand Down Expand Up @@ -105,7 +104,6 @@ export class PXEService implements PXE {
await this.restoreNoteProcessors();
const info = await this.getNodeInfo();
this.log.info(`Started PXE connected to chain ${info.chainId} version ${info.protocolVersion}`);
this.running = true;
}

private async restoreNoteProcessors() {
Expand Down Expand Up @@ -355,9 +353,6 @@ export class PXEService implements PXE {
if (txRequest.functionData.isInternal === undefined) {
throw new Error(`Unspecified internal are not allowed`);
}
if (!this.running) {
throw new Error('PXE Service is not running');
}

// all simulations must be serialized w.r.t. the synchronizer
return await this.jobQueue.put(async () => {
Expand Down Expand Up @@ -398,10 +393,6 @@ export class PXEService implements PXE {
to: AztecAddress,
_from?: AztecAddress,
): Promise<DecodedReturn> {
if (!this.running) {
throw new Error('PXE Service is not running');
}

// all simulations must be serialized w.r.t. the synchronizer
return await this.jobQueue.put(async () => {
// TODO - Should check if `from` has the permission to call the view function.
Expand Down
71 changes: 50 additions & 21 deletions yarn-project/pxe/src/synchronizer/synchronizer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,47 +91,76 @@ describe('Synchronizer', () => {
});

it('note processor successfully catches up', async () => {
const block = L2Block.random(1, 4);
const blocks = [L2Block.random(1, 4), L2Block.random(2, 4)];

aztecNode.getBlocks
// called by synchronizer.work
.mockResolvedValueOnce([L2Block.fromFields(omit(blocks[0], 'newEncryptedLogs', 'newUnencryptedLogs'))])
.mockResolvedValueOnce([L2Block.fromFields(omit(blocks[1], 'newEncryptedLogs', 'newUnencryptedLogs'))])
// called by synchronizer.workNoteProcessorCatchUp
.mockResolvedValueOnce([L2Block.fromFields(omit(blocks[0], 'newEncryptedLogs', 'newUnencryptedLogs'))])
.mockResolvedValueOnce([L2Block.fromFields(omit(blocks[1], 'newEncryptedLogs', 'newUnencryptedLogs'))]);

// getBlocks is called by both synchronizer.work and synchronizer.workNoteProcessorCatchUp
aztecNode.getBlocks.mockResolvedValue([L2Block.fromFields(omit(block, 'newEncryptedLogs', 'newUnencryptedLogs'))]);
aztecNode.getLogs
.mockResolvedValueOnce([block.newEncryptedLogs!]) // called by synchronizer.work
.mockResolvedValueOnce([block.newUnencryptedLogs!]) // called by synchronizer.work
.mockResolvedValueOnce([block.newEncryptedLogs!]); // called by synchronizer.workNoteProcessorCatchUp
// called by synchronizer.work
.mockResolvedValueOnce([blocks[0].newEncryptedLogs!])
.mockResolvedValueOnce([blocks[0].newUnencryptedLogs!])
.mockResolvedValueOnce([blocks[1].newEncryptedLogs!])
.mockResolvedValueOnce([blocks[1].newUnencryptedLogs!])
// called by synchronizer.workNoteProcessorCatchUp
.mockResolvedValueOnce([blocks[0].newEncryptedLogs!])
.mockResolvedValueOnce([blocks[1].newEncryptedLogs!]);

// Sync the synchronizer so that note processor has something to catch up to
await synchronizer.work();
aztecNode.getBlockNumber.mockResolvedValue(INITIAL_L2_BLOCK_NUM + 1);

// Used in synchronizer.isAccountStateSynchronized
aztecNode.getBlockNumber.mockResolvedValueOnce(1);
// Sync the synchronizer so that note processor has something to catch up to
// There are two blocks, and we have a limit of 1 block per work call
await synchronizer.work(1);
expect(await synchronizer.isGlobalStateSynchronized()).toBe(false);
await synchronizer.work(1);
expect(await synchronizer.isGlobalStateSynchronized()).toBe(true);

// Manually adding account to database so that we can call synchronizer.isAccountStateSynchronized
const keyStore = new TestKeyStore(new Grumpkin(), await AztecLmdbStore.create(EthAddress.random()));
const privateKey = GrumpkinScalar.random();
await keyStore.addAccount(privateKey);
const completeAddress = CompleteAddress.fromPrivateKeyAndPartialAddress(privateKey, Fr.random());
await database.addCompleteAddress(completeAddress);
const addAddress = async (startingBlockNum: number) => {
const privateKey = GrumpkinScalar.random();
await keyStore.addAccount(privateKey);
const completeAddress = CompleteAddress.fromPrivateKeyAndPartialAddress(privateKey, Fr.random());
await database.addCompleteAddress(completeAddress);
synchronizer.addAccount(completeAddress.publicKey, keyStore, startingBlockNum);
return completeAddress;
};

const [completeAddressA, completeAddressB, completeAddressC] = await Promise.all([
addAddress(INITIAL_L2_BLOCK_NUM),
addAddress(INITIAL_L2_BLOCK_NUM),
addAddress(INITIAL_L2_BLOCK_NUM + 1),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the first block we catch up is not relevant to addressC

]);

await synchronizer.workNoteProcessorCatchUp();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after this, all processors need block 2.


// Add the account which will add the note processor to the synchronizer
synchronizer.addAccount(completeAddress.publicKey, keyStore, INITIAL_L2_BLOCK_NUM);
expect(await synchronizer.isAccountStateSynchronized(completeAddressA.address)).toBe(false);
expect(await synchronizer.isAccountStateSynchronized(completeAddressB.address)).toBe(false);
expect(await synchronizer.isAccountStateSynchronized(completeAddressC.address)).toBe(false);

await synchronizer.workNoteProcessorCatchUp();

expect(await synchronizer.isAccountStateSynchronized(completeAddress.address)).toBe(true);
expect(await synchronizer.isAccountStateSynchronized(completeAddressA.address)).toBe(true);
expect(await synchronizer.isAccountStateSynchronized(completeAddressB.address)).toBe(true);
expect(await synchronizer.isAccountStateSynchronized(completeAddressC.address)).toBe(true);
});
});

class TestSynchronizer extends Synchronizer {
public work() {
return super.work();
public work(limit = 1) {
return super.work(limit);
}

public initialSync(): Promise<void> {
return super.initialSync();
}

public workNoteProcessorCatchUp(): Promise<boolean> {
return super.workNoteProcessorCatchUp();
public workNoteProcessorCatchUp(limit = 1): Promise<boolean> {
return super.workNoteProcessorCatchUp(limit);
}
}
87 changes: 63 additions & 24 deletions yarn-project/pxe/src/synchronizer/synchronizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,27 +159,45 @@ export class Synchronizer {
}

/**
* Catch up a note processor that is lagging behind the main sync,
* Catch up note processors that are lagging behind the main sync.
* e.g. because we just added a new account.
*
* @param limit - the maximum number of encrypted, unencrypted logs and blocks to fetch in each iteration.
* @returns true if there could be more work, false if we're caught up or there was an error.
* @returns true if there could be more work, false if there was an error which allows a retry with delay.
*/
protected async workNoteProcessorCatchUp(limit = 1): Promise<boolean> {
const noteProcessor = this.noteProcessorsToCatchUp[0];
const toBlockNumber = this.getSynchedBlockNumber();

if (noteProcessor.status.syncedToBlock >= toBlockNumber) {
// Note processor already synched, nothing to do
this.noteProcessorsToCatchUp.shift();
this.noteProcessors.push(noteProcessor);
// could be more work if there are more note processors to catch up
// filter out note processors that are already caught up
// and sort them by the block number they are lagging behind in ascending order
this.noteProcessorsToCatchUp = this.noteProcessorsToCatchUp.filter(noteProcessor => {
if (noteProcessor.status.syncedToBlock >= toBlockNumber) {
// Note processor is ahead of main sync, nothing to do
this.noteProcessors.push(noteProcessor);
return false;
}
return true;
});

if (!this.noteProcessorsToCatchUp.length) {
// No note processors to catch up, nothing to do here,
// but we return true to continue with the normal flow.
return true;
}

const from = noteProcessor.status.syncedToBlock + 1;
// create a copy so that:
// 1. we can modify the original array while iterating over it
// 2. we don't need to serialize insertions into the array
const catchUpGroup = this.noteProcessorsToCatchUp
.slice()
// sort by the block number they are lagging behind
.sort((a, b) => a.status.syncedToBlock - b.status.syncedToBlock);

// grab the note processor that is lagging behind the most
const from = catchUpGroup[0].status.syncedToBlock + 1;
// Ensuring that the note processor does not sync further than the main sync.
limit = Math.min(limit, toBlockNumber - from + 1);
// this.log(`Catching up ${catchUpGroup.length} note processors by up to ${limit} blocks starting at block ${from}`);

if (limit < 1) {
throw new Error(`Unexpected limit ${limit} for note processor catch up`);
Expand Down Expand Up @@ -209,22 +227,43 @@ export class Synchronizer {
const blockContexts = blocks.map(block => new L2BlockContext(block));

const logCount = L2BlockL2Logs.getTotalLogCount(encryptedLogs);
this.log(`Forwarding ${logCount} encrypted logs and blocks to note processor in catch up mode`);
await noteProcessor.process(blockContexts, encryptedLogs);

if (noteProcessor.status.syncedToBlock === toBlockNumber) {
// Note processor caught up, move it to `noteProcessors` from `noteProcessorsToCatchUp`.
this.log(`Note processor for ${noteProcessor.publicKey.toString()} has caught up`, {
eventName: 'note-processor-caught-up',
publicKey: noteProcessor.publicKey.toString(),
duration: noteProcessor.timer.ms(),
dbSize: this.db.estimateSize(),
...noteProcessor.stats,
} satisfies NoteProcessorCaughtUpStats);
this.noteProcessorsToCatchUp.shift();
this.noteProcessors.push(noteProcessor);
this.log(`Forwarding ${logCount} encrypted logs and blocks to note processors in catch up mode`);

for (const noteProcessor of catchUpGroup) {
// find the index of the first block that the note processor is not yet synced to
const index = blockContexts.findIndex(block => block.block.number > noteProcessor.status.syncedToBlock);
if (index === -1) {
// Due to the limit, we might not have fetched a new enough block for the note processor.
// And since the group is sorted, we break as soon as we find a note processor
// that needs blocks newer than the newest block we fetched.
break;
}

this.log.debug(
`Catching up note processor ${noteProcessor.publicKey.toString()} by processing ${
blockContexts.length - index
} blocks`,
);
await noteProcessor.process(blockContexts.slice(index), encryptedLogs.slice(index));

if (noteProcessor.status.syncedToBlock === toBlockNumber) {
// Note processor caught up, move it to `noteProcessors` from `noteProcessorsToCatchUp`.
this.log(`Note processor for ${noteProcessor.publicKey.toString()} has caught up`, {
eventName: 'note-processor-caught-up',
publicKey: noteProcessor.publicKey.toString(),
duration: noteProcessor.timer.ms(),
dbSize: this.db.estimateSize(),
...noteProcessor.stats,
} satisfies NoteProcessorCaughtUpStats);

this.noteProcessorsToCatchUp = this.noteProcessorsToCatchUp.filter(
np => !np.publicKey.equals(noteProcessor.publicKey),
);
this.noteProcessors.push(noteProcessor);
}
}
return true;

return true; // could be more work, immediately continue syncing
} catch (err) {
this.log.error(`Error in synchronizer workNoteProcessorCatchUp`, err);
return false;
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/types/src/l2_block_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { EthAddress } from '@aztec/circuits.js';

import { L2Block } from './l2_block.js';
import { L2Tx } from './l2_tx.js';
import { TxHash } from './tx/index.js';
import { TxHash } from './tx/tx_hash.js';

/**
* Interface of classes allowing for the retrieval of L2 blocks.
Expand Down
4 changes: 2 additions & 2 deletions yarn-project/types/src/notes/extended_note.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { AztecAddress, Fr } from '@aztec/circuits.js';
import { BufferReader } from '@aztec/foundation/serialize';

import { Note } from '../logs/index.js';
import { TxHash } from '../tx/index.js';
import { Note } from '../logs/l1_note_payload/note.js';
import { TxHash } from '../tx/tx_hash.js';

/**
* A note with contextual data.
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/types/src/tx/tx_receipt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { AztecAddress } from '@aztec/foundation/aztec-address';
import { Fr } from '@aztec/foundation/fields';

import { ContractData } from '../contract_data.js';
import { ExtendedNote } from '../notes/index.js';
import { ExtendedNote } from '../notes/extended_note.js';
import { PublicDataWrite } from '../public_data_write.js';
import { TxHash } from './tx_hash.js';

Expand Down
Loading