From af2be878b49aceb668480e5a291aed7dea5319ba Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Fri, 19 Jan 2024 10:20:34 +0000 Subject: [PATCH] refactor: archiver store (#3966) This PR refactors the archiver store to use `@aztec/kv-store` instead of lmdb directly. It is stacked on top of #4028. The diff looks massive but all it does is split the archiver store into individual pieces to better separate them conceptually (blocks, logs, contracts, messages) and replace direct uses of lmdb with the data structures from kv-store. --- yarn-project/archiver/package.json | 1 + yarn-project/archiver/src/archiver/index.ts | 2 +- .../archiver/kv_archiver_store/block_store.ts | 170 +++++ .../kv_archiver_store/contract_store.ts | 93 +++ .../kv_archiver_store.test.ts | 15 + .../kv_archiver_store/kv_archiver_store.ts | 241 ++++++ .../archiver/kv_archiver_store/log_store.ts | 174 +++++ .../kv_archiver_store/message_store.ts | 170 +++++ .../src/archiver/lmdb_archiver_store.test.ts | 14 - .../src/archiver/lmdb_archiver_store.ts | 708 ------------------ yarn-project/archiver/tsconfig.json | 3 + .../aztec-node/src/aztec-node/server.ts | 6 +- yarn-project/end-to-end/package.json | 2 +- .../src/integration_archiver_l1_to_l2.test.ts | 6 +- yarn-project/end-to-end/tsconfig.json | 3 + yarn-project/kv-store/src/index.ts | 2 + .../kv-store/src/interfaces/common.ts | 18 + .../kv-store/src/interfaces/counter.ts | 43 ++ yarn-project/kv-store/src/interfaces/map.ts | 28 +- yarn-project/kv-store/src/interfaces/store.ts | 8 + .../kv-store/src/lmdb/counter.test.ts | 122 +++ yarn-project/kv-store/src/lmdb/counter.ts | 57 ++ yarn-project/kv-store/src/lmdb/map.test.ts | 45 +- yarn-project/kv-store/src/lmdb/map.ts | 97 ++- yarn-project/kv-store/src/lmdb/store.ts | 6 + yarn-project/yarn.lock | 3 +- 26 files changed, 1261 insertions(+), 776 deletions(-) create mode 100644 yarn-project/archiver/src/archiver/kv_archiver_store/block_store.ts create mode 100644 yarn-project/archiver/src/archiver/kv_archiver_store/contract_store.ts create mode 100644 yarn-project/archiver/src/archiver/kv_archiver_store/kv_archiver_store.test.ts create mode 100644 yarn-project/archiver/src/archiver/kv_archiver_store/kv_archiver_store.ts create mode 100644 yarn-project/archiver/src/archiver/kv_archiver_store/log_store.ts create mode 100644 yarn-project/archiver/src/archiver/kv_archiver_store/message_store.ts delete mode 100644 yarn-project/archiver/src/archiver/lmdb_archiver_store.test.ts delete mode 100644 yarn-project/archiver/src/archiver/lmdb_archiver_store.ts create mode 100644 yarn-project/kv-store/src/interfaces/common.ts create mode 100644 yarn-project/kv-store/src/interfaces/counter.ts create mode 100644 yarn-project/kv-store/src/lmdb/counter.test.ts create mode 100644 yarn-project/kv-store/src/lmdb/counter.ts diff --git a/yarn-project/archiver/package.json b/yarn-project/archiver/package.json index 66a7c825d9b..b837e1d9b38 100644 --- a/yarn-project/archiver/package.json +++ b/yarn-project/archiver/package.json @@ -39,6 +39,7 @@ "@aztec/circuits.js": "workspace:^", "@aztec/ethereum": "workspace:^", "@aztec/foundation": "workspace:^", + "@aztec/kv-store": "workspace:^", "@aztec/l1-artifacts": "workspace:^", "@types/lodash.omit": "^4.5.7", "debug": "^4.3.4", diff --git a/yarn-project/archiver/src/archiver/index.ts b/yarn-project/archiver/src/archiver/index.ts index 0ef2b0025c8..a7294537624 100644 --- a/yarn-project/archiver/src/archiver/index.ts +++ b/yarn-project/archiver/src/archiver/index.ts @@ -1,5 +1,5 @@ export * from './archiver.js'; export * from './config.js'; export { MemoryArchiverStore } from './memory_archiver_store/memory_archiver_store.js'; -export { LMDBArchiverStore } from './lmdb_archiver_store.js'; export { ArchiverDataStore } from './archiver_store.js'; +export { KVArchiverDataStore } from './kv_archiver_store/kv_archiver_store.js'; diff --git a/yarn-project/archiver/src/archiver/kv_archiver_store/block_store.ts b/yarn-project/archiver/src/archiver/kv_archiver_store/block_store.ts new file mode 100644 index 00000000000..025221fe73f --- /dev/null +++ b/yarn-project/archiver/src/archiver/kv_archiver_store/block_store.ts @@ -0,0 +1,170 @@ +import { INITIAL_L2_BLOCK_NUM, L2Block, L2Tx, TxHash } from '@aztec/circuit-types'; +import { AztecAddress } from '@aztec/circuits.js'; +import { createDebugLogger } from '@aztec/foundation/log'; +import { AztecKVStore, AztecMap, Range } from '@aztec/kv-store'; + +/* eslint-disable */ +type BlockIndexValue = [blockNumber: number, index: number]; + +type BlockContext = { + blockNumber: number; + l1BlockNumber: bigint; + block: Buffer; + blockHash: Buffer; +}; +/* eslint-enable */ + +/** + * LMDB implementation of the ArchiverDataStore interface. + */ +export class BlockStore { + /** Map block number to block data */ + #blocks: AztecMap; + + /** Index mapping transaction hash (as a string) to its location in a block */ + #txIndex: AztecMap; + + /** Index mapping a contract's address (as a string) to its location in a block */ + #contractIndex: AztecMap; + + #log = createDebugLogger('aztec:archiver:block_store'); + + constructor(private db: AztecKVStore) { + this.#blocks = db.createMap('archiver_blocks'); + + this.#txIndex = db.createMap('archiver_tx_index'); + this.#contractIndex = db.createMap('archiver_contract_index'); + } + + /** + * Append new blocks to the store's list. + * @param blocks - The L2 blocks to be added to the store. + * @returns True if the operation is successful. + */ + addBlocks(blocks: L2Block[]): Promise { + return this.db.transaction(() => { + for (const block of blocks) { + void this.#blocks.set(block.number, { + blockNumber: block.number, + block: block.toBuffer(), + l1BlockNumber: block.getL1BlockNumber(), + blockHash: block.getBlockHash(), + }); + + for (const [i, tx] of block.getTxs().entries()) { + if (tx.txHash.isZero()) { + continue; + } + void this.#txIndex.set(tx.txHash.toString(), [block.number, i]); + } + + for (const [i, contractData] of block.newContractData.entries()) { + if (contractData.contractAddress.isZero()) { + continue; + } + + void this.#contractIndex.set(contractData.contractAddress.toString(), [block.number, i]); + } + } + + return true; + }); + } + + /** + * Gets up to `limit` amount of L2 blocks starting from `from`. + * @param start - Number of the first block to return (inclusive). + * @param limit - The number of blocks to return. + * @returns The requested L2 blocks, without logs attached + */ + *getBlocks(start: number, limit: number): IterableIterator { + for (const blockCtx of this.#blocks.values(this.#computeBlockRange(start, limit))) { + yield L2Block.fromBuffer(blockCtx.block, blockCtx.blockHash); + } + } + + /** + * Gets an L2 block. + * @param blockNumber - The number of the block to return. + * @returns The requested L2 block, without logs attached + */ + getBlock(blockNumber: number): L2Block | undefined { + const blockCtx = this.#blocks.get(blockNumber); + if (!blockCtx || !blockCtx.block) { + return undefined; + } + + const block = L2Block.fromBuffer(blockCtx.block, blockCtx.blockHash); + + return block; + } + + /** + * Gets an l2 tx. + * @param txHash - The txHash of the l2 tx. + * @returns The requested L2 tx. + */ + getL2Tx(txHash: TxHash): L2Tx | undefined { + const [blockNumber, txIndex] = this.getL2TxLocation(txHash) ?? []; + if (typeof blockNumber !== 'number' || typeof txIndex !== 'number') { + return undefined; + } + + const block = this.getBlock(blockNumber); + return block?.getTx(txIndex); + } + + /** + * Looks up which block included the requested L2 tx. + * @param txHash - The txHash of the l2 tx. + * @returns The block number and index of the tx. + */ + getL2TxLocation(txHash: TxHash): [blockNumber: number, txIndex: number] | undefined { + return this.#txIndex.get(txHash.toString()); + } + + /** + * Looks up which block deployed a particular contract. + * @param contractAddress - The address of the contract to look up. + * @returns The block number and index of the contract. + */ + getContractLocation(contractAddress: AztecAddress): [blockNumber: number, index: number] | undefined { + return this.#contractIndex.get(contractAddress.toString()); + } + + /** + * Gets the number of the latest L2 block processed. + * @returns The number of the latest L2 block processed. + */ + getBlockNumber(): number { + const [lastBlockNumber] = this.#blocks.keys({ reverse: true, limit: 1 }); + return typeof lastBlockNumber === 'number' ? lastBlockNumber : INITIAL_L2_BLOCK_NUM - 1; + } + + /** + * Gets the most recent L1 block processed. + * @returns The L1 block that published the latest L2 block + */ + getL1BlockNumber(): bigint { + const [lastBlock] = this.#blocks.values({ reverse: true, limit: 1 }); + if (!lastBlock) { + return 0n; + } else { + return lastBlock.l1BlockNumber; + } + } + + #computeBlockRange(start: number, limit: number): Required, 'start' | 'end'>> { + if (limit < 1) { + throw new Error(`Invalid limit: ${limit}`); + } + + if (start < INITIAL_L2_BLOCK_NUM) { + this.#log(`Clamping start block ${start} to ${INITIAL_L2_BLOCK_NUM}`); + start = INITIAL_L2_BLOCK_NUM; + } + + const end = start + limit; + return { start, end }; + } +} diff --git a/yarn-project/archiver/src/archiver/kv_archiver_store/contract_store.ts b/yarn-project/archiver/src/archiver/kv_archiver_store/contract_store.ts new file mode 100644 index 00000000000..055b25af20d --- /dev/null +++ b/yarn-project/archiver/src/archiver/kv_archiver_store/contract_store.ts @@ -0,0 +1,93 @@ +import { ContractData, ExtendedContractData } from '@aztec/circuit-types'; +import { AztecAddress } from '@aztec/foundation/aztec-address'; +import { createDebugLogger } from '@aztec/foundation/log'; +import { AztecKVStore, AztecMap } from '@aztec/kv-store'; + +import { BlockStore } from './block_store.js'; + +/** + * LMDB implementation of the ArchiverDataStore interface. + */ +export class ContractStore { + #blockStore: BlockStore; + #extendedContractData: AztecMap; + #log = createDebugLogger('aztec:archiver:contract_store'); + + constructor(private db: AztecKVStore, blockStore: BlockStore) { + this.#extendedContractData = db.createMap('archiver_extended_contract_data'); + this.#blockStore = blockStore; + } + + /** + * Add new extended contract data from an L2 block to the store's list. + * @param data - List of contracts' data to be added. + * @param blockNum - Number of the L2 block the contract data was deployed in. + * @returns True if the operation is successful. + */ + addExtendedContractData(data: ExtendedContractData[], blockNum: number): Promise { + return this.#extendedContractData.swap(blockNum, (existingData = []) => { + existingData.push(...data.map(d => d.toBuffer())); + return existingData; + }); + } + + /** + * Get the extended contract data for this contract. + * @param contractAddress - The contract data address. + * @returns The extended contract data or undefined if not found. + */ + getExtendedContractData(contractAddress: AztecAddress): ExtendedContractData | undefined { + const [blockNumber, _] = this.#blockStore.getContractLocation(contractAddress) ?? []; + + if (typeof blockNumber !== 'number') { + return undefined; + } + + for (const contract of this.#extendedContractData.get(blockNumber) ?? []) { + const extendedContractData = ExtendedContractData.fromBuffer(contract); + if (extendedContractData.contractData.contractAddress.equals(contractAddress)) { + return extendedContractData; + } + } + + return undefined; + } + + /** + * Lookup all extended contract data in an L2 block. + * @param blockNumber - The block number to get all contract data from. + * @returns All extended contract data in the block (if found). + */ + getExtendedContractDataInBlock(blockNumber: number): Array { + return (this.#extendedContractData.get(blockNumber) ?? []).map(contract => + ExtendedContractData.fromBuffer(contract), + ); + } + + /** + * Get basic info for an L2 contract. + * Contains contract address & the ethereum portal address. + * @param contractAddress - The contract data address. + * @returns ContractData with the portal address (if we didn't throw an error). + */ + getContractData(contractAddress: AztecAddress): ContractData | undefined { + const [blockNumber, index] = this.#blockStore.getContractLocation(contractAddress) ?? []; + if (typeof blockNumber !== 'number' || typeof index !== 'number') { + return undefined; + } + + const block = this.#blockStore.getBlock(blockNumber); + return block?.newContractData[index]; + } + + /** + * Get basic info for an all L2 contracts deployed in a block. + * Contains contract address & the ethereum portal address. + * @param blockNumber - Number of the L2 block where contracts were deployed. + * @returns ContractData with the portal address (if we didn't throw an error). + */ + getContractDataInBlock(blockNumber: number): ContractData[] { + const block = this.#blockStore.getBlock(blockNumber); + return block?.newContractData ?? []; + } +} diff --git a/yarn-project/archiver/src/archiver/kv_archiver_store/kv_archiver_store.test.ts b/yarn-project/archiver/src/archiver/kv_archiver_store/kv_archiver_store.test.ts new file mode 100644 index 00000000000..2903ea6fe9c --- /dev/null +++ b/yarn-project/archiver/src/archiver/kv_archiver_store/kv_archiver_store.test.ts @@ -0,0 +1,15 @@ +import { EthAddress } from '@aztec/circuits.js'; +import { AztecLmdbStore } from '@aztec/kv-store'; + +import { describeArchiverDataStore } from '../archiver_store_test_suite.js'; +import { KVArchiverDataStore } from './kv_archiver_store.js'; + +describe('KVArchiverDataStore', () => { + let archiverStore: KVArchiverDataStore; + + beforeEach(async () => { + archiverStore = new KVArchiverDataStore(await AztecLmdbStore.create(EthAddress.random())); + }); + + describeArchiverDataStore('ArchiverStore', () => archiverStore); +}); diff --git a/yarn-project/archiver/src/archiver/kv_archiver_store/kv_archiver_store.ts b/yarn-project/archiver/src/archiver/kv_archiver_store/kv_archiver_store.ts new file mode 100644 index 00000000000..74be29d9d58 --- /dev/null +++ b/yarn-project/archiver/src/archiver/kv_archiver_store/kv_archiver_store.ts @@ -0,0 +1,241 @@ +import { + ContractData, + ExtendedContractData, + GetUnencryptedLogsResponse, + L1ToL2Message, + L2Block, + L2BlockL2Logs, + L2Tx, + LogFilter, + LogType, + TxHash, +} from '@aztec/circuit-types'; +import { Fr } from '@aztec/circuits.js'; +import { AztecAddress } from '@aztec/foundation/aztec-address'; +import { createDebugLogger } from '@aztec/foundation/log'; +import { AztecKVStore } from '@aztec/kv-store'; + +import { ArchiverDataStore, ArchiverL1SynchPoint } from '../archiver_store.js'; +import { BlockStore } from './block_store.js'; +import { ContractStore } from './contract_store.js'; +import { LogStore } from './log_store.js'; +import { MessageStore } from './message_store.js'; + +/** + * LMDB implementation of the ArchiverDataStore interface. + */ +export class KVArchiverDataStore implements ArchiverDataStore { + #blockStore: BlockStore; + #logStore: LogStore; + #contractStore: ContractStore; + #messageStore: MessageStore; + + #log = createDebugLogger('aztec:archiver:lmdb'); + + constructor(db: AztecKVStore, logsMaxPageSize: number = 1000) { + this.#blockStore = new BlockStore(db); + this.#logStore = new LogStore(db, this.#blockStore, logsMaxPageSize); + this.#contractStore = new ContractStore(db, this.#blockStore); + this.#messageStore = new MessageStore(db); + } + + /** + * Append new blocks to the store's list. + * @param blocks - The L2 blocks to be added to the store. + * @returns True if the operation is successful. + */ + addBlocks(blocks: L2Block[]): Promise { + return this.#blockStore.addBlocks(blocks); + } + + /** + * Gets up to `limit` amount of L2 blocks starting from `from`. + * The blocks returned do not contain any logs. + * + * @param start - Number of the first block to return (inclusive). + * @param limit - The number of blocks to return. + * @returns The requested L2 blocks, without any logs attached + */ + getBlocks(start: number, limit: number): Promise { + try { + return Promise.resolve(Array.from(this.#blockStore.getBlocks(start, limit))); + } catch (err) { + // this function is sync so if any errors are thrown we need to make sure they're passed on as rejected Promises + return Promise.reject(err); + } + } + + /** + * Gets an l2 tx. + * @param txHash - The txHash of the l2 tx. + * @returns The requested L2 tx. + */ + getL2Tx(txHash: TxHash): Promise { + return Promise.resolve(this.#blockStore.getL2Tx(txHash)); + } + + /** + * Append new logs to the store's list. + * @param encryptedLogs - The logs to be added to the store. + * @param unencryptedLogs - The type of the logs to be added to the store. + * @param blockNumber - The block for which to add the logs. + * @returns True if the operation is successful. + */ + addLogs( + encryptedLogs: L2BlockL2Logs | undefined, + unencryptedLogs: L2BlockL2Logs | undefined, + blockNumber: number, + ): Promise { + return this.#logStore.addLogs(encryptedLogs, unencryptedLogs, blockNumber); + } + + /** + * Append new pending L1 to L2 messages to the store. + * @param messages - The L1 to L2 messages to be added to the store. + * @param l1BlockNumber - The L1 block number for which to add the messages. + * @returns True if the operation is successful. + */ + addPendingL1ToL2Messages(messages: L1ToL2Message[], l1BlockNumber: bigint): Promise { + return Promise.resolve(this.#messageStore.addPendingMessages(messages, l1BlockNumber)); + } + + /** + * Remove pending L1 to L2 messages from the store (if they were cancelled). + * @param messages - The message keys to be removed from the store. + * @param l1BlockNumber - The L1 block number for which to remove the messages. + * @returns True if the operation is successful. + */ + cancelPendingL1ToL2Messages(messages: Fr[], l1BlockNumber: bigint): Promise { + return Promise.resolve(this.#messageStore.cancelPendingMessages(messages, l1BlockNumber)); + } + + /** + * Messages that have been published in an L2 block are confirmed. + * Add them to the confirmed store, also remove them from the pending store. + * @param entryKeys - The message keys to be removed from the store. + * @param blockNumber - The block for which to add the messages. + * @returns True if the operation is successful. + */ + confirmL1ToL2Messages(entryKeys: Fr[]): Promise { + return this.#messageStore.confirmPendingMessages(entryKeys); + } + + /** + * Gets up to `limit` amount of pending L1 to L2 messages, sorted by fee + * @param limit - The number of messages to return (by default NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP). + * @returns The requested L1 to L2 message keys. + */ + getPendingL1ToL2MessageKeys(limit: number): Promise { + return Promise.resolve(this.#messageStore.getPendingMessageKeysByFee(limit)); + } + + /** + * Gets the confirmed L1 to L2 message corresponding to the given message key. + * @param messageKey - The message key to look up. + * @returns The requested L1 to L2 message or throws if not found. + */ + getConfirmedL1ToL2Message(messageKey: Fr): Promise { + try { + return Promise.resolve(this.#messageStore.getConfirmedMessage(messageKey)); + } catch (err) { + return Promise.reject(err); + } + } + + /** + * Gets up to `limit` amount of logs starting from `from`. + * @param start - Number of the L2 block to which corresponds the first logs to be returned. + * @param limit - The number of logs to return. + * @param logType - Specifies whether to return encrypted or unencrypted logs. + * @returns The requested logs. + */ + getLogs(start: number, limit: number, logType: LogType): Promise { + try { + return Promise.resolve(Array.from(this.#logStore.getLogs(start, limit, logType))); + } catch (err) { + return Promise.reject(err); + } + } + + /** + * Gets unencrypted logs based on the provided filter. + * @param filter - The filter to apply to the logs. + * @returns The requested logs. + */ + getUnencryptedLogs(filter: LogFilter): Promise { + try { + return Promise.resolve(this.#logStore.getUnencryptedLogs(filter)); + } catch (err) { + return Promise.reject(err); + } + } + + /** + * Add new extended contract data from an L2 block to the store's list. + * @param data - List of contracts' data to be added. + * @param blockNum - Number of the L2 block the contract data was deployed in. + * @returns True if the operation is successful. + */ + addExtendedContractData(data: ExtendedContractData[], blockNum: number): Promise { + return this.#contractStore.addExtendedContractData(data, blockNum); + } + + /** + * Get the extended contract data for this contract. + * @param contractAddress - The contract data address. + * @returns The extended contract data or undefined if not found. + */ + getExtendedContractData(contractAddress: AztecAddress): Promise { + return Promise.resolve(this.#contractStore.getExtendedContractData(contractAddress)); + } + + /** + * Lookup all extended contract data in an L2 block. + * @param blockNumber - The block number to get all contract data from. + * @returns All extended contract data in the block (if found). + */ + getExtendedContractDataInBlock(blockNumber: number): Promise { + return Promise.resolve(Array.from(this.#contractStore.getExtendedContractDataInBlock(blockNumber))); + } + + /** + * Get basic info for an L2 contract. + * Contains contract address & the ethereum portal address. + * @param contractAddress - The contract data address. + * @returns ContractData with the portal address (if we didn't throw an error). + */ + getContractData(contractAddress: AztecAddress): Promise { + return Promise.resolve(this.#contractStore.getContractData(contractAddress)); + } + + /** + * Get basic info for an all L2 contracts deployed in a block. + * Contains contract address & the ethereum portal address. + * @param blockNumber - Number of the L2 block where contracts were deployed. + * @returns ContractData with the portal address (if we didn't throw an error). + */ + getContractDataInBlock(blockNumber: number): Promise { + return Promise.resolve(Array.from(this.#contractStore.getContractDataInBlock(blockNumber))); + } + + /** + * Gets the number of the latest L2 block processed. + * @returns The number of the latest L2 block processed. + */ + getBlockNumber(): Promise { + return Promise.resolve(this.#blockStore.getBlockNumber()); + } + + /** + * Gets the last L1 block number processed by the archiver + */ + getL1BlockNumber(): Promise { + const addedBlock = this.#blockStore.getL1BlockNumber(); + const { addedMessages, cancelledMessages } = this.#messageStore.getL1BlockNumber(); + return Promise.resolve({ + addedBlock, + addedMessages, + cancelledMessages, + }); + } +} diff --git a/yarn-project/archiver/src/archiver/kv_archiver_store/log_store.ts b/yarn-project/archiver/src/archiver/kv_archiver_store/log_store.ts new file mode 100644 index 00000000000..afab800fb48 --- /dev/null +++ b/yarn-project/archiver/src/archiver/kv_archiver_store/log_store.ts @@ -0,0 +1,174 @@ +import { + ExtendedUnencryptedL2Log, + GetUnencryptedLogsResponse, + INITIAL_L2_BLOCK_NUM, + L2BlockL2Logs, + LogFilter, + LogId, + LogType, + UnencryptedL2Log, +} from '@aztec/circuit-types'; +import { createDebugLogger } from '@aztec/foundation/log'; +import { AztecKVStore, AztecMap } from '@aztec/kv-store'; + +import { BlockStore } from './block_store.js'; + +/** + * A store for logs + */ +export class LogStore { + #encryptedLogs: AztecMap; + #unencryptedLogs: AztecMap; + #logsMaxPageSize: number; + #log = createDebugLogger('aztec:archiver:log_store'); + + constructor(private db: AztecKVStore, private blockStore: BlockStore, logsMaxPageSize: number = 1000) { + this.#encryptedLogs = db.createMap('archiver_encrypted_logs'); + this.#unencryptedLogs = db.createMap('archiver_unencrypted_logs'); + + this.#logsMaxPageSize = logsMaxPageSize; + } + + /** + * Append new logs to the store's list. + * @param encryptedLogs - The logs to be added to the store. + * @param unencryptedLogs - The type of the logs to be added to the store. + * @param blockNumber - The block for which to add the logs. + * @returns True if the operation is successful. + */ + addLogs( + encryptedLogs: L2BlockL2Logs | undefined, + unencryptedLogs: L2BlockL2Logs | undefined, + blockNumber: number, + ): Promise { + return this.db.transaction(() => { + if (encryptedLogs) { + void this.#encryptedLogs.set(blockNumber, encryptedLogs.toBuffer()); + } + + if (unencryptedLogs) { + void this.#unencryptedLogs.set(blockNumber, unencryptedLogs.toBuffer()); + } + + return true; + }); + } + + /** + * Gets up to `limit` amount of logs starting from `from`. + * @param start - Number of the L2 block to which corresponds the first logs to be returned. + * @param limit - The number of logs to return. + * @param logType - Specifies whether to return encrypted or unencrypted logs. + * @returns The requested logs. + */ + *getLogs(start: number, limit: number, logType: LogType): IterableIterator { + const logMap = logType === LogType.ENCRYPTED ? this.#encryptedLogs : this.#unencryptedLogs; + for (const buffer of logMap.values({ start, limit })) { + yield L2BlockL2Logs.fromBuffer(buffer); + } + } + + /** + * Gets unencrypted logs based on the provided filter. + * @param filter - The filter to apply to the logs. + * @returns The requested logs. + */ + getUnencryptedLogs(filter: LogFilter): GetUnencryptedLogsResponse { + if (filter.afterLog) { + return this.#filterUnencryptedLogsBetweenBlocks(filter); + } else if (filter.txHash) { + return this.#filterUnencryptedLogsOfTx(filter); + } else { + return this.#filterUnencryptedLogsBetweenBlocks(filter); + } + } + + #filterUnencryptedLogsOfTx(filter: LogFilter): GetUnencryptedLogsResponse { + if (!filter.txHash) { + throw new Error('Missing txHash'); + } + + const [blockNumber, txIndex] = this.blockStore.getL2TxLocation(filter.txHash) ?? []; + if (typeof blockNumber !== 'number' || typeof txIndex !== 'number') { + return { logs: [], maxLogsHit: false }; + } + + const unencryptedLogsInBlock = this.#getBlockLogs(blockNumber, LogType.UNENCRYPTED); + const txLogs = unencryptedLogsInBlock.txLogs[txIndex].unrollLogs().map(log => UnencryptedL2Log.fromBuffer(log)); + + const logs: ExtendedUnencryptedL2Log[] = []; + const maxLogsHit = this.#accumulateLogs(logs, blockNumber, txIndex, txLogs, filter); + + return { logs, maxLogsHit }; + } + + #filterUnencryptedLogsBetweenBlocks(filter: LogFilter): GetUnencryptedLogsResponse { + const start = + filter.afterLog?.blockNumber ?? Math.max(filter.fromBlock ?? INITIAL_L2_BLOCK_NUM, INITIAL_L2_BLOCK_NUM); + const end = filter.toBlock; + + if (typeof end === 'number' && end < start) { + return { + logs: [], + maxLogsHit: true, + }; + } + + const logs: ExtendedUnencryptedL2Log[] = []; + + let maxLogsHit = false; + loopOverBlocks: for (const [blockNumber, logBuffer] of this.#unencryptedLogs.entries({ start, end })) { + const unencryptedLogsInBlock = L2BlockL2Logs.fromBuffer(logBuffer); + for (let txIndex = filter.afterLog?.txIndex ?? 0; txIndex < unencryptedLogsInBlock.txLogs.length; txIndex++) { + const txLogs = unencryptedLogsInBlock.txLogs[txIndex].unrollLogs().map(log => UnencryptedL2Log.fromBuffer(log)); + maxLogsHit = this.#accumulateLogs(logs, blockNumber, txIndex, txLogs, filter); + if (maxLogsHit) { + this.#log(`Max logs hit at block ${blockNumber}`); + break loopOverBlocks; + } + } + } + + return { logs, maxLogsHit }; + } + + #accumulateLogs( + results: ExtendedUnencryptedL2Log[], + blockNumber: number, + txIndex: number, + txLogs: UnencryptedL2Log[], + filter: LogFilter, + ): boolean { + let maxLogsHit = false; + let logIndex = typeof filter.afterLog?.logIndex === 'number' ? filter.afterLog.logIndex + 1 : 0; + for (; logIndex < txLogs.length; logIndex++) { + const log = txLogs[logIndex]; + if (filter.contractAddress && !log.contractAddress.equals(filter.contractAddress)) { + continue; + } + + if (filter.selector && !log.selector.equals(filter.selector)) { + continue; + } + + results.push(new ExtendedUnencryptedL2Log(new LogId(blockNumber, txIndex, logIndex), log)); + if (results.length >= this.#logsMaxPageSize) { + maxLogsHit = true; + break; + } + } + + return maxLogsHit; + } + + #getBlockLogs(blockNumber: number, logType: LogType): L2BlockL2Logs { + const logMap = logType === LogType.ENCRYPTED ? this.#encryptedLogs : this.#unencryptedLogs; + const buffer = logMap.get(blockNumber); + + if (!buffer) { + return new L2BlockL2Logs([]); + } + + return L2BlockL2Logs.fromBuffer(buffer); + } +} diff --git a/yarn-project/archiver/src/archiver/kv_archiver_store/message_store.ts b/yarn-project/archiver/src/archiver/kv_archiver_store/message_store.ts new file mode 100644 index 00000000000..81a83e21560 --- /dev/null +++ b/yarn-project/archiver/src/archiver/kv_archiver_store/message_store.ts @@ -0,0 +1,170 @@ +import { L1ToL2Message } from '@aztec/circuit-types'; +import { Fr } from '@aztec/circuits.js'; +import { createDebugLogger } from '@aztec/foundation/log'; +import { AztecCounter, AztecKVStore, AztecMap, AztecSingleton } from '@aztec/kv-store'; + +/** + * A message stored in the database + */ +type Message = { + /** The L1ToL2Message */ + message: Buffer; + /** The message's fee */ + fee: number; + /** Has it _ever_ been confirmed? */ + confirmed: boolean; +}; + +/** + * LMDB implementation of the ArchiverDataStore interface. + */ +export class MessageStore { + #messages: AztecMap; + #pendingMessagesByFee: AztecCounter<[number, string]>; + #lastL1BlockAddingMessages: AztecSingleton; + #lastL1BlockCancellingMessages: AztecSingleton; + + #log = createDebugLogger('aztec:archiver:message_store'); + + constructor(private db: AztecKVStore) { + this.#messages = db.createMap('archiver_l1_to_l2_messages'); + this.#pendingMessagesByFee = db.createCounter('archiver_messages_by_fee'); + this.#lastL1BlockAddingMessages = db.createSingleton('archiver_last_l1_block_adding_messages'); + this.#lastL1BlockCancellingMessages = db.createSingleton('archiver_last_l1_block_cancelling_messages'); + } + + /** + * Gets the last L1 block number that emitted new messages and the block that cancelled messages. + * @returns The last L1 block number processed + */ + getL1BlockNumber() { + return { + addedMessages: this.#lastL1BlockAddingMessages.get() ?? 0n, + cancelledMessages: this.#lastL1BlockCancellingMessages.get() ?? 0n, + }; + } + + /** + * Append new pending L1 to L2 messages to the store. + * @param messages - The L1 to L2 messages to be added to the store. + * @param l1BlockNumber - The L1 block number for which to add the messages. + * @returns True if the operation is successful. + */ + addPendingMessages(messages: L1ToL2Message[], l1BlockNumber: bigint): Promise { + return this.db.transaction(() => { + const lastL1BlockNumber = this.#lastL1BlockAddingMessages.get() ?? 0n; + if (lastL1BlockNumber >= l1BlockNumber) { + return false; + } + + void this.#lastL1BlockAddingMessages.set(l1BlockNumber); + + for (const message of messages) { + const messageKey = message.entryKey?.toString(); + if (!messageKey) { + throw new Error('Message does not have an entry key'); + } + + void this.#messages.setIfNotExists(messageKey, { + message: message.toBuffer(), + fee: message.fee, + confirmed: false, + }); + + void this.#pendingMessagesByFee.update([message.fee, messageKey], 1); + } + + return true; + }); + } + + /** + * Remove pending L1 to L2 messages from the store (if they were cancelled). + * @param messageKeys - The message keys to be removed from the store. + * @param l1BlockNumber - The L1 block number for which to remove the messages. + * @returns True if the operation is successful. + */ + cancelPendingMessages(messageKeys: Fr[], l1BlockNumber: bigint): Promise { + return this.db.transaction(() => { + const lastL1BlockNumber = this.#lastL1BlockCancellingMessages.get() ?? 0n; + if (lastL1BlockNumber >= l1BlockNumber) { + return false; + } + + void this.#lastL1BlockCancellingMessages.set(l1BlockNumber); + + for (const messageKey of messageKeys) { + const messageCtx = this.#messages.get(messageKey.toString()); + if (!messageCtx) { + throw new Error(`Message ${messageKey.toString()} not found`); + } + + void this.#pendingMessagesByFee.update([messageCtx.fee, messageKey.toString()], -1); + } + + return true; + }); + } + + /** + * Messages that have been published in an L2 block are confirmed. + * Add them to the confirmed store, also remove them from the pending store. + * @param messageKeys - The message keys to be removed from the store. + * @returns True if the operation is successful. + */ + confirmPendingMessages(messageKeys: Fr[]): Promise { + return this.db.transaction(() => { + for (const messageKey of messageKeys) { + const messageCtx = this.#messages.get(messageKey.toString()); + if (!messageCtx) { + throw new Error(`Message ${messageKey.toString()} not found`); + } + messageCtx.confirmed = true; + + void this.#messages.set(messageKey.toString(), messageCtx); + void this.#pendingMessagesByFee.update([messageCtx.fee, messageKey.toString()], -1); + } + + return true; + }); + } + + /** + * Gets the confirmed L1 to L2 message corresponding to the given message key. + * @param messageKey - The message key to look up. + * @returns The requested L1 to L2 message or throws if not found. + */ + getConfirmedMessage(messageKey: Fr): L1ToL2Message { + const messageCtx = this.#messages.get(messageKey.toString()); + if (!messageCtx) { + throw new Error(`Message ${messageKey.toString()} not found`); + } + + if (!messageCtx.confirmed) { + throw new Error(`Message ${messageKey.toString()} not confirmed`); + } + + return L1ToL2Message.fromBuffer(messageCtx.message); + } + + /** + * Gets up to `limit` amount of pending L1 to L2 messages, sorted by fee + * @param limit - The number of messages to return (by default NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP). + * @returns The requested L1 to L2 message keys. + */ + getPendingMessageKeysByFee(limit: number): Fr[] { + const messageKeys: Fr[] = []; + + for (const [[_, messageKey], count] of this.#pendingMessagesByFee.entries({ + reverse: true, + })) { + // put `count` copies of this message in the result list + messageKeys.push(...Array(count).fill(Fr.fromString(messageKey))); + if (messageKeys.length >= limit) { + break; + } + } + + return messageKeys; + } +} diff --git a/yarn-project/archiver/src/archiver/lmdb_archiver_store.test.ts b/yarn-project/archiver/src/archiver/lmdb_archiver_store.test.ts deleted file mode 100644 index 7a1ceacb4f1..00000000000 --- a/yarn-project/archiver/src/archiver/lmdb_archiver_store.test.ts +++ /dev/null @@ -1,14 +0,0 @@ -import { open } from 'lmdb'; - -import { describeArchiverDataStore } from './archiver_store_test_suite.js'; -import { LMDBArchiverStore } from './lmdb_archiver_store.js'; - -describe('LMDB Memory Store', () => { - let archiverStore: LMDBArchiverStore; - - beforeEach(() => { - archiverStore = new LMDBArchiverStore(open({} as any)); - }); - - describeArchiverDataStore('LMDBArchiverStore', () => archiverStore); -}); diff --git a/yarn-project/archiver/src/archiver/lmdb_archiver_store.ts b/yarn-project/archiver/src/archiver/lmdb_archiver_store.ts deleted file mode 100644 index 6343c7dd0d9..00000000000 --- a/yarn-project/archiver/src/archiver/lmdb_archiver_store.ts +++ /dev/null @@ -1,708 +0,0 @@ -import { - ContractData, - ExtendedContractData, - ExtendedUnencryptedL2Log, - GetUnencryptedLogsResponse, - INITIAL_L2_BLOCK_NUM, - L1ToL2Message, - L2Block, - L2BlockL2Logs, - L2Tx, - LogFilter, - LogId, - LogType, - TxHash, - UnencryptedL2Log, -} from '@aztec/circuit-types'; -import { Fr } from '@aztec/circuits.js'; -import { AztecAddress } from '@aztec/foundation/aztec-address'; -import { createDebugLogger } from '@aztec/foundation/log'; - -import { Database, RangeOptions, RootDatabase } from 'lmdb'; - -import { ArchiverDataStore, ArchiverL1SynchPoint } from './archiver_store.js'; - -/* eslint-disable */ -type L1ToL2MessageAndCount = { - message: Buffer; - pendingCount: number; - confirmedCount: number; -}; - -type BlockIndexValue = [blockNumber: number, index: number]; - -type BlockContext = { - block?: Uint8Array; - blockHash?: Uint8Array; - l1BlockNumber?: bigint; - encryptedLogs?: Uint8Array; - unencryptedLogs?: Uint8Array; - extendedContractData?: Array; -}; - -const L1_BLOCK_ADDED_PENDING_MESSAGE = 'l1BlockAddedPendingMessage'; -const L1_BLOCK_CANCELLED_MESSAGE = 'l1BlockCancelledMessage'; -/* eslint-enable */ - -/** - * LMDB implementation of the ArchiverDataStore interface. - */ -export class LMDBArchiverStore implements ArchiverDataStore { - #tables: { - /** Where block information will be stored */ - blocks: Database; - /** Transactions index */ - txIndex: Database; - /** Contracts index */ - contractIndex: Database; - /** L1 to L2 messages */ - l1ToL2Messages: Database; - /** Which blocks emitted which messages */ - l1ToL2MessagesByBlock: Database; - /** Pending L1 to L2 messages sorted by their fee, in buckets (dupSort=true) */ - pendingMessagesByFee: Database; - }; - - #logsMaxPageSize: number; - - #log = createDebugLogger('aztec:archiver:lmdb'); - - constructor(db: RootDatabase, logsMaxPageSize: number = 1000) { - this.#tables = { - blocks: db.openDB('blocks', { - keyEncoding: 'uint32', - encoding: 'msgpack', - }), - txIndex: db.openDB('tx_index', { - keyEncoding: 'binary', - encoding: 'msgpack', - }), - contractIndex: db.openDB('contract_index', { - keyEncoding: 'binary', - encoding: 'msgpack', - }), - l1ToL2Messages: db.openDB('l1_to_l2_messages', { - keyEncoding: 'binary', - encoding: 'msgpack', - }), - l1ToL2MessagesByBlock: db.openDB('l1_to_l2_message_nonces', { - keyEncoding: 'ordered-binary', - encoding: 'msgpack', - }), - pendingMessagesByFee: db.openDB('pending_messages_by_fee', { - keyEncoding: 'ordered-binary', - encoding: 'binary', - dupSort: true, - }), - }; - - this.#logsMaxPageSize = logsMaxPageSize; - } - - public async close() { - await Promise.all(Object.values(this.#tables).map(table => table.close())); - } - - /** - * Append new blocks to the store's list. - * @param blocks - The L2 blocks to be added to the store. - * @returns True if the operation is successful. - */ - addBlocks(blocks: L2Block[]): Promise { - // LMDB transactions are shared across databases, so we can use a single transaction for all the writes - // https://github.com/kriszyp/lmdb-js/blob/67505a979ab63187953355a88747a7ad703d50b6/README.md#dbopendbdatabase-stringnamestring - return this.#tables.blocks.transaction(() => { - for (const block of blocks) { - const blockCtx = this.#tables.blocks.get(block.number) ?? {}; - blockCtx.block = block.toBuffer(); - blockCtx.l1BlockNumber = block.getL1BlockNumber(); - blockCtx.blockHash = block.getBlockHash(); - - // no need to await, all writes are enqueued in the transaction - // awaiting would interrupt the execution flow of this callback and "leak" the transaction to some other part - // of the system and any writes would then be part of our transaction here - void this.#tables.blocks.put(block.number, blockCtx); - - for (const [i, tx] of block.getTxs().entries()) { - if (tx.txHash.isZero()) { - continue; - } - void this.#tables.txIndex.put(tx.txHash.buffer, [block.number, i]); - } - - for (const [i, contractData] of block.newContractData.entries()) { - if (contractData.contractAddress.isZero()) { - continue; - } - - void this.#tables.contractIndex.put(contractData.contractAddress.toBuffer(), [block.number, i]); - } - } - - return true; - }); - } - - /** - * Gets up to `limit` amount of L2 blocks starting from `from`. - * @param start - Number of the first block to return (inclusive). - * @param limit - The number of blocks to return. - * @returns The requested L2 blocks. - */ - getBlocks(start: number, limit: number): Promise { - try { - const blocks = this.#tables.blocks - .getRange(this.#computeBlockRange(start, limit)) - .filter(({ value }) => value.block) - .map(({ value }) => { - const block = L2Block.fromBuffer( - asBuffer(value.block!), - value.blockHash ? asBuffer(value.blockHash) : undefined, - ); - return block; - }).asArray; - - return Promise.resolve(blocks); - } catch (err) { - // this function is sync so if any errors are thrown we need to make sure they're passed on as rejected Promises - return Promise.reject(err); - } - } - - /** - * Gets an l2 tx. - * @param txHash - The txHash of the l2 tx. - * @returns The requested L2 tx. - */ - getL2Tx(txHash: TxHash): Promise { - const [blockNumber, txIndex] = this.#tables.txIndex.get(txHash.buffer) ?? []; - if (typeof blockNumber !== 'number' || typeof txIndex !== 'number') { - return Promise.resolve(undefined); - } - - const block = this.#getBlock(blockNumber); - return Promise.resolve(block?.getTx(txIndex)); - } - - /** - * Append new logs to the store's list. - * @param encryptedLogs - The logs to be added to the store. - * @param unencryptedLogs - The type of the logs to be added to the store. - * @param blockNumber - The block for which to add the logs. - * @returns True if the operation is successful. - */ - addLogs( - encryptedLogs: L2BlockL2Logs | undefined, - unencryptedLogs: L2BlockL2Logs | undefined, - blockNumber: number, - ): Promise { - return this.#tables.blocks.transaction(() => { - const blockCtx = this.#tables.blocks.get(blockNumber) ?? {}; - - if (encryptedLogs) { - blockCtx.encryptedLogs = encryptedLogs.toBuffer(); - } - - if (unencryptedLogs) { - blockCtx.unencryptedLogs = unencryptedLogs.toBuffer(); - } - - void this.#tables.blocks.put(blockNumber, blockCtx); - return true; - }); - } - - /** - * Append new pending L1 to L2 messages to the store. - * @param messages - The L1 to L2 messages to be added to the store. - * @param l1BlockNumber - The L1 block number for which to add the messages. - * @returns True if the operation is successful. - */ - addPendingL1ToL2Messages(messages: L1ToL2Message[], l1BlockNumber: bigint): Promise { - return this.#tables.l1ToL2Messages.transaction(() => { - if ((this.#tables.l1ToL2MessagesByBlock.get(L1_BLOCK_ADDED_PENDING_MESSAGE) ?? 0n) >= l1BlockNumber) { - return false; - } - // ensure we don't add the same messages twice - void this.#tables.l1ToL2MessagesByBlock.put(L1_BLOCK_ADDED_PENDING_MESSAGE, l1BlockNumber); - - for (const message of messages) { - const messageKey = message.entryKey?.toBuffer(); - if (!messageKey) { - throw new Error('Message does not have an entry key'); - } - - let messageCtx = this.#tables.l1ToL2Messages.get(messageKey); - if (!messageCtx) { - messageCtx = { - message: message.toBuffer(), - pendingCount: 0, - confirmedCount: 0, - }; - void this.#tables.l1ToL2Messages.put(messageKey, messageCtx); - } - - this.#updateMessageCountInTx(messageKey, message, 1, 0); - } - - return true; - }); - } - - /** - * Remove pending L1 to L2 messages from the store (if they were cancelled). - * @param cancelledMessages - The message keys to be removed from the store. - * @param l1BlockNumber - The L1 block number for which to remove the messages. - * @returns True if the operation is successful. - */ - cancelPendingL1ToL2Messages(cancelledMessages: Fr[], l1BlockNumber: bigint): Promise { - return this.#tables.l1ToL2Messages.transaction(() => { - if ((this.#tables.l1ToL2MessagesByBlock.get(L1_BLOCK_CANCELLED_MESSAGE) ?? 0n) >= l1BlockNumber) { - return false; - } - void this.#tables.l1ToL2MessagesByBlock.put(L1_BLOCK_CANCELLED_MESSAGE, l1BlockNumber); - - for (const messageKey of cancelledMessages) { - const message = this.#getL1ToL2Message(messageKey.toBuffer()); - if (!message) { - continue; - } - this.#updateMessageCountInTx(messageKey.toBuffer(), message, -1, 0); - } - - return true; - }); - } - - /** - * Messages that have been published in an L2 block are confirmed. - * Add them to the confirmed store, also remove them from the pending store. - * @param entryKeys - The message keys to be removed from the store. - * @returns True if the operation is successful. - */ - confirmL1ToL2Messages(entryKeys: Fr[]): Promise { - return this.#tables.l1ToL2Messages.transaction(() => { - for (const entryKey of entryKeys) { - const messageKey = entryKey.toBuffer(); - const message = this.#getL1ToL2Message(messageKey); - this.#updateMessageCountInTx(messageKey, message, -1, 1); - } - return true; - }); - } - - /** - * Gets up to `limit` amount of pending L1 to L2 messages, sorted by fee - * @param limit - The number of messages to return (by default NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP). - * @returns The requested L1 to L2 message keys. - */ - getPendingL1ToL2MessageKeys(limit: number): Promise { - // start a read transaction in order to have a consistent view of the data - // this is all sync code, but better to be safe in case it changes in the future - // or we end up having multiple processes touching the same db - const transaction = this.#tables.pendingMessagesByFee.useReadTransaction(); - - try { - // get all the keys, in reverse order - const fees = this.#tables.pendingMessagesByFee.getKeys({ reverse: true, transaction }); - const messages: Fr[] = []; - - loopOverFees: for (const fee of fees) { - const pendingMessages = this.#tables.pendingMessagesByFee.getValues(fee, { transaction }); - this.#log(`Found pending messages for ${fee}`); - - for (const messageKey of pendingMessages) { - const messageWithCount = this.#tables.l1ToL2Messages.get(messageKey, { transaction }); - if (!messageWithCount || messageWithCount.pendingCount === 0) { - this.#log( - `Message ${messageKey.toString( - 'hex', - )} has no pending count but it got picked up by getPEndingL1ToL2MessageKeys`, - ); - continue; - } - const toAdd = Array(messageWithCount.pendingCount).fill(Fr.fromBuffer(messageKey)); - this.#log(`Adding ${toAdd.length} copies of ${messageKey.toString('hex')} for ${fee}`); - messages.push(...toAdd); - - if (messages.length >= limit) { - break loopOverFees; - } - } - } - - return Promise.resolve(messages); - } catch (err) { - return Promise.reject(err); - } finally { - transaction.done(); - } - } - - /** - * Gets the confirmed L1 to L2 message corresponding to the given message key. - * @param messageKey - The message key to look up. - * @returns The requested L1 to L2 message or throws if not found. - */ - getConfirmedL1ToL2Message(messageKey: Fr): Promise { - const value = this.#tables.l1ToL2Messages.get(messageKey.toBuffer()); - if (!value) { - return Promise.reject(new Error(`Message with key ${messageKey} not found`)); - } - - if (value.confirmedCount === 0) { - return Promise.reject(new Error(`Message with key ${messageKey} not confirmed`)); - } - - return Promise.resolve(L1ToL2Message.fromBuffer(value.message)); - } - - /** - * Gets up to `limit` amount of logs starting from `from`. - * @param start - Number of the L2 block to which corresponds the first logs to be returned. - * @param limit - The number of logs to return. - * @param logType - Specifies whether to return encrypted or unencrypted logs. - * @returns The requested logs. - */ - getLogs(start: number, limit: number, logType: LogType): Promise { - try { - const blockCtxKey = logType === LogType.ENCRYPTED ? 'encryptedLogs' : 'unencryptedLogs'; - const logs = this.#tables.blocks - .getRange(this.#computeBlockRange(start, limit)) - .map(({ value: { [blockCtxKey]: logs } }) => - logs ? L2BlockL2Logs.fromBuffer(asBuffer(logs)) : new L2BlockL2Logs([]), - ).asArray; - - return Promise.resolve(logs); - } catch (err) { - return Promise.reject(err); - } - } - - /** - * Gets unencrypted logs based on the provided filter. - * @param filter - The filter to apply to the logs. - * @returns The requested logs. - */ - getUnencryptedLogs(filter: LogFilter): Promise { - try { - if (filter.afterLog) { - return Promise.resolve(this.#filterUnencryptedLogsBetweenBlocks(filter)); - } else if (filter.txHash) { - return Promise.resolve(this.#filterUnencryptedLogsOfTx(filter)); - } else { - return Promise.resolve(this.#filterUnencryptedLogsBetweenBlocks(filter)); - } - } catch (err) { - return Promise.reject(err); - } - } - - #filterUnencryptedLogsOfTx(filter: LogFilter): GetUnencryptedLogsResponse { - if (!filter.txHash) { - throw new Error('Missing txHash'); - } - - const [blockNumber, txIndex] = this.#tables.txIndex.get(filter.txHash.buffer) ?? []; - if (typeof blockNumber !== 'number' || typeof txIndex !== 'number') { - return { logs: [], maxLogsHit: false }; - } - - const unencryptedLogsInBlock = this.#getBlockLogs(blockNumber, LogType.UNENCRYPTED); - const txLogs = unencryptedLogsInBlock.txLogs[txIndex].unrollLogs().map(log => UnencryptedL2Log.fromBuffer(log)); - - const logs: ExtendedUnencryptedL2Log[] = []; - const maxLogsHit = this.#accumulateLogs(logs, blockNumber, txIndex, txLogs, filter); - - return { logs, maxLogsHit }; - } - - #filterUnencryptedLogsBetweenBlocks(filter: LogFilter): GetUnencryptedLogsResponse { - const start = - filter.afterLog?.blockNumber ?? Math.max(filter.fromBlock ?? INITIAL_L2_BLOCK_NUM, INITIAL_L2_BLOCK_NUM); - const end = filter.toBlock; - - if (typeof end === 'number' && end < start) { - return { - logs: [], - maxLogsHit: true, - }; - } - - const logs: ExtendedUnencryptedL2Log[] = []; - - const blockNumbers = this.#tables.blocks.getKeys({ start, end, snapshot: false }); - let maxLogsHit = false; - - loopOverBlocks: for (const blockNumber of blockNumbers) { - const unencryptedLogsInBlock = this.#getBlockLogs(blockNumber, LogType.UNENCRYPTED); - for (let txIndex = filter.afterLog?.txIndex ?? 0; txIndex < unencryptedLogsInBlock.txLogs.length; txIndex++) { - const txLogs = unencryptedLogsInBlock.txLogs[txIndex].unrollLogs().map(log => UnencryptedL2Log.fromBuffer(log)); - maxLogsHit = this.#accumulateLogs(logs, blockNumber, txIndex, txLogs, filter); - if (maxLogsHit) { - break loopOverBlocks; - } - } - } - - return { logs, maxLogsHit }; - } - - #accumulateLogs( - results: ExtendedUnencryptedL2Log[], - blockNumber: number, - txIndex: number, - txLogs: UnencryptedL2Log[], - filter: LogFilter, - ): boolean { - let maxLogsHit = false; - let logIndex = typeof filter.afterLog?.logIndex === 'number' ? filter.afterLog.logIndex + 1 : 0; - for (; logIndex < txLogs.length; logIndex++) { - const log = txLogs[logIndex]; - if (filter.contractAddress && !log.contractAddress.equals(filter.contractAddress)) { - continue; - } - - if (filter.selector && !log.selector.equals(filter.selector)) { - continue; - } - - results.push(new ExtendedUnencryptedL2Log(new LogId(blockNumber, txIndex, logIndex), log)); - if (results.length >= this.#logsMaxPageSize) { - maxLogsHit = true; - break; - } - } - - return maxLogsHit; - } - - /** - * Add new extended contract data from an L2 block to the store's list. - * @param data - List of contracts' data to be added. - * @param blockNum - Number of the L2 block the contract data was deployed in. - * @returns True if the operation is successful. - */ - addExtendedContractData(data: ExtendedContractData[], blockNum: number): Promise { - return this.#tables.blocks.transaction(() => { - const blockCtx = this.#tables.blocks.get(blockNum) ?? {}; - if (!blockCtx.extendedContractData) { - blockCtx.extendedContractData = []; - } - this.#log(`Adding ${data.length} extended contract data to block ${blockNum}`); - blockCtx.extendedContractData.push(...data.map(data => data.toBuffer())); - void this.#tables.blocks.put(blockNum, blockCtx); - - return true; - }); - } - - /** - * Get the extended contract data for this contract. - * @param contractAddress - The contract data address. - * @returns The extended contract data or undefined if not found. - */ - getExtendedContractData(contractAddress: AztecAddress): Promise { - const [blockNumber, _] = this.#tables.contractIndex.get(contractAddress.toBuffer()) ?? []; - - if (typeof blockNumber !== 'number') { - return Promise.resolve(undefined); - } - - const blockCtx = this.#tables.blocks.get(blockNumber); - if (!blockCtx) { - return Promise.resolve(undefined); - } - - for (const data of blockCtx.extendedContractData ?? []) { - const extendedContractData = ExtendedContractData.fromBuffer(asBuffer(data)); - if (extendedContractData.contractData.contractAddress.equals(contractAddress)) { - return Promise.resolve(extendedContractData); - } - } - - return Promise.resolve(undefined); - } - - /** - * Lookup all extended contract data in an L2 block. - * @param blockNum - The block number to get all contract data from. - * @returns All extended contract data in the block (if found). - */ - getExtendedContractDataInBlock(blockNum: number): Promise { - const blockCtx = this.#tables.blocks.get(blockNum); - if (!blockCtx || !blockCtx.extendedContractData) { - return Promise.resolve([]); - } - - return Promise.resolve(blockCtx.extendedContractData.map(data => ExtendedContractData.fromBuffer(asBuffer(data)))); - } - - /** - * Get basic info for an L2 contract. - * Contains contract address & the ethereum portal address. - * @param contractAddress - The contract data address. - * @returns ContractData with the portal address (if we didn't throw an error). - */ - getContractData(contractAddress: AztecAddress): Promise { - const [blockNumber, index] = this.#tables.contractIndex.get(contractAddress.toBuffer()) ?? []; - if (typeof blockNumber !== 'number' || typeof index !== 'number') { - return Promise.resolve(undefined); - } - - const block = this.#getBlock(blockNumber); - return Promise.resolve(block?.newContractData[index]); - } - - /** - * Get basic info for an all L2 contracts deployed in a block. - * Contains contract address & the ethereum portal address. - * @param blockNumber - Number of the L2 block where contracts were deployed. - * @returns ContractData with the portal address (if we didn't throw an error). - */ - getContractDataInBlock(blockNumber: number): Promise { - const block = this.#getBlock(blockNumber); - return Promise.resolve(block?.newContractData ?? []); - } - - /** - * Gets the number of the latest L2 block processed. - * @returns The number of the latest L2 block processed. - */ - getBlockNumber(): Promise { - // inverse range with no start/end will return the last key - const [lastBlockNumber] = this.#tables.blocks.getKeys({ reverse: true, limit: 1 }).asArray; - return Promise.resolve(typeof lastBlockNumber === 'number' ? lastBlockNumber : INITIAL_L2_BLOCK_NUM - 1); - } - - /** - * Gets the last L1 block number processed by the archiver - */ - getL1BlockNumber(): Promise { - // inverse range with no start/end will return the last value - const [lastL2Block] = this.#tables.blocks.getRange({ reverse: true, limit: 1 }).asArray; - const addedBlock = lastL2Block?.value?.l1BlockNumber ?? 0n; - const addedMessages = this.#tables.l1ToL2MessagesByBlock.get(L1_BLOCK_ADDED_PENDING_MESSAGE) ?? 0n; - const cancelledMessages = this.#tables.l1ToL2MessagesByBlock.get(L1_BLOCK_CANCELLED_MESSAGE) ?? 0n; - - return Promise.resolve({ - addedBlock, - addedMessages, - cancelledMessages, - }); - } - - #getBlock(blockNumber: number, withLogs = false): L2Block | undefined { - const blockCtx = this.#tables.blocks.get(blockNumber); - if (!blockCtx || !blockCtx.block) { - return undefined; - } - - const block = L2Block.fromBuffer( - asBuffer(blockCtx.block), - blockCtx.blockHash ? asBuffer(blockCtx.blockHash) : undefined, - ); - - if (withLogs) { - if (blockCtx.encryptedLogs) { - block.attachLogs(L2BlockL2Logs.fromBuffer(asBuffer(blockCtx.encryptedLogs)), LogType.ENCRYPTED); - } - - if (blockCtx.unencryptedLogs) { - block.attachLogs(L2BlockL2Logs.fromBuffer(asBuffer(blockCtx.unencryptedLogs)), LogType.UNENCRYPTED); - } - } - - return block; - } - - #getBlockLogs(blockNumber: number, logType: LogType): L2BlockL2Logs { - const blockCtx = this.#tables.blocks.get(blockNumber); - const logs = blockCtx?.[logType === LogType.ENCRYPTED ? 'encryptedLogs' : 'unencryptedLogs']; - - if (!logs) { - return new L2BlockL2Logs([]); - } - - return L2BlockL2Logs.fromBuffer(asBuffer(logs)); - } - - #computeBlockRange(start: number, limit: number): Required> { - if (limit < 1) { - throw new Error(`Invalid limit: ${limit}`); - } - - if (start < INITIAL_L2_BLOCK_NUM) { - this.#log(`Clamping start block ${start} to ${INITIAL_L2_BLOCK_NUM}`); - start = INITIAL_L2_BLOCK_NUM; - } - - const end = start + limit; - return { start, end }; - } - - #getL1ToL2Message(entryKey: Buffer): L1ToL2Message { - const value = this.#tables.l1ToL2Messages.get(entryKey); - if (!value) { - throw new Error('Unknown message: ' + entryKey.toString()); - } - - return L1ToL2Message.fromBuffer(value.message); - } - - /** - * Atomically updates the pending and confirmed count for a message. - * If both counts are 0 after adding their respective deltas, the message is removed from the store. - * - * Only call this method from inside a _transaction_! - * - * @param messageKey - The message key to update. - * @param message - The message to update. - * @param deltaPendingCount - The amount to add to the pending count. - * @param deltaConfirmedCount - The amount to add to the confirmed count. - */ - #updateMessageCountInTx( - messageKey: Buffer, - message: L1ToL2Message, - deltaPendingCount: number, - deltaConfirmedCount: number, - ): void { - const entry = this.#tables.l1ToL2Messages.getEntry(messageKey); - if (!entry) { - return; - } - - const { value } = entry; - - value.pendingCount = Math.max(0, value.pendingCount + deltaPendingCount); - value.confirmedCount = Math.max(0, value.confirmedCount + deltaConfirmedCount); - - this.#log( - `Updating count of ${messageKey.toString('hex')} to ${value.pendingCount} pending and ${ - value.confirmedCount - } confirmed}`, - ); - - if (value.pendingCount === 0) { - this.#log(`Removing message ${messageKey.toString('hex')} from pending messages group with fee ${message.fee}`); - void this.#tables.pendingMessagesByFee.remove(message.fee, messageKey); - } else if (value.pendingCount > 0) { - this.#log(`Adding message ${messageKey.toString('hex')} to pending message group with fee ${message.fee}`); - void this.#tables.pendingMessagesByFee.put(message.fee, messageKey); - } - - if (value.pendingCount === 0 && value.confirmedCount === 0) { - void this.#tables.l1ToL2Messages.remove(messageKey); - } else { - void this.#tables.l1ToL2Messages.put(messageKey, value); - } - } -} - -/** - * Creates a Buffer viewing the same memory location as the passed array. - * @param arr - A Uint8Array - */ -function asBuffer(arr: Uint8Array | Buffer): Buffer { - return Buffer.isBuffer(arr) ? arr : Buffer.from(arr.buffer, arr.byteOffset, arr.length / arr.BYTES_PER_ELEMENT); -} diff --git a/yarn-project/archiver/tsconfig.json b/yarn-project/archiver/tsconfig.json index 902e0c05490..6a0c794ba6d 100644 --- a/yarn-project/archiver/tsconfig.json +++ b/yarn-project/archiver/tsconfig.json @@ -18,6 +18,9 @@ { "path": "../foundation" }, + { + "path": "../kv-store" + }, { "path": "../l1-artifacts" } diff --git a/yarn-project/aztec-node/src/aztec-node/server.ts b/yarn-project/aztec-node/src/aztec-node/server.ts index 1d24a3e17b4..0c65a9c488d 100644 --- a/yarn-project/aztec-node/src/aztec-node/server.ts +++ b/yarn-project/aztec-node/src/aztec-node/server.ts @@ -1,4 +1,4 @@ -import { Archiver, LMDBArchiverStore } from '@aztec/archiver'; +import { Archiver, KVArchiverDataStore } from '@aztec/archiver'; import { AztecNode, ContractData, @@ -107,10 +107,10 @@ export class AztecNodeService implements AztecNode { const log = createDebugLogger('aztec:node'); const store = await AztecLmdbStore.create(config.l1Contracts.rollupAddress, config.dataDirectory); - const [nodeDb, worldStateDb] = await openDb(config, log); + const [_, worldStateDb] = await openDb(config, log); // first create and sync the archiver - const archiverStore = new LMDBArchiverStore(nodeDb, config.maxLogs); + const archiverStore = new KVArchiverDataStore(store, config.maxLogs); const archiver = await Archiver.createAndSync(config, archiverStore, true); // we identify the P2P transaction protocol by using the rollup contract address. diff --git a/yarn-project/end-to-end/package.json b/yarn-project/end-to-end/package.json index a75d100da31..88ccca29173 100644 --- a/yarn-project/end-to-end/package.json +++ b/yarn-project/end-to-end/package.json @@ -32,6 +32,7 @@ "@aztec/cli": "workspace:^", "@aztec/ethereum": "workspace:^", "@aztec/foundation": "workspace:^", + "@aztec/kv-store": "workspace:^", "@aztec/l1-artifacts": "workspace:^", "@aztec/merkle-tree": "workspace:^", "@aztec/noir-contracts": "workspace:^", @@ -57,7 +58,6 @@ "koa": "^2.14.2", "koa-static": "^5.0.0", "levelup": "^5.1.1", - "lmdb": "^2.9.1", "lodash.compact": "^3.0.1", "lodash.every": "^4.6.0", "memdown": "^6.1.1", diff --git a/yarn-project/end-to-end/src/integration_archiver_l1_to_l2.test.ts b/yarn-project/end-to-end/src/integration_archiver_l1_to_l2.test.ts index b546356c630..eb57e00eb21 100644 --- a/yarn-project/end-to-end/src/integration_archiver_l1_to_l2.test.ts +++ b/yarn-project/end-to-end/src/integration_archiver_l1_to_l2.test.ts @@ -1,4 +1,4 @@ -import { Archiver, LMDBArchiverStore } from '@aztec/archiver'; +import { Archiver, KVArchiverDataStore } from '@aztec/archiver'; import { AztecNodeConfig } from '@aztec/aztec-node'; import { AztecAddress, @@ -10,9 +10,9 @@ import { Wallet, computeMessageSecretHash, } from '@aztec/aztec.js'; +import { AztecLmdbStore } from '@aztec/kv-store'; import { TokenContract } from '@aztec/noir-contracts/Token'; -import { open } from 'lmdb'; import { Chain, HttpTransport, PublicClient } from 'viem'; import { delay, deployAndInitializeTokenAndBridgeContracts, setNextBlockTimestamp, setup } from './fixtures/utils.js'; @@ -43,7 +43,7 @@ describe('archiver integration with l1 to l2 messages', () => { config.archiverPollingIntervalMS = 100; archiver = await Archiver.createAndSync( { ...config, l1Contracts: deployL1ContractsValues.l1ContractAddresses }, - new LMDBArchiverStore(open({} as any)), + new KVArchiverDataStore(await AztecLmdbStore.create(deployL1ContractsValues.l1ContractAddresses.rollupAddress)), ); const walletClient = deployL1ContractsValues.walletClient; diff --git a/yarn-project/end-to-end/tsconfig.json b/yarn-project/end-to-end/tsconfig.json index 2cd953c345d..e467ab65bf4 100644 --- a/yarn-project/end-to-end/tsconfig.json +++ b/yarn-project/end-to-end/tsconfig.json @@ -33,6 +33,9 @@ { "path": "../foundation" }, + { + "path": "../kv-store" + }, { "path": "../l1-artifacts" }, diff --git a/yarn-project/kv-store/src/index.ts b/yarn-project/kv-store/src/index.ts index 2a71333f9e6..b35a4fb3d53 100644 --- a/yarn-project/kv-store/src/index.ts +++ b/yarn-project/kv-store/src/index.ts @@ -1,5 +1,7 @@ export * from './interfaces/array.js'; export * from './interfaces/map.js'; +export * from './interfaces/counter.js'; export * from './interfaces/singleton.js'; export * from './interfaces/store.js'; export * from './lmdb/store.js'; +export { Range } from './interfaces/common.js'; diff --git a/yarn-project/kv-store/src/interfaces/common.ts b/yarn-project/kv-store/src/interfaces/common.ts new file mode 100644 index 00000000000..c4e0effa8c8 --- /dev/null +++ b/yarn-project/kv-store/src/interfaces/common.ts @@ -0,0 +1,18 @@ +/** + * The key type for use with the kv-store + */ +export type Key = string | number | Array; + +/** + * A range of keys to iterate over. + */ +export type Range = { + /** The key of the first item to include */ + start?: K; + /** The key of the last item to include */ + end?: K; + /** Whether to iterate in reverse */ + reverse?: boolean; + /** The maximum number of items to iterate over */ + limit?: number; +}; diff --git a/yarn-project/kv-store/src/interfaces/counter.ts b/yarn-project/kv-store/src/interfaces/counter.ts new file mode 100644 index 00000000000..0f68626e691 --- /dev/null +++ b/yarn-project/kv-store/src/interfaces/counter.ts @@ -0,0 +1,43 @@ +import { Key, Range } from './common.js'; + +/** + * A map that counts how many times it sees a key. Once 0 is reached, that key is removed from the map. + * Iterating over the map will only return keys that have a count over 0. + * + * Keys are stored in sorted order + */ +export interface AztecCounter { + /** + * Resets the count of the given key to the given value. + * @param key - The key to reset + * @param value - The value to reset the key to + */ + set(key: K, value: number): Promise; + + /** + * Updates the count of the given key by the given delta. This can be used to increment or decrement the count. + * Once a key's count reaches 0, it is removed from the map. + * + * @param key - The key to update + * @param delta - The amount to modify the key by + */ + update(key: K, delta: number): Promise; + + /** + * Gets the current count. + * @param key - The key to get the count of + */ + get(key: K): number; + + /** + * Returns keys in the map in sorted order. Only returns keys that have been seen at least once. + * @param range - The range of keys to iterate over + */ + keys(range: Range): IterableIterator; + + /** + * Returns keys and their counts in the map sorted by the key. Only returns keys that have been seen at least once. + * @param range - The range of keys to iterate over + */ + entries(range: Range): IterableIterator<[K, number]>; +} diff --git a/yarn-project/kv-store/src/interfaces/map.ts b/yarn-project/kv-store/src/interfaces/map.ts index 8de773837b7..0916146a4ab 100644 --- a/yarn-project/kv-store/src/interfaces/map.ts +++ b/yarn-project/kv-store/src/interfaces/map.ts @@ -1,7 +1,9 @@ +import { Key, Range } from './common.js'; + /** * A map backed by a persistent store. */ -export interface AztecMap { +export interface AztecMap { /** * Gets the value at the given key. * @param key - The key to get the value from @@ -22,6 +24,13 @@ export interface AztecMap { */ set(key: K, val: V): Promise; + /** + * Atomically swap the value at the given key + * @param key - The key to swap the value at + * @param fn - The function to swap the value with + */ + swap(key: K, fn: (val: V | undefined) => V): Promise; + /** * Sets the value at the given key if it does not already exist. * @param key - The key to set the value at @@ -36,25 +45,28 @@ export interface AztecMap { delete(key: K): Promise; /** - * Iterates over the map's key-value entries + * Iterates over the map's key-value entries in the key's natural order + * @param range - The range of keys to iterate over */ - entries(): IterableIterator<[K, V]>; + entries(range?: Range): IterableIterator<[K, V]>; /** - * Iterates over the map's values + * Iterates over the map's values in the key's natural order + * @param range - The range of keys to iterate over */ - values(): IterableIterator; + values(range?: Range): IterableIterator; /** - * Iterates over the map's keys + * Iterates over the map's keys in the key's natural order + * @param range - The range of keys to iterate over */ - keys(): IterableIterator; + keys(range?: Range): IterableIterator; } /** * A map backed by a persistent store that can have multiple values for a single key. */ -export interface AztecMultiMap extends AztecMap { +export interface AztecMultiMap extends AztecMap { /** * Gets all the values at the given key. * @param key - The key to get the values from diff --git a/yarn-project/kv-store/src/interfaces/store.ts b/yarn-project/kv-store/src/interfaces/store.ts index d7ccfa3cd29..73a2901387c 100644 --- a/yarn-project/kv-store/src/interfaces/store.ts +++ b/yarn-project/kv-store/src/interfaces/store.ts @@ -1,4 +1,6 @@ import { AztecArray } from './array.js'; +import { Key } from './common.js'; +import { AztecCounter } from './counter.js'; import { AztecMap, AztecMultiMap } from './map.js'; import { AztecSingleton } from './singleton.js'; @@ -32,6 +34,12 @@ export interface AztecKVStore { */ createSingleton(name: string): AztecSingleton; + /** + * Creates a new count map. + * @param name - name of the counter + */ + createCounter(name: string): AztecCounter; + /** * Starts a transaction. All calls to read/write data while in a transaction are queued and executed atomically. * @param callback - The callback to execute in a transaction diff --git a/yarn-project/kv-store/src/lmdb/counter.test.ts b/yarn-project/kv-store/src/lmdb/counter.test.ts new file mode 100644 index 00000000000..fdd1204b15c --- /dev/null +++ b/yarn-project/kv-store/src/lmdb/counter.test.ts @@ -0,0 +1,122 @@ +import { randomBytes } from 'crypto'; +import { Database, open } from 'lmdb'; + +import { LmdbAztecCounter } from './counter.js'; + +describe('LmdbAztecCounter', () => { + let db: Database; + + beforeEach(() => { + db = open({} as any); + }); + + describe.each([ + ['floating point number', () => Math.random()], + ['integers', () => (Math.random() * 1000) | 0], + ['strings', () => randomBytes(8).toString('hex')], + ['strings', () => [Math.random(), randomBytes(8).toString('hex')]], + ])('counts occurrences of %s values', (_, genKey) => { + let counter: LmdbAztecCounter>; + beforeEach(() => { + counter = new LmdbAztecCounter(db, 'test'); + }); + + it('returns 0 for unknown keys', () => { + expect(counter.get(genKey())).toEqual(0); + }); + + it('increments values', async () => { + const key = genKey(); + await counter.update(key, 1); + + expect(counter.get(key)).toEqual(1); + }); + + it('decrements values', async () => { + const key = genKey(); + await counter.update(key, 1); + await counter.update(key, -1); + + expect(counter.get(key)).toEqual(0); + }); + + it('throws when decrementing below zero', async () => { + const key = genKey(); + await counter.update(key, 1); + + await expect(counter.update(key, -2)).rejects.toThrow(); + }); + + it('increments values by a delta', async () => { + const key = genKey(); + await counter.update(key, 1); + await counter.update(key, 2); + + expect(counter.get(key)).toEqual(3); + }); + + it('resets the counter', async () => { + const key = genKey(); + await counter.update(key, 1); + await counter.update(key, 2); + await counter.set(key, 0); + + expect(counter.get(key)).toEqual(0); + }); + + it('iterates over entries', async () => { + const key = genKey(); + await counter.update(key, 1); + await counter.update(key, 2); + + expect([...counter.entries()]).toEqual([[key, 3]]); + }); + }); + + it.each([ + [ + [ + ['c', 2342], + ['a', 8], + ['b', 1], + ], + [ + ['a', 8], + ['b', 1], + ['c', 2342], + ], + ], + [ + [ + [10, 2], + [18, 1], + [1, 2], + ], + [ + [1, 2], + [10, 2], + [18, 1], + ], + ], + [ + [ + [[10, 'a'], 1], + [[10, 'c'], 2], + [[11, 'b'], 1], + [[9, 'f'], 1], + [[10, 'b'], 1], + ], + [ + [[9, 'f'], 1], + [[10, 'a'], 1], + [[10, 'b'], 1], + [[10, 'c'], 2], + [[11, 'b'], 1], + ], + ], + ])('iterates in key order', async (insertOrder, expectedOrder) => { + const counter = new LmdbAztecCounter(db, 'test'); + await Promise.all(insertOrder.map(([key, value]) => counter.update(key, value as number))); + expect([...counter.entries()]).toEqual(expectedOrder); + }); +}); diff --git a/yarn-project/kv-store/src/lmdb/counter.ts b/yarn-project/kv-store/src/lmdb/counter.ts new file mode 100644 index 00000000000..74886e89dbf --- /dev/null +++ b/yarn-project/kv-store/src/lmdb/counter.ts @@ -0,0 +1,57 @@ +import { Key as BaseKey, Database } from 'lmdb'; + +import { Key, Range } from '../interfaces/common.js'; +import { AztecCounter } from '../interfaces/counter.js'; +import { LmdbAztecMap } from './map.js'; + +/** + * A counter implementation backed by LMDB + */ +export class LmdbAztecCounter implements AztecCounter { + #db: Database; + #name: string; + #map: LmdbAztecMap; + + constructor(db: Database, name: string) { + this.#db = db; + this.#name = name; + this.#map = new LmdbAztecMap(db, name); + } + + set(key: K, value: number): Promise { + return this.#map.set(key, value); + } + + update(key: K, delta = 1): Promise { + return this.#db.childTransaction(() => { + const current = this.#map.get(key) ?? 0; + const next = current + delta; + + if (next < 0) { + throw new Error(`Cannot update ${key} in counter ${this.#name} below zero`); + } + + if (next === 0) { + void this.#map.delete(key); + } else { + // store the key inside the entry because LMDB might return an internal representation + // of the key when iterating over the database + void this.#map.set(key, next); + } + + return true; + }); + } + + get(key: K): number { + return this.#map.get(key) ?? 0; + } + + entries(range: Range = {}): IterableIterator<[K, number]> { + return this.#map.entries(range); + } + + keys(range: Range = {}): IterableIterator { + return this.#map.keys(range); + } +} diff --git a/yarn-project/kv-store/src/lmdb/map.test.ts b/yarn-project/kv-store/src/lmdb/map.test.ts index 5319e0a26c3..007b4c4eb8f 100644 --- a/yarn-project/kv-store/src/lmdb/map.test.ts +++ b/yarn-project/kv-store/src/lmdb/map.test.ts @@ -41,26 +41,24 @@ describe('LmdbAztecMap', () => { await map.set('foo', 'bar'); await map.set('baz', 'qux'); - expect([...map.entries()]).toEqual( - expect.arrayContaining([ - ['foo', 'bar'], - ['baz', 'qux'], - ]), - ); + expect([...map.entries()]).toEqual([ + ['baz', 'qux'], + ['foo', 'bar'], + ]); }); it('should be able to iterate over values', async () => { await map.set('foo', 'bar'); - await map.set('baz', 'qux'); + await map.set('baz', 'quux'); - expect([...map.values()]).toEqual(expect.arrayContaining(['bar', 'qux'])); + expect([...map.values()]).toEqual(['quux', 'bar']); }); it('should be able to iterate over keys', async () => { await map.set('foo', 'bar'); await map.set('baz', 'qux'); - expect([...map.keys()]).toEqual(expect.arrayContaining(['foo', 'baz'])); + expect([...map.keys()]).toEqual(['baz', 'foo']); }); it('should be able to get multiple values for a single key', async () => { @@ -69,4 +67,33 @@ describe('LmdbAztecMap', () => { expect([...map.getValues('foo')]).toEqual(['bar', 'baz']); }); + + it('supports tuple keys', async () => { + const map = new LmdbAztecMap<[number, string], string>(db, 'test'); + + await map.set([5, 'bar'], 'val'); + await map.set([0, 'foo'], 'val'); + + expect([...map.keys()]).toEqual([ + [0, 'foo'], + [5, 'bar'], + ]); + + expect(map.get([5, 'bar'])).toEqual('val'); + }); + + it('supports range queries', async () => { + await map.set('a', 'a'); + await map.set('b', 'b'); + await map.set('c', 'c'); + await map.set('d', 'd'); + + expect([...map.keys({ start: 'b', end: 'c' })]).toEqual(['b']); + expect([...map.keys({ start: 'b' })]).toEqual(['b', 'c', 'd']); + expect([...map.keys({ end: 'c' })]).toEqual(['a', 'b']); + expect([...map.keys({ start: 'b', end: 'c', reverse: true })]).toEqual(['c']); + expect([...map.keys({ start: 'b', limit: 1 })]).toEqual(['b']); + expect([...map.keys({ start: 'b', reverse: true })]).toEqual(['d', 'c']); + expect([...map.keys({ end: 'b', reverse: true })]).toEqual(['b', 'a']); + }); }); diff --git a/yarn-project/kv-store/src/lmdb/map.ts b/yarn-project/kv-store/src/lmdb/map.ts index b883b809738..6e5fa67ef1e 100644 --- a/yarn-project/kv-store/src/lmdb/map.ts +++ b/yarn-project/kv-store/src/lmdb/map.ts @@ -1,20 +1,30 @@ -import { Database, Key } from 'lmdb'; +import { Database, RangeOptions } from 'lmdb'; +import { Key, Range } from '../interfaces/common.js'; import { AztecMultiMap } from '../interfaces/map.js'; /** The slot where a key-value entry would be stored */ -type MapKeyValueSlot = ['map', string, 'slot', K]; +type MapValueSlot = ['map', string, 'slot', K]; /** * A map backed by LMDB. */ -export class LmdbAztecMap implements AztecMultiMap { - protected db: Database>; +export class LmdbAztecMap implements AztecMultiMap { + protected db: Database<[K, V], MapValueSlot>; protected name: string; - constructor(rootDb: Database, mapName: string) { + #startSentinel: MapValueSlot; + #endSentinel: MapValueSlot; + + constructor(rootDb: Database, mapName: string) { this.name = mapName; - this.db = rootDb as Database>; + this.db = rootDb as Database<[K, V], MapValueSlot>; + + // sentinels are used to define the start and end of the map + // with LMDB's key encoding, no _primitive value_ can be "less than" an empty buffer or greater than Byte 255 + // these will be used later to answer range queries + this.#startSentinel = ['map', this.name, 'slot', Buffer.from([])]; + this.#endSentinel = ['map', this.name, 'slot', Buffer.from([255])]; } close(): Promise { @@ -22,13 +32,13 @@ export class LmdbAztecMap implements AztecMultiMap } get(key: K): V | undefined { - return this.db.get(this.#slot(key)) as V | undefined; + return this.db.get(this.#slot(key))?.[1]; } *getValues(key: K): IterableIterator { const values = this.db.getValues(this.#slot(key)); for (const value of values) { - yield value; + yield value?.[1]; } } @@ -37,13 +47,23 @@ export class LmdbAztecMap implements AztecMultiMap } set(key: K, val: V): Promise { - return this.db.put(this.#slot(key), val); + return this.db.put(this.#slot(key), [key, val]); + } + + swap(key: K, fn: (val: V | undefined) => V): Promise { + return this.db.childTransaction(() => { + const slot = this.#slot(key); + const entry = this.db.get(slot); + void this.db.put(slot, [key, fn(entry?.[1])]); + + return true; + }); } setIfNotExists(key: K, val: V): Promise { const slot = this.#slot(key); return this.db.ifNoExists(slot, () => { - void this.db.put(slot, val); + void this.db.put(slot, [key, val]); }); } @@ -52,37 +72,58 @@ export class LmdbAztecMap implements AztecMultiMap } async deleteValue(key: K, val: V): Promise { - await this.db.remove(this.#slot(key), val); + await this.db.remove(this.#slot(key), [key, val]); } - *entries(): IterableIterator<[K, V]> { - const iterator = this.db.getRange({ - start: ['map', this.name, 'slot'], - }); - - for (const { key, value } of iterator) { - if (key[0] !== 'map' || key[1] !== this.name) { - break; - } - - const originalKey = key[3]; - yield [originalKey, value]; + *entries(range: Range = {}): IterableIterator<[K, V]> { + const { reverse = false, limit } = range; + // LMDB has a quirk where it expects start > end when reverse=true + // in that case, we need to swap the start and end sentinels + const start = reverse + ? range.end + ? this.#slot(range.end) + : this.#endSentinel + : range.start + ? this.#slot(range.start) + : this.#startSentinel; + + const end = reverse + ? range.start + ? this.#slot(range.start) + : this.#startSentinel + : range.end + ? this.#slot(range.end) + : this.#endSentinel; + + const lmdbRange: RangeOptions = { + start, + end, + reverse, + limit, + }; + + const iterator = this.db.getRange(lmdbRange); + + for (const { + value: [key, value], + } of iterator) { + yield [key, value]; } } - *values(): IterableIterator { - for (const [_, value] of this.entries()) { + *values(range: Range = {}): IterableIterator { + for (const [_, value] of this.entries(range)) { yield value; } } - *keys(): IterableIterator { - for (const [key, _] of this.entries()) { + *keys(range: Range = {}): IterableIterator { + for (const [key, _] of this.entries(range)) { yield key; } } - #slot(key: K): MapKeyValueSlot { + #slot(key: K): MapValueSlot { return ['map', this.name, 'slot', key]; } } diff --git a/yarn-project/kv-store/src/lmdb/store.ts b/yarn-project/kv-store/src/lmdb/store.ts index 9ede111a875..8c3cebd0737 100644 --- a/yarn-project/kv-store/src/lmdb/store.ts +++ b/yarn-project/kv-store/src/lmdb/store.ts @@ -4,10 +4,12 @@ import { Logger, createDebugLogger } from '@aztec/foundation/log'; import { Database, Key, RootDatabase, open } from 'lmdb'; import { AztecArray } from '../interfaces/array.js'; +import { AztecCounter } from '../interfaces/counter.js'; import { AztecMap, AztecMultiMap } from '../interfaces/map.js'; import { AztecSingleton } from '../interfaces/singleton.js'; import { AztecKVStore } from '../interfaces/store.js'; import { LmdbAztecArray } from './array.js'; +import { LmdbAztecCounter } from './counter.js'; import { LmdbAztecMap } from './map.js'; import { LmdbAztecSingleton } from './singleton.js'; @@ -88,6 +90,10 @@ export class AztecLmdbStore implements AztecKVStore { return new LmdbAztecMap(this.#multiMapData, name); } + createCounter>(name: string): AztecCounter { + return new LmdbAztecCounter(this.#data, name); + } + /** * Creates a new AztecArray in the store. * @param name - Name of the array diff --git a/yarn-project/yarn.lock b/yarn-project/yarn.lock index d2945982290..5e7bbb29474 100644 --- a/yarn-project/yarn.lock +++ b/yarn-project/yarn.lock @@ -115,6 +115,7 @@ __metadata: "@aztec/circuits.js": "workspace:^" "@aztec/ethereum": "workspace:^" "@aztec/foundation": "workspace:^" + "@aztec/kv-store": "workspace:^" "@aztec/l1-artifacts": "workspace:^" "@jest/globals": ^29.5.0 "@types/debug": ^4.1.7 @@ -399,6 +400,7 @@ __metadata: "@aztec/cli": "workspace:^" "@aztec/ethereum": "workspace:^" "@aztec/foundation": "workspace:^" + "@aztec/kv-store": "workspace:^" "@aztec/l1-artifacts": "workspace:^" "@aztec/merkle-tree": "workspace:^" "@aztec/noir-contracts": "workspace:^" @@ -425,7 +427,6 @@ __metadata: koa: ^2.14.2 koa-static: ^5.0.0 levelup: ^5.1.1 - lmdb: ^2.9.1 lodash.compact: ^3.0.1 lodash.every: ^4.6.0 memdown: ^6.1.1