diff --git a/yarn-project/foundation/src/fifo/memory_fifo.ts b/yarn-project/foundation/src/fifo/memory_fifo.ts index 5bb614eaf23..39c6a0b0d10 100644 --- a/yarn-project/foundation/src/fifo/memory_fifo.ts +++ b/yarn-project/foundation/src/fifo/memory_fifo.ts @@ -57,15 +57,18 @@ export class MemoryFifo { /** * Put an item onto back of the queue. * @param item - The item to enqueue. + * @returns A boolean indicating whether the item was successfully added to the queue. */ - public put(item: T) { + public put(item: T): boolean { if (this.flushing) { this.log.warn('Discarding item because queue is flushing'); - return; + return false; } else if (this.waiting.length) { this.waiting.shift()!(item); + return true; } else { this.items.push(item); + return true; } } diff --git a/yarn-project/foundation/src/fifo/serial_queue.ts b/yarn-project/foundation/src/fifo/serial_queue.ts index 560bfca0776..6854cdc6ddc 100644 --- a/yarn-project/foundation/src/fifo/serial_queue.ts +++ b/yarn-project/foundation/src/fifo/serial_queue.ts @@ -54,11 +54,11 @@ export class SerialQueue { * Enqueues fn for execution on the serial queue. * Returns the result of the function after execution. * @param fn - The function to enqueue. - * @returns A resolution promise. + * @returns A resolution promise. Rejects if the function does, or if the function could not be enqueued. */ public put(fn: () => Promise): Promise { return new Promise((resolve, reject) => { - this.queue.put(async () => { + const accepted = this.queue.put(async () => { try { const res = await fn(); resolve(res); @@ -66,6 +66,9 @@ export class SerialQueue { reject(e); } }); + if (!accepted) { + reject(new Error('Could not enqueue function')); + } }); } diff --git a/yarn-project/pxe/src/pxe_service/pxe_service.ts b/yarn-project/pxe/src/pxe_service/pxe_service.ts index c7dd25a5792..ee487cce126 100644 --- a/yarn-project/pxe/src/pxe_service/pxe_service.ts +++ b/yarn-project/pxe/src/pxe_service/pxe_service.ts @@ -76,7 +76,6 @@ export class PXEService implements PXE { // serialize synchronizer and calls to simulateTx. // ensures that state is not changed while simulating private jobQueue = new SerialQueue(); - private running = false; constructor( private keyStore: KeyStore, @@ -105,7 +104,6 @@ export class PXEService implements PXE { await this.restoreNoteProcessors(); const info = await this.getNodeInfo(); this.log.info(`Started PXE connected to chain ${info.chainId} version ${info.protocolVersion}`); - this.running = true; } private async restoreNoteProcessors() { @@ -355,9 +353,6 @@ export class PXEService implements PXE { if (txRequest.functionData.isInternal === undefined) { throw new Error(`Unspecified internal are not allowed`); } - if (!this.running) { - throw new Error('PXE Service is not running'); - } // all simulations must be serialized w.r.t. the synchronizer return await this.jobQueue.put(async () => { @@ -398,10 +393,6 @@ export class PXEService implements PXE { to: AztecAddress, _from?: AztecAddress, ): Promise { - if (!this.running) { - throw new Error('PXE Service is not running'); - } - // all simulations must be serialized w.r.t. the synchronizer return await this.jobQueue.put(async () => { // TODO - Should check if `from` has the permission to call the view function. diff --git a/yarn-project/pxe/src/synchronizer/synchronizer.test.ts b/yarn-project/pxe/src/synchronizer/synchronizer.test.ts index e28d78be85c..045dbeeba24 100644 --- a/yarn-project/pxe/src/synchronizer/synchronizer.test.ts +++ b/yarn-project/pxe/src/synchronizer/synchronizer.test.ts @@ -91,47 +91,76 @@ describe('Synchronizer', () => { }); it('note processor successfully catches up', async () => { - const block = L2Block.random(1, 4); + const blocks = [L2Block.random(1, 4), L2Block.random(2, 4)]; + + aztecNode.getBlocks + // called by synchronizer.work + .mockResolvedValueOnce([L2Block.fromFields(omit(blocks[0], 'newEncryptedLogs', 'newUnencryptedLogs'))]) + .mockResolvedValueOnce([L2Block.fromFields(omit(blocks[1], 'newEncryptedLogs', 'newUnencryptedLogs'))]) + // called by synchronizer.workNoteProcessorCatchUp + .mockResolvedValueOnce([L2Block.fromFields(omit(blocks[0], 'newEncryptedLogs', 'newUnencryptedLogs'))]) + .mockResolvedValueOnce([L2Block.fromFields(omit(blocks[1], 'newEncryptedLogs', 'newUnencryptedLogs'))]); - // getBlocks is called by both synchronizer.work and synchronizer.workNoteProcessorCatchUp - aztecNode.getBlocks.mockResolvedValue([L2Block.fromFields(omit(block, 'newEncryptedLogs', 'newUnencryptedLogs'))]); aztecNode.getLogs - .mockResolvedValueOnce([block.newEncryptedLogs!]) // called by synchronizer.work - .mockResolvedValueOnce([block.newUnencryptedLogs!]) // called by synchronizer.work - .mockResolvedValueOnce([block.newEncryptedLogs!]); // called by synchronizer.workNoteProcessorCatchUp + // called by synchronizer.work + .mockResolvedValueOnce([blocks[0].newEncryptedLogs!]) + .mockResolvedValueOnce([blocks[0].newUnencryptedLogs!]) + .mockResolvedValueOnce([blocks[1].newEncryptedLogs!]) + .mockResolvedValueOnce([blocks[1].newUnencryptedLogs!]) + // called by synchronizer.workNoteProcessorCatchUp + .mockResolvedValueOnce([blocks[0].newEncryptedLogs!]) + .mockResolvedValueOnce([blocks[1].newEncryptedLogs!]); - // Sync the synchronizer so that note processor has something to catch up to - await synchronizer.work(); + aztecNode.getBlockNumber.mockResolvedValue(INITIAL_L2_BLOCK_NUM + 1); - // Used in synchronizer.isAccountStateSynchronized - aztecNode.getBlockNumber.mockResolvedValueOnce(1); + // Sync the synchronizer so that note processor has something to catch up to + // There are two blocks, and we have a limit of 1 block per work call + await synchronizer.work(1); + expect(await synchronizer.isGlobalStateSynchronized()).toBe(false); + await synchronizer.work(1); + expect(await synchronizer.isGlobalStateSynchronized()).toBe(true); // Manually adding account to database so that we can call synchronizer.isAccountStateSynchronized const keyStore = new TestKeyStore(new Grumpkin(), await AztecLmdbStore.create(EthAddress.random())); - const privateKey = GrumpkinScalar.random(); - await keyStore.addAccount(privateKey); - const completeAddress = CompleteAddress.fromPrivateKeyAndPartialAddress(privateKey, Fr.random()); - await database.addCompleteAddress(completeAddress); + const addAddress = async (startingBlockNum: number) => { + const privateKey = GrumpkinScalar.random(); + await keyStore.addAccount(privateKey); + const completeAddress = CompleteAddress.fromPrivateKeyAndPartialAddress(privateKey, Fr.random()); + await database.addCompleteAddress(completeAddress); + synchronizer.addAccount(completeAddress.publicKey, keyStore, startingBlockNum); + return completeAddress; + }; + + const [completeAddressA, completeAddressB, completeAddressC] = await Promise.all([ + addAddress(INITIAL_L2_BLOCK_NUM), + addAddress(INITIAL_L2_BLOCK_NUM), + addAddress(INITIAL_L2_BLOCK_NUM + 1), + ]); + + await synchronizer.workNoteProcessorCatchUp(); - // Add the account which will add the note processor to the synchronizer - synchronizer.addAccount(completeAddress.publicKey, keyStore, INITIAL_L2_BLOCK_NUM); + expect(await synchronizer.isAccountStateSynchronized(completeAddressA.address)).toBe(false); + expect(await synchronizer.isAccountStateSynchronized(completeAddressB.address)).toBe(false); + expect(await synchronizer.isAccountStateSynchronized(completeAddressC.address)).toBe(false); await synchronizer.workNoteProcessorCatchUp(); - expect(await synchronizer.isAccountStateSynchronized(completeAddress.address)).toBe(true); + expect(await synchronizer.isAccountStateSynchronized(completeAddressA.address)).toBe(true); + expect(await synchronizer.isAccountStateSynchronized(completeAddressB.address)).toBe(true); + expect(await synchronizer.isAccountStateSynchronized(completeAddressC.address)).toBe(true); }); }); class TestSynchronizer extends Synchronizer { - public work() { - return super.work(); + public work(limit = 1) { + return super.work(limit); } public initialSync(): Promise { return super.initialSync(); } - public workNoteProcessorCatchUp(): Promise { - return super.workNoteProcessorCatchUp(); + public workNoteProcessorCatchUp(limit = 1): Promise { + return super.workNoteProcessorCatchUp(limit); } } diff --git a/yarn-project/pxe/src/synchronizer/synchronizer.ts b/yarn-project/pxe/src/synchronizer/synchronizer.ts index 6a2e1256738..36d26ed73cb 100644 --- a/yarn-project/pxe/src/synchronizer/synchronizer.ts +++ b/yarn-project/pxe/src/synchronizer/synchronizer.ts @@ -159,27 +159,45 @@ export class Synchronizer { } /** - * Catch up a note processor that is lagging behind the main sync, + * Catch up note processors that are lagging behind the main sync. * e.g. because we just added a new account. * * @param limit - the maximum number of encrypted, unencrypted logs and blocks to fetch in each iteration. - * @returns true if there could be more work, false if we're caught up or there was an error. + * @returns true if there could be more work, false if there was an error which allows a retry with delay. */ protected async workNoteProcessorCatchUp(limit = 1): Promise { - const noteProcessor = this.noteProcessorsToCatchUp[0]; const toBlockNumber = this.getSynchedBlockNumber(); - if (noteProcessor.status.syncedToBlock >= toBlockNumber) { - // Note processor already synched, nothing to do - this.noteProcessorsToCatchUp.shift(); - this.noteProcessors.push(noteProcessor); - // could be more work if there are more note processors to catch up + // filter out note processors that are already caught up + // and sort them by the block number they are lagging behind in ascending order + this.noteProcessorsToCatchUp = this.noteProcessorsToCatchUp.filter(noteProcessor => { + if (noteProcessor.status.syncedToBlock >= toBlockNumber) { + // Note processor is ahead of main sync, nothing to do + this.noteProcessors.push(noteProcessor); + return false; + } + return true; + }); + + if (!this.noteProcessorsToCatchUp.length) { + // No note processors to catch up, nothing to do here, + // but we return true to continue with the normal flow. return true; } - const from = noteProcessor.status.syncedToBlock + 1; + // create a copy so that: + // 1. we can modify the original array while iterating over it + // 2. we don't need to serialize insertions into the array + const catchUpGroup = this.noteProcessorsToCatchUp + .slice() + // sort by the block number they are lagging behind + .sort((a, b) => a.status.syncedToBlock - b.status.syncedToBlock); + + // grab the note processor that is lagging behind the most + const from = catchUpGroup[0].status.syncedToBlock + 1; // Ensuring that the note processor does not sync further than the main sync. limit = Math.min(limit, toBlockNumber - from + 1); + // this.log(`Catching up ${catchUpGroup.length} note processors by up to ${limit} blocks starting at block ${from}`); if (limit < 1) { throw new Error(`Unexpected limit ${limit} for note processor catch up`); @@ -209,22 +227,43 @@ export class Synchronizer { const blockContexts = blocks.map(block => new L2BlockContext(block)); const logCount = L2BlockL2Logs.getTotalLogCount(encryptedLogs); - this.log(`Forwarding ${logCount} encrypted logs and blocks to note processor in catch up mode`); - await noteProcessor.process(blockContexts, encryptedLogs); - - if (noteProcessor.status.syncedToBlock === toBlockNumber) { - // Note processor caught up, move it to `noteProcessors` from `noteProcessorsToCatchUp`. - this.log(`Note processor for ${noteProcessor.publicKey.toString()} has caught up`, { - eventName: 'note-processor-caught-up', - publicKey: noteProcessor.publicKey.toString(), - duration: noteProcessor.timer.ms(), - dbSize: this.db.estimateSize(), - ...noteProcessor.stats, - } satisfies NoteProcessorCaughtUpStats); - this.noteProcessorsToCatchUp.shift(); - this.noteProcessors.push(noteProcessor); + this.log(`Forwarding ${logCount} encrypted logs and blocks to note processors in catch up mode`); + + for (const noteProcessor of catchUpGroup) { + // find the index of the first block that the note processor is not yet synced to + const index = blockContexts.findIndex(block => block.block.number > noteProcessor.status.syncedToBlock); + if (index === -1) { + // Due to the limit, we might not have fetched a new enough block for the note processor. + // And since the group is sorted, we break as soon as we find a note processor + // that needs blocks newer than the newest block we fetched. + break; + } + + this.log.debug( + `Catching up note processor ${noteProcessor.publicKey.toString()} by processing ${ + blockContexts.length - index + } blocks`, + ); + await noteProcessor.process(blockContexts.slice(index), encryptedLogs.slice(index)); + + if (noteProcessor.status.syncedToBlock === toBlockNumber) { + // Note processor caught up, move it to `noteProcessors` from `noteProcessorsToCatchUp`. + this.log(`Note processor for ${noteProcessor.publicKey.toString()} has caught up`, { + eventName: 'note-processor-caught-up', + publicKey: noteProcessor.publicKey.toString(), + duration: noteProcessor.timer.ms(), + dbSize: this.db.estimateSize(), + ...noteProcessor.stats, + } satisfies NoteProcessorCaughtUpStats); + + this.noteProcessorsToCatchUp = this.noteProcessorsToCatchUp.filter( + np => !np.publicKey.equals(noteProcessor.publicKey), + ); + this.noteProcessors.push(noteProcessor); + } } - return true; + + return true; // could be more work, immediately continue syncing } catch (err) { this.log.error(`Error in synchronizer workNoteProcessorCatchUp`, err); return false; diff --git a/yarn-project/types/src/l2_block_source.ts b/yarn-project/types/src/l2_block_source.ts index 00de1abd27c..02e5b6dcda5 100644 --- a/yarn-project/types/src/l2_block_source.ts +++ b/yarn-project/types/src/l2_block_source.ts @@ -2,7 +2,7 @@ import { EthAddress } from '@aztec/circuits.js'; import { L2Block } from './l2_block.js'; import { L2Tx } from './l2_tx.js'; -import { TxHash } from './tx/index.js'; +import { TxHash } from './tx/tx_hash.js'; /** * Interface of classes allowing for the retrieval of L2 blocks. diff --git a/yarn-project/types/src/notes/extended_note.ts b/yarn-project/types/src/notes/extended_note.ts index 41395ea61f0..8a2855bc7ab 100644 --- a/yarn-project/types/src/notes/extended_note.ts +++ b/yarn-project/types/src/notes/extended_note.ts @@ -1,8 +1,8 @@ import { AztecAddress, Fr } from '@aztec/circuits.js'; import { BufferReader } from '@aztec/foundation/serialize'; -import { Note } from '../logs/index.js'; -import { TxHash } from '../tx/index.js'; +import { Note } from '../logs/l1_note_payload/note.js'; +import { TxHash } from '../tx/tx_hash.js'; /** * A note with contextual data. diff --git a/yarn-project/types/src/tx/tx_receipt.ts b/yarn-project/types/src/tx/tx_receipt.ts index 46dc697f96f..00c8334f1d2 100644 --- a/yarn-project/types/src/tx/tx_receipt.ts +++ b/yarn-project/types/src/tx/tx_receipt.ts @@ -2,7 +2,7 @@ import { AztecAddress } from '@aztec/foundation/aztec-address'; import { Fr } from '@aztec/foundation/fields'; import { ContractData } from '../contract_data.js'; -import { ExtendedNote } from '../notes/index.js'; +import { ExtendedNote } from '../notes/extended_note.js'; import { PublicDataWrite } from '../public_data_write.js'; import { TxHash } from './tx_hash.js';