Skip to content

Commit

Permalink
cleanups and bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
aditiharini committed Sep 16, 2024
1 parent 692cfbc commit 1508b0b
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 98 deletions.
1 change: 0 additions & 1 deletion apps/hubble/src/eth/l2EventsProvider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ describe("process events", () => {
TEST_TIMEOUT_LONG,
);

// TODO(aditi): It's pretty high overhead to set up tests with the id registry and key registry -- need to deploy contracts (need bytecode). Testing via the storage contract tests most of the meaningful logic.
test(
"retry by fid",
async () => {
Expand Down
110 changes: 70 additions & 40 deletions apps/hubble/src/eth/l2EventsProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ export class OptimismConstants {
}

const RENT_EXPIRY_IN_SECONDS = 365 * 24 * 60 * 60; // One year
const FID_RETRY_DEDUP_LOOKBACK_MS = 60 * 60 * 1000; // One hour

type EventSpecificArgs = {
eventKind: "Storage" | "KeyRegistry" | "IdRegistry";
eventName: string;
fid: number;
};
Expand Down Expand Up @@ -168,6 +168,11 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
this._onChainEventsByBlock = new Map();
this._retryDedupMap = new Map();
this._fidRetryDedupMap = new Map();
// Clear the map periodically to avoid memory bloat.
setInterval(() => {
this._fidRetryDedupMap.clear();
}, FID_RETRY_DEDUP_LOOKBACK_MS);

this._blockTimestampsCache = new Map();

this.setAddresses(storageRegistryAddress, keyRegistryV2Address, idRegistryV2Address);
Expand Down Expand Up @@ -669,36 +674,51 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
}
}

private getEventSpecificArgs(fid: number) {
return {
StorageRegistry: [{ eventName: "Rent", fid }],
IdRegistry: [
{ eventName: "Register", fid },
{ eventName: "Transfer", fid },
],
KeyRegistry: [
{ eventName: "Add", fid },
{ eventName: "Remove", fid },
],
};
}

public async retryEventsForFid(fid: number) {
if (this._fidRetryDedupMap.has(fid)) {
return;
}

// Let's never remove. No need to retry same fid many times. If we allow retrying multiple times, we need to rate limit.
this._fidRetryDedupMap.set(fid, true);

log.info(
{ fid, startBlock: this._firstBlock, stopBlock: this._lastBlockNumber, chunkSize: this._chunkSize },
`Attempting to retryEventsForFid ${fid}`,
);
statsd().increment("l2events.retriesByFid.attempts", 1, { fid: fid.toString() });

// Let's not remove. No need to retry same fid many times. If we allow retrying multiple times, we need to rate limit. The map will get cleared periodically to prevent memory bloat.
try {
// The viem API requires an event kind if you want to provide filters by indexed arguments
// TODO(aditi): Do we want batching?
await this.syncHistoricalEvents(0, this._lastBlockNumber, this._lastBlockNumber, {
eventKind: "Storage",
eventName: "Rent",
fid,
});
// TODO(aditi): Do we want to do more event types
await this.syncHistoricalEvents(0, this._lastBlockNumber, this._lastBlockNumber, {
eventKind: "IdRegistry",
eventName: "Register",
fid,
});
// TODO(aditi): Do we want to do more event types
await this.syncHistoricalEvents(0, this._lastBlockNumber, this._lastBlockNumber, {
eventKind: "KeyRegistry",
eventName: "Add",
fid,
});
// The viem API requires an event kind if you want to provide filters by indexed arguments-- this is why we're making multiple calls to syncHistoricalEvents and prioritizing the most common event types.
const eventSpecificArgs = this.getEventSpecificArgs(fid);

// The batch sizes are artificially large-- this batch size is internally enforced and we essentially want to eliminate batches given that we don't expct a lot of results from these queries.
await this.syncHistoricalEvents(
this._firstBlock,
this._lastBlockNumber,
this._lastBlockNumber,
eventSpecificArgs,
);

log.info({ fid }, `Finished retryEventsForFid ${fid}`);
statsd().increment("l2events.retriesByFid.successes", 1, { fid: fid.toString() });
} catch (e) {
log.error(e, `Error retrying events for fid ${fid}`);
log.error(e, `Error in retryEventsForFid ${fid}`);
statsd().increment("l2events.retriesByFid.errors", 1, { fid: fid.toString() });
}
}

Expand Down Expand Up @@ -789,30 +809,30 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
toBlockBigInt = BigInt(toBlock);
}

let fidBigInt;
if (eventSpecificArgs !== undefined) {
fidBigInt = BigInt(eventSpecificArgs.fid);
}

return {
fromBlock: fromBlockBigInt,
toBlock: toBlockBigInt,
eventName: eventSpecificArgs?.eventName,
args: {
fid: fidBigInt,
},
strict: true,
};
}

