Skip to content

Commit

Permalink
feat: request missing on chain events on error (#2298)
Browse files Browse the repository at this point in the history
## Why is this change needed?

Our hubs regularly fail to get into sync because some of them are
missing onchain events. This feature will have us request missing
onchain events if there are errors related to missing on chain events
when messages are submitted.

Deployed to hoyt for testing:
[Logs](https://app.datadoghq.com/logs?query=%22Attempting%20to%20retryEventsForFid%22%20OR%20%22Finished%20retryEventsForFid%22%20OR%20%22cacheOnChainEvent%22%20&agg_m=count&agg_m_source=base&agg_q=%40fid&agg_q_source=base&agg_t=count&cols=host%2Cservice&fromUser=true&messageDisplay=inline&refresh_mode=paused&storage=hot&stream_sort=time%2Cdesc&top_n=30&top_o=top&viz=stream&x_missing=true&from_ts=1726433975166&to_ts=1726433977504&live=false)
## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.

<!-- start pr-codex -->

---

## PR-Codex overview
The focus of this PR is to enhance error handling and retry mechanisms
for on-chain events in the Hubble application.

### Detailed summary
- Added feature to request missing on-chain events on related submit
message errors
- Improved error code handling for validation failures and unknown
signers
- Implemented retry mechanism for on-chain events by fid
- Enhanced error messages and error handling logic

> The following files were skipped due to too many changes:
`apps/hubble/src/storage/engine/index.ts`,
`apps/hubble/src/eth/l2EventsProvider.ts`

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
aditiharini authored Sep 16, 2024
1 parent 398b661 commit f084daa
Show file tree
Hide file tree
Showing 10 changed files with 294 additions and 76 deletions.
6 changes: 6 additions & 0 deletions .changeset/gold-geckos-remain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@farcaster/core": patch
"@farcaster/hubble": patch
---

feat: request missing on chain events on related submit message errors
48 changes: 48 additions & 0 deletions apps/hubble/src/eth/l2EventsProvider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,52 @@ describe("process events", () => {
},
TEST_TIMEOUT_LONG,
);

test(
"retry by fid",
async () => {
const rentSim = await simulateContract(publicClient, {
address: storageRegistryAddress,
abi: StorageRegistry.abi,
functionName: "credit",
account: accounts[0].address,
args: [BigInt(1), BigInt(1)],
});

// Set up the storage rent event
const rentHash = await writeContract(walletClientWithAccount, rentSim.request);
const rentTrx = await waitForTransactionReceipt(publicClient, { hash: rentHash });
await sleep(1000); // allow time for the rent event to be polled for
await mine(testClient, { blocks: L2EventsProvider.numConfirmations });
await waitForBlock(Number(rentTrx.blockNumber) + L2EventsProvider.numConfirmations);

const events1 = await onChainEventStore.getOnChainEvents(OnChainEventType.EVENT_TYPE_STORAGE_RENT, 1);
expect(events1.length).toEqual(1);
expect(events1[0]?.fid).toEqual(1);
expect(events1[0]?.storageRentEventBody?.units).toEqual(1);

const clearAndRetryForFid = async () => {
// Clear on chain events and show that they get re-ingested when retrying by fid
await OnChainEventStore.clearEvents(db);
const events = await onChainEventStore.getOnChainEvents(OnChainEventType.EVENT_TYPE_STORAGE_RENT, 1);

expect(events.length).toEqual(0);

await l2EventsProvider.retryEventsForFid(1);
await sleep(1000); // allow time for the rent event to be polled for
return await onChainEventStore.getOnChainEvents(OnChainEventType.EVENT_TYPE_STORAGE_RENT, 1);
};

const events2 = await clearAndRetryForFid();

expect(events2.length).toEqual(1);
expect(events2[0]?.fid).toEqual(1);
expect(events2[0]?.storageRentEventBody?.units).toEqual(1);

// After retrying once, we don't retry again
const events3 = await clearAndRetryForFid();
expect(events3.length).toEqual(0);
},
TEST_TIMEOUT_LONG,
);
});
198 changes: 165 additions & 33 deletions apps/hubble/src/eth/l2EventsProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ export class OptimismConstants {
}

