diff --git a/yarn-project/archiver/package.json b/yarn-project/archiver/package.json index 4ebbd12dfdb..3dba1489b60 100644 --- a/yarn-project/archiver/package.json +++ b/yarn-project/archiver/package.json @@ -31,7 +31,8 @@ "^(\\.{1,2}/.*)\\.[cm]?js$": "$1" }, "testRegex": "./src/.*\\.test\\.(js|mjs|ts)$", - "rootDir": "./src" + "rootDir": "./src", + "workerThreads": true }, "dependencies": { "@aztec/circuit-types": "workspace:^", diff --git a/yarn-project/archiver/src/archiver/archiver.ts b/yarn-project/archiver/src/archiver/archiver.ts index 55bc7f09580..26eaaa71eeb 100644 --- a/yarn-project/archiver/src/archiver/archiver.ts +++ b/yarn-project/archiver/src/archiver/archiver.ts @@ -129,18 +129,28 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource * @param blockUntilSynced - If true, blocks until the archiver has fully synced. */ private async sync(blockUntilSynced: boolean) { + /** + * We keep track of three "pointers" to L1 blocks: + * 1. the last L1 block that published an L2 block + * 2. the last L1 block that added L1 to L2 messages + * 3. the last L1 block that cancelled L1 to L2 messages + * + * We do this to deal with L1 data providers that are eventually consistent (e.g. Infura). + * We guard against seeing block X with no data at one point, and later, the provider processes the block and it has data. + * The archiver will stay back, until there's data on L1 that will move the pointers forward. + * + * This code does not handle reorgs. + */ + const lastL1Blocks = await this.store.getL1BlockNumber(); const currentL1BlockNumber = await this.publicClient.getBlockNumber(); - // this makes the archiver more resilient to eventually-consistent eth providers like Infura - // it _will_ process the same L1 blocks over and over again until the L2 chain advances - // one thing to handle now is that we will process the same L1 to L2 messages over and over again - // so the store needs to account for that. - const lastProcessedL1BlockNumber = await this.store.getL1BlockNumber(); - if (currentL1BlockNumber <= lastProcessedL1BlockNumber) { - // reducing logs, otherwise this gets triggered on every loop (1s) - if (currentL1BlockNumber !== this.lastLoggedL1BlockNumber) { - this.log(`No new blocks to process, current block number: ${currentL1BlockNumber}`); - this.lastLoggedL1BlockNumber = currentL1BlockNumber; - } + + if ( + currentL1BlockNumber <= lastL1Blocks.addedBlock && + currentL1BlockNumber <= lastL1Blocks.addedMessages && + currentL1BlockNumber <= lastL1Blocks.cancelledMessages + ) { + // chain hasn't moved forward + // or it's been rolled back return; } @@ -152,61 +162,61 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource * to ensure that data is read exactly once. * * The first is the problem of eventually consistent ETH service providers like Infura. - * We currently read from the last L1 block that we saw emit an L2 block. This could mean - * that the archiver ends up looking at the same L1 block multiple times (e.g. if we last saw - * an L2 block emitted at L1 block 10, we'd constantly ask for L1 blocks from 11 onwards until - * we see another L2 block). For this to work message and block processing need to be idempotent. - * We should re-visit this before mainnet launch. + * Each L1 read operation will query data from the last L1 block that it saw emit its kind of data. + * (so pending L1 to L2 messages will read from the last L1 block that emitted a message and so on) + * This will mean the archiver will lag behind L1 and will only advance when there's L2-relevant activity on the chain. * * The second is that in between the various calls to L1, the block number can move meaning some * of the following calls will return data for blocks that were not present during earlier calls. - * It's possible that we actually received messages in block currentBlockNumber + 1 meaning the next time - * we do this sync we get the same message again. Additionally, the call to get cancelled L1 to L2 messages - * could read from a block not present when retrieving pending messages. If a message was added and cancelled - * in the same eth block then we could try and cancel a non-existent pending message. - * * To combat this for the time being we simply ensure that all data retrieval methods only retrieve * data up to the currentBlockNumber captured at the top of this function. We might want to improve on this * in future but for the time being it should give us the guarantees that we need - * */ - // ********** Events that are processed in between blocks ********** + // ********** Events that are processed per L1 block ********** // Process l1ToL2Messages, these are consumed as time passes, not each block const retrievedPendingL1ToL2Messages = await retrieveNewPendingL1ToL2Messages( this.publicClient, this.inboxAddress, blockUntilSynced, - lastProcessedL1BlockNumber + 1n, // + 1 to prevent re-including messages from the last processed block + lastL1Blocks.addedMessages + 1n, currentL1BlockNumber, ); const retrievedCancelledL1ToL2Messages = await retrieveNewCancelledL1ToL2Messages( this.publicClient, this.inboxAddress, blockUntilSynced, - lastProcessedL1BlockNumber + 1n, + lastL1Blocks.cancelledMessages + 1n, currentL1BlockNumber, ); - // TODO (#717): optimize this - there could be messages in confirmed that are also in pending. - // Or messages in pending that are also cancelled in the same block. No need to modify storage for them. + // group pending messages and cancelled messages by their L1 block number + const messagesByBlock = new Map(); + for (const [message, blockNumber] of retrievedPendingL1ToL2Messages.retrievedData) { + const messages = messagesByBlock.get(blockNumber) || [[], []]; + messages[0].push(message); + messagesByBlock.set(blockNumber, messages); + } - if (retrievedPendingL1ToL2Messages.retrievedData.length) { - // Store l1 to l2 messages - this.log(`Adding ${retrievedPendingL1ToL2Messages.retrievedData.length} pending l1 to l2 messages to store`); - await this.store.addPendingL1ToL2Messages(retrievedPendingL1ToL2Messages.retrievedData); + for (const [messageKey, blockNumber] of retrievedCancelledL1ToL2Messages.retrievedData) { + const messages = messagesByBlock.get(blockNumber) || [[], []]; + messages[1].push(messageKey); + messagesByBlock.set(blockNumber, messages); } - if (retrievedCancelledL1ToL2Messages.retrievedData.length) { - // remove cancelled messages from the pending message store: + // process messages from each L1 block in sequence + const l1BlocksWithMessages = Array.from(messagesByBlock.keys()).sort((a, b) => (a < b ? -1 : a === b ? 0 : 1)); + for (const l1Block of l1BlocksWithMessages) { + const [newMessages, cancelledMessages] = messagesByBlock.get(l1Block)!; this.log( - `Removing ${retrievedCancelledL1ToL2Messages.retrievedData.length} pending l1 to l2 messages from store where messages were cancelled`, + `Adding ${newMessages.length} new messages and ${cancelledMessages.length} cancelled messages in L1 block ${l1Block}`, ); - await this.store.cancelPendingL1ToL2Messages(retrievedCancelledL1ToL2Messages.retrievedData); + await this.store.addPendingL1ToL2Messages(newMessages, l1Block); + await this.store.cancelPendingL1ToL2Messages(cancelledMessages, l1Block); } - // ********** Events that are processed per block ********** + // ********** Events that are processed per L2 block ********** // Read all data from chain and then write to our stores at the end const nextExpectedL2BlockNum = BigInt((await this.store.getBlockNumber()) + 1); @@ -214,7 +224,7 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource this.publicClient, this.rollupAddress, blockUntilSynced, - lastProcessedL1BlockNumber + 1n, + lastL1Blocks.addedBlock + 1n, currentL1BlockNumber, nextExpectedL2BlockNum, ); @@ -224,7 +234,7 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource } else { this.log( `Retrieved ${retrievedBlocks.retrievedData.length} new L2 blocks between L1 blocks ${ - lastProcessedL1BlockNumber + 1n + lastL1Blocks.addedBlock + 1n } and ${currentL1BlockNumber}.`, ); } @@ -238,7 +248,7 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource this.publicClient, this.contractDeploymentEmitterAddress, blockUntilSynced, - lastProcessedL1BlockNumber + 1n, + lastL1Blocks.addedBlock + 1n, currentL1BlockNumber, blockHashMapping, ); @@ -264,9 +274,10 @@ export class Archiver implements L2BlockSource, L2LogsSource, ContractDataSource // from retrieved L2Blocks, confirm L1 to L2 messages that have been published // from each l2block fetch all messageKeys in a flattened array: - const messageKeysToRemove = retrievedBlocks.retrievedData.map(l2block => l2block.newL1ToL2Messages).flat(); this.log(`Confirming l1 to l2 messages in store`); - await this.store.confirmL1ToL2Messages(messageKeysToRemove); + for (const block of retrievedBlocks.retrievedData) { + await this.store.confirmL1ToL2Messages(block.newL1ToL2Messages); + } // store retrieved L2 blocks after removing new logs information. // remove logs to serve "lightweight" block information. Logs can be fetched separately if needed. diff --git a/yarn-project/archiver/src/archiver/archiver_store.ts b/yarn-project/archiver/src/archiver/archiver_store.ts index 44fe30cf9e5..0d4c1f13c26 100644 --- a/yarn-project/archiver/src/archiver/archiver_store.ts +++ b/yarn-project/archiver/src/archiver/archiver_store.ts @@ -1,5 +1,4 @@ import { - CancelledL1ToL2Message, ContractData, ExtendedContractData, GetUnencryptedLogsResponse, @@ -9,12 +8,23 @@ import { L2Tx, LogFilter, LogType, - PendingL1ToL2Message, TxHash, } from '@aztec/circuit-types'; import { Fr } from '@aztec/circuits.js'; import { AztecAddress } from '@aztec/foundation/aztec-address'; +/** + * Represents the latest L1 block processed by the archiver for various objects in L2. + */ +export type ArchiverL1SynchPoint = { + /** The last L1 block that added a new L2 block. */ + addedBlock: bigint; + /** The last L1 block that added pending messages */ + addedMessages: bigint; + /** The last L1 block that cancelled messages */ + cancelledMessages: bigint; +}; + /** * Interface describing a data store to be used by the archiver to store all its relevant data * (blocks, encrypted logs, aztec contract data extended contract data). @@ -58,16 +68,18 @@ export interface ArchiverDataStore { /** * Append new pending L1 to L2 messages to the store. * @param messages - The L1 to L2 messages to be added to the store. + * @param l1BlockNumber - The block number of the L1 block that added the messages. * @returns True if the operation is successful. */ - addPendingL1ToL2Messages(messages: PendingL1ToL2Message[]): Promise; + addPendingL1ToL2Messages(messages: L1ToL2Message[], l1BlockNumber: bigint): Promise; /** * Remove pending L1 to L2 messages from the store (if they were cancelled). * @param message - The message keys to be removed from the store. + * @param l1BlockNumber - The block number of the L1 block that cancelled the messages. * @returns True if the operation is successful. */ - cancelPendingL1ToL2Messages(message: CancelledL1ToL2Message[]): Promise; + cancelPendingL1ToL2Messages(message: Fr[], l1BlockNumber: bigint): Promise; /** * Messages that have been published in an L2 block are confirmed. @@ -152,7 +164,7 @@ export interface ArchiverDataStore { getBlockNumber(): Promise; /** - * Gets the number of the latest L1 block processed. + * Gets the last L1 block number processed by the archiver */ - getL1BlockNumber(): Promise; + getL1BlockNumber(): Promise; } diff --git a/yarn-project/archiver/src/archiver/archiver_store_test_suite.ts b/yarn-project/archiver/src/archiver/archiver_store_test_suite.ts index a044d6afcbb..788c0894d4e 100644 --- a/yarn-project/archiver/src/archiver/archiver_store_test_suite.ts +++ b/yarn-project/archiver/src/archiver/archiver_store_test_suite.ts @@ -1,5 +1,4 @@ import { - CancelledL1ToL2Message, ExtendedContractData, INITIAL_L2_BLOCK_NUM, L1ToL2Message, @@ -7,7 +6,6 @@ import { L2BlockContext, LogId, LogType, - PendingL1ToL2Message, TxHash, UnencryptedL2Log, } from '@aztec/circuit-types'; @@ -88,12 +86,39 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch describe('getL1BlockNumber', () => { it('returns 0n if no blocks have been added', async () => { - await expect(store.getL1BlockNumber()).resolves.toEqual(0n); + await expect(store.getL1BlockNumber()).resolves.toEqual({ + addedBlock: 0n, + addedMessages: 0n, + cancelledMessages: 0n, + }); }); it('returns the L1 block number in which the most recent L2 block was published', async () => { await store.addBlocks(blocks); - await expect(store.getL1BlockNumber()).resolves.toEqual(blocks.at(-1)!.getL1BlockNumber()); + await expect(store.getL1BlockNumber()).resolves.toEqual({ + addedBlock: blocks.at(-1)!.getL1BlockNumber(), + addedMessages: 0n, + cancelledMessages: 0n, + }); + }); + + it('returns the L1 block number that most recently added pending messages', async () => { + await store.addPendingL1ToL2Messages([L1ToL2Message.random(Fr.random())], 1n); + await expect(store.getL1BlockNumber()).resolves.toEqual({ + addedBlock: 0n, + addedMessages: 1n, + cancelledMessages: 0n, + }); + }); + it('returns the L1 block number that most recently cancelled pending messages', async () => { + const message = L1ToL2Message.random(Fr.random()); + await store.addPendingL1ToL2Messages([message], 1n); + await store.cancelPendingL1ToL2Messages([message.entryKey!], 2n); + await expect(store.getL1BlockNumber()).resolves.toEqual({ + addedBlock: 0n, + addedMessages: 1n, + cancelledMessages: 2n, + }); }); }); @@ -151,66 +176,49 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch describe('addPendingL1ToL2Messages', () => { it('stores pending L1 to L2 messages', async () => { - await expect( - store.addPendingL1ToL2Messages([new PendingL1ToL2Message(L1ToL2Message.random(Fr.random()), 1n, 0)]), - ).resolves.toEqual(true); + await expect(store.addPendingL1ToL2Messages([L1ToL2Message.random(Fr.random())], 1n)).resolves.toEqual(true); }); it('allows duplicate pending messages in different positions in the same block', async () => { const message = L1ToL2Message.random(Fr.random()); - await expect( - store.addPendingL1ToL2Messages([ - new PendingL1ToL2Message(message, 1n, 0), - new PendingL1ToL2Message(message, 1n, 1), - ]), - ).resolves.toEqual(true); + await expect(store.addPendingL1ToL2Messages([message, message], 1n)).resolves.toEqual(true); await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([message.entryKey!, message.entryKey!]); }); it('allows duplicate pending messages in different blocks', async () => { const message = L1ToL2Message.random(Fr.random()); - await expect( - store.addPendingL1ToL2Messages([ - new PendingL1ToL2Message(message, 1n, 0), - new PendingL1ToL2Message(message, 2n, 0), - ]), - ).resolves.toEqual(true); + await expect(store.addPendingL1ToL2Messages([message], 1n)).resolves.toEqual(true); + await expect(store.addPendingL1ToL2Messages([message], 2n)).resolves.toEqual(true); await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([message.entryKey!, message.entryKey!]); }); it('is idempotent', async () => { const message = L1ToL2Message.random(Fr.random()); - await expect( - store.addPendingL1ToL2Messages([ - new PendingL1ToL2Message(message, 1n, 0), - new PendingL1ToL2Message(message, 1n, 0), - ]), - ).resolves.toEqual(true); + await expect(store.addPendingL1ToL2Messages([message], 1n)).resolves.toEqual(true); + await expect(store.addPendingL1ToL2Messages([message], 1n)).resolves.toEqual(false); await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([message.entryKey!]); }); }); describe('getPendingL1ToL2Messages', () => { it('returns previously stored pending L1 to L2 messages', async () => { - const messageCtx = new PendingL1ToL2Message(L1ToL2Message.random(Fr.random()), 1n, 0); - await store.addPendingL1ToL2Messages([messageCtx]); - await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([messageCtx.message.entryKey!]); + const message = L1ToL2Message.random(Fr.random()); + await store.addPendingL1ToL2Messages([message], 1n); + await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([message.entryKey!]); }); it('returns messages ordered by fee', async () => { - const messageCtxs = Array.from({ length: 3 }).map( - (_, i) => new PendingL1ToL2Message(L1ToL2Message.random(Fr.random()), 1n, i), - ); + const messages = Array.from({ length: 3 }, () => L1ToL2Message.random(Fr.random())); // add a duplicate message - messageCtxs.push(new PendingL1ToL2Message(messageCtxs[0].message, 1n, 3)); + messages.push(messages[0]); - await store.addPendingL1ToL2Messages(messageCtxs); + await store.addPendingL1ToL2Messages(messages, 1n); - messageCtxs.sort((a, b) => b.message.fee - a.message.fee); - await expect(store.getPendingL1ToL2MessageKeys(messageCtxs.length)).resolves.toEqual( - messageCtxs.map(({ message }) => message.entryKey!), + messages.sort((a, b) => b.fee - a.fee); + await expect(store.getPendingL1ToL2MessageKeys(messages.length)).resolves.toEqual( + messages.map(message => message.entryKey!), ); }); @@ -221,33 +229,29 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch describe('confirmL1ToL2Messages', () => { it('updates a message from pending to confirmed', async () => { - const messageCtx = new PendingL1ToL2Message(L1ToL2Message.random(Fr.random()), 1n, 0); - await store.addPendingL1ToL2Messages([messageCtx]); - await expect(store.confirmL1ToL2Messages([messageCtx.message.entryKey!])).resolves.toEqual(true); + const message = L1ToL2Message.random(Fr.random()); + await store.addPendingL1ToL2Messages([message], 1n); + await expect(store.confirmL1ToL2Messages([message.entryKey!])).resolves.toEqual(true); }); it('once confirmed, a message is no longer pending', async () => { - const pendingMessage = new PendingL1ToL2Message(L1ToL2Message.random(Fr.random()), 1n, 0); - await store.addPendingL1ToL2Messages([pendingMessage]); - await store.confirmL1ToL2Messages([pendingMessage.message.entryKey!]); + const message = L1ToL2Message.random(Fr.random()); + await store.addPendingL1ToL2Messages([message], 1n); + await store.confirmL1ToL2Messages([message.entryKey!]); await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([]); }); it('once confirmed a message can also be pending if added again', async () => { const message = L1ToL2Message.random(Fr.random()); - await store.addPendingL1ToL2Messages([new PendingL1ToL2Message(message, 1n, 0)]); + await store.addPendingL1ToL2Messages([message], 1n); await store.confirmL1ToL2Messages([message.entryKey!]); - await store.addPendingL1ToL2Messages([new PendingL1ToL2Message(message, 2n, 0)]); + await store.addPendingL1ToL2Messages([message], 2n); await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([message.entryKey!]); }); it('once confirmed a message can remain pending if more of it were pending', async () => { const message = L1ToL2Message.random(Fr.random()); - await store.addPendingL1ToL2Messages([ - new PendingL1ToL2Message(message, 1n, 0), - new PendingL1ToL2Message(message, 1n, 1), - ]); - + await store.addPendingL1ToL2Messages([message, message], 1n); await store.confirmL1ToL2Messages([message.entryKey!]); await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([message.entryKey!]); }); @@ -256,80 +260,61 @@ export function describeArchiverDataStore(testName: string, getStore: () => Arch describe('cancelL1ToL2Messages', () => { it('cancels a pending message', async () => { const message = L1ToL2Message.random(Fr.random()); - await store.addPendingL1ToL2Messages([new PendingL1ToL2Message(message, 1n, 0)]); - await store.cancelPendingL1ToL2Messages([new CancelledL1ToL2Message(message.entryKey!, 1n, 0)]); + await store.addPendingL1ToL2Messages([message], 1n); + await store.cancelPendingL1ToL2Messages([message.entryKey!], 1n); await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([]); }); it('cancels only one of the pending messages if duplicates exist', async () => { const message = L1ToL2Message.random(Fr.random()); - await store.addPendingL1ToL2Messages([ - new PendingL1ToL2Message(message, 1n, 0), - new PendingL1ToL2Message(message, 1n, 1), - ]); - await store.cancelPendingL1ToL2Messages([new CancelledL1ToL2Message(message.entryKey!, 2n, 0)]); + await store.addPendingL1ToL2Messages([message, message], 1n); + await store.cancelPendingL1ToL2Messages([message.entryKey!], 1n); await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([message.entryKey]); }); it('once canceled a message can also be pending if added again', async () => { const message = L1ToL2Message.random(Fr.random()); - await store.addPendingL1ToL2Messages([new PendingL1ToL2Message(message, 1n, 0)]); + await store.addPendingL1ToL2Messages([message], 1n); - await store.cancelPendingL1ToL2Messages([new CancelledL1ToL2Message(message.entryKey!, 1n, 0)]); + await store.cancelPendingL1ToL2Messages([message.entryKey!], 1n); await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([]); - await store.addPendingL1ToL2Messages([new PendingL1ToL2Message(message, 2n, 0)]); + await store.addPendingL1ToL2Messages([message], 2n); await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([message.entryKey!]); }); it('allows adding and cancelling in the same block', async () => { const message = L1ToL2Message.random(Fr.random()); - await store.addPendingL1ToL2Messages([new PendingL1ToL2Message(message, 1n, 0)]); - await store.cancelPendingL1ToL2Messages([new CancelledL1ToL2Message(message.entryKey!, 1n, 0)]); + await store.addPendingL1ToL2Messages([message], 1n); + await store.cancelPendingL1ToL2Messages([message.entryKey!], 1n); await expect(store.getPendingL1ToL2MessageKeys(1)).resolves.toEqual([]); }); it('allows duplicates cancellations in different positions in the same block', async () => { const message = L1ToL2Message.random(Fr.random()); - await store.addPendingL1ToL2Messages([ - new PendingL1ToL2Message(message, 1n, 0), - new PendingL1ToL2Message(message, 1n, 1), - ]); + await store.addPendingL1ToL2Messages([message, message], 1n); - await store.cancelPendingL1ToL2Messages([ - new CancelledL1ToL2Message(message.entryKey!, 2n, 0), - new CancelledL1ToL2Message(message.entryKey!, 2n, 1), - ]); + await store.cancelPendingL1ToL2Messages([message.entryKey!, message.entryKey!], 1n); await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([]); }); it('allows duplicates cancellations in different blocks', async () => { const message = L1ToL2Message.random(Fr.random()); - await store.addPendingL1ToL2Messages([ - new PendingL1ToL2Message(message, 1n, 0), - new PendingL1ToL2Message(message, 1n, 1), - ]); + await store.addPendingL1ToL2Messages([message, message], 1n); - await store.cancelPendingL1ToL2Messages([ - new CancelledL1ToL2Message(message.entryKey!, 2n, 0), - new CancelledL1ToL2Message(message.entryKey!, 3n, 0), - ]); + await store.cancelPendingL1ToL2Messages([message.entryKey!], 2n); + await store.cancelPendingL1ToL2Messages([message.entryKey!], 3n); await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([]); }); it('is idempotent', async () => { const message = L1ToL2Message.random(Fr.random()); - await store.addPendingL1ToL2Messages([ - new PendingL1ToL2Message(message, 1n, 0), - new PendingL1ToL2Message(message, 1n, 1), - ]); + await store.addPendingL1ToL2Messages([message, message], 1n); - await store.cancelPendingL1ToL2Messages([ - new CancelledL1ToL2Message(message.entryKey!, 2n, 0), - new CancelledL1ToL2Message(message.entryKey!, 2n, 0), - ]); + await store.cancelPendingL1ToL2Messages([message.entryKey!], 2n); + await store.cancelPendingL1ToL2Messages([message.entryKey!], 2n); await expect(store.getPendingL1ToL2MessageKeys(2)).resolves.toEqual([message.entryKey!]); }); diff --git a/yarn-project/archiver/src/archiver/data_retrieval.ts b/yarn-project/archiver/src/archiver/data_retrieval.ts index 81ec341e160..b0673fd9cc8 100644 --- a/yarn-project/archiver/src/archiver/data_retrieval.ts +++ b/yarn-project/archiver/src/archiver/data_retrieval.ts @@ -1,4 +1,5 @@ -import { CancelledL1ToL2Message, ExtendedContractData, L2Block, PendingL1ToL2Message } from '@aztec/circuit-types'; +import { ExtendedContractData, L1ToL2Message, L2Block } from '@aztec/circuit-types'; +import { Fr } from '@aztec/circuits.js'; import { EthAddress } from '@aztec/foundation/eth-address'; import { PublicClient } from 'viem'; @@ -123,8 +124,8 @@ export async function retrieveNewPendingL1ToL2Messages( blockUntilSynced: boolean, searchStartBlock: bigint, searchEndBlock: bigint, -): Promise> { - const retrievedNewL1ToL2Messages: PendingL1ToL2Message[] = []; +): Promise> { + const retrievedNewL1ToL2Messages: [L1ToL2Message, bigint][] = []; do { if (searchStartBlock > searchEndBlock) { break; @@ -161,8 +162,8 @@ export async function retrieveNewCancelledL1ToL2Messages( blockUntilSynced: boolean, searchStartBlock: bigint, searchEndBlock: bigint, -): Promise> { - const retrievedNewCancelledL1ToL2Messages: CancelledL1ToL2Message[] = []; +): Promise> { + const retrievedNewCancelledL1ToL2Messages: [Fr, bigint][] = []; do { if (searchStartBlock > searchEndBlock) { break; diff --git a/yarn-project/archiver/src/archiver/eth_log_handlers.ts b/yarn-project/archiver/src/archiver/eth_log_handlers.ts index ebdd62a3e98..52133e86d17 100644 --- a/yarn-project/archiver/src/archiver/eth_log_handlers.ts +++ b/yarn-project/archiver/src/archiver/eth_log_handlers.ts @@ -1,5 +1,4 @@ import { - CancelledL1ToL2Message, ContractData, EncodedContractFunction, ExtendedContractData, @@ -7,7 +6,6 @@ import { L1ToL2Message, L2Actor, L2Block, - PendingL1ToL2Message, } from '@aztec/circuit-types'; import { AztecAddress } from '@aztec/foundation/aztec-address'; import { EthAddress } from '@aztec/foundation/eth-address'; @@ -24,26 +22,23 @@ import { Hex, Log, PublicClient, decodeFunctionData, getAbiItem, getAddress, hex */ export function processPendingL1ToL2MessageAddedLogs( logs: Log[], -): PendingL1ToL2Message[] { - const l1ToL2Messages: PendingL1ToL2Message[] = []; - for (const [index, log] of logs.entries()) { +): [L1ToL2Message, bigint][] { + const l1ToL2Messages: [L1ToL2Message, bigint][] = []; + for (const log of logs) { const { sender, senderChainId, recipient, recipientVersion, content, secretHash, deadline, fee, entryKey } = log.args; - l1ToL2Messages.push( - new PendingL1ToL2Message( - new L1ToL2Message( - new L1Actor(EthAddress.fromString(sender), Number(senderChainId)), - new L2Actor(AztecAddress.fromString(recipient), Number(recipientVersion)), - Fr.fromString(content), - Fr.fromString(secretHash), - deadline, - Number(fee), - Fr.fromString(entryKey), - ), - log.blockNumber!, - index, + l1ToL2Messages.push([ + new L1ToL2Message( + new L1Actor(EthAddress.fromString(sender), Number(senderChainId)), + new L2Actor(AztecAddress.fromString(recipient), Number(recipientVersion)), + Fr.fromString(content), + Fr.fromString(secretHash), + deadline, + Number(fee), + Fr.fromString(entryKey), ), - ); + log.blockNumber!, + ]); } return l1ToL2Messages; } @@ -55,10 +50,10 @@ export function processPendingL1ToL2MessageAddedLogs( */ export function processCancelledL1ToL2MessagesLogs( logs: Log[], -): CancelledL1ToL2Message[] { - const cancelledL1ToL2Messages: CancelledL1ToL2Message[] = []; - for (const [index, log] of logs.entries()) { - cancelledL1ToL2Messages.push(new CancelledL1ToL2Message(Fr.fromString(log.args.entryKey), log.blockNumber!, index)); +): [Fr, bigint][] { + const cancelledL1ToL2Messages: [Fr, bigint][] = []; + for (const log of logs) { + cancelledL1ToL2Messages.push([Fr.fromString(log.args.entryKey), log.blockNumber!]); } return cancelledL1ToL2Messages; } diff --git a/yarn-project/archiver/src/archiver/lmdb_archiver_store.test.ts b/yarn-project/archiver/src/archiver/lmdb_archiver_store.test.ts index 63014846b50..7a1ceacb4f1 100644 --- a/yarn-project/archiver/src/archiver/lmdb_archiver_store.test.ts +++ b/yarn-project/archiver/src/archiver/lmdb_archiver_store.test.ts @@ -1,33 +1,13 @@ -import { mkdtemp, rm } from 'fs/promises'; -import { RootDatabase, open } from 'lmdb'; -import { tmpdir } from 'os'; -import { join } from 'path'; +import { open } from 'lmdb'; import { describeArchiverDataStore } from './archiver_store_test_suite.js'; import { LMDBArchiverStore } from './lmdb_archiver_store.js'; describe('LMDB Memory Store', () => { let archiverStore: LMDBArchiverStore; - let tmpDbLocation: string; - let tmpDb: RootDatabase; - - beforeAll(async () => { - tmpDbLocation = await mkdtemp(join(tmpdir(), 'archiver-store-test-')); - tmpDb = open(tmpDbLocation, {}); - }); - - afterAll(async () => { - await tmpDb.close(); - await rm(tmpDbLocation, { recursive: true }); - }); beforeEach(() => { - archiverStore = new LMDBArchiverStore(tmpDb); - }); - - afterEach(async () => { - await archiverStore?.close(); - await tmpDb.clearAsync(); + archiverStore = new LMDBArchiverStore(open({} as any)); }); describeArchiverDataStore('LMDBArchiverStore', () => archiverStore); diff --git a/yarn-project/archiver/src/archiver/lmdb_archiver_store.ts b/yarn-project/archiver/src/archiver/lmdb_archiver_store.ts index e4f66c8fca1..6343c7dd0d9 100644 --- a/yarn-project/archiver/src/archiver/lmdb_archiver_store.ts +++ b/yarn-project/archiver/src/archiver/lmdb_archiver_store.ts @@ -1,5 +1,4 @@ import { - CancelledL1ToL2Message, ContractData, ExtendedContractData, ExtendedUnencryptedL2Log, @@ -12,18 +11,16 @@ import { LogFilter, LogId, LogType, - PendingL1ToL2Message, TxHash, UnencryptedL2Log, } from '@aztec/circuit-types'; import { Fr } from '@aztec/circuits.js'; import { AztecAddress } from '@aztec/foundation/aztec-address'; -import { toBigIntBE, toBufferBE } from '@aztec/foundation/bigint-buffer'; import { createDebugLogger } from '@aztec/foundation/log'; import { Database, RangeOptions, RootDatabase } from 'lmdb'; -import { ArchiverDataStore } from './archiver_store.js'; +import { ArchiverDataStore, ArchiverL1SynchPoint } from './archiver_store.js'; /* eslint-disable */ type L1ToL2MessageAndCount = { @@ -32,26 +29,19 @@ type L1ToL2MessageAndCount = { confirmedCount: number; }; -type L1ToL2MessageBlockKey = `${string}:${'newMessage' | 'cancelledMessage'}:${number}`; - -function l1ToL2MessageBlockKey( - l1BlockNumber: bigint, - key: 'newMessage' | 'cancelledMessage', - indexInBlock: number, -): L1ToL2MessageBlockKey { - return `${toBufferBE(l1BlockNumber, 32).toString('hex')}:${key}:${indexInBlock}`; -} - type BlockIndexValue = [blockNumber: number, index: number]; type BlockContext = { block?: Uint8Array; blockHash?: Uint8Array; - l1BlockNumber?: Uint8Array; + l1BlockNumber?: bigint; encryptedLogs?: Uint8Array; unencryptedLogs?: Uint8Array; extendedContractData?: Array; }; + +const L1_BLOCK_ADDED_PENDING_MESSAGE = 'l1BlockAddedPendingMessage'; +const L1_BLOCK_CANCELLED_MESSAGE = 'l1BlockCancelledMessage'; /* eslint-enable */ /** @@ -68,7 +58,7 @@ export class LMDBArchiverStore implements ArchiverDataStore { /** L1 to L2 messages */ l1ToL2Messages: Database; /** Which blocks emitted which messages */ - l1ToL2MessagesByBlock: Database; + l1ToL2MessagesByBlock: Database; /** Pending L1 to L2 messages sorted by their fee, in buckets (dupSort=true) */ pendingMessagesByFee: Database; }; @@ -97,7 +87,7 @@ export class LMDBArchiverStore implements ArchiverDataStore { }), l1ToL2MessagesByBlock: db.openDB('l1_to_l2_message_nonces', { keyEncoding: 'ordered-binary', - encoding: 'binary', + encoding: 'msgpack', }), pendingMessagesByFee: db.openDB('pending_messages_by_fee', { keyEncoding: 'ordered-binary', @@ -125,7 +115,7 @@ export class LMDBArchiverStore implements ArchiverDataStore { for (const block of blocks) { const blockCtx = this.#tables.blocks.get(block.number) ?? {}; blockCtx.block = block.toBuffer(); - blockCtx.l1BlockNumber = toBufferBE(block.getL1BlockNumber(), 32); + blockCtx.l1BlockNumber = block.getL1BlockNumber(); blockCtx.blockHash = block.getBlockHash(); // no need to await, all writes are enqueued in the transaction @@ -225,76 +215,61 @@ export class LMDBArchiverStore implements ArchiverDataStore { /** * Append new pending L1 to L2 messages to the store. * @param messages - The L1 to L2 messages to be added to the store. + * @param l1BlockNumber - The L1 block number for which to add the messages. * @returns True if the operation is successful. */ - addPendingL1ToL2Messages(messages: PendingL1ToL2Message[]): Promise { + addPendingL1ToL2Messages(messages: L1ToL2Message[], l1BlockNumber: bigint): Promise { return this.#tables.l1ToL2Messages.transaction(() => { - for (const { message, blockNumber, indexInBlock } of messages) { + if ((this.#tables.l1ToL2MessagesByBlock.get(L1_BLOCK_ADDED_PENDING_MESSAGE) ?? 0n) >= l1BlockNumber) { + return false; + } + // ensure we don't add the same messages twice + void this.#tables.l1ToL2MessagesByBlock.put(L1_BLOCK_ADDED_PENDING_MESSAGE, l1BlockNumber); + + for (const message of messages) { const messageKey = message.entryKey?.toBuffer(); if (!messageKey) { throw new Error('Message does not have an entry key'); } - const dupeKey = l1ToL2MessageBlockKey(blockNumber, 'newMessage', indexInBlock); - const messageInBlock = this.#tables.l1ToL2MessagesByBlock.get(dupeKey); - - if (messageInBlock?.equals(messageKey)) { - continue; - } else { - if (messageInBlock) { - this.#log( - `Previously add pending message ${messageInBlock.toString( - 'hex', - )} at ${dupeKey.toString()}, now got ${messageKey.toString('hex')}`, - ); - } - - void this.#tables.l1ToL2MessagesByBlock.put(dupeKey, messageKey); - } - - let messageWithCount = this.#tables.l1ToL2Messages.get(messageKey); - if (!messageWithCount) { - messageWithCount = { + let messageCtx = this.#tables.l1ToL2Messages.get(messageKey); + if (!messageCtx) { + messageCtx = { message: message.toBuffer(), pendingCount: 0, confirmedCount: 0, }; - void this.#tables.l1ToL2Messages.put(messageKey, messageWithCount); + void this.#tables.l1ToL2Messages.put(messageKey, messageCtx); } this.#updateMessageCountInTx(messageKey, message, 1, 0); } + return true; }); } /** * Remove pending L1 to L2 messages from the store (if they were cancelled). - * @param messages - The message keys to be removed from the store. + * @param cancelledMessages - The message keys to be removed from the store. + * @param l1BlockNumber - The L1 block number for which to remove the messages. * @returns True if the operation is successful. */ - cancelPendingL1ToL2Messages(messages: CancelledL1ToL2Message[]): Promise { + cancelPendingL1ToL2Messages(cancelledMessages: Fr[], l1BlockNumber: bigint): Promise { return this.#tables.l1ToL2Messages.transaction(() => { - for (const { blockNumber, indexInBlock, entryKey } of messages) { - const messageKey = entryKey.toBuffer(); - const dupeKey = l1ToL2MessageBlockKey(blockNumber, 'cancelledMessage', indexInBlock); - const messageInBlock = this.#tables.l1ToL2MessagesByBlock.get(dupeKey); - if (messageInBlock?.equals(messageKey)) { + if ((this.#tables.l1ToL2MessagesByBlock.get(L1_BLOCK_CANCELLED_MESSAGE) ?? 0n) >= l1BlockNumber) { + return false; + } + void this.#tables.l1ToL2MessagesByBlock.put(L1_BLOCK_CANCELLED_MESSAGE, l1BlockNumber); + + for (const messageKey of cancelledMessages) { + const message = this.#getL1ToL2Message(messageKey.toBuffer()); + if (!message) { continue; - } else { - if (messageInBlock) { - this.#log( - `Previously add pending message ${messageInBlock.toString( - 'hex', - )} at ${dupeKey.toString()}, now got ${messageKey.toString('hex')}`, - ); - } - void this.#tables.l1ToL2MessagesByBlock.put(dupeKey, messageKey); } - - const message = this.#getL1ToL2Message(messageKey); - this.#updateMessageCountInTx(messageKey, message, -1, 0); + this.#updateMessageCountInTx(messageKey.toBuffer(), message, -1, 0); } + return true; }); } @@ -600,19 +575,21 @@ export class LMDBArchiverStore implements ArchiverDataStore { return Promise.resolve(typeof lastBlockNumber === 'number' ? lastBlockNumber : INITIAL_L2_BLOCK_NUM - 1); } - getL1BlockNumber(): Promise { + /** + * Gets the last L1 block number processed by the archiver + */ + getL1BlockNumber(): Promise { // inverse range with no start/end will return the last value - const [lastBlock] = this.#tables.blocks.getRange({ reverse: true, limit: 1 }).asArray; - if (!lastBlock) { - return Promise.resolve(0n); - } else { - const blockCtx = lastBlock.value; - if (!blockCtx.l1BlockNumber) { - return Promise.reject(new Error('L1 block number not found')); - } else { - return Promise.resolve(toBigIntBE(asBuffer(blockCtx.l1BlockNumber))); - } - } + const [lastL2Block] = this.#tables.blocks.getRange({ reverse: true, limit: 1 }).asArray; + const addedBlock = lastL2Block?.value?.l1BlockNumber ?? 0n; + const addedMessages = this.#tables.l1ToL2MessagesByBlock.get(L1_BLOCK_ADDED_PENDING_MESSAGE) ?? 0n; + const cancelledMessages = this.#tables.l1ToL2MessagesByBlock.get(L1_BLOCK_CANCELLED_MESSAGE) ?? 0n; + + return Promise.resolve({ + addedBlock, + addedMessages, + cancelledMessages, + }); } #getBlock(blockNumber: number, withLogs = false): L2Block | undefined { diff --git a/yarn-project/archiver/src/archiver/memory_archiver_store/l1_to_l2_message_store.test.ts b/yarn-project/archiver/src/archiver/memory_archiver_store/l1_to_l2_message_store.test.ts index 1c910e2d6b5..3a60f300409 100644 --- a/yarn-project/archiver/src/archiver/memory_archiver_store/l1_to_l2_message_store.test.ts +++ b/yarn-project/archiver/src/archiver/memory_archiver_store/l1_to_l2_message_store.test.ts @@ -16,21 +16,15 @@ describe('l1_to_l2_message_store', () => { }); it('addMessage adds a message', () => { - store.addMessage(entryKey, msg, 1n, 0); + store.addMessage(entryKey, msg); expect(store.getMessage(entryKey)).toEqual(msg); }); it('addMessage increments the count if the message is already in the store', () => { - store.addMessage(entryKey, msg, 1n, 0); - store.addMessage(entryKey, msg, 1n, 1); + store.addMessage(entryKey, msg); + store.addMessage(entryKey, msg); expect(store.getMessageAndCount(entryKey)).toEqual({ message: msg, count: 2 }); }); - - it('addMessage does not increment the count if the message is already in the store at the same position', () => { - store.addMessage(entryKey, msg, 1n, 0); - store.addMessage(entryKey, msg, 1n, 0); - expect(store.getMessageAndCount(entryKey)).toEqual({ message: msg, count: 1 }); - }); }); describe('pending_l1_to_l2_message_store', () => { @@ -46,22 +40,22 @@ describe('pending_l1_to_l2_message_store', () => { }); it('removeMessage removes the message if the count is 1', () => { - store.addMessage(entryKey, msg, 1n, 0); - store.removeMessage(entryKey, 2n, 0); + store.addMessage(entryKey, msg); + store.removeMessage(entryKey); expect(store.getMessage(entryKey)).toBeUndefined(); }); it("handles case when removing a message that doesn't exist", () => { - expect(() => store.removeMessage(new Fr(0), 1n, 0)).not.toThrow(); + expect(() => store.removeMessage(new Fr(0))).not.toThrow(); const one = new Fr(1); - expect(() => store.removeMessage(one, 1n, 0)).toThrow(`Message with key ${one.value} not found in store`); + expect(() => store.removeMessage(one)).toThrow(`Message with key ${one.value} not found in store`); }); it('removeMessage decrements the count if the message is already in the store', () => { - store.addMessage(entryKey, msg, 1n, 0); - store.addMessage(entryKey, msg, 1n, 1); - store.addMessage(entryKey, msg, 1n, 2); - store.removeMessage(entryKey, 2n, 0); + store.addMessage(entryKey, msg); + store.addMessage(entryKey, msg); + store.addMessage(entryKey, msg); + store.removeMessage(entryKey); expect(store.getMessageAndCount(entryKey)).toEqual({ message: msg, count: 2 }); }); @@ -70,21 +64,21 @@ describe('pending_l1_to_l2_message_store', () => { }); it('getMessageKeys returns an empty array if limit is 0', () => { - store.addMessage(entryKey, msg, 1n, 0); + store.addMessage(entryKey, msg); expect(store.getMessageKeys(0)).toEqual([]); }); it('get messages for a non-empty store when limit > number of messages in store', () => { const entryKeys = [1, 2, 3, 4, 5].map(x => new Fr(x)); - entryKeys.forEach((entryKey, i) => { - store.addMessage(entryKey, L1ToL2Message.random(), 1n, i); + entryKeys.forEach(entryKey => { + store.addMessage(entryKey, L1ToL2Message.random()); }); expect(store.getMessageKeys(10).length).toEqual(5); }); it('get messages returns messages sorted by fees and also includes multiple of the same message', () => { const entryKeys = [1, 2, 3, 3, 3, 4].map(x => new Fr(x)); - entryKeys.forEach((entryKey, i) => { + entryKeys.forEach(entryKey => { // set msg.fee to entryKey to test the sort. const msg = new L1ToL2Message( L1Actor.random(), @@ -95,7 +89,7 @@ describe('pending_l1_to_l2_message_store', () => { Number(entryKey.value), entryKey, ); - store.addMessage(entryKey, msg, 1n, i); + store.addMessage(entryKey, msg); }); const expectedMessageFees = [4n, 3n, 3n, 3n]; // the top 4. const receivedMessageFees = store.getMessageKeys(4).map(key => key.value); diff --git a/yarn-project/archiver/src/archiver/memory_archiver_store/l1_to_l2_message_store.ts b/yarn-project/archiver/src/archiver/memory_archiver_store/l1_to_l2_message_store.ts index e5a7ced4bfc..ae6c6988957 100644 --- a/yarn-project/archiver/src/archiver/memory_archiver_store/l1_to_l2_message_store.ts +++ b/yarn-project/archiver/src/archiver/memory_archiver_store/l1_to_l2_message_store.ts @@ -11,21 +11,11 @@ export class L1ToL2MessageStore { * messages (and the number of times the message has been seen). */ protected store: Map = new Map(); - private messagesByBlock = new Set(); constructor() {} - addMessage(messageKey: Fr, message: L1ToL2Message, l1BlocKNumber: bigint, messageIndex: number) { - if (this.messagesByBlock.has(`${l1BlocKNumber}-${messageIndex}`)) { - return; - } - this.messagesByBlock.add(`${l1BlocKNumber}-${messageIndex}`); - - this.addMessageUnsafe(messageKey, message); - } - - addMessageUnsafe(messageKey: Fr, message: L1ToL2Message) { - const messageKeyBigInt = messageKey.value; + addMessage(messageKey: Fr, message: L1ToL2Message) { + const messageKeyBigInt = messageKey.toBigInt(); const msgAndCount = this.store.get(messageKeyBigInt); if (msgAndCount) { msgAndCount.count++; @@ -48,7 +38,6 @@ export class L1ToL2MessageStore { * for removing messages or fetching multiple messages. */ export class PendingL1ToL2MessageStore extends L1ToL2MessageStore { - private cancelledMessagesByBlock = new Set(); getMessageKeys(limit: number): Fr[] { if (limit < 1) { return []; @@ -68,20 +57,12 @@ export class PendingL1ToL2MessageStore extends L1ToL2MessageStore { return messages; } - removeMessage(messageKey: Fr, l1BlockNumber: bigint, messageIndex: number) { + removeMessage(messageKey: Fr) { // ignore 0 - messageKey is a hash, so a 0 can probabilistically never occur. It is best to skip it. if (messageKey.equals(Fr.ZERO)) { return; } - if (this.cancelledMessagesByBlock.has(`${l1BlockNumber}-${messageIndex}`)) { - return; - } - this.cancelledMessagesByBlock.add(`${l1BlockNumber}-${messageIndex}`); - this.removeMessageUnsafe(messageKey); - } - - removeMessageUnsafe(messageKey: Fr) { const messageKeyBigInt = messageKey.value; const msgAndCount = this.store.get(messageKeyBigInt); if (!msgAndCount) { diff --git a/yarn-project/archiver/src/archiver/memory_archiver_store/memory_archiver_store.ts b/yarn-project/archiver/src/archiver/memory_archiver_store/memory_archiver_store.ts index 1c2c2fd5369..62824ae1fe3 100644 --- a/yarn-project/archiver/src/archiver/memory_archiver_store/memory_archiver_store.ts +++ b/yarn-project/archiver/src/archiver/memory_archiver_store/memory_archiver_store.ts @@ -1,5 +1,4 @@ import { - CancelledL1ToL2Message, ContractData, ExtendedContractData, ExtendedUnencryptedL2Log, @@ -13,7 +12,6 @@ import { LogFilter, LogId, LogType, - PendingL1ToL2Message, TxHash, UnencryptedL2Log, } from '@aztec/circuit-types'; @@ -70,6 +68,9 @@ export class MemoryArchiverStore implements ArchiverDataStore { */ private pendingL1ToL2Messages: PendingL1ToL2MessageStore = new PendingL1ToL2MessageStore(); + private lastL1BlockAddedMessages: bigint = 0n; + private lastL1BlockCancelledMessages: bigint = 0n; + constructor( /** The max number of logs that can be obtained in 1 "getUnencryptedLogs" call. */ public readonly maxLogs: number, @@ -108,11 +109,17 @@ export class MemoryArchiverStore implements ArchiverDataStore { /** * Append new pending L1 to L2 messages to the store. * @param messages - The L1 to L2 messages to be added to the store. + * @param l1BlockNumber - The L1 block number for which to add the messages. * @returns True if the operation is successful (always in this implementation). */ - public addPendingL1ToL2Messages(messages: PendingL1ToL2Message[]): Promise { - for (const { message, blockNumber, indexInBlock } of messages) { - this.pendingL1ToL2Messages.addMessage(message.entryKey!, message, blockNumber, indexInBlock); + public addPendingL1ToL2Messages(messages: L1ToL2Message[], l1BlockNumber: bigint): Promise { + if (l1BlockNumber <= this.lastL1BlockAddedMessages) { + return Promise.resolve(false); + } + + this.lastL1BlockAddedMessages = l1BlockNumber; + for (const message of messages) { + this.pendingL1ToL2Messages.addMessage(message.entryKey!, message); } return Promise.resolve(true); } @@ -120,11 +127,17 @@ export class MemoryArchiverStore implements ArchiverDataStore { /** * Remove pending L1 to L2 messages from the store (if they were cancelled). * @param messages - The message keys to be removed from the store. + * @param l1BlockNumber - The L1 block number for which to remove the messages. * @returns True if the operation is successful (always in this implementation). */ - public cancelPendingL1ToL2Messages(messages: CancelledL1ToL2Message[]): Promise { - messages.forEach(({ entryKey, blockNumber, indexInBlock }) => { - this.pendingL1ToL2Messages.removeMessage(entryKey, blockNumber, indexInBlock); + public cancelPendingL1ToL2Messages(messages: Fr[], l1BlockNumber: bigint): Promise { + if (l1BlockNumber <= this.lastL1BlockCancelledMessages) { + return Promise.resolve(false); + } + + this.lastL1BlockCancelledMessages = l1BlockNumber; + messages.forEach(entryKey => { + this.pendingL1ToL2Messages.removeMessage(entryKey); }); return Promise.resolve(true); } @@ -137,8 +150,8 @@ export class MemoryArchiverStore implements ArchiverDataStore { */ public confirmL1ToL2Messages(messageKeys: Fr[]): Promise { messageKeys.forEach(messageKey => { - this.confirmedL1ToL2Messages.addMessageUnsafe(messageKey, this.pendingL1ToL2Messages.getMessage(messageKey)!); - this.pendingL1ToL2Messages.removeMessageUnsafe(messageKey); + this.confirmedL1ToL2Messages.addMessage(messageKey, this.pendingL1ToL2Messages.getMessage(messageKey)!); + this.pendingL1ToL2Messages.removeMessage(messageKey); }); return Promise.resolve(true); } @@ -390,10 +403,15 @@ export class MemoryArchiverStore implements ArchiverDataStore { return Promise.resolve(this.l2BlockContexts[this.l2BlockContexts.length - 1].block.number); } - public getL1BlockNumber(): Promise { - if (this.l2BlockContexts.length === 0) { - return Promise.resolve(0n); - } - return Promise.resolve(this.l2BlockContexts[this.l2BlockContexts.length - 1].block.getL1BlockNumber()); + public getL1BlockNumber() { + const addedBlock = this.l2BlockContexts[this.l2BlockContexts.length - 1]?.block?.getL1BlockNumber() ?? 0n; + const addedMessages = this.lastL1BlockAddedMessages; + const cancelledMessages = this.lastL1BlockCancelledMessages; + + return Promise.resolve({ + addedBlock, + addedMessages, + cancelledMessages, + }); } } diff --git a/yarn-project/circuit-types/src/l1_to_l2_message.ts b/yarn-project/circuit-types/src/l1_to_l2_message.ts index e6697384c6b..a072e6c270b 100644 --- a/yarn-project/circuit-types/src/l1_to_l2_message.ts +++ b/yarn-project/circuit-types/src/l1_to_l2_message.ts @@ -62,34 +62,6 @@ export class L1ToL2MessageAndIndex { } } -/** - * An L1 to L2 message emitted in a particular L1 block. - */ -export class PendingL1ToL2Message { - constructor( - /** the message */ - public readonly message: L1ToL2Message, - /** the L1 block this message was emitted in */ - public readonly blockNumber: bigint, - /** at which index in the L1 block this message was emitted */ - public readonly indexInBlock: number, - ) {} -} - -/** - * An L1 to L2 message that was cancelled. - */ -export class CancelledL1ToL2Message { - constructor( - /** the message */ - public readonly entryKey: Fr, - /** the L1 block this message was emitted in */ - public readonly blockNumber: bigint, - /** at which index in the L1 block this message was emitted */ - public readonly indexInBlock: number, - ) {} -} - /** * The format of an L1 to L2 Message. */