// TODO(aditi): Make sure we don't make multiple requests for a single fid
// TODO(aditi): Add a timeout
getEventSpecificFid(eventSpecificArgs?: EventSpecificArgs) {
if (eventSpecificArgs !== undefined) {
return BigInt(eventSpecificArgs.fid);
}

return undefined;
}

async getStorageEvents(fromBlock?: number, toBlock?: number, eventSpecificArgs?: EventSpecificArgs) {
const realParams = {
...this.getCommonFilterArgs(fromBlock, toBlock, eventSpecificArgs),
address: this.storageRegistryAddress,
abi: StorageRegistry.abi,
args: { fid: this.getEventSpecificFid(eventSpecificArgs) },
};

const storageLogsPromise = this.getContractEvents(realParams);
await this.processStorageEvents(
(await storageLogsPromise) as WatchContractEventOnLogsParameter<typeof StorageRegistry.abi>,
Expand All @@ -824,6 +844,9 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
...this.getCommonFilterArgs(fromBlock, toBlock, eventSpecificArgs),
address: this.idRegistryV2Address,
abi: IdRegistry.abi,
args: {
id: this.getEventSpecificFid(eventSpecificArgs) /* The fid parameter is named "id" in the IdRegistry events */,
},
});

await this.processIdRegistryV2Events(
Expand All @@ -836,6 +859,7 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
...this.getCommonFilterArgs(fromBlock, toBlock, eventSpecificArgs),
address: this.keyRegistryV2Address,
abi: KeyRegistry.abi,
args: { fid: this.getEventSpecificFid(eventSpecificArgs) },
});

