Skip to content

Commit

Permalink
feat: Add a status hubble command for friendly reporting of current h…
Browse files Browse the repository at this point in the history
…ub status (#944)

* feat: Suport sync status rpc call

* Add sync status hubble command

* Fix generated file

* Changeset

* Fix isSyncing check

* Rename to status and report db stats as well

* Fix error
  • Loading branch information
sanjayprabhu authored May 8, 2023
1 parent ceac2c1 commit 1236b4e
Show file tree
Hide file tree
Showing 19 changed files with 1,406 additions and 133 deletions.
8 changes: 8 additions & 0 deletions .changeset/shiny-tables-join.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@farcaster/hub-nodejs': patch
'@farcaster/hub-web': patch
'@farcaster/core': patch
'@farcaster/hubble': patch
---

Add a GetSyncStatus rpc call that exposes the hubs sync status with different peers
1 change: 1 addition & 0 deletions apps/hubble/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"identity": "tsx src/cli.ts identity",
"dbreset": "tsx src/cli.ts dbreset",
"console": "tsx src/cli.ts console",
"status": "tsx src/cli.ts status",
"test": "yarn build && NODE_OPTIONS=--experimental-vm-modules jest",
"test:ci": "ENVIRONMENT=test NODE_OPTIONS=--experimental-vm-modules jest --ci --forceExit --coverage"
},
Expand Down
54 changes: 53 additions & 1 deletion apps/hubble/src/cli.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
#!/usr/bin/env node
import { FarcasterNetwork } from '@farcaster/hub-nodejs';
import {
FarcasterNetwork,
getInsecureHubRpcClient,
getSSLHubRpcClient,
HubInfoRequest,
SyncStatusRequest,
} from '@farcaster/hub-nodejs';
import { PeerId } from '@libp2p/interface-peer-id';
import { createEd25519PeerId, createFromProtobuf, exportToProtobuf } from '@libp2p/peer-id-factory';
import { Command } from 'commander';
Expand Down Expand Up @@ -437,6 +443,52 @@ app
.addCommand(createIdCommand)
.addCommand(verifyIdCommand);

app
.command('status')
.description('Reports the db and sync status of the hub')
.option(
'-s, --server <url>',
'Farcaster RPC server address:port to connect to (eg. 127.0.0.1:2283)',
DEFAULT_RPC_CONSOLE
)
.option('--insecure', 'Allow insecure connections to the RPC server', false)
.option('-p, --peerId <peerId>', 'Peer id of the hub to compare with (defaults to bootstrap peers)')
.action(async (cliOptions) => {
let rpcClient;
if (cliOptions.insecure) {
rpcClient = getInsecureHubRpcClient(cliOptions.server);
} else {
rpcClient = getSSLHubRpcClient(cliOptions.server);
}
const infoResult = await rpcClient.getInfo(HubInfoRequest.create({ dbStats: true }));
const syncStatusResult = await rpcClient.getSyncStatus(SyncStatusRequest.create({ peerId: cliOptions.peerId }));
if (syncStatusResult.isErr()) {
logger.error(
{ errCode: syncStatusResult.error.errCode, errMsg: syncStatusResult.error.message },
'Failed to get hub status'
);
exit(1);
} else if (infoResult.isErr()) {
logger.error({ errCode: infoResult.error.errCode, errMsg: infoResult.error.message }, 'Failed to get hub status');
exit(1);
}
const dbStats = infoResult.value.dbStats;
logger.info(
`Hub Version: ${infoResult.value.version} Messages: ${dbStats?.numMessages} FIDs: ${dbStats?.numFidEvents} FNames: ${dbStats?.numFnameEvents}}`
);
for (const peerStatus of syncStatusResult.value.syncStatus) {
const messageDelta = peerStatus.theirMessages - peerStatus.ourMessages;
if (syncStatusResult.value.isSyncing) {
logger.info(`Peer ${peerStatus.peerId}: Sync in progress. (msg delta: ${messageDelta})`);
} else {
logger.info(
`Peer ${peerStatus.peerId}: In Sync: ${peerStatus.inSync} (msg delta: ${messageDelta}, diverged ${peerStatus.divergenceSecondsAgo} seconds ago)`
);
}
}
exit(0);
});

app
.command('console')
.description('Start a REPL console')
Expand Down
2 changes: 1 addition & 1 deletion apps/hubble/src/console/console.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export const startConsole = async (addressString: string, useInsecure: boolean)

