Skip to content

Commit

Permalink
fix: Use a queue when retrying messages (#1168)
Browse files Browse the repository at this point in the history
* fix: Add a retry Q for failed signer messages

* changeset
  • Loading branch information
adityapk00 authored Jul 20, 2023
1 parent e36fcae commit c1bb21c
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 41 deletions.
5 changes: 5 additions & 0 deletions .changeset/silver-dolls-accept.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

fix: When retring messages due to failed signers, use a queue
102 changes: 66 additions & 36 deletions apps/hubble/src/network/sync/syncEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,26 @@ type PeerContact = {
peerId: PeerId;
};

type MergeResult = {
class MergeResult {
total: number;
successCount: number;
deferredCount: number;
errCount: number;
};

constructor(total = 0, successCount = 0, deferredCount = 0, errCount = 0) {
this.total = total;
this.successCount = successCount;
this.deferredCount = deferredCount;
this.errCount = errCount;
}

addResult(result: MergeResult) {
this.total += result.total;
this.successCount += result.successCount;
this.deferredCount += result.deferredCount;
this.errCount += result.errCount;
}
}

type SyncStatus = {
isSyncing: boolean;
Expand Down Expand Up @@ -108,6 +122,8 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
private _messagesSinceLastCompaction = 0;
private _isCompacting = false;

private _fidRetryMessageQ = new Map<number, Message[]>();

constructor(
hub: HubInterface,
rocksDb: RocksDB,
Expand Down Expand Up @@ -243,7 +259,6 @@ class SyncEngine extends TypedEmitter<SyncEvents> {

public async diffSyncIfRequired(hub: Hub, peerIdString?: string) {
this.emit("syncStart");
log.info({ peerIdString }, "Diffsync: Starting diff sync");

if (this.currentHubPeerContacts.size === 0) {
log.warn("Diffsync: No peer contacts, skipping sync");
Expand Down Expand Up @@ -339,10 +354,12 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
);

if (syncStatus.shouldSync === true) {
log.info({ peerId }, "Diffsync: Syncing with peer");
await this.performSync(updatedPeerIdString, peerState, rpcClient);
log.info({ peerId }, "Diffsync: Starting Sync with peer");
const start = Date.now();

log.info({ peerId }, "Diffsync: complete");
const result = await this.performSync(updatedPeerIdString, peerState, rpcClient);

log.info({ peerId, result, timeTakenMs: Date.now() - start }, "Diffsync: complete");
this.emit("syncComplete", true);
return;
} else {
Expand Down Expand Up @@ -422,12 +439,15 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
});
}

async performSync(peerId: string, otherSnapshot: TrieSnapshot, rpcClient: HubRpcClient): Promise<boolean> {
async performSync(peerId: string, otherSnapshot: TrieSnapshot, rpcClient: HubRpcClient): Promise<MergeResult> {
log.debug({ peerId }, "Perform sync: Start");

let success = false;
const fullSyncResult = new MergeResult();

try {
this._isSyncing = true;
this._interruptSync = false;
this._fidRetryMessageQ = new Map();

// Get the snapshot of our trie, at the same prefix as the peer's snapshot
const snapshot = await this.getSnapshot(otherSnapshot.prefix);
Expand All @@ -447,13 +467,9 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
"Divergence prefix",
);

const fullSyncResult: MergeResult = { total: 0, successCount: 0, deferredCount: 0, errCount: 0 };
await this.fetchMissingHashesByPrefix(divergencePrefix, rpcClient, async (missingIds: Uint8Array[]) => {
fullSyncResult.total += missingIds.length;
const result = await this.fetchAndMergeMessages(missingIds, rpcClient);
fullSyncResult.successCount += result.successCount;
fullSyncResult.deferredCount += result.deferredCount;
fullSyncResult.errCount += result.errCount;
fullSyncResult.addResult(result);
});
log.info({ syncResult: fullSyncResult }, "Perform sync: Sync Complete");

Expand All @@ -469,16 +485,14 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
);
this._unproductivePeers.set(peerId, new Date());
}

success = true;
}
} catch (e) {
log.warn(e, "Perform sync: Error");
} finally {
this._isSyncing = false;
}

return success;
return fullSyncResult;
}

async getAllMessagesBySyncIds(syncIds: Uint8Array[]): HubAsyncResult<Message[]> {
Expand Down Expand Up @@ -506,12 +520,11 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
}

