Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: note processor cleanup #6870

Merged
merged 1 commit into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 7 additions & 28 deletions yarn-project/pxe/src/note_processor/note_processor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,7 @@ describe('Note Processor', () => {
const request = new MockNoteRequest(TaggedNote.random(app), 4, 0, 2, ownerIvpkM, KeyValidationRequest.random());

const blocks = mockBlocks([request]);

// TODO(#6830): pass in only the blocks
const encryptedLogs = blocks.flatMap(block => block.body.noteEncryptedLogs);
await noteProcessor.process(blocks, encryptedLogs);
await noteProcessor.process(blocks);

expect(addNotesSpy).toHaveBeenCalledTimes(1);
expect(addNotesSpy).toHaveBeenCalledWith(
Expand All @@ -206,10 +203,7 @@ describe('Note Processor', () => {
const request = new MockNoteRequest(TaggedNote.random(app), 4, 0, 2, Point.random(), ownerOvKeys);

const blocks = mockBlocks([request]);

// TODO(#6830): pass in only the blocks
const encryptedLogs = blocks.flatMap(block => block.body.noteEncryptedLogs);
await noteProcessor.process(blocks, encryptedLogs);
await noteProcessor.process(blocks);

expect(addNotesSpy).toHaveBeenCalledTimes(1);
// For outgoing notes, the resulting DAO does not contain index.
Expand All @@ -226,10 +220,7 @@ describe('Note Processor', () => {
];

const blocks = mockBlocks(requests);

// TODO(#6830): pass in only the blocks
const encryptedLogs = blocks.flatMap(block => block.body.noteEncryptedLogs);
await noteProcessor.process(blocks, encryptedLogs);
await noteProcessor.process(blocks);

expect(addNotesSpy).toHaveBeenCalledTimes(1);
expect(addNotesSpy).toHaveBeenCalledWith(
Expand Down Expand Up @@ -263,10 +254,7 @@ describe('Note Processor', () => {
new MockNoteRequest(TaggedNote.random(), 2, 1, 1, Point.random(), KeyValidationRequest.random()),
new MockNoteRequest(TaggedNote.random(), 2, 3, 0, Point.random(), KeyValidationRequest.random()),
]);

// TODO(#6830): pass in only the blocks
const encryptedLogs = blocks.flatMap(block => block.body.noteEncryptedLogs);
await noteProcessor.process(blocks, encryptedLogs);
await noteProcessor.process(blocks);

expect(addNotesSpy).toHaveBeenCalledTimes(0);
});
Expand All @@ -284,10 +272,7 @@ describe('Note Processor', () => {
];

const blocks = mockBlocks(requests);

// TODO(#6830): pass in only the blocks
const encryptedLogs = blocks.flatMap(block => block.body.noteEncryptedLogs);
await noteProcessor.process(blocks, encryptedLogs);
await noteProcessor.process(blocks);

// First we check incoming
{
Expand Down Expand Up @@ -325,10 +310,7 @@ describe('Note Processor', () => {
const request = new MockNoteRequest(TaggedNote.random(), 6, 0, 2, ownerIvpkM, ownerOvKeys);

const blocks = mockBlocks([request]);

// TODO(#6830): pass in only the blocks
const encryptedLogs = blocks.flatMap(block => block.body.noteEncryptedLogs);
await noteProcessor.process(blocks, encryptedLogs);
await noteProcessor.process(blocks);

expect(noteProcessor.status.syncedToBlock).toEqual(blocks.at(-1)?.number);
});
Expand All @@ -337,10 +319,7 @@ describe('Note Processor', () => {
const request = new MockNoteRequest(TaggedNote.random(), 6, 0, 2, Point.random(), KeyValidationRequest.random());

const blocks = mockBlocks([request]);

// TODO(#6830): pass in only the blocks
const encryptedLogs = blocks.flatMap(block => block.body.noteEncryptedLogs);
await noteProcessor.process(blocks, encryptedLogs);
await noteProcessor.process(blocks);

const newNoteProcessor = await NoteProcessor.create(
account.address,
Expand Down
30 changes: 8 additions & 22 deletions yarn-project/pxe/src/note_processor/note_processor.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
import {
type AztecNode,
type EncryptedNoteL2BlockL2Logs,
L1NotePayload,
type L2Block,
TaggedNote,
} from '@aztec/circuit-types';
import { type AztecNode, L1NotePayload, type L2Block, TaggedNote } from '@aztec/circuit-types';
import { type NoteProcessorStats } from '@aztec/circuit-types/stats';
import {
type AztecAddress,
Expand Down Expand Up @@ -111,18 +105,11 @@ export class NoteProcessor {
/**
* Extracts new user-relevant notes from the information contained in the provided L2 blocks and encrypted logs.
*
* @throws If the number of blocks and encrypted logs do not match.
* @param l2Blocks - L2 blocks to be processed.
* @param encryptedL2BlockLogs - Encrypted logs associated with the L2 blocks.
* @param blocks - L2 blocks to be processed.
* @returns A promise that resolves once the processing is completed.
*/
public async process(l2Blocks: L2Block[], encryptedL2BlockLogs: EncryptedNoteL2BlockL2Logs[]): Promise<void> {
if (l2Blocks.length !== encryptedL2BlockLogs.length) {
throw new Error(
`Number of blocks and EncryptedLogs is not equal. Received ${l2Blocks.length} blocks, ${encryptedL2BlockLogs.length} encrypted logs.`,
);
}
if (l2Blocks.length === 0) {
public async process(blocks: L2Block[]): Promise<void> {
if (blocks.length === 0) {
return;
}

Expand All @@ -136,10 +123,9 @@ export class NoteProcessor {
const ovskM = await this.keyStore.getMasterSecretKey(this.ovpkM);

// Iterate over both blocks and encrypted logs.
for (let blockIndex = 0; blockIndex < encryptedL2BlockLogs.length; ++blockIndex) {
for (const block of blocks) {
this.stats.blocks++;
const { txLogs } = encryptedL2BlockLogs[blockIndex];
const block = l2Blocks[blockIndex];
const { txLogs } = block.body.noteEncryptedLogs;
const dataStartIndexForBlock =
block.header.state.partial.noteHashTree.nextAvailableLeafIndex -
block.body.numberOfTxsIncludingPadded * MAX_NEW_NOTE_HASHES_PER_TX;
Expand Down Expand Up @@ -210,7 +196,7 @@ export class NoteProcessor {
}

blocksAndNotes.push({
block: l2Blocks[blockIndex],
block,
incomingNotes,
outgoingNotes,
});
Expand All @@ -219,7 +205,7 @@ export class NoteProcessor {
await this.processBlocksAndNotes(blocksAndNotes);
await this.processDeferredNotes(deferredNoteDaosIncoming);

const syncedToBlock = l2Blocks[l2Blocks.length - 1].number;
const syncedToBlock = blocks[blocks.length - 1].number;
await this.db.setSynchedBlockNumberForPublicKey(this.ivpkM, syncedToBlock);

this.log.debug(`Synched block ${syncedToBlock}`);
Expand Down
19 changes: 4 additions & 15 deletions yarn-project/pxe/src/synchronizer/synchronizer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { type AztecNode, type L2Block, L2BlockL2Logs, MerkleTreeId, type TxHash } from '@aztec/circuit-types';
import { type AztecNode, type L2Block, MerkleTreeId, type TxHash } from '@aztec/circuit-types';
import { type NoteProcessorCaughtUpStats } from '@aztec/circuit-types/stats';
import { type AztecAddress, type Fr, INITIAL_L2_BLOCK_NUM, type PublicKey } from '@aztec/circuits.js';
import { type SerialQueue } from '@aztec/foundation/fifo';
Expand Down Expand Up @@ -99,19 +99,13 @@ export class Synchronizer {
return false;
}

const noteEncryptedLogs = blocks.flatMap(block => block.body.noteEncryptedLogs);

// Update latest tree roots from the most recent block
const latestBlock = blocks[blocks.length - 1];
await this.setHeaderFromBlock(latestBlock);

const logCount = L2BlockL2Logs.getTotalLogCount(noteEncryptedLogs);
this.log.debug(
`Forwarding ${logCount} encrypted logs and blocks to ${this.noteProcessors.length} note processors`,
);
this.log.debug(`Forwarding ${blocks.length} blocks to ${this.noteProcessors.length} note processors`);
for (const noteProcessor of this.noteProcessors) {
// TODO(#6830): pass in only the blocks
await noteProcessor.process(blocks, noteEncryptedLogs);
await noteProcessor.process(blocks);
}
return true;
} catch (err) {
Expand Down Expand Up @@ -177,11 +171,6 @@ export class Synchronizer {
throw new Error('No blocks in processor catch up mode');
}

const noteEncryptedLogs = blocks.flatMap(block => block.body.noteEncryptedLogs);

const logCount = L2BlockL2Logs.getTotalLogCount(noteEncryptedLogs);
this.log.debug(`Forwarding ${logCount} encrypted logs and blocks to note processors in catch up mode`);

for (const noteProcessor of catchUpGroup) {
// find the index of the first block that the note processor is not yet synced to
const index = blocks.findIndex(block => block.number > noteProcessor.status.syncedToBlock);
Expand All @@ -197,7 +186,7 @@ export class Synchronizer {
blocks.length - index
} blocks`,
);
await noteProcessor.process(blocks.slice(index), noteEncryptedLogs.slice(index));
await noteProcessor.process(blocks.slice(index));

if (noteProcessor.status.syncedToBlock === toBlockNumber) {
// Note processor caught up, move it to `noteProcessors` from `noteProcessorsToCatchUp`.
Expand Down
Loading