const RENT_EXPIRY_IN_SECONDS = 365 * 24 * 60 * 60; // One year
const FID_RETRY_DEDUP_LOOKBACK_MS = 60 * 60 * 1000; // One hour

type EventSpecificArgs = {
eventName: string;
fid: number;
};

/**
* Class that follows the Optimism chain to handle on-chain events from the Storage Registry contract.
Expand All @@ -87,6 +93,7 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra

private _onChainEventsByBlock: Map<number, Array<OnChainEvent>>;
private _retryDedupMap: Map<number, boolean>;
private _fidRetryDedupMap: Map<number, boolean>;
private _blockTimestampsCache: Map<string, number>;

private _lastBlockNumber: number;
Expand Down Expand Up @@ -160,6 +167,12 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
// numConfirmations blocks have been mined.
this._onChainEventsByBlock = new Map();
this._retryDedupMap = new Map();
this._fidRetryDedupMap = new Map();
// Clear the map periodically to avoid memory bloat.
setInterval(() => {
this._fidRetryDedupMap.clear();
}, FID_RETRY_DEDUP_LOOKBACK_MS);

this._blockTimestampsCache = new Map();

this.setAddresses(storageRegistryAddress, keyRegistryV2Address, idRegistryV2Address);
Expand Down Expand Up @@ -300,6 +313,7 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
// Handling: use try-catch + log since errors are expected and not important to surface
try {
if (event.eventName === "Rent") {
statsd().increment("l2events.events_processed", { kind: "storage:rent" });
// Fix when viem fixes https://github.com/wagmi-dev/viem/issues/938
const rentEvent = event as Log<
bigint,
Expand Down Expand Up @@ -366,6 +380,7 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
// Handling: use try-catch + log since errors are expected and not important to surface
try {
if (event.eventName === "Add") {
statsd().increment("l2events.events_processed", { kind: "key-registry:add" });
const addEvent = event as Log<
bigint,
number,
Expand Down Expand Up @@ -393,6 +408,7 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
signerEventBody,
);
} else if (event.eventName === "Remove") {
statsd().increment("l2events.events_processed", { kind: "key-registry:remove" });
const removeEvent = event as Log<
bigint,
number,
Expand All @@ -417,6 +433,7 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
signerEventBody,
);
} else if (event.eventName === "AdminReset") {
statsd().increment("l2events.events_processed", { kind: "key-registry:admin-reset" });
const resetEvent = event as Log<
bigint,
number,
Expand All @@ -441,6 +458,7 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
signerEventBody,
);
} else if (event.eventName === "Migrated") {
statsd().increment("l2events.events_processed", { kind: "key-registry:migrated" });
const migratedEvent = event as Log<
bigint,
number,
Expand Down Expand Up @@ -493,6 +511,7 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
// Handling: use try-catch + log since errors are expected and not important to surface
try {
if (event.eventName === "Register") {
statsd().increment("l2events.events_processed", { kind: "id-registry:register" });
const registerEvent = event as Log<
bigint,
number,
Expand Down Expand Up @@ -520,6 +539,7 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
idRegisterEventBody,
);
} else if (event.eventName === "Transfer") {
statsd().increment("l2events.events_processed", { kind: "id-registry:transfer" });
const transferEvent = event as Log<
bigint,
number,
Expand Down Expand Up @@ -547,6 +567,7 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
idRegisterEventBody,
);
} else if (event.eventName === "ChangeRecoveryAddress") {
statsd().increment("l2events.events_processed", { kind: "id-registry:change-recovery-address" });
const transferEvent = event as Log<
bigint,
number,
Expand Down Expand Up @@ -661,6 +682,54 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
}
}

private getEventSpecificArgs(fid: number) {
return {
StorageRegistry: [{ eventName: "Rent", fid }],
IdRegistry: [
{ eventName: "Register", fid },
{ eventName: "Transfer", fid },
],
KeyRegistry: [
{ eventName: "Add", fid },
{ eventName: "Remove", fid },
],
};
}

public async retryEventsForFid(fid: number) {
if (this._fidRetryDedupMap.has(fid)) {
return;
}

this._fidRetryDedupMap.set(fid, true);

log.info(
{ fid, startBlock: this._firstBlock, stopBlock: this._lastBlockNumber, chunkSize: this._chunkSize },
`Attempting to retryEventsForFid ${fid}`,
);
statsd().increment("l2events.retriesByFid.attempts", 1, { fid: fid.toString() });

// Let's not remove. No need to retry same fid many times. If we allow retrying multiple times, we need to rate limit. The map will get cleared periodically to prevent memory bloat.
try {
// The viem API requires an event kind if you want to provide filters by indexed arguments-- this is why we're making multiple calls to syncHistoricalEvents and prioritizing the most common event types.
const eventSpecificArgs = this.getEventSpecificArgs(fid);

// The batch sizes are artificially large-- this batch size is internally enforced and we essentially want to eliminate batches given that we don't expct a lot of results from these queries.
await this.syncHistoricalEvents(
this._firstBlock,
this._lastBlockNumber,
this._lastBlockNumber,
eventSpecificArgs,
);

log.info({ fid }, `Finished retryEventsForFid ${fid}`);
statsd().increment("l2events.retriesByFid.successes", 1, { fid: fid.toString() });
} catch (e) {
log.error(e, `Error in retryEventsForFid ${fid}`);
statsd().increment("l2events.retriesByFid.errors", 1, { fid: fid.toString() });
}
}

private setAddresses(
storageRegistryAddress: `0x${string}`,
keyRegistryV2Address: `0x${string}`,
Expand Down Expand Up @@ -737,11 +806,89 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
);
}

getCommonFilterArgs(fromBlock?: number, toBlock?: number, eventSpecificArgs?: EventSpecificArgs) {
let fromBlockBigInt;
if (fromBlock !== undefined) {
fromBlockBigInt = BigInt(fromBlock);
}

let toBlockBigInt;
if (toBlock !== undefined) {
toBlockBigInt = BigInt(toBlock);
}

return {
fromBlock: fromBlockBigInt,
toBlock: toBlockBigInt,
eventName: eventSpecificArgs?.eventName,
strict: true,
};
}

getEventSpecificFid(eventSpecificArgs?: EventSpecificArgs) {
if (eventSpecificArgs !== undefined) {
return BigInt(eventSpecificArgs.fid);
}

return undefined;
}

async getStorageEvents(fromBlock?: number, toBlock?: number, eventSpecificArgs?: EventSpecificArgs) {
const realParams = {
...this.getCommonFilterArgs(fromBlock, toBlock, eventSpecificArgs),
address: this.storageRegistryAddress,
abi: StorageRegistry.abi,
args: { fid: this.getEventSpecificFid(eventSpecificArgs) },
};

const storageLogsPromise = this.getContractEvents(realParams);
await this.processStorageEvents(
(await storageLogsPromise) as WatchContractEventOnLogsParameter<typeof StorageRegistry.abi>,
);
}

async getIdRegistryEvents(fromBlock?: number, toBlock?: number, eventSpecificArgs?: EventSpecificArgs) {
const idV2LogsPromise = this.getContractEvents({
...this.getCommonFilterArgs(fromBlock, toBlock, eventSpecificArgs),
address: this.idRegistryV2Address,
abi: IdRegistry.abi,
args: {
id: this.getEventSpecificFid(eventSpecificArgs) /* The fid parameter is named "id" in the IdRegistry events */,
},
});

