Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: request missing on chain events on error #2298

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/gold-geckos-remain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@farcaster/core": patch
"@farcaster/hubble": patch
---

feat: request missing on chain events on related submit message errors
48 changes: 48 additions & 0 deletions apps/hubble/src/eth/l2EventsProvider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,52 @@ describe("process events", () => {
},
TEST_TIMEOUT_LONG,
);

test(
"retry by fid",
async () => {
const rentSim = await simulateContract(publicClient, {
address: storageRegistryAddress,
abi: StorageRegistry.abi,
functionName: "credit",
account: accounts[0].address,
args: [BigInt(1), BigInt(1)],
});

// Set up the storage rent event
const rentHash = await writeContract(walletClientWithAccount, rentSim.request);
const rentTrx = await waitForTransactionReceipt(publicClient, { hash: rentHash });
await sleep(1000); // allow time for the rent event to be polled for
await mine(testClient, { blocks: L2EventsProvider.numConfirmations });
await waitForBlock(Number(rentTrx.blockNumber) + L2EventsProvider.numConfirmations);

const events1 = await onChainEventStore.getOnChainEvents(OnChainEventType.EVENT_TYPE_STORAGE_RENT, 1);
expect(events1.length).toEqual(1);
expect(events1[0]?.fid).toEqual(1);
expect(events1[0]?.storageRentEventBody?.units).toEqual(1);

const clearAndRetryForFid = async () => {
// Clear on chain events and show that they get re-ingested when retrying by fid
await OnChainEventStore.clearEvents(db);
const events = await onChainEventStore.getOnChainEvents(OnChainEventType.EVENT_TYPE_STORAGE_RENT, 1);

expect(events.length).toEqual(0);

await l2EventsProvider.retryEventsForFid(1);
await sleep(1000); // allow time for the rent event to be polled for
return await onChainEventStore.getOnChainEvents(OnChainEventType.EVENT_TYPE_STORAGE_RENT, 1);
};

const events2 = await clearAndRetryForFid();

expect(events2.length).toEqual(1);
expect(events2[0]?.fid).toEqual(1);
expect(events2[0]?.storageRentEventBody?.units).toEqual(1);

// After retrying once, we don't retry again
const events3 = await clearAndRetryForFid();
expect(events3.length).toEqual(0);
},
TEST_TIMEOUT_LONG,
);
});
198 changes: 165 additions & 33 deletions apps/hubble/src/eth/l2EventsProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@
}

const RENT_EXPIRY_IN_SECONDS = 365 * 24 * 60 * 60; // One year
const FID_RETRY_DEDUP_LOOKBACK_MS = 60 * 60 * 1000; // One hour

type EventSpecificArgs = {
eventName: string;
fid: number;
};

