Skip to content

Commit

Permalink
fix: Fix initial Eth events fetching and logging (#1257)
Browse files Browse the repository at this point in the history
* fix: Improve initial eth events fetch

* changeset
  • Loading branch information
adityapk00 authored Aug 14, 2023
1 parent 951793b commit 8d61f5f
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 112 deletions.
5 changes: 5 additions & 0 deletions .changeset/thin-countries-tickle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

fix: Improve initial eth events fetching
195 changes: 83 additions & 112 deletions apps/hubble/src/eth/ethEventsProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { HubInterface } from "../hubble.js";
import { logger } from "../utils/logger.js";
import { WatchContractEvent } from "./watchContractEvent.js";
import { WatchBlockNumber } from "./watchBlockNumber.js";
import { formatPercentage } from "../profile/profile.js";

const log = logger.child({
component: "EthEventsProvider",
Expand Down Expand Up @@ -268,7 +269,7 @@ export class EthEventsProvider {
let lastSyncedBlock = this._firstBlock;

const hubState = await this._hub.getHubState();
if (hubState.isOk()) {
if (hubState.isOk() && hubState.value.lastEthBlock) {
lastSyncedBlock = hubState.value.lastEthBlock;
}

Expand All @@ -279,19 +280,52 @@ export class EthEventsProvider {

log.info({ lastSyncedBlock }, "last synced block");
const toBlock = latestBlock;
const fromBlock = lastSyncedBlock;

/*
* Querying Blocks in Batches
*
* 1. Calculate difference between the first and last sync blocks (e.g. )
* 2. Divide by batch size and round up to get runs, and iterate with for-loop
* 3. Compute the fromBlock in each run as firstBlock + (loopIndex * batchSize)
* 4. Compute the toBlock in each run as fromBlock + batchSize
* 5. In every run after the first one, increment fromBlock by 1 to avoid duplication
* To sync blocks from 7,648,795 to 8,418,326, the diff is 769,531
* The run size for a 10k batch would be = 76.9531 ~= 77 runs
* For the 1st run, fromBlock = 7,648,795 + (0 * 10,000) = 7,648,795
* For the 1st run, toBlock = 7,648,795 + 10,000 = 7,658,795
* For the 2nd run, fromBlock = 7,648,795 + (1 * 10,000) + 1 = 7,658,796
* For the 2nd run, toBlock = 7,658,796 + 10,000 = 7,668,796
*/

if (lastSyncedBlock < toBlock) {
log.info({ fromBlock: lastSyncedBlock, toBlock }, "syncing events from missed blocks");
const totalBlocks = toBlock - fromBlock;
const numOfRuns = Math.ceil(totalBlocks / this._chunkSize);

for (let i = 0; i < numOfRuns; i++) {
let nextFromBlock = fromBlock + i * this._chunkSize;
const nextToBlock = nextFromBlock + this._chunkSize;

if (i > 0) {
// If this isn't our first loop, we need to up the fromBlock by 1, or else we will be re-caching an already cached block.
nextFromBlock += 1;
}

// Sync old Id events
await this.syncHistoricalIdEvents(IdRegistryEventType.REGISTER, lastSyncedBlock, toBlock, this._chunkSize);
await this.syncHistoricalIdEvents(IdRegistryEventType.TRANSFER, lastSyncedBlock, toBlock, this._chunkSize);
log.info(
{ fromBlock: nextFromBlock, toBlock: nextToBlock },
`syncing events (${formatPercentage((nextFromBlock - fromBlock) / totalBlocks)}) `,
);

// Sync old Id events
await this.syncHistoricalIdEvents(nextFromBlock, nextToBlock);

// Sync old Name Transfer events
await this.syncHistoricalNameEvents(NameRegistryEventType.TRANSFER, lastSyncedBlock, toBlock, this._chunkSize);
// Sync old Name Transfer events
await this.syncHistoricalNameEvents(NameRegistryEventType.TRANSFER, nextFromBlock, nextToBlock);

// We don't need to sync historical Renew events because the expiry
// is pulled when NameRegistryEvents are merged
// We don't need to sync historical Renew events because the expiry
// is pulled when NameRegistryEvents are merged
}
}

this._isHistoricalSyncDone = true;
Expand All @@ -307,124 +341,61 @@ export class EthEventsProvider {
return;
}
this._retryDedupMap.set(blockNumber, true);
await this.syncHistoricalIdEvents(IdRegistryEventType.REGISTER, blockNumber, blockNumber + 1, 1);
await this.syncHistoricalIdEvents(IdRegistryEventType.TRANSFER, blockNumber, blockNumber + 1, 1);
await this.syncHistoricalIdEvents(blockNumber, blockNumber + 1);

// Sync old Name Transfer events
await this.syncHistoricalNameEvents(NameRegistryEventType.TRANSFER, blockNumber, blockNumber + 1, 1);
await this.syncHistoricalNameEvents(NameRegistryEventType.TRANSFER, blockNumber, blockNumber + 1);
}

/**
* Sync old Id events that may have happened before hub was started. We'll put them all
* in the sync queue to be processed later, to make sure we don't process any unconfirmed events.
*/
private async syncHistoricalIdEvents(
type: IdRegistryEventType,
fromBlock: number,
toBlock: number,
batchSize: number,
) {
/*
* How querying blocks in batches works
* We calculate the difference in blocks, for example, lets say we need to sync/cache 769,531 blocks (difference between the contracts FirstBlock, and the latest Goerli block at time of writing, 8418326)
* After that, we divide our difference in blocks by the batchSize. For example, over 769,531 blocks, at a 10,000 block batchSize, we need to run our loop 76.9531 times, which obviously just rounds up to 77 loops
* During this whole process, we're using a for(let i=0;) loop, which means to get the correct from block, we need to calculate new fromBlock's and toBlock's on every loop
* fromBlock: FirstBlock + (loopIndex * batchSize) - Example w/ batchSize 10,000: Run 0 - FirstBlock + 0, Run 1 - FirstBlock + 10,000, Run 2 - FirstBlock + 20,000, etc....
* toBlock: fromBlock + batchSize - Example w/ batchSize 10,000: Run 0: fromBlock + 10,000, Run 1 - fromBlock + 10,000, etc...
*/

// Calculate amount of runs required based on batchSize, and round up to capture all blocks
const numOfRuns = Math.ceil((toBlock - fromBlock) / batchSize);
private async syncHistoricalIdEvents(fromBlock: number, toBlock: number) {
const idFilter = await this._publicClient.createContractEventFilter({
address: GoerliEthConstants.IdRegistryAddress,
abi: IdRegistry.abi,
eventName: "Register",
fromBlock: BigInt(fromBlock),
toBlock: BigInt(toBlock),
strict: true,
});
const idlogsPromise = this._publicClient.getFilterLogs({ filter: idFilter });

const tfrFilter = await this._publicClient.createContractEventFilter({
address: GoerliEthConstants.IdRegistryAddress,
abi: IdRegistry.abi,
eventName: "Transfer",
fromBlock: BigInt(fromBlock),
toBlock: BigInt(toBlock),
strict: true,
});
const tfrlogsPromise = this._publicClient.getFilterLogs({ filter: tfrFilter });

for (let i = 0; i < numOfRuns; i++) {
let nextFromBlock = fromBlock + i * batchSize;
const nextToBlock = nextFromBlock + batchSize;
// Process the idLogs first
await this.processIdRegisterEvents(await idlogsPromise);

if (i > 0) {
// If this isn't our first loop, we need to up the fromBlock by 1, or else we will be re-caching an already cached block.
nextFromBlock += 1;
}

if (type === IdRegistryEventType.REGISTER) {
const filter = await this._publicClient.createContractEventFilter({
address: GoerliEthConstants.IdRegistryAddress,
abi: IdRegistry.abi,
eventName: "Register",
fromBlock: BigInt(nextFromBlock),
toBlock: BigInt(nextToBlock),
strict: true,
});

const logs = await this._publicClient.getFilterLogs({ filter });
await this.processIdRegisterEvents(logs);
} else if (type === IdRegistryEventType.TRANSFER) {
const filter = await this._publicClient.createContractEventFilter({
address: GoerliEthConstants.IdRegistryAddress,
abi: IdRegistry.abi,
eventName: "Transfer",
fromBlock: BigInt(nextFromBlock),
toBlock: BigInt(nextToBlock),
strict: true,
});

const logs = await this._publicClient.getFilterLogs({ filter });
await this.processIdTransferEvents(logs);
}
}
// Then the transfer events
await this.processIdTransferEvents(await tfrlogsPromise);
}

/**
* Sync old Name events that may have happened before hub was started. We'll put them all
* in the sync queue to be processed later, to make sure we don't process any unconfirmed events.
*/
private async syncHistoricalNameEvents(
type: NameRegistryEventType,
fromBlock: number,
toBlock: number,
batchSize: number,
) {
/*
* Querying Blocks in Batches
*
* 1. Calculate difference between the first and last sync blocks (e.g. )
* 2. Divide by batch size and round up to get runs, and iterate with for-loop
* 3. Compute the fromBlock in each run as firstBlock + (loopIndex * batchSize)
* 4. Compute the toBlock in each run as fromBlock + batchSize
* 5. In every run after the first one, increment fromBlock by 1 to avoid duplication
* To sync blocks from 7,648,795 to 8,418,326, the diff is 769,531
* The run size for a 10k batch would be = 76.9531 ~= 77 runs
* For the 1st run, fromBlock = 7,648,795 + (0 * 10,000) = 7,648,795
* For the 1st run, toBlock = 7,648,795 + 10,000 = 7,658,795
* For the 2nd run, fromBlock = 7,648,795 + (1 * 10,000) + 1 = 7,658,796
* For the 2nd run, toBlock = 7,658,796 + 10,000 = 7,668,796
*/

// Calculate amount of runs required based on batchSize, and round up to capture all blocks
const numOfRuns = Math.ceil((toBlock - fromBlock) / batchSize);

for (let i = 0; i < numOfRuns; i++) {
let nextFromBlock = fromBlock + i * batchSize;
const nextToBlock = nextFromBlock + batchSize;

if (i > 0) {
// If this isn't our first loop, we need to up the fromBlock by 1, or else we will be re-caching an already cached block.
nextFromBlock += 1;
}
private async syncHistoricalNameEvents(type: NameRegistryEventType, fromBlock: number, toBlock: number) {
if (type === NameRegistryEventType.TRANSFER) {
const filter = await this._publicClient.createContractEventFilter({
address: GoerliEthConstants.NameRegistryAddress,
abi: NameRegistry.abi,
eventName: "Transfer",
fromBlock: BigInt(fromBlock),
toBlock: BigInt(toBlock),
strict: true,
});

if (type === NameRegistryEventType.TRANSFER) {
const filter = await this._publicClient.createContractEventFilter({
address: GoerliEthConstants.NameRegistryAddress,
abi: NameRegistry.abi,
eventName: "Transfer",
fromBlock: BigInt(nextFromBlock),
toBlock: BigInt(nextToBlock),
strict: true,
});

const logs = await this._publicClient.getFilterLogs({ filter });
await this.processNameTransferEvents(logs);
}
const logs = await this._publicClient.getFilterLogs({ filter });
await this.processNameTransferEvents(logs);
}
}

Expand Down Expand Up @@ -661,7 +632,7 @@ export class EthEventsProvider {
}
idEvents.push(idRegistryEvent);

log.info(
log.debug(
{ event: { to, id: id.toString(), blockNumber } },
`cacheIdRegistryEvent: fid ${id.toString()} assigned to ${to} in block ${blockNumber}`,
);
Expand Down Expand Up @@ -717,7 +688,7 @@ export class EthEventsProvider {
}
nameEvents.push(nameRegistryEvent);

logEvent.info(`cacheNameRegistryEvent: token id ${tokenId.toString()} assigned to ${to} in block ${blockNumber}`);
logEvent.debug(`cacheNameRegistryEvent: token id ${tokenId.toString()} assigned to ${to} in block ${blockNumber}`);

return ok(undefined);
}
Expand Down

0 comments on commit 8d61f5f

Please sign in to comment.