Skip to content

Commit

Permalink
perf: Add a Sync Profiler (#1098)
Browse files Browse the repository at this point in the history
* feat: Add Sync profiler

* changeset

* bugfix

* pretty print

* cleanup
  • Loading branch information
adityapk00 authored Jul 11, 2023
1 parent f9ebc7c commit 850f82f
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 10 deletions.
5 changes: 5 additions & 0 deletions .changeset/tidy-roses-crash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

perf: Add a sync profiler
3 changes: 3 additions & 0 deletions apps/hubble/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ app
.option("--admin-server-enabled", "Enable the admin server. (default: disabled)")
.option("--admin-server-host <host>", "The host the admin server should listen on. (default: '127.0.0.1')")
.option("--db-name <name>", "The name of the RocksDB instance")
.option("--profile-sync", "Profile the sync. Will sync the node and exit. (default: disabled)")
.option("--rebuild-sync-trie", "Rebuilds the sync trie before starting")
.option("--resync-eth-events", "Resyncs events from the Farcaster contracts before starting")
.option("--resync-name-events", "Resyncs events from the FName registry server before starting")
Expand Down Expand Up @@ -299,6 +300,7 @@ app
.map((a) => a._unsafeUnwrap());

const rebuildSyncTrie = cliOptions.rebuildSyncTrie ?? hubConfig.rebuildSyncTrie ?? false;
const profileSync = cliOptions.profileSync ?? hubConfig.profileSync ?? false;

const options: HubOptions = {
peerId,
Expand All @@ -322,6 +324,7 @@ app
rocksDBName: cliOptions.dbName ?? hubConfig.dbName,
resetDB,
rebuildSyncTrie,
profileSync,
resyncEthEvents: cliOptions.resyncEthEvents ?? hubConfig.resyncEthEvents ?? false,
resyncNameEvents: cliOptions.resyncNameEvents ?? hubConfig.resyncNameEvents ?? false,
commitLockTimeout: cliOptions.commitLockTimeout ?? hubConfig.commitLockTimeout,
Expand Down
39 changes: 38 additions & 1 deletion apps/hubble/src/hubble.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import { MAINNET_ALLOWED_PEERS } from "./allowedPeers.mainnet.js";
import StoreEventHandler from "./storage/stores/storeEventHandler.js";
import { FNameRegistryClient, FNameRegistryEventsProvider } from "./eth/fnameRegistryEventsProvider.js";
import { GOSSIP_PROTOCOL_VERSION } from "./network/p2p/protocol.js";
import { prettyPrintTable } from "./profile.js";
import packageJson from "./package.json" assert { type: "json" };
import { createPublicClient, http } from "viem";
import { mainnet } from "viem/chains";
Expand Down Expand Up @@ -163,6 +164,9 @@ export interface HubOptions {
/** Resets the DB on start, if true */
resetDB?: boolean;

/** Profile the sync and exit after done */
profileSync?: boolean;

/** Rebuild the sync trie from messages in the DB on startup */
rebuildSyncTrie?: boolean;

Expand Down Expand Up @@ -271,13 +275,46 @@ export class Hub implements HubInterface {
lockMaxPending: options.commitLockMaxPending,
lockTimeout: options.commitLockTimeout,
});

const mainnetClient = createPublicClient({
chain: mainnet,
transport: http(options.ethMainnetRpcUrl, { retryCount: 2 }),
});

this.engine = new Engine(this.rocksDB, options.network, eventHandler, mainnetClient);
this.syncEngine = new SyncEngine(this, this.rocksDB, this.ethRegistryProvider);

const profileSync = options.profileSync ?? false;
this.syncEngine = new SyncEngine(this, this.rocksDB, this.ethRegistryProvider, profileSync);

// If profileSync is true, exit after sync is complete
if (profileSync) {
this.syncEngine.on("syncComplete", async (success) => {
if (success) {
log.info("Sync complete, exiting (profileSync=true)");

const profileLog = logger.child({ component: "SyncProfile" });

const profile = this.syncEngine.getSyncProfile();
if (profile) {
profileLog.info({ wallTimeMs: profile.getSyncDuration() });

for (const [method, p] of profile.getRpcMethodProfiles()) {
profileLog.info({ method, p });
}

// Also write to console for easy copy/paste
console.log("\nLatencies (ms)\n");
console.log(prettyPrintTable(profile.latenciesToPrettyPrintObject()));

console.log("\nData Fetched (bytes)\n");
console.log(prettyPrintTable(profile.resultBytesToPrettyPrintObject()));
}

await this.stop();
process.exit(0);
}
});
}

this.rpcServer = new Server(
this,
Expand Down
30 changes: 23 additions & 7 deletions apps/hubble/src/network/sync/syncEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { sleepWhile } from "../../utils/crypto.js";
import { logger } from "../../utils/logger.js";
import { RootPrefix } from "../../storage/db/types.js";
import { fromFarcasterTime } from "@farcaster/core";
import { SyncEngineProfiler } from "./syncEngineProfiler.js";

// Number of seconds to wait for the network to "settle" before syncing. We will only
// attempt to sync messages that are older than this time.
Expand Down Expand Up @@ -89,6 +90,7 @@ class SyncEngine extends TypedEmitter<SyncEvents> {

private _isSyncing = false;
private _interruptSync = false;
private _syncProfiler?: SyncEngineProfiler;

private currentHubPeerContacts: Map<string, PeerContact> = new Map();
// Map of peerId to last time we attempted to sync with them without merging any new messages succesfully
Expand All @@ -102,13 +104,17 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
// Number of messages since last compaction
private _messagesSinceLastCompaction = 0;

constructor(hub: HubInterface, rocksDb: RocksDB, ethEventsProvider?: EthEventsProvider) {
constructor(hub: HubInterface, rocksDb: RocksDB, ethEventsProvider?: EthEventsProvider, profileSync = false) {
super();

this._db = rocksDb;
this._trie = new MerkleTrie(rocksDb);
this._ethEventsProvider = ethEventsProvider;

if (profileSync) {
this._syncProfiler = new SyncEngineProfiler();
}

this._hub = hub;

this._hub.engine.eventHandler.on("mergeMessage", async (event: MergeMessageHubEvent) => {
Expand Down Expand Up @@ -196,6 +202,10 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
this._interruptSync = false;
}

public getSyncProfile(): SyncEngineProfiler | undefined {
return this._syncProfiler;
}

public isSyncing(): boolean {
return this._isSyncing;
}
Expand Down Expand Up @@ -257,7 +267,7 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
}

const updatedPeerIdString = peerId.toString();
const rpcClient = await hub.getRPCClientForPeer(peerId, peerContact);
let rpcClient = await hub.getRPCClientForPeer(peerId, peerContact);
if (!rpcClient) {
log.warn("Diffsync: Failed to get RPC client for peer, skipping sync");
// If we're unable to reach the peer, remove it from our contact list. We'll retry when it's added back by
Expand All @@ -266,6 +276,12 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
this.emit("syncComplete", false);
return;
}

// If a sync profile is enabled, wrap the rpcClient in a profiler
if (this._syncProfiler) {
rpcClient = this._syncProfiler.profiledRpcClient(rpcClient);
}

try {
// First, get the latest state from the peer
const peerStateResult = await rpcClient.getSyncSnapshotByPrefix(
Expand Down Expand Up @@ -312,18 +328,18 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
if (syncStatus.shouldSync === true) {
log.info({ peerId }, "Diffsync: Syncing with peer");
await this.performSync(updatedPeerIdString, peerState, rpcClient);

log.info({ peerId }, "Diffsync: complete");
this.emit("syncComplete", true);
return;
} else {
log.info({ peerId }, "No need to sync");
this.emit("syncComplete", false);
return;
}

log.info({ peerId }, "Diffsync: complete");
this.emit("syncComplete", false);
return;
} finally {
const closeResult = Result.fromThrowable(
() => rpcClient.close(),
() => rpcClient?.close(),
(e) => e as Error,
)();
if (closeResult.isErr()) {
Expand Down
Loading

0 comments on commit 850f82f

Please sign in to comment.