/**
* Class that follows the Optimism chain to handle on-chain events from the Storage Registry contract.
Expand All @@ -87,6 +93,7 @@

private _onChainEventsByBlock: Map<number, Array<OnChainEvent>>;
private _retryDedupMap: Map<number, boolean>;
private _fidRetryDedupMap: Map<number, boolean>;
private _blockTimestampsCache: Map<string, number>;

private _lastBlockNumber: number;
Expand Down Expand Up @@ -160,6 +167,12 @@
// numConfirmations blocks have been mined.
this._onChainEventsByBlock = new Map();
this._retryDedupMap = new Map();
this._fidRetryDedupMap = new Map();
// Clear the map periodically to avoid memory bloat.
setInterval(() => {
this._fidRetryDedupMap.clear();

Check warning on line 173 in apps/hubble/src/eth/l2EventsProvider.ts

View check run for this annotation

Codecov / codecov/patch

apps/hubble/src/eth/l2EventsProvider.ts#L173

Added line #L173 was not covered by tests
}, FID_RETRY_DEDUP_LOOKBACK_MS);

this._blockTimestampsCache = new Map();

this.setAddresses(storageRegistryAddress, keyRegistryV2Address, idRegistryV2Address);
Expand Down Expand Up @@ -300,6 +313,7 @@
// Handling: use try-catch + log since errors are expected and not important to surface
try {
if (event.eventName === "Rent") {
statsd().increment("l2events.events_processed", { kind: "storage:rent" });
// Fix when viem fixes https://github.com/wagmi-dev/viem/issues/938
const rentEvent = event as Log<
bigint,
Expand Down Expand Up @@ -366,6 +380,7 @@
// Handling: use try-catch + log since errors are expected and not important to surface
try {
if (event.eventName === "Add") {
statsd().increment("l2events.events_processed", { kind: "key-registry:add" });

Check warning on line 383 in apps/hubble/src/eth/l2EventsProvider.ts

View check run for this annotation

Codecov / codecov/patch

apps/hubble/src/eth/l2EventsProvider.ts#L383

Added line #L383 was not covered by tests
const addEvent = event as Log<
bigint,
number,
Expand Down Expand Up @@ -393,6 +408,7 @@
signerEventBody,
);
} else if (event.eventName === "Remove") {
statsd().increment("l2events.events_processed", { kind: "key-registry:remove" });

Check warning on line 411 in apps/hubble/src/eth/l2EventsProvider.ts

View check run for this annotation

Codecov / codecov/patch

apps/hubble/src/eth/l2EventsProvider.ts#L411

Added line #L411 was not covered by tests
const removeEvent = event as Log<
bigint,
number,
Expand All @@ -417,6 +433,7 @@
signerEventBody,
);
} else if (event.eventName === "AdminReset") {
statsd().increment("l2events.events_processed", { kind: "key-registry:admin-reset" });

Check warning on line 436 in apps/hubble/src/eth/l2EventsProvider.ts

View check run for this annotation

Codecov / codecov/patch

apps/hubble/src/eth/l2EventsProvider.ts#L436

Added line #L436 was not covered by tests
const resetEvent = event as Log<
bigint,
number,
Expand All @@ -441,6 +458,7 @@
signerEventBody,
);
} else if (event.eventName === "Migrated") {
statsd().increment("l2events.events_processed", { kind: "key-registry:migrated" });

Check warning on line 461 in apps/hubble/src/eth/l2EventsProvider.ts

View check run for this annotation

Codecov / codecov/patch

apps/hubble/src/eth/l2EventsProvider.ts#L461

Added line #L461 was not covered by tests
const migratedEvent = event as Log<
bigint,
number,
Expand Down Expand Up @@ -493,6 +511,7 @@
// Handling: use try-catch + log since errors are expected and not important to surface
try {
if (event.eventName === "Register") {
statsd().increment("l2events.events_processed", { kind: "id-registry:register" });

Check warning on line 514 in apps/hubble/src/eth/l2EventsProvider.ts

View check run for this annotation

Codecov / codecov/patch

apps/hubble/src/eth/l2EventsProvider.ts#L514

Added line #L514 was not covered by tests
const registerEvent = event as Log<
bigint,
number,
Expand Down Expand Up @@ -520,6 +539,7 @@
idRegisterEventBody,
);
} else if (event.eventName === "Transfer") {
statsd().increment("l2events.events_processed", { kind: "id-registry:transfer" });

Check warning on line 542 in apps/hubble/src/eth/l2EventsProvider.ts

View check run for this annotation

Codecov / codecov/patch

apps/hubble/src/eth/l2EventsProvider.ts#L542

Added line #L542 was not covered by tests
const transferEvent = event as Log<
bigint,
number,
Expand Down Expand Up @@ -547,6 +567,7 @@
idRegisterEventBody,
);
} else if (event.eventName === "ChangeRecoveryAddress") {
statsd().increment("l2events.events_processed", { kind: "id-registry:change-recovery-address" });

Check warning on line 570 in apps/hubble/src/eth/l2EventsProvider.ts

View check run for this annotation

Codecov / codecov/patch

apps/hubble/src/eth/l2EventsProvider.ts#L570

Added line #L570 was not covered by tests
const transferEvent = event as Log<
bigint,
number,
Expand Down Expand Up @@ -661,6 +682,54 @@
}
}

private getEventSpecificArgs(fid: number) {
return {
StorageRegistry: [{ eventName: "Rent", fid }],
IdRegistry: [
{ eventName: "Register", fid },
{ eventName: "Transfer", fid },
],
KeyRegistry: [
{ eventName: "Add", fid },
{ eventName: "Remove", fid },
],
};
}

public async retryEventsForFid(fid: number) {
aditiharini marked this conversation as resolved.
Show resolved Hide resolved
if (this._fidRetryDedupMap.has(fid)) {
return;
}

this._fidRetryDedupMap.set(fid, true);
aditiharini marked this conversation as resolved.
Show resolved Hide resolved

log.info(
{ fid, startBlock: this._firstBlock, stopBlock: this._lastBlockNumber, chunkSize: this._chunkSize },
`Attempting to retryEventsForFid ${fid}`,
);
statsd().increment("l2events.retriesByFid.attempts", 1, { fid: fid.toString() });

// Let's not remove. No need to retry same fid many times. If we allow retrying multiple times, we need to rate limit. The map will get cleared periodically to prevent memory bloat.
try {
// The viem API requires an event kind if you want to provide filters by indexed arguments-- this is why we're making multiple calls to syncHistoricalEvents and prioritizing the most common event types.
const eventSpecificArgs = this.getEventSpecificArgs(fid);

// The batch sizes are artificially large-- this batch size is internally enforced and we essentially want to eliminate batches given that we don't expct a lot of results from these queries.
await this.syncHistoricalEvents(
this._firstBlock,
this._lastBlockNumber,
this._lastBlockNumber,
eventSpecificArgs,
);

log.info({ fid }, `Finished retryEventsForFid ${fid}`);
statsd().increment("l2events.retriesByFid.successes", 1, { fid: fid.toString() });
} catch (e) {
log.error(e, `Error in retryEventsForFid ${fid}`);
statsd().increment("l2events.retriesByFid.errors", 1, { fid: fid.toString() });

Check warning on line 729 in apps/hubble/src/eth/l2EventsProvider.ts

View check run for this annotation

Codecov / codecov/patch

apps/hubble/src/eth/l2EventsProvider.ts#L728-L729

Added lines #L728 - L729 were not covered by tests
}
}

private setAddresses(
storageRegistryAddress: `0x${string}`,
keyRegistryV2Address: `0x${string}`,
Expand Down Expand Up @@ -737,11 +806,89 @@
);
}

getCommonFilterArgs(fromBlock?: number, toBlock?: number, eventSpecificArgs?: EventSpecificArgs) {
let fromBlockBigInt;
if (fromBlock !== undefined) {
fromBlockBigInt = BigInt(fromBlock);
}

let toBlockBigInt;
if (toBlock !== undefined) {
toBlockBigInt = BigInt(toBlock);
}

return {
fromBlock: fromBlockBigInt,
toBlock: toBlockBigInt,
eventName: eventSpecificArgs?.eventName,
strict: true,
};
}

getEventSpecificFid(eventSpecificArgs?: EventSpecificArgs) {
if (eventSpecificArgs !== undefined) {
return BigInt(eventSpecificArgs.fid);
}

return undefined;
}

async getStorageEvents(fromBlock?: number, toBlock?: number, eventSpecificArgs?: EventSpecificArgs) {
const realParams = {
...this.getCommonFilterArgs(fromBlock, toBlock, eventSpecificArgs),
address: this.storageRegistryAddress,
abi: StorageRegistry.abi,
args: { fid: this.getEventSpecificFid(eventSpecificArgs) },
};

const storageLogsPromise = this.getContractEvents(realParams);
await this.processStorageEvents(
(await storageLogsPromise) as WatchContractEventOnLogsParameter<typeof StorageRegistry.abi>,
);
}

async getIdRegistryEvents(fromBlock?: number, toBlock?: number, eventSpecificArgs?: EventSpecificArgs) {
const idV2LogsPromise = this.getContractEvents({
...this.getCommonFilterArgs(fromBlock, toBlock, eventSpecificArgs),
address: this.idRegistryV2Address,
abi: IdRegistry.abi,
args: {
id: this.getEventSpecificFid(eventSpecificArgs) /* The fid parameter is named "id" in the IdRegistry events */,
},
});