await this.processIdRegistryV2Events(
(await idV2LogsPromise) as WatchContractEventOnLogsParameter<typeof IdRegistry.abi>,
);
}

async getKeyRegistryEvents(fromBlock?: number, toBlock?: number, eventSpecificArgs?: EventSpecificArgs) {
const keyV2LogsPromise = this.getContractEvents({
...this.getCommonFilterArgs(fromBlock, toBlock, eventSpecificArgs),
address: this.keyRegistryV2Address,
abi: KeyRegistry.abi,
args: { fid: this.getEventSpecificFid(eventSpecificArgs) },
});

await this.processKeyRegistryEventsV2(
(await keyV2LogsPromise) as WatchContractEventOnLogsParameter<typeof KeyRegistry.abi>,
);
}

/**
* Sync old Storage events that may have happened before hub was started. We'll put them all
* in the sync queue to be processed later, to make sure we don't process any unconfirmed events.
*/
private async syncHistoricalEvents(fromBlock: number, toBlock: number, batchSize: number) {
private async syncHistoricalEvents(
fromBlock: number,
toBlock: number,
batchSize: number, // This batch size is enforced by us internally, not by the RPC provider
byEventKind?: {
StorageRegistry: EventSpecificArgs[];
IdRegistry: EventSpecificArgs[];
KeyRegistry: EventSpecificArgs[];
},
) {
if (!this.idRegistryV2Address || !this.keyRegistryV2Address || !this.storageRegistryAddress) {
return;
}
Expand Down Expand Up @@ -789,41 +936,26 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
`syncing events (${formatPercentage((nextFromBlock - fromBlock) / totalBlocks)})`,
);
progressBar?.update(Math.max(nextFromBlock - fromBlock - 1, 0));
statsd().increment("l2events.blocks", Math.min(toBlock, nextToBlock - nextFromBlock));

const storageLogsPromise = this.getContractEvents({
address: this.storageRegistryAddress,
abi: StorageRegistry.abi,
fromBlock: BigInt(nextFromBlock),
toBlock: BigInt(nextToBlock),
strict: true,
});

const idV2LogsPromise = this.getContractEvents({
address: this.idRegistryV2Address,
abi: IdRegistry.abi,
fromBlock: BigInt(nextFromBlock),
toBlock: BigInt(nextToBlock),
strict: true,
});
if (byEventKind) {
// If there are event-specific filters, we don't count the blocks here. The filters mean we don't necessarily consume all the blocks in the provided range. Instead, look at "l2events.events_processed" for an accurate metric.
for (const storageEventSpecific of byEventKind.StorageRegistry) {
await this.getStorageEvents(nextFromBlock, nextToBlock, storageEventSpecific);
}

const keyV2LogsPromise = this.getContractEvents({
address: this.keyRegistryV2Address,
abi: KeyRegistry.abi,
fromBlock: BigInt(nextFromBlock),
toBlock: BigInt(nextToBlock),
strict: true,
});
for (const idRegistryEventSpecific of byEventKind.IdRegistry) {
await this.getIdRegistryEvents(nextFromBlock, nextToBlock, idRegistryEventSpecific);
}

await this.processStorageEvents(
(await storageLogsPromise) as WatchContractEventOnLogsParameter<typeof StorageRegistry.abi>,
);
await this.processIdRegistryV2Events(
(await idV2LogsPromise) as WatchContractEventOnLogsParameter<typeof IdRegistry.abi>,
);
await this.processKeyRegistryEventsV2(
(await keyV2LogsPromise) as WatchContractEventOnLogsParameter<typeof KeyRegistry.abi>,
);
for (const keyRegistryEventSpecific of byEventKind.KeyRegistry) {
await this.getKeyRegistryEvents(nextFromBlock, nextToBlock, keyRegistryEventSpecific);
}
} else {
statsd().increment("l2events.blocks", Math.min(toBlock, nextToBlock - nextFromBlock));
await this.getStorageEvents(nextFromBlock, nextToBlock);
await this.getIdRegistryEvents(nextFromBlock, nextToBlock);
await this.getKeyRegistryEvents(nextFromBlock, nextToBlock);
}

// Write out all the cached blocks first
await this.writeCachedBlocks(toBlock);
Expand Down
1 change: 1 addition & 0 deletions apps/hubble/src/hubble.ts
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ export class Hub implements HubInterface {
mainnetClient,
opClient as PublicClient,
this.fNameRegistryEventsProvider,
this.l2RegistryProvider,
);

const profileSync = options.profileSync ?? false;
Expand Down
2 changes: 1 addition & 1 deletion apps/hubble/src/rpc/test/httpServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ describe("httpServer", () => {
const response = (e as any).response;

expect(response.status).toBe(400);
expect(response.data.errCode).toEqual("bad_request.validation_failure");
expect(response.data.errCode).toEqual("bad_request.unknown_fid");
expect(response.data.details).toMatch("unknown fid");
}
expect(errored).toBeTruthy();
Expand Down
Loading

0 comments on commit f084daa

Please sign in to comment.