await this.processKeyRegistryEventsV2(
Expand All @@ -850,8 +874,12 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
private async syncHistoricalEvents(
fromBlock: number,
toBlock: number,
batchSize: number,
byEventKind?: EventSpecificArgs,
batchSize: number, // This batch size is enforced by us internally, not by the RPC provider
byEventKind?: {
StorageRegistry: EventSpecificArgs[];
IdRegistry: EventSpecificArgs[];
KeyRegistry: EventSpecificArgs[];
},
) {
if (!this.idRegistryV2Address || !this.keyRegistryV2Address || !this.storageRegistryAddress) {
return;
Expand Down Expand Up @@ -903,14 +931,16 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
statsd().increment("l2events.blocks", Math.min(toBlock, nextToBlock - nextFromBlock));

if (byEventKind) {
if (byEventKind.eventKind === "Storage") {
await this.getStorageEvents(nextFromBlock, nextToBlock, byEventKind);
for (const storageEventSpecific of byEventKind.StorageRegistry) {
await this.getStorageEvents(nextFromBlock, nextToBlock, storageEventSpecific);
}
if (byEventKind.eventKind === "IdRegistry") {
await this.getIdRegistryEvents(nextFromBlock, nextToBlock, byEventKind);

for (const idRegistryEventSpecific of byEventKind.IdRegistry) {
await this.getIdRegistryEvents(nextFromBlock, nextToBlock, idRegistryEventSpecific);
}
if (byEventKind.eventKind === "KeyRegistry") {
await this.getKeyRegistryEvents(nextFromBlock, nextToBlock, byEventKind);

for (const keyRegistryEventSpecific of byEventKind.KeyRegistry) {
await this.getKeyRegistryEvents(nextFromBlock, nextToBlock, keyRegistryEventSpecific);
}
} else {
await this.getStorageEvents(nextFromBlock, nextToBlock);
Expand Down
1 change: 1 addition & 0 deletions apps/hubble/src/hubble.ts
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ export class Hub implements HubInterface {
mainnetClient,
opClient as PublicClient,
this.fNameRegistryEventsProvider,
this.l2RegistryProvider,
);

const profileSync = options.profileSync ?? false;
Expand Down
2 changes: 1 addition & 1 deletion apps/hubble/src/rpc/test/httpServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ describe("httpServer", () => {
const response = (e as any).response;

expect(response.status).toBe(400);
expect(response.data.errCode).toEqual("bad_request.validation_failure");
expect(response.data.errCode).toEqual("bad_request.unknown_fid");
expect(response.data.details).toMatch("unknown fid");
}
expect(errored).toBeTruthy();
Expand Down
6 changes: 3 additions & 3 deletions apps/hubble/src/rpc/test/messageService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ describe("submitMessage", () => {
test("fails without signer", async () => {
const result = await client.submitMessage(castAdd);
const err = result._unsafeUnwrapErr();
expect(err.errCode).toEqual("bad_request.validation_failure");
expect(err.errCode).toEqual("bad_request.unknown_fid");
expect(err.message).toMatch("unknown fid");
});
});
Expand Down Expand Up @@ -134,12 +134,12 @@ describe("validateMessage", () => {
test("fails without signer", async () => {
const castResult = await client.submitMessage(castAdd);
const castErr = castResult._unsafeUnwrapErr();
expect(castErr.errCode).toEqual("bad_request.validation_failure");
expect(castErr.errCode).toEqual("bad_request.unknown_fid");
expect(castErr.message).toMatch("unknown fid");

const frameResult = await client.submitMessage(frameAction);
const frameErr = frameResult._unsafeUnwrapErr();
expect(frameErr.errCode).toEqual("bad_request.validation_failure");
expect(frameErr.errCode).toEqual("bad_request.unknown_fid");
expect(frameErr.message).toMatch("unknown fid");
});
});
26 changes: 18 additions & 8 deletions apps/hubble/src/storage/engine/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,13 @@ const fNameProvider = new FNameRegistryEventsProvider(
} as any,
false,
);
engine = new Engine(db, network, undefined, publicClient, undefined, fNameProvider);

// biome-ignore lint/suspicious/noExplicitAny: mock used only in tests
const l2EventsProvider = jest.fn() as any;
l2EventsProvider.retryEventsForFid = jest.fn();
const retryEventsForFidMock = l2EventsProvider.retryEventsForFid;

engine = new Engine(db, network, undefined, publicClient, undefined, fNameProvider, l2EventsProvider);

const fid = Factories.Fid.build();
const fname = Factories.Fname.build();
Expand Down Expand Up @@ -656,7 +662,7 @@ describe("mergeMessage", () => {
await engine.mergeOnChainEvent(Factories.SignerOnChainEvent.build({ fid }));

const result = await engine.mergeMessage(reactionAdd);
expect(result).toMatchObject(err({ errCode: "bad_request.validation_failure" }));
expect(result).toMatchObject(err({ errCode: "bad_request.unknown_signer" }));
expect(result._unsafeUnwrapErr().message).toMatch("invalid signer");
});

Expand All @@ -666,7 +672,7 @@ describe("mergeMessage", () => {
await engine.mergeOnChainEvent(Factories.SignerOnChainEvent.build({ fid }));

const result = await engine.mergeMessage(linkAdd);
expect(result).toMatchObject(err({ errCode: "bad_request.validation_failure" }));
expect(result).toMatchObject(err({ errCode: "bad_request.unknown_signer" }));
expect(result._unsafeUnwrapErr().message).toMatch("invalid signer");
});
});
Expand All @@ -677,7 +683,7 @@ describe("mergeMessage", () => {
afterEach(async () => {
const result = await engine.mergeMessage(message);
const err = result._unsafeUnwrapErr();
expect(err.errCode).toEqual("bad_request.validation_failure");
expect(err.errCode).toEqual("bad_request.unknown_fid");
expect(err.message).toMatch("unknown fid");
});

Expand Down Expand Up @@ -1098,7 +1104,8 @@ describe("mergeMessages", () => {
expect(results.size).toBe(3);

expect(results.get(0)).toBeInstanceOf(Ok);
expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.validation_failure" }));
expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.unknown_fid" }));
expect(retryEventsForFidMock).toHaveBeenLastCalledWith(0);
expect(results.get(2)).toBeInstanceOf(Ok);

const fid2 = Factories.Fid.build();
Expand All @@ -1123,14 +1130,16 @@ describe("mergeMessages", () => {
expect(results.size).toBe(2);

expect(results.get(0)).toBeInstanceOf(Ok);
expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.validation_failure" }));
expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.unknown_fid" }));
expect(retryEventsForFidMock).toHaveBeenLastCalledWith(fid2);

// Add custody address, but adding without signer is invalid
await engine.mergeOnChainEvent(custodyEvent2);
results = await engine.mergeMessages([castAdd2, linkAdd]);
expect(results.size).toBe(2);

expect(results.get(0)).toMatchObject(err({ errCode: "bad_request.validation_failure" }));
expect(results.get(0)).toMatchObject(err({ errCode: "bad_request.unknown_signer" }));
expect(retryEventsForFidMock).toHaveBeenLastCalledWith(fid2);
expect(results.get(1)).toBeInstanceOf(Ok);

// Add signer address, but adding without storage is invalid
Expand All @@ -1139,7 +1148,8 @@ describe("mergeMessages", () => {
expect(results.size).toBe(2);

expect(results.get(0)).toBeInstanceOf(Ok);
expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.prunable" }));
expect(results.get(1)).toMatchObject(err({ errCode: "bad_request.no_storage" }));
expect(retryEventsForFidMock).toHaveBeenLastCalledWith(fid2);

// Add the storage event, and now it should merge
await engine.mergeOnChainEvent(storageEvent2);
Expand Down
Loading

0 comments on commit 1508b0b

Please sign in to comment.