await this.processIdRegistryV2Events(
(await idV2LogsPromise) as WatchContractEventOnLogsParameter<typeof IdRegistry.abi>,
);
}

async getKeyRegistryEvents(fromBlock?: number, toBlock?: number, eventSpecificArgs?: EventSpecificArgs) {
const keyV2LogsPromise = this.getContractEvents({
...this.getCommonFilterArgs(fromBlock, toBlock, eventSpecificArgs),
address: this.keyRegistryV2Address,
abi: KeyRegistry.abi,
args: { fid: this.getEventSpecificFid(eventSpecificArgs) },
});

await this.processKeyRegistryEventsV2(
(await keyV2LogsPromise) as WatchContractEventOnLogsParameter<typeof KeyRegistry.abi>,
);
}

/**
* Sync old Storage events that may have happened before hub was started. We'll put them all
* in the sync queue to be processed later, to make sure we don't process any unconfirmed events.
*/
private async syncHistoricalEvents(fromBlock: number, toBlock: number, batchSize: number) {
private async syncHistoricalEvents(
fromBlock: number,
toBlock: number,
batchSize: number, // This batch size is enforced by us internally, not by the RPC provider
byEventKind?: {
StorageRegistry: EventSpecificArgs[];
IdRegistry: EventSpecificArgs[];
KeyRegistry: EventSpecificArgs[];
},
) {
if (!this.idRegistryV2Address || !this.keyRegistryV2Address || !this.storageRegistryAddress) {
return;
}
Expand Down Expand Up @@ -789,41 +936,26 @@
`syncing events (${formatPercentage((nextFromBlock - fromBlock) / totalBlocks)})`,
);
progressBar?.update(Math.max(nextFromBlock - fromBlock - 1, 0));
statsd().increment("l2events.blocks", Math.min(toBlock, nextToBlock - nextFromBlock));

const storageLogsPromise = this.getContractEvents({
address: this.storageRegistryAddress,
abi: StorageRegistry.abi,
fromBlock: BigInt(nextFromBlock),
toBlock: BigInt(nextToBlock),
strict: true,
});

const idV2LogsPromise = this.getContractEvents({
address: this.idRegistryV2Address,
abi: IdRegistry.abi,
fromBlock: BigInt(nextFromBlock),
toBlock: BigInt(nextToBlock),
strict: true,
});
if (byEventKind) {
// If there are event-specific filters, we don't count the blocks here. The filters mean we don't necessarily consume all the blocks in the provided range. Instead, look at "l2events.events_processed" for an accurate metric.
for (const storageEventSpecific of byEventKind.StorageRegistry) {
await this.getStorageEvents(nextFromBlock, nextToBlock, storageEventSpecific);
}

const keyV2LogsPromise = this.getContractEvents({
address: this.keyRegistryV2Address,
abi: KeyRegistry.abi,
fromBlock: BigInt(nextFromBlock),
toBlock: BigInt(nextToBlock),
strict: true,
});
for (const idRegistryEventSpecific of byEventKind.IdRegistry) {
await this.getIdRegistryEvents(nextFromBlock, nextToBlock, idRegistryEventSpecific);
}

await this.processStorageEvents(
(await storageLogsPromise) as WatchContractEventOnLogsParameter<typeof StorageRegistry.abi>,
);
await this.processIdRegistryV2Events(
(await idV2LogsPromise) as WatchContractEventOnLogsParameter<typeof IdRegistry.abi>,
);
await this.processKeyRegistryEventsV2(
(await keyV2LogsPromise) as WatchContractEventOnLogsParameter<typeof KeyRegistry.abi>,
);
for (const keyRegistryEventSpecific of byEventKind.KeyRegistry) {
await this.getKeyRegistryEvents(nextFromBlock, nextToBlock, keyRegistryEventSpecific);
}
} else {
statsd().increment("l2events.blocks", Math.min(toBlock, nextToBlock - nextFromBlock));
await this.getStorageEvents(nextFromBlock, nextToBlock);
await this.getIdRegistryEvents(nextFromBlock, nextToBlock);
await this.getKeyRegistryEvents(nextFromBlock, nextToBlock);
}

// Write out all the cached blocks first
await this.writeCachedBlocks(toBlock);
Expand Down
1 change: 1 addition & 0 deletions apps/hubble/src/hubble.ts
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ export class Hub implements HubInterface {
mainnetClient,
opClient as PublicClient,
this.fNameRegistryEventsProvider,
this.l2RegistryProvider,
);

const profileSync = options.profileSync ?? false;
Expand Down
2 changes: 1 addition & 1 deletion apps/hubble/src/rpc/test/httpServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ describe("httpServer", () => {
const response = (e as any).response;

expect(response.status).toBe(400);
expect(response.data.errCode).toEqual("bad_request.validation_failure");
expect(response.data.errCode).toEqual("bad_request.unknown_fid");
expect(response.data.details).toMatch("unknown fid");
}
expect(errored).toBeTruthy();
Expand Down
Loading
Loading