Skip to content

Commit

Permalink
enable onchain events ingestion with stop block option
Browse files Browse the repository at this point in the history
  • Loading branch information
aditiharini committed Dec 23, 2024
1 parent 0d4a7ab commit 8d362af
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 61 deletions.
2 changes: 2 additions & 0 deletions apps/hubble/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ app
"The block number to begin syncing events from L2 Farcaster contracts",
parseNumber,
)
.option("--l2-stop-block <number>", "The block number to stop syncing L2 events at", parseNumber)
.option(
"--l2-chunk-size <number>",
"The number of events to fetch from L2 Farcaster contracts at a time",
Expand Down Expand Up @@ -519,6 +520,7 @@ app
l2KeyRegistryAddress: cliOptions.l2KeyRegistryAddress ?? hubConfig.l2KeyRegistryAddress,
l2StorageRegistryAddress: cliOptions.l2StorageRegistryAddress ?? hubConfig.l2StorageRegistryAddress,
l2FirstBlock: cliOptions.l2FirstBlock ?? hubConfig.l2FirstBlock,
l2StopBlock: cliOptions.l2StopBlock,
l2ChunkSize: cliOptions.l2ChunkSize ?? hubConfig.l2ChunkSize,
l2ChainId: cliOptions.l2ChainId ?? hubConfig.l2ChainId,
l2ResyncEvents: cliOptions.l2ResyncEvents ?? hubConfig.l2ResyncEvents ?? false,
Expand Down
50 changes: 30 additions & 20 deletions apps/hubble/src/eth/l2EventsProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
private _publicClient: PublicClient<transport, chain>;

private _firstBlock: number;
private _stopBlock?: number;
private _chunkSize: number;
private _chainId: number;
private _rentExpiry: number;
Expand Down Expand Up @@ -151,10 +152,12 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
chainId: number,
resyncEvents: boolean,
expiryOverride?: number,
stopBlock?: number,
) {
this._hub = hub;
this._publicClient = publicClient;
this._firstBlock = firstBlock;
this._stopBlock = stopBlock;
this._chunkSize = chunkSize;
this._chainId = chainId;
this._resyncEvents = resyncEvents;
Expand Down Expand Up @@ -198,6 +201,7 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
chainId: number,
resyncEvents: boolean,
expiryOverride?: number,
stopBlock?: number,
): L2EventsProvider<chain> {
const l2RpcUrls = l2RpcUrl.split(",");
const transports = l2RpcUrls.map((url) =>
Expand Down Expand Up @@ -232,6 +236,7 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
chainId,
resyncEvents,
expiryOverride,
stopBlock,
);

return provider;
Expand All @@ -245,9 +250,9 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
// Connect to L2 RPC

// Start the contract watchers first, so we cache events while we sync historical events
this._watchStorageContractEvents?.start();
this._watchIdRegistryV2ContractEvents?.start();
this._watchKeyRegistryV2ContractEvents?.start();
// this._watchStorageContractEvents?.start();
// this._watchIdRegistryV2ContractEvents?.start();
// this._watchKeyRegistryV2ContractEvents?.start();

const syncHistoryResult = await this.connectAndSyncHistoricalEvents();
if (syncHistoryResult.isErr()) {
Expand Down Expand Up @@ -603,23 +608,6 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra

/** Connect to OP RPC and sync events. Returns the highest block that was synced */
private async connectAndSyncHistoricalEvents(): HubAsyncResult<number> {
const latestBlockResult = await ResultAsync.fromPromise(getBlockNumber(this._publicClient), (err) => err);
if (latestBlockResult.isErr()) {
diagnosticReporter().reportError(latestBlockResult.error as Error);
const msg = "failed to connect to optimism node. Check your eth RPC URL (e.g. --l2-rpc-url)";
log.error({ err: latestBlockResult.error }, msg);
return err(new HubError("unavailable.network_failure", msg));
}
const latestBlock = Number(latestBlockResult.value);

if (!latestBlock) {
const msg = "failed to get the latest block from the RPC provider";
log.error(msg);
return err(new HubError("unavailable.network_failure", msg));
}

log.info({ latestBlock: latestBlock }, "connected to optimism node");

// Find how how much we need to sync
let lastSyncedBlock = this._firstBlock;

Expand All @@ -635,6 +623,28 @@ export class L2EventsProvider<chain extends Chain = Chain, transport extends Tra
}

log.info({ lastSyncedBlock }, "last synced block");
let latestBlock = lastSyncedBlock;
if (this._stopBlock) {
latestBlock = this._stopBlock;
} else {
const latestBlockResult = await ResultAsync.fromPromise(getBlockNumber(this._publicClient), (err) => err);
if (latestBlockResult.isErr()) {
diagnosticReporter().reportError(latestBlockResult.error as Error);
const msg = "failed to connect to optimism node. Check your eth RPC URL (e.g. --l2-rpc-url)";
log.error({ err: latestBlockResult.error }, msg);
return err(new HubError("unavailable.network_failure", msg));
}
latestBlock = Number(latestBlockResult.value);

if (!latestBlock) {
const msg = "failed to get the latest block from the RPC provider";
log.error(msg);
return err(new HubError("unavailable.network_failure", msg));
}

log.info({ latestBlock: latestBlock }, "connected to optimism node");
}

const toBlock = latestBlock;

if (lastSyncedBlock < toBlock) {
Expand Down
5 changes: 4 additions & 1 deletion apps/hubble/src/hubble.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ export interface HubOptions {
/** Block number to begin syncing events from for L2 */
l2FirstBlock?: number;

l2StopBlock?: number;

/** Number of blocks to batch when syncing historical events for L2 */
l2ChunkSize?: number;

Expand Down Expand Up @@ -431,6 +433,7 @@ export class Hub implements HubInterface {
options.l2ChainId ?? OptimismConstants.ChainId,
options.l2ResyncEvents ?? false,
options.l2RentExpiryOverride,
options.l2StopBlock,
);
} else {
log.warn("No L2 RPC URL provided, unable to sync L2 contract events");
Expand Down Expand Up @@ -799,7 +802,7 @@ export class Hub implements HubInterface {
await this.adminServer.start(this.options.adminServerHost ?? "127.0.0.1");
}

// await this.l2RegistryProvider.start();
await this.l2RegistryProvider.start();
// await this.fNameRegistryEventsProvider.start();

const peerId = this.options.peerId
Expand Down
4 changes: 1 addition & 3 deletions packages/shuttle/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@
"scripts": {
"build": "tsup --config tsup.config.ts",
"start": "tsx src/example-app/app.ts",
"migrate:onchain-events": "tsx src/shuttle/migration.ts backfill-onchain-events",
"migrate:messages": "tsx src/shuttle/migration.ts backfill-messages",
"migrate:validate": "tsx src/shuttle/migration.ts compare-message-counts",
"migrate:backfill": "tsx src/shuttle/migration.ts backfill",
"lint": "biome format src/ --write && biome check src/ --apply",
"lint:ci": "biome ci src/",
"test": "NODE_OPTIONS=--experimental-vm-modules jest --detectOpenHandles --forceExit",
Expand Down
47 changes: 10 additions & 37 deletions packages/shuttle/src/shuttle/migration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -347,45 +347,26 @@ function selectFids() {
}

if (import.meta.url.endsWith(url.pathToFileURL(process.argv[1] || "").toString())) {
async function backfillOnChainEvents() {
async function backfill() {
const migration = await Migration.create(
POSTGRES_SCHEMA,
ONCHAIN_EVENTS_HUB_HOST,
SNAPCHAIN_HOST,
HUB_HOST,
HUB_ADMIN_HOST,
);

const fids = selectFids();
await migration.ingestAllOnchainEvents(fids);
log.info("Done migrating onchain events");
await migration.ingestUsernameProofs(fids);
return;
}

async function backfillMessages() {
const migration = await Migration.create(
POSTGRES_SCHEMA,
ONCHAIN_EVENTS_HUB_HOST,
SNAPCHAIN_HOST,
HUB_HOST,
HUB_ADMIN_HOST,
);

const fids = selectFids();
log.info("Done migrating usernme proofs");
// Sleep 30s
await sleep(30_000);
await migration.ingestMessagesFromDb(fids);

return;
}

async function compareMessageCounts() {
const migration = await Migration.create(
POSTGRES_SCHEMA,
ONCHAIN_EVENTS_HUB_HOST,
SNAPCHAIN_HOST,
HUB_HOST,
HUB_ADMIN_HOST,
);

const fids = selectFids();
log.info("Done migrating messages");
// Sleep 2 minutes
await sleep(120_000);
for (const fid of fids) {
const compareResult = await migration.compareMessageCounts(fid);
if (compareResult.isErr()) {
Expand All @@ -401,15 +382,7 @@ if (import.meta.url.endsWith(url.pathToFileURL(process.argv[1] || "").toString()
.description("Synchronizes a Farcaster Hub with a Postgres database")
.version(JSON.parse(readFileSync("./package.json").toString()).version);

program.command("backfill-messages").description("Queue up backfill for the worker").action(backfillMessages);
program
.command("backfill-onchain-events")
.description("Queue up backfill for the worker")
.action(backfillOnChainEvents);
program
.command("compare-message-counts")
.description("Cross-check message counts between hub and snapchain")
.action(compareMessageCounts);
program.command("backfill").description("Queue up backfill for the worker").action(backfill);

program.parse(process.argv);
}

0 comments on commit 8d362af

Please sign in to comment.