From f084daa184e9b4cb96eb7cd11f3ea90fa93dcc6d Mon Sep 17 00:00:00 2001 From: Aditi Srinivasan Date: Mon, 16 Sep 2024 16:31:41 -0400 Subject: [PATCH] feat: request missing on chain events on error (#2298) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Why is this change needed? Our hubs regularly fail to get into sync because some of them are missing onchain events. This feature will have us request missing onchain events if there are errors related to missing on chain events when messages are submitted. Deployed to hoyt for testing: [Logs](https://app.datadoghq.com/logs?query=%22Attempting%20to%20retryEventsForFid%22%20OR%20%22Finished%20retryEventsForFid%22%20OR%20%22cacheOnChainEvent%22%20&agg_m=count&agg_m_source=base&agg_q=%40fid&agg_q_source=base&agg_t=count&cols=host%2Cservice&fromUser=true&messageDisplay=inline&refresh_mode=paused&storage=hot&stream_sort=time%2Cdesc&top_n=30&top_o=top&viz=stream&x_missing=true&from_ts=1726433975166&to_ts=1726433977504&live=false) ## Merge Checklist _Choose all relevant options below by adding an `x` now or at any time before submitting for review_ - [x] PR title adheres to the [conventional commits](https://www.conventionalcommits.org/en/v1.0.0/) standard - [x] PR has a [changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets) - [x] PR has been tagged with a change label(s) (i.e. documentation, feature, bugfix, or chore) - [ ] PR includes [documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs) if necessary. --- ## PR-Codex overview The focus of this PR is to enhance error handling and retry mechanisms for on-chain events in the Hubble application. ### Detailed summary - Added feature to request missing on-chain events on related submit message errors - Improved error code handling for validation failures and unknown signers - Implemented retry mechanism for on-chain events by fid - Enhanced error messages and error handling logic > The following files were skipped due to too many changes: `apps/hubble/src/storage/engine/index.ts`, `apps/hubble/src/eth/l2EventsProvider.ts` > ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your question}` --- .changeset/gold-geckos-remain.md | 6 + apps/hubble/src/eth/l2EventsProvider.test.ts | 48 +++++ apps/hubble/src/eth/l2EventsProvider.ts | 198 +++++++++++++++--- apps/hubble/src/hubble.ts | 1 + apps/hubble/src/rpc/test/httpServer.test.ts | 2 +- .../src/rpc/test/messageService.test.ts | 6 +- apps/hubble/src/storage/engine/index.test.ts | 26 ++- apps/hubble/src/storage/engine/index.ts | 79 ++++--- .../src/storage/stores/onChainEventStore.ts | 1 + packages/core/src/errors.ts | 3 + 10 files changed, 294 insertions(+), 76 deletions(-) create mode 100644 .changeset/gold-geckos-remain.md diff --git a/.changeset/gold-geckos-remain.md b/.changeset/gold-geckos-remain.md new file mode 100644 index 0000000000..324f9fcb02 --- /dev/null +++ b/.changeset/gold-geckos-remain.md @@ -0,0 +1,6 @@ +--- +"@farcaster/core": patch +"@farcaster/hubble": patch +--- + +feat: request missing on chain events on related submit message errors diff --git a/apps/hubble/src/eth/l2EventsProvider.test.ts b/apps/hubble/src/eth/l2EventsProvider.test.ts index 5515a4d2e5..12994bff45 100644 --- a/apps/hubble/src/eth/l2EventsProvider.test.ts +++ b/apps/hubble/src/eth/l2EventsProvider.test.ts @@ -199,4 +199,52 @@ describe("process events", () => { }, TEST_TIMEOUT_LONG, ); + + test( + "retry by fid", + async () => { + const rentSim = await simulateContract(publicClient, { + address: storageRegistryAddress, + abi: StorageRegistry.abi, + functionName: "credit", + account: accounts[0].address, + args: [BigInt(1), BigInt(1)], + }); + + // Set up the storage rent event + const rentHash = await writeContract(walletClientWithAccount, rentSim.request); + const rentTrx = await waitForTransactionReceipt(publicClient, { hash: rentHash }); + await sleep(1000); // allow time for the rent event to be polled for + await mine(testClient, { blocks: L2EventsProvider.numConfirmations }); + await waitForBlock(Number(rentTrx.blockNumber) + L2EventsProvider.numConfirmations); + + const events1 = await onChainEventStore.getOnChainEvents(OnChainEventType.EVENT_TYPE_STORAGE_RENT, 1); + expect(events1.length).toEqual(1); + expect(events1[0]?.fid).toEqual(1); + expect(events1[0]?.storageRentEventBody?.units).toEqual(1); + + const clearAndRetryForFid = async () => { + // Clear on chain events and show that they get re-ingested when retrying by fid + await OnChainEventStore.clearEvents(db); + const events = await onChainEventStore.getOnChainEvents(OnChainEventType.EVENT_TYPE_STORAGE_RENT, 1); + + expect(events.length).toEqual(0); + + await l2EventsProvider.retryEventsForFid(1); + await sleep(1000); // allow time for the rent event to be polled for + return await onChainEventStore.getOnChainEvents(OnChainEventType.EVENT_TYPE_STORAGE_RENT, 1); + }; + + const events2 = await clearAndRetryForFid(); + + expect(events2.length).toEqual(1); + expect(events2[0]?.fid).toEqual(1); + expect(events2[0]?.storageRentEventBody?.units).toEqual(1); + + // After retrying once, we don't retry again + const events3 = await clearAndRetryForFid(); + expect(events3.length).toEqual(0); + }, + TEST_TIMEOUT_LONG, + ); }); diff --git a/apps/hubble/src/eth/l2EventsProvider.ts b/apps/hubble/src/eth/l2EventsProvider.ts index 68600ad70f..5927123674 100644 --- a/apps/hubble/src/eth/l2EventsProvider.ts +++ b/apps/hubble/src/eth/l2EventsProvider.ts @@ -70,6 +70,12 @@ export class OptimismConstants { } const RENT_EXPIRY_IN_SECONDS = 365 * 24 * 60 * 60; // One year +const FID_RETRY_DEDUP_LOOKBACK_MS = 60 * 60 * 1000; // One hour + +type EventSpecificArgs = { + eventName: string; + fid: number; +}; /** * Class that follows the Optimism chain to handle on-chain events from the Storage Registry contract. @@ -87,6 +93,7 @@ export class L2EventsProvider>; private _retryDedupMap: Map; + private _fidRetryDedupMap: Map; private _blockTimestampsCache: Map; private _lastBlockNumber: number; @@ -160,6 +167,12 @@ export class L2EventsProvider { + this._fidRetryDedupMap.clear(); + }, FID_RETRY_DEDUP_LOOKBACK_MS); + this._blockTimestampsCache = new Map(); this.setAddresses(storageRegistryAddress, keyRegistryV2Address, idRegistryV2Address); @@ -300,6 +313,7 @@ export class L2EventsProvider, + ); + } + + async getIdRegistryEvents(fromBlock?: number, toBlock?: number, eventSpecificArgs?: EventSpecificArgs) { + const idV2LogsPromise = this.getContractEvents({ + ...this.getCommonFilterArgs(fromBlock, toBlock, eventSpecificArgs), + address: this.idRegistryV2Address, + abi: IdRegistry.abi, + args: { + id: this.getEventSpecificFid(eventSpecificArgs) /* The fid parameter is named "id" in the IdRegistry events */, + }, + }); + + await this.processIdRegistryV2Events( + (await idV2LogsPromise) as WatchContractEventOnLogsParameter, + ); + } + + async getKeyRegistryEvents(fromBlock?: number, toBlock?: number, eventSpecificArgs?: EventSpecificArgs) { + const keyV2LogsPromise = this.getContractEvents({ + ...this.getCommonFilterArgs(fromBlock, toBlock, eventSpecificArgs), + address: this.keyRegistryV2Address, + abi: KeyRegistry.abi, + args: { fid: this.getEventSpecificFid(eventSpecificArgs) }, + }); + + await this.processKeyRegistryEventsV2( + (await keyV2LogsPromise) as WatchContractEventOnLogsParameter, + ); + } + /** * Sync old Storage events that may have happened before hub was started. We'll put them all * in the sync queue to be processed later, to make sure we don't process any unconfirmed events. */ - private async syncHistoricalEvents(fromBlock: number, toBlock: number, batchSize: number) { + private async syncHistoricalEvents( + fromBlock: number, + toBlock: number, + batchSize: number, // This batch size is enforced by us internally, not by the RPC provider + byEventKind?: { + StorageRegistry: EventSpecificArgs[]; + IdRegistry: EventSpecificArgs[]; + KeyRegistry: EventSpecificArgs[]; + }, + ) { if (!this.idRegistryV2Address || !this.keyRegistryV2Address || !this.storageRegistryAddress) { return; } @@ -789,41 +936,26 @@ export class L2EventsProvider, - ); - await this.processIdRegistryV2Events( - (await idV2LogsPromise) as WatchContractEventOnLogsParameter, - ); - await this.processKeyRegistryEventsV2( - (await keyV2LogsPromise) as WatchContractEventOnLogsParameter, - ); + for (const keyRegistryEventSpecific of byEventKind.KeyRegistry) { + await this.getKeyRegistryEvents(nextFromBlock, nextToBlock, keyRegistryEventSpecific); + } + } else { + statsd().increment("l2events.blocks", Math.min(toBlock, nextToBlock - nextFromBlock)); + await this.getStorageEvents(nextFromBlock, nextToBlock); + await this.getIdRegistryEvents(nextFromBlock, nextToBlock); + await this.getKeyRegistryEvents(nextFromBlock, nextToBlock); + } // Write out all the cached blocks first await this.writeCachedBlocks(toBlock); diff --git a/apps/hubble/src/hubble.ts b/apps/hubble/src/hubble.ts index c4fbf4fc2e..3eaf68e352 100644 --- a/apps/hubble/src/hubble.ts +++ b/apps/hubble/src/hubble.ts @@ -471,6 +471,7 @@ export class Hub implements HubInterface { mainnetClient, opClient as PublicClient, this.fNameRegistryEventsProvider, + this.l2RegistryProvider, ); const profileSync = options.profileSync ?? false; diff --git a/apps/hubble/src/rpc/test/httpServer.test.ts b/apps/hubble/src/rpc/test/httpServer.test.ts index 09bc6591af..91e859b838 100644 --- a/apps/hubble/src/rpc/test/httpServer.test.ts +++ b/apps/hubble/src/rpc/test/httpServer.test.ts @@ -273,7 +273,7 @@ describe("httpServer", () => { const response = (e as any).response; expect(response.status).toBe(400); - expect(response.data.errCode).toEqual("bad_request.validation_failure"); + expect(response.data.errCode).toEqual("bad_request.unknown_fid"); expect(response.data.details).toMatch("unknown fid"); } expect(errored).toBeTruthy(); diff --git a/apps/hubble/src/rpc/test/messageService.test.ts b/apps/hubble/src/rpc/test/messageService.test.ts index 0e4240cd67..b32e69566c 100644 --- a/apps/hubble/src/rpc/test/messageService.test.ts +++ b/apps/hubble/src/rpc/test/messageService.test.ts @@ -105,7 +105,7 @@ describe("submitMessage", () => { test("fails without signer", async () => { const result = await client.submitMessage(castAdd); const err = result._unsafeUnwrapErr(); - expect(err.errCode).toEqual("bad_request.validation_failure"); + expect(err.errCode).toEqual("bad_request.unknown_fid"); expect(err.message).toMatch("unknown fid"); }); }); @@ -134,12 +134,12 @@ describe("validateMessage", () => { test("fails without signer", async () => { const castResult = await client.submitMessage(castAdd); const castErr = castResult._unsafeUnwrapErr(); - expect(castErr.errCode).toEqual("bad_request.validation_failure"); + expect(castErr.errCode).toEqual("bad_request.unknown_fid"); expect(castErr.message).toMatch("unknown fid"); const frameResult = await client.submitMessage(frameAction); const frameErr = frameResult._unsafeUnwrapErr(); - expect(frameErr.errCode).toEqual("bad_request.validation_failure"); + expect(frameErr.errCode).toEqual("bad_request.unknown_fid"); expect(frameErr.message).toMatch("unknown fid"); }); }); diff --git a/apps/hubble/src/storage/engine/index.test.ts b/apps/hubble/src/storage/engine/index.test.ts index cd652757dc..831fe75c0e 100644 --- a/apps/hubble/src/storage/engine/index.test.ts +++ b/apps/hubble/src/storage/engine/index.test.ts @@ -67,7 +67,13 @@ const fNameProvider = new FNameRegistryEventsProvider( } as any, false, ); -engine = new Engine(db, network, undefined, publicClient, undefined, fNameProvider); + +// biome-ignore lint/suspicious/noExplicitAny: mock used only in tests +const l2EventsProvider = jest.fn() as any; +l2EventsProvider.retryEventsForFid = jest.fn(); +const retryEventsForFidMock = l2EventsProvider.retryEventsForFid; + +engine = new Engine(db, network, undefined, publicClient, undefined, fNameProvider, l2EventsProvider); const fid = Factories.Fid.build(); const fname = Factories.Fname.build(); @@ -656,7 +662,7 @@ describe("mergeMessage", () => { await engine.mergeOnChainEvent(Factories.SignerOnChainEvent.build({ fid })); const result = await engine.mergeMessage(reactionAdd); - expect(result).toMatchObject(err({ errCode: "bad_request.validation_failure" })); + expect(result).toMatchObject(err({ errCode: "bad_request.unknown_signer" })); expect(result._unsafeUnwrapErr().message).toMatch("invalid signer"); }); @@ -666,7 +672,7 @@ describe("mergeMessage", () => { await engine.mergeOnChainEvent(Factories.SignerOnChainEvent.build({ fid })); const result = await engine.mergeMessage(linkAdd); - expect(result).toMatchObject(err({ errCode: "bad_request.validation_failure" })); + expect(result).toMatchObject(err({ errCode: "bad_request.unknown_signer" })); expect(result._unsafeUnwrapErr().message).toMatch("invalid signer"); }); }); @@ -677,7 +683,7 @@ describe("mergeMessage", () => { afterEach(async () => { const result = await engine.mergeMessage(message); const err = result._unsafeUnwrapErr(); - expect(err.errCode).toEqual("bad_request.validation_failure"); + expect(err.errCode).toEqual("bad_request.unknown_fid"); expect(err.message).toMatch("unknown fid"); }); @@ -1098,7 +1104,8 @@ describe("mergeMessages", () => { expect(results.size).toBe(3); expect(results.get(0)).toBeInstanceOf(Ok); - expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.validation_failure" })); + expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.unknown_fid" })); + expect(retryEventsForFidMock).toHaveBeenLastCalledWith(0); expect(results.get(2)).toBeInstanceOf(Ok); const fid2 = Factories.Fid.build(); @@ -1123,14 +1130,16 @@ describe("mergeMessages", () => { expect(results.size).toBe(2); expect(results.get(0)).toBeInstanceOf(Ok); - expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.validation_failure" })); + expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.unknown_fid" })); + expect(retryEventsForFidMock).toHaveBeenLastCalledWith(fid2); // Add custody address, but adding without signer is invalid await engine.mergeOnChainEvent(custodyEvent2); results = await engine.mergeMessages([castAdd2, linkAdd]); expect(results.size).toBe(2); - expect(results.get(0)).toMatchObject(err({ errCode: "bad_request.validation_failure" })); + expect(results.get(0)).toMatchObject(err({ errCode: "bad_request.unknown_signer" })); + expect(retryEventsForFidMock).toHaveBeenLastCalledWith(fid2); expect(results.get(1)).toBeInstanceOf(Ok); // Add signer address, but adding without storage is invalid @@ -1139,7 +1148,8 @@ describe("mergeMessages", () => { expect(results.size).toBe(2); expect(results.get(0)).toBeInstanceOf(Ok); - expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.prunable" })); + expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.no_storage" })); + expect(retryEventsForFidMock).toHaveBeenLastCalledWith(fid2); // Add the storage event, and now it should merge await engine.mergeOnChainEvent(storageEvent2); diff --git a/apps/hubble/src/storage/engine/index.ts b/apps/hubble/src/storage/engine/index.ts index a6b524e604..8b5effd7da 100644 --- a/apps/hubble/src/storage/engine/index.ts +++ b/apps/hubble/src/storage/engine/index.ts @@ -77,6 +77,7 @@ import { RateLimiterAbstract, RateLimiterMemory } from "rate-limiter-flexible"; import { TypedEmitter } from "tiny-typed-emitter"; import { FNameRegistryEventsProvider } from "../../eth/fnameRegistryEventsProvider.js"; import { statsd } from "../../utils/statsd.js"; +import { L2EventsProvider } from "eth/l2EventsProvider.js"; export const NUM_VALIDATION_WORKERS = 2; @@ -126,6 +127,7 @@ class Engine extends TypedEmitter { private _publicClient: PublicClient | undefined; private _l2PublicClient: PublicClient | undefined; private _fNameRegistryEventsProvider: FNameRegistryEventsProvider | undefined; + private _l2EventsProvider: L2EventsProvider | undefined; private _linkStore: LinkStore; private _reactionStore: ReactionStore; @@ -157,6 +159,7 @@ class Engine extends TypedEmitter { publicClient?: PublicClient, l2PublicClient?: PublicClient, fNameRegistryEventsProvider?: FNameRegistryEventsProvider, + l2EventsProvider?: L2EventsProvider, ) { super(); this._db = db; @@ -164,6 +167,7 @@ class Engine extends TypedEmitter { this._publicClient = publicClient; this._l2PublicClient = l2PublicClient; this._fNameRegistryEventsProvider = fNameRegistryEventsProvider; + this._l2EventsProvider = l2EventsProvider; this.eventHandler = eventHandler ?? new StoreEventHandler(db); @@ -279,6 +283,33 @@ class Engine extends TypedEmitter { } } + async computeMergeResult(message: Message, i: number) { + const fid = message.data?.fid ?? 0; + const validatedMessage = await this.validateMessage(message); + if (validatedMessage.isErr()) { + return err(validatedMessage.error); + } + + const storageSlot = await this.eventHandler.getCurrentStorageSlotForFid(fid); + if (storageSlot.isErr()) { + return err(storageSlot.error); + } + + const totalUnits = storageSlot.value.legacy_units + storageSlot.value.units; + if (totalUnits === 0) { + return err(new HubError("bad_request.no_storage", "no storage")); + } + + const limiter = getRateLimiterForTotalMessages(totalUnits * this._totalPruneSize); + const isRateLimited = await isRateLimitedByKey(`${fid}`, limiter); + if (isRateLimited) { + log.warn({ fid }, "rate limit exceeded for FID"); + return err(new HubError("unavailable", `rate limit exceeded for FID ${fid}`)); + } + + return ok({ i, fid, limiter, message }); + } + async mergeMessages(messages: Message[]): Promise>> { const mergeResults: Map> = new Map(); const validatedMessages: IndexedMessage[] = []; @@ -286,38 +317,24 @@ class Engine extends TypedEmitter { // Validate all messages first await Promise.all( messages.map(async (message, i) => { - const validatedMessage = await this.validateMessage(message); - if (validatedMessage.isErr()) { - mergeResults.set(i, err(validatedMessage.error)); - return; - } - // Extract the FID that this message was signed by - const fid = message.data?.fid ?? 0; - const storageSlot = await this.eventHandler.getCurrentStorageSlotForFid(fid); - - if (storageSlot.isErr()) { - mergeResults.set(i, err(storageSlot.error)); - return; - } - - const totalUnits = storageSlot.value.legacy_units + storageSlot.value.units; - - if (totalUnits === 0) { - mergeResults.set(i, err(new HubError("bad_request.prunable", "no storage"))); - return; - } - // We rate limit the number of messages that can be merged per FID - const limiter = getRateLimiterForTotalMessages(totalUnits * this._totalPruneSize); - const isRateLimited = await isRateLimitedByKey(`${fid}`, limiter); - if (isRateLimited) { - log.warn({ fid }, "rate limit exceeded for FID"); - mergeResults.set(i, err(new HubError("unavailable", `rate limit exceeded for FID ${fid}`))); - return; + const result = await this.computeMergeResult(message, i); + if (result.isErr()) { + mergeResults.set(i, result); + // Try to request on chain event if it's missing + if ( + result.error.errCode === "bad_request.no_storage" || + "bad_request.unknown_signer" || + "bad_request.missing_fid" + ) { + const fid = message.data?.fid ?? 0; + // Don't await because we don't want to block hubs from processing new messages. + this._l2EventsProvider?.retryEventsForFid(fid); + } + } else { + validatedMessages.push(result.value); } - - validatedMessages.push({ i, fid, limiter, message }); }), ); @@ -1217,7 +1234,7 @@ class Engine extends TypedEmitter { } if (!custodyAddress) { - return err(new HubError("bad_request.validation_failure", `unknown fid: ${message.data.fid}`)); + return err(new HubError("bad_request.unknown_fid", `unknown fid: ${message.data.fid}`)); } // 4. Check that the signer is valid @@ -1231,7 +1248,7 @@ class Engine extends TypedEmitter { return hex.andThen((signerHex) => { return err( new HubError( - "bad_request.validation_failure", + "bad_request.unknown_signer", `invalid signer: signer ${signerHex} not found for fid ${message.data?.fid}`, ), ); diff --git a/apps/hubble/src/storage/stores/onChainEventStore.ts b/apps/hubble/src/storage/stores/onChainEventStore.ts index 42a801de05..aaeb7deeb6 100644 --- a/apps/hubble/src/storage/stores/onChainEventStore.ts +++ b/apps/hubble/src/storage/stores/onChainEventStore.ts @@ -185,6 +185,7 @@ class OnChainEventStore { if (result.isErr()) { throw result.error; } + return result.value; } diff --git a/packages/core/src/errors.ts b/packages/core/src/errors.ts index bfca347f3b..8a05e64dd3 100644 --- a/packages/core/src/errors.ts +++ b/packages/core/src/errors.ts @@ -69,9 +69,12 @@ export type HubErrorCode = | "bad_request.parse_failure" | "bad_request.invalid_param" | "bad_request.validation_failure" + | "bad_request.unknown_signer" | "bad_request.duplicate" | "bad_request.conflict" | "bad_request.prunable" + | "bad_request.no_storage" + | "bad_request.unknown_fid" /* The requested resource could not be found */ | "not_found" /* The request could not be completed because the operation is not executable */