// Run the info command to start

const info = await rpcClient.getInfo(HubInfoRequest.create({ syncStats: true }), new Metadata(), {
const info = await rpcClient.getInfo(HubInfoRequest.create({ dbStats: true }), new Metadata(), {
deadline: Date.now() + 2000,
});

Expand Down
1 change: 1 addition & 0 deletions apps/hubble/src/hubble.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export interface HubInterface {
getHubState(): HubAsyncResult<HubState>;
putHubState(hubState: HubState): HubAsyncResult<void>;
gossipContactInfo(): HubAsyncResult<void>;
getRPCClientForPeer(peerId: PeerId, peer: ContactInfoContent): Promise<HubRpcClient | undefined>;
}

export interface HubOptions {
Expand Down
13 changes: 11 additions & 2 deletions apps/hubble/src/network/p2p/gossipNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,12 @@ export class GossipNode extends TypedEmitter<NodeEvents> {
private _node?: Libp2p;
private _periodicPeerCheckJob?: PeriodicPeerCheckScheduler;
private _network: FarcasterNetwork;
private _bootstrapPeerIds: Set<string>;

constructor(network?: FarcasterNetwork) {
super();
this._network = network ?? FarcasterNetwork.NONE;
this._bootstrapPeerIds = new Set<string>();
}

/** Returns the PeerId (public key) of this node */
Expand Down Expand Up @@ -259,13 +261,16 @@ export class GossipNode extends TypedEmitter<NodeEvents> {
}

/** Connect to a peer Gossip Node using a specific address */
async connectAddress(address: Multiaddr): Promise<HubResult<void>> {
async connectAddress(address: Multiaddr, isBootstrapNode = false): Promise<HubResult<void>> {
log.debug({ identity: this.identity, address }, `Attempting to connect to address ${address}`);
try {
const conn = await this._node?.dial(address);

if (conn) {
log.info({ identity: this.identity, address }, `Connected to peer at address: ${address}`);
if (isBootstrapNode) {
this._bootstrapPeerIds.add(conn.remotePeer.toString());
}
return ok(undefined);
}
} catch (error: any) {
Expand Down Expand Up @@ -336,6 +341,10 @@ export class GossipNode extends TypedEmitter<NodeEvents> {
return [this.primaryTopic(), this.contactInfoTopic()];
}

get bootstrapPeerIds(): Set<string> {
return this._bootstrapPeerIds;
}

//TODO: Needs better typesafety
static encodeMessage(message: GossipMessage): HubResult<Uint8Array> {
return ok(GossipMessage.encode(message).finish());
Expand Down Expand Up @@ -363,7 +372,7 @@ export class GossipNode extends TypedEmitter<NodeEvents> {
/* Attempts to dial all the addresses in the bootstrap list */
public async bootstrap(bootstrapAddrs: Multiaddr[]): Promise<HubResult<void>> {
if (bootstrapAddrs.length == 0) return ok(undefined);
const results = await Promise.all(bootstrapAddrs.map((addr) => this.connectAddress(addr)));
const results = await Promise.all(bootstrapAddrs.map((addr) => this.connectAddress(addr, true)));

const finalResults = Result.combineWithAllErrors(results) as Result<void[], HubError[]>;
if (finalResults.isErr() && finalResults.error.length == bootstrapAddrs.length) {
Expand Down
3 changes: 1 addition & 2 deletions apps/hubble/src/network/sync/syncEngine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ describe('SyncEngine', () => {
expect(shouldSync.isOk()).toBeTruthy();
expect(shouldSync._unsafeUnwrap().isSyncing).toBeTruthy();
expect(shouldSync._unsafeUnwrap().shouldSync).toBeFalsy();
expect(shouldSync._unsafeUnwrap().ourSnapshot).toBeUndefined();
called = true;

// Return an empty child map so sync will finish with a noop
Expand Down Expand Up @@ -332,7 +331,7 @@ describe('SyncEngine', () => {
await engine.mergeMessage(signerAdd);
await addMessagesWithTimestamps([167, 169]);

const stats = await syncEngine.getSyncStats();
const stats = await syncEngine.getDbStats();
expect(stats.numFids).toEqual(1);
expect(stats.numFnames).toEqual(2);
expect(stats.numMessages).toEqual(3);
Expand Down
113 changes: 84 additions & 29 deletions apps/hubble/src/network/sync/syncEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import { TypedEmitter } from 'tiny-typed-emitter';
import { EthEventsProvider } from '~/eth/ethEventsProvider';
import { Hub, HubInterface } from '~/hubble';
import { MerkleTrie, NodeMetadata } from '~/network/sync/merkleTrie';
import { SyncId, timestampToPaddedTimestampPrefix } from '~/network/sync/syncId';
import { prefixToTimestamp, SyncId, timestampToPaddedTimestampPrefix } from '~/network/sync/syncId';
import { TrieSnapshot } from '~/network/sync/trieNode';
import { getManyMessages } from '~/storage/db/message';
import RocksDB from '~/storage/db/rocksdb';
import { sleepWhile } from '~/utils/crypto';
import { logger } from '~/utils/logger';
import { RootPrefix } from '~/storage/db/types';
import { fromFarcasterTime } from '@farcaster/core';

// Number of seconds to wait for the network to "settle" before syncing. We will only
// attempt to sync messages that are older than this time.
Expand Down Expand Up @@ -65,13 +66,16 @@ type MergeResult = {

type SyncStatus = {
isSyncing: boolean;
inSync: 'true' | 'false' | 'unknown';
inSync: 'true' | 'false' | 'unknown' | 'blocked';
shouldSync: boolean;
theirSnapshot: TrieSnapshot;
ourSnapshot?: TrieSnapshot;
ourSnapshot: TrieSnapshot;
divergencePrefix: string;
divergenceSecondsAgo: number;
lastBadSync: number;
};

type SyncStats = {
type DbStats = {
numMessages: number;
numFids: number;
numFnames: number;
Expand Down Expand Up @@ -297,7 +301,9 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
ourMessages: syncStatus.ourSnapshot?.numMessages,
peerNetwork: peerContact.network,
peerVersion: peerContact.hubVersion,
lastBadSync: this._unproductivePeers.get(peerIdString)?.getTime(),
divergencePrefix: syncStatus.divergencePrefix,
divergenceSeconds: syncStatus.divergenceSecondsAgo,
lastBadSync: syncStatus.lastBadSync,
},
'SyncStatus' // Search for this string in the logs to get summary of sync status
);
Expand Down Expand Up @@ -326,38 +332,65 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
}

public async syncStatus(peerId: string, theirSnapshot: TrieSnapshot): HubAsyncResult<SyncStatus> {
if (this._isSyncing) {
log.info('shouldSync: already syncing');
return ok({ isSyncing: true, inSync: 'unknown', shouldSync: false, theirSnapshot });
}

const lastBadSync = this._unproductivePeers.get(peerId);
if (lastBadSync && Date.now() < lastBadSync.getTime() + BAD_PEER_BLOCK_TIMEOUT) {
log.info(`shouldSync: bad peer (blocked until ${lastBadSync.getTime() + BAD_PEER_BLOCK_TIMEOUT})`);
return ok({ isSyncing: false, inSync: 'false', shouldSync: false, theirSnapshot });
}

const ourSnapshotResult = await this.getSnapshot(theirSnapshot.prefix);

if (ourSnapshotResult.isErr()) {
return err(ourSnapshotResult.error);
} else {
const ourSnapshot = ourSnapshotResult.value;
const excludedHashesMatch =
ourSnapshot.excludedHashes.length === theirSnapshot.excludedHashes.length &&
// NOTE: `index` is controlled by `every` and so not at risk of object injection.
// eslint-disable-next-line security/detect-object-injection
ourSnapshot.excludedHashes.every((value, index) => value === theirSnapshot.excludedHashes[index]);
}
const ourSnapshot = ourSnapshotResult.value;

log.info({ excludedHashesMatch }, `shouldSync: excluded hashes`);
if (this._isSyncing) {
return ok({
isSyncing: true,
inSync: 'unknown',
shouldSync: false,
theirSnapshot,
ourSnapshot,
divergencePrefix: '',
divergenceSecondsAgo: -1,
lastBadSync: -1,
});
}

if (lastBadSync && Date.now() < lastBadSync.getTime() + BAD_PEER_BLOCK_TIMEOUT) {
return ok({
isSyncing: false,
inSync: excludedHashesMatch ? 'true' : 'false',
shouldSync: !excludedHashesMatch,
ourSnapshot,
inSync: 'blocked',
shouldSync: false,
theirSnapshot,
ourSnapshot,
divergencePrefix: '',
divergenceSecondsAgo: -1,
lastBadSync: lastBadSync.getTime(),
});
}

const excludedHashesMatch =
ourSnapshot.excludedHashes.length === theirSnapshot.excludedHashes.length &&
// NOTE: `index` is controlled by `every` and so not at risk of object injection.
// eslint-disable-next-line security/detect-object-injection
ourSnapshot.excludedHashes.every((value, index) => value === theirSnapshot.excludedHashes[index]);

const divergencePrefix = Buffer.from(this.getDivergencePrefix(ourSnapshot, theirSnapshot.excludedHashes)).toString(
'ascii'
);
const divergedAt = fromFarcasterTime(prefixToTimestamp(divergencePrefix));
let divergenceSecondsAgo = -1;
if (divergedAt.isOk()) {
divergenceSecondsAgo = Math.floor((Date.now() - divergedAt.value) / 1000);
}

return ok({
isSyncing: false,
inSync: excludedHashesMatch ? 'true' : 'false',
shouldSync: !excludedHashesMatch,
ourSnapshot,
theirSnapshot,
divergencePrefix,
divergenceSecondsAgo,
lastBadSync: lastBadSync?.getTime() ?? -1,
});
}

async performSync(peerId: string, otherSnapshot: TrieSnapshot, rpcClient: HubRpcClient): Promise<boolean> {
Expand All @@ -371,7 +404,7 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
log.warn({ errCode: snapshot.error.errCode }, `Error performing sync: ${snapshot.error.message}}`);
} else {
const ourSnapshot = snapshot.value;
const divergencePrefix = await this.getDivergencePrefix(ourSnapshot, otherSnapshot.excludedHashes);
const divergencePrefix = this.getDivergencePrefix(ourSnapshot, otherSnapshot.excludedHashes);
log.info(
{
divergencePrefix: Buffer.from(divergencePrefix).toString('ascii'),
Expand Down Expand Up @@ -423,7 +456,7 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
* @param prefix - the prefix of the external trie.
* @param otherExcludedHashes - the excluded hashes of the external trie.
*/
async getDivergencePrefix(ourSnapshot: TrieSnapshot, otherExcludedHashes: string[]): Promise<Uint8Array> {
getDivergencePrefix(ourSnapshot: TrieSnapshot, otherExcludedHashes: string[]): Uint8Array {
const { prefix, excludedHashes } = ourSnapshot;

for (let i = 0; i < prefix.length; i++) {
Expand Down Expand Up @@ -661,7 +694,7 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
return false;
}

public async getSyncStats(): Promise<SyncStats> {
public async getDbStats(): Promise<DbStats> {
let numFids = 0,
numFnames = 0;

Expand All @@ -686,6 +719,28 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
};
}

public async getSyncStatusForPeer(peerId: string, hub: HubInterface): HubAsyncResult<SyncStatus> {
const c = this.currentHubPeerContacts.get(peerId);
if (!c?.peerId || !c?.contactInfo) {
return err(new HubError('unavailable.network_failure', `No contact info for peer ${peerId}`));
}
const rpcClient = await hub.getRPCClientForPeer(c?.peerId, c?.contactInfo);
if (!rpcClient) {
return err(new HubError('unavailable.network_failure', `Could not create a RPC client for peer ${peerId}`));
}
const peerStateResult = await rpcClient.getSyncSnapshotByPrefix(
TrieNodePrefix.create({ prefix: new Uint8Array() }),
new Metadata(),
rpcDeadline()
);
if (peerStateResult.isErr()) {
return err(peerStateResult.error);
}

const theirSnapshot = peerStateResult.value;
return this.syncStatus(peerId, theirSnapshot);
}

public get shouldCompactDb(): boolean {
return this._messagesSinceLastCompaction > COMPACTION_THRESHOLD;
}
Expand Down
6 changes: 5 additions & 1 deletion apps/hubble/src/network/sync/syncId.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,8 @@ const timestampToPaddedTimestampPrefix = (timestamp: number): string => {
return Math.floor(timestamp).toString().padStart(TIMESTAMP_LENGTH, '0');
};

export { SyncId, timestampToPaddedTimestampPrefix, TIMESTAMP_LENGTH, HASH_LENGTH };
const prefixToTimestamp = (prefix: string): number => {
return parseInt(prefix.padEnd(TIMESTAMP_LENGTH, '0'), 10);
};

export { SyncId, timestampToPaddedTimestampPrefix, prefixToTimestamp, TIMESTAMP_LENGTH, HASH_LENGTH };
Loading

0 comments on commit 1236b4e

Please sign in to comment.