public async fetchAndMergeMessages(syncIds: Uint8Array[], rpcClient: HubRpcClient): Promise<MergeResult> {
const empty = { successCount: 0, deferredCount: 0, errCount: 0, total: 0 };
if (syncIds.length === 0) {
return empty;
return new MergeResult(); // empty merge result
}

let result = empty;
let result = new MergeResult();
const messagesResult = await rpcClient.getAllMessagesBySyncIds(
SyncIds.create({ syncIds }),
new Metadata(),
Expand Down Expand Up @@ -549,14 +562,13 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
if (result.error.errCode === "bad_request.validation_failure") {
if (result.error.message.startsWith("invalid signer")) {
// The user's signer was not found. So fetch all signers from the peer and retry.

const retryResult = await this.syncUserAndRetryMessage(msg, rpcClient);
log.warn(
{
fid: msg.data?.fid,
err: result.error.message,
signer: bytesToHexString(msg.signer)._unsafeUnwrap(),
retryResult: retryResult.isOk() ? "ok" : "err",
retryResult,
},
"Unknown signer, fetched all signers from peer",
);
Expand Down Expand Up @@ -598,19 +610,13 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
}

const successCount = mergeResults.filter((r) => r.isOk()).length;
const result = new MergeResult(mergeResults.length, successCount, deferredCount, errCount);

if (mergeResults.length > 0) {
log.info(
{
total: mergeResults.length,
success: successCount,
deferred: deferredCount,
errored: errCount,
},
"Merged messages during sync",
);
log.info(result, "Merged messages during sync");
}

return { total: mergeResults.length, successCount, deferredCount, errCount };
return result;
}

async fetchMissingHashesByPrefix(
Expand Down Expand Up @@ -868,6 +874,20 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
return err(new HubError("bad_request.invalid_param", "Invalid fid while retrying message"));
}

if (this._fidRetryMessageQ.has(fid)) {
// If the queue is empty, this fid has already been retried, so we can skip
if (this._fidRetryMessageQ.get(fid)?.length === 0) {
return err(new HubError("bad_request.invalid_param", "Fid already retried"));
}

// Add the message to the queue
this._fidRetryMessageQ.get(fid)?.push(message);
return ok(this._fidRetryMessageQ.get(fid)?.length ?? 0);
} else {
// Create a new queue for this fid
this._fidRetryMessageQ.set(fid, [message]);
}

// Probably not required to fetch the signer messages, but doing it here means
// sync will complete in one round (prevents messages failing to merge due to missed or out of
// order signer message)
Expand All @@ -883,13 +903,23 @@ class SyncEngine extends TypedEmitter<SyncEvents> {
const results = await Promise.all(
signerMessagesResult.value.messages.map((message) => this._hub.submitMessage(message, "sync")),
);

const messages = this._fidRetryMessageQ.get(fid) ?? [];
this._fidRetryMessageQ.set(fid, []);

if (results.every((r) => r.isErr())) {
return err(new HubError("unavailable.storage_failure", "Failed to merge signer messages"));
return err(new HubError("unavailable.storage_failure", "Failed to merge any signer message"));
} else {
// if at least one signer message was merged, retry the original message
return (await this._hub.submitMessage(message, "sync")).mapErr((e) => {
return new HubError("unavailable.storage_failure", e);
});
// if at least one signer message was merged, retry the messages in the queue
const results = await Promise.all(messages.map(async (message) => this._hub.submitMessage(message, "sync")));

// If any of the messages failed, return a hub error
const firstFailed = results.find((r) => r.isErr());
if (firstFailed) {
return firstFailed;
} else {
return ok(0);
}
}
}
}
Expand Down
5 changes: 1 addition & 4 deletions apps/hubble/src/rpc/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1022,10 +1022,7 @@ export default class Server {
if (process.memoryUsage().rss > rssUsage + RSS_USAGE_THRESHOLD) {
// more than 1G, so we're writing a lot of data to the stream, but the client is not reading it.
// We'll destroy the stream.
const error = new HubError(
"unavailable.network_failure",
`stream memory usage too much for peer: ${stream.getPeer()}`,
);
const error = new HubError("unavailable.network_failure", "stream memory usage too much");
logger.error({ errCode: error.errCode }, error.message);
stream.destroy(error);

Expand Down
7 changes: 6 additions & 1 deletion apps/hubble/src/storage/jobs/validateOrRevokeMessagesJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,17 @@ export class ValidateOrRevokeMessagesJobScheduler {

const numFids = allFids.length;
const scheduledTimePerFidMs = (6 * 60 * 60 * 1000) / numFids; // 6 hours for all FIDs
log.info({ numFids, scheduledTimePerFidMs }, "ValidateOrRevokeMessagesJob: got FIDs");

for (let i = 0; i < numFids; i++) {
const fid = allFids[i] as number;
const numChecked = await this.doJobForFid(fid);
totalMessagesChecked += numChecked.unwrapOr(0);

if (i % 1000 === 0) {
log.info({ fid, totalMessagesChecked }, "ValidateOrRevokeMessagesJob: progress");
}

// If we are running ahead of schedule, sleep for a bit to let the other jobs catch up.
if (Date.now() - start < (i + 1) * scheduledTimePerFidMs) {
await new Promise((resolve) => setTimeout(resolve, scheduledTimePerFidMs));
Expand Down Expand Up @@ -132,7 +137,7 @@ export class ValidateOrRevokeMessagesJobScheduler {
}
},
{},
5 * 60 * 1000, // 5 minutes
15 * 60 * 1000, // 15 minutes
);

return ok(count);
Expand Down

0 comments on commit c1bb21c

Please sign in to comment.