Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: getLogsByTags request batching in syncTaggedLogs #10716

Merged
merged 11 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion yarn-project/circuit-types/src/interfaces/aztec-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,8 @@ export interface AztecNode
* Gets all logs that match any of the received tags (i.e. logs with their first field equal to a tag).
* @param tags - The tags to filter the logs by.
* @returns For each received tag, an array of matching logs and metadata (e.g. tx hash) is returned. An empty
array implies no logs match that tag.
* array implies no logs match that tag. There can be multiple logs for 1 tag because tag reuse can happen
* --> e.g. when sending a note from multiple unsynched devices.
*/
getLogsByTags(tags: Fr[]): Promise<TxScopedL2Log[][]>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ import { poseidon2Hash } from '@aztec/foundation/crypto';
import { Fr } from '@aztec/foundation/fields';

export class IndexedTaggingSecret {
constructor(public appTaggingSecret: Fr, public index: number) {}
constructor(public appTaggingSecret: Fr, public index: number) {
if (index < 0) {
throw new Error('IndexedTaggingSecret index out of bounds');
}
}

toFields(): Fr[] {
return [this.appTaggingSecret, new Fr(this.index)];
Expand Down
201 changes: 111 additions & 90 deletions yarn-project/pxe/src/simulator_oracle/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import { type IncomingNoteDao } from '../database/incoming_note_dao.js';
import { type PxeDatabase } from '../database/index.js';
import { produceNoteDaos } from '../note_decryption_utils/produce_note_daos.js';
import { getAcirSimulator } from '../simulator/index.js';
import { getInitialIndexes, getLeftMostIndexedTaggingSecrets, getRightMostIndexes } from './tagging_utils.js';
import { getIndexedTaggingSecretsForTheWindow, getInitialIndexesMap } from './tagging_utils.js';

/**
* A data oracle that provides information needed for simulating a transaction.
Expand Down Expand Up @@ -424,116 +424,137 @@ export class SimulatorOracle implements DBOracle {
// Half the size of the window we slide over the tagging secret indexes.
const WINDOW_HALF_SIZE = 10;

// Ideally this algorithm would be implemented in noir, exposing its building blocks as oracles.
// However it is impossible at the moment due to the language not supporting nested slices.
// This nesting is necessary because for a given set of tags we don't
// know how many logs we will get back. Furthermore, these logs are of undetermined
// length, since we don't really know the note they correspond to until we decrypt them.

const recipients = scopes ? scopes : await this.keyStore.getAccounts();
// A map of never-before-seen logs going from recipient address to logs
const newLogsMap = new Map<string, TxScopedL2Log[]>();
// A map of logs going from recipient address to logs. Note that the logs might have been processed before
// due to us having a sliding window that "looks back" for logs as well. (We look back as there is no guarantee
// that a logs will be received ordered by a given tax index and that the tags won't be reused).
const logsMap = new Map<string, TxScopedL2Log[]>();
const contractName = await this.contractDataOracle.getDebugContractName(contractAddress);
for (const recipient of recipients) {
const logs: TxScopedL2Log[] = [];
// Ideally this algorithm would be implemented in noir, exposing its building blocks as oracles.
// However it is impossible at the moment due to the language not supporting nested slices.
// This nesting is necessary because for a given set of tags we don't
// know how many logs we will get back. Furthermore, these logs are of undetermined
// length, since we don't really know the note they correspond to until we decrypt them.

// 1. Get all the secrets for the recipient and sender pairs (#9365)
const indexedTaggingSecrets = await this.#getIndexedTaggingSecretsForContacts(contractAddress, recipient);

// 1.1 Set up a sliding window with an offset. Chances are the sender might have messed up
// and inadvertently incremented their index without us getting any logs (for example, in case
// of a revert). If we stopped looking for logs the first time we don't receive any logs for a tag,
// we might never receive anything from that sender again.
// Also there's a possibility that we have advanced our index, but the sender has reused it,
// so we might have missed some logs. For these reasons, we have to look both back and ahead of
// the stored index.

// App tagging secrets along with an index in a window to check in the current iteration. Called current because
// this value will be updated as we iterate through the window.
let currentSecrets = getLeftMostIndexedTaggingSecrets(indexedTaggingSecrets, WINDOW_HALF_SIZE);
// Right-most indexes in a window to check stored in a key-value map where key is the app tagging secret
// and value is the index to check (the right-most index in the window).
const rightMostIndexesMap = getRightMostIndexes(indexedTaggingSecrets, WINDOW_HALF_SIZE);
const logsForRecipient: TxScopedL2Log[] = [];

// Get all the secrets for the recipient and sender pairs (#9365)
const secrets = await this.#getIndexedTaggingSecretsForContacts(contractAddress, recipient);

// We fetch logs for a window of indexes in a range:
// <latest_log_index - WINDOW_HALF_SIZE, latest_log_index + WINDOW_HALF_SIZE>.
//
// We use this window approach because it could happen that a sender might have messed up and inadvertently
// incremented their index without us getting any logs (for example, in case of a revert). If we stopped looking
// for logs the first time we don't receive any logs for a tag, we might never receive anything from that sender again.
// Also there's a possibility that we have advanced our index, but the sender has reused it, so we might have missed
// some logs. For these reasons, we have to look both back and ahead of the stored index.
let secretsAndWindows = secrets.map(secret => {
return {
appTaggingSecret: secret.appTaggingSecret,
leftMostIndex: Math.max(0, secret.index - WINDOW_HALF_SIZE),
rightMostIndex: secret.index + WINDOW_HALF_SIZE,
};
});

// As we iterate we store the largest index we have seen for a given secret to later on store it in the db.
const newLargestIndexMapToStore: { [k: string]: number } = {};

// The initial/unmodified indexes of the secrets stored in a key-value map where key is the app tagging secret.
const initialIndexesMap = getInitialIndexes(indexedTaggingSecrets);
// A map of indexes to increment for secrets for which we have found logs with an index higher than the one
// stored.
const indexesToIncrementMap: { [k: string]: number } = {};

while (currentSecrets.length > 0) {
// 2. Compute tags using the secrets, recipient and index. Obtain logs for each tag (#9380)
const currentTags = currentSecrets.map(secret =>
// We compute the siloed tags since we need the tags as they appear in the log.
const initialIndexesMap = getInitialIndexesMap(secrets);

while (secretsAndWindows.length > 0) {
const secretsForTheWholeWindow = getIndexedTaggingSecretsForTheWindow(secretsAndWindows);
const tagsForTheWholeWindow = secretsForTheWholeWindow.map(secret =>
secret.computeSiloedTag(recipient, contractAddress),
);

// We store the new largest indexes we find in the iteration in the following map to later on construct
// a new set of secrets and windows to fetch logs for.
const newLargestIndexMapForIteration: { [k: string]: number } = {};

// Fetch the logs for the tags and iterate over them
const logsByTags = await this.aztecNode.getLogsByTags(currentTags);
const secretsWithNewIndex: IndexedTaggingSecret[] = [];
const logsByTags = await this.aztecNode.getLogsByTags(tagsForTheWholeWindow);

logsByTags.forEach((logsByTag, logIndex) => {
const { appTaggingSecret: currentSecret, index: currentIndex } = currentSecrets[logIndex];
const currentSecretAsStr = currentSecret.toString();
this.log.debug(`Syncing logs for recipient ${recipient} at contract ${contractName}(${contractAddress})`, {
recipient,
secret: currentSecret,
index: currentIndex,
contractName,
contractAddress,
});
// 3.1. Append logs to the list and increment the index for the tags that have logs (#9380)
if (logsByTag.length > 0) {
const newIndex = currentIndex + 1;
this.log.debug(
`Found ${logsByTag.length} logs as recipient ${recipient}. Incrementing index to ${newIndex} at contract ${contractName}(${contractAddress})`,
{
recipient,
secret: currentSecret,
newIndex,
contractName,
contractAddress,
},
);
logs.push(...logsByTag);

if (currentIndex >= initialIndexesMap[currentSecretAsStr]) {
// 3.2. We found an index higher than the stored/initial one so we update it in the db later on (#9380)
indexesToIncrementMap[currentSecretAsStr] = newIndex;
// 3.3. We found an index higher than the initial one so we slide the window.
rightMostIndexesMap[currentSecretAsStr] = currentIndex + WINDOW_HALF_SIZE;
// The logs for the given tag exist so we store them for later processing
logsForRecipient.push(...logsByTag);

// We retrieve the indexed tagging secret corresponding to the log as I need that to evaluate whether
// a new largest index have been found.
const secretCorrespondingToLog = secretsForTheWholeWindow[logIndex];
const initialIndex = initialIndexesMap[secretCorrespondingToLog.appTaggingSecret.toString()];

this.log.debug(`Found ${logsByTag.length} logs as recipient ${recipient}`, {
recipient,
secret: secretCorrespondingToLog.appTaggingSecret,
contractName,
contractAddress,
});

if (
secretCorrespondingToLog.index >= initialIndex &&
(newLargestIndexMapForIteration[secretCorrespondingToLog.appTaggingSecret.toString()] === undefined ||
secretCorrespondingToLog.index >=
newLargestIndexMapForIteration[secretCorrespondingToLog.appTaggingSecret.toString()])
) {
// We have found a new largest index so we store it for later processing (storing it in the db + fetching
// the difference of the window sets of current and the next iteration)
newLargestIndexMapForIteration[secretCorrespondingToLog.appTaggingSecret.toString()] =
secretCorrespondingToLog.index + 1;

this.log.debug(
`Incrementing index to ${
secretCorrespondingToLog.index + 1
} at contract ${contractName}(${contractAddress})`,
);
}
}
// 3.4 Keep increasing the index (inside the window) temporarily for the tags that have no logs
// There's a chance the sender missed some and we want to catch up
if (currentIndex < rightMostIndexesMap[currentSecretAsStr]) {
const newTaggingSecret = new IndexedTaggingSecret(currentSecret, currentIndex + 1);
secretsWithNewIndex.push(newTaggingSecret);
}
});

// We store the new indexes for the secrets that have logs with an index higher than the one stored.
await this.db.setTaggingSecretsIndexesAsRecipient(
Object.keys(indexesToIncrementMap).map(
secret => new IndexedTaggingSecret(Fr.fromHexString(secret), indexesToIncrementMap[secret]),
),
);
// Now based on the new largest indexes we found, we will construct a new secrets and windows set to fetch logs
// for. Note that it's very unlikely that a new log from the current window would appear between the iterations
// so we fetch the logs only for the difference of the window sets.
const newSecretsAndWindows = [];
for (const [appTaggingSecret, newIndex] of Object.entries(newLargestIndexMapForIteration)) {
const secret = secrets.find(secret => secret.appTaggingSecret.toString() === appTaggingSecret);
if (secret) {
newSecretsAndWindows.push({
appTaggingSecret: secret.appTaggingSecret,
// We set the left most index to the new index to avoid fetching the same logs again
leftMostIndex: newIndex,
rightMostIndex: newIndex + WINDOW_HALF_SIZE,
});

// We store the new largest index in the map to later store it in the db.
newLargestIndexMapToStore[appTaggingSecret] = newIndex;
} else {
throw new Error(
`Secret not found for appTaggingSecret ${appTaggingSecret}. This is a bug as it should never happen!`,
);
}
}

// We've processed all the current secret-index pairs so we proceed to the next iteration.
currentSecrets = secretsWithNewIndex;
// Now we set the new secrets and windows and proceed to the next iteration.
secretsAndWindows = newSecretsAndWindows;
}

newLogsMap.set(
// We filter the logs by block number and store them in the map.
logsMap.set(
recipient.toString(),
// Remove logs with a block number higher than the max block number
// Duplicates are likely to happen due to the sliding window, so we also filter them out
logs.filter(
(log, index, self) =>
// The following condition is true if the log has small enough block number and is unique
// --> the right side of the && is true if the index of the current log is the first occurrence
// of the log in the array --> that way we ensure uniqueness.
log.blockNumber <= maxBlockNumber && index === self.findIndex(otherLog => otherLog.equals(log)),
logsForRecipient.filter(log => log.blockNumber <= maxBlockNumber),
);

// At this point we have processed all the logs for the recipient so we store the new largest indexes in the db.
await this.db.setTaggingSecretsIndexesAsRecipient(
Object.entries(newLargestIndexMapToStore).map(
([appTaggingSecret, index]) => new IndexedTaggingSecret(Fr.fromHexString(appTaggingSecret), index),
),
);
}
return newLogsMap;
return logsMap;
}

/**
Expand Down
Loading
Loading