From c7ec4ca92e896c6bf2ff5ca3102cbf5f64a37092 Mon Sep 17 00:00:00 2001 From: Sanjay Date: Fri, 21 Jul 2023 15:41:13 -0700 Subject: [PATCH] feat: revoke signers 1hr after custody event (#1177) * feat: revoke signers 1hr after custody event * add changeset --- .changeset/fifty-feet-punch.md | 5 ++++ apps/hubble/src/storage/engine/index.test.ts | 18 ++++++++++++-- apps/hubble/src/storage/engine/index.ts | 3 ++- .../storage/jobs/revokeMessagesBySignerJob.ts | 24 +++++++++++++------ 4 files changed, 40 insertions(+), 10 deletions(-) create mode 100644 .changeset/fifty-feet-punch.md diff --git a/.changeset/fifty-feet-punch.md b/.changeset/fifty-feet-punch.md new file mode 100644 index 0000000000..3fdb5d1275 --- /dev/null +++ b/.changeset/fifty-feet-punch.md @@ -0,0 +1,5 @@ +--- +"@farcaster/hubble": patch +--- + +feat: revoke signers 1hr after custody event diff --git a/apps/hubble/src/storage/engine/index.test.ts b/apps/hubble/src/storage/engine/index.test.ts index 93e088b2ba..b47ba668ce 100644 --- a/apps/hubble/src/storage/engine/index.test.ts +++ b/apps/hubble/src/storage/engine/index.test.ts @@ -47,6 +47,7 @@ import { setReferenceDateForTest } from "../../utils/versions.js"; import { getUserNameProof } from "../db/nameRegistryEvent.js"; import { publicClient } from "../../test/utils.js"; import { jest } from "@jest/globals"; +import { RevokeMessagesBySignerJobQueue, RevokeMessagesBySignerJobWorker } from "../jobs/revokeMessagesBySignerJob.js"; const db = jestRocksDB("protobufs.engine.test"); const network = FarcasterNetwork.TESTNET; @@ -882,9 +883,22 @@ describe("with listeners and workers", () => { blockNumber: custodyEvent.blockNumber + 1, }); await liveEngine.mergeIdRegistryEvent(custodyTransfer); + await sleep(200); + // Does not immediately revoke messages, will wait 1 hr expect(revokedMessages).toEqual([]); - await sleep(200); // Wait for engine to revoke messages - expect(revokedMessages).toEqual([signerAdd, castAdd, reactionAdd, linkAdd]); + + // Manually trigger the job + const queue = new RevokeMessagesBySignerJobQueue(db); + const worker = new RevokeMessagesBySignerJobWorker(queue, db, liveEngine); + await worker.processJobs(Date.now() + 1000 * 10 * 60 + 5000); + expect(revokedMessages).toEqual([]); // No messages revoked yet, after 10 mins + + // Revokes messages after 1 hr + await worker.processJobs(Date.now() + 1000 * 60 * 60 + 5000); + expect(revokedMessages).toContainEqual(signerAdd); + expect(revokedMessages).toContainEqual(castAdd); + expect(revokedMessages).toContainEqual(reactionAdd); + expect(revokedMessages).toContainEqual(linkAdd); }); test("revokes messages when SignerAdd is pruned", async () => { diff --git a/apps/hubble/src/storage/engine/index.ts b/apps/hubble/src/storage/engine/index.ts index 6a7e7237e6..e184f14fdc 100644 --- a/apps/hubble/src/storage/engine/index.ts +++ b/apps/hubble/src/storage/engine/index.ts @@ -981,7 +981,8 @@ class Engine { fid: idRegistryEvent.fid, signer: fromAddress, }); - const enqueueRevoke = await this._revokeSignerQueue.enqueueJob(payload); + const oneHourFromNow = Date.now() + 60 * 60 * 1000; + const enqueueRevoke = await this._revokeSignerQueue.enqueueJob(payload, oneHourFromNow); if (enqueueRevoke.isErr()) { log.error( { errCode: enqueueRevoke.error.errCode }, diff --git a/apps/hubble/src/storage/jobs/revokeMessagesBySignerJob.ts b/apps/hubble/src/storage/jobs/revokeMessagesBySignerJob.ts index db77cc8108..0d5c769274 100644 --- a/apps/hubble/src/storage/jobs/revokeMessagesBySignerJob.ts +++ b/apps/hubble/src/storage/jobs/revokeMessagesBySignerJob.ts @@ -22,6 +22,7 @@ export class RevokeMessagesBySignerJobWorker { private _db: RocksDB; private _engine: Engine; private _status: "working" | "waiting"; + private _processJobs: () => Promise; constructor(queue: RevokeMessagesBySignerJobQueue, db: RocksDB, engine: Engine) { this._queue = queue; @@ -29,18 +30,21 @@ export class RevokeMessagesBySignerJobWorker { this._engine = engine; this._status = "waiting"; - this.processJobs = this.processJobs.bind(this); + this._processJobs = async () => { + await this.processJobs(); + }; } start() { - this._queue.on("enqueueJob", this.processJobs); + this._queue.on("enqueueJob", this._processJobs); } stop() { - this._queue.off("enqueueJob", this.processJobs); + this._queue.off("enqueueJob", this._processJobs); } - async processJobs(): HubAsyncResult { + async processJobs(doBefore?: number): HubAsyncResult { + const doBeforeTs = doBefore || Date.now() + 500; // Add a 500ms buffer for tests if (this._status === "working") { return err(new HubError("unavailable", "worker is already processing jobs")); } @@ -50,11 +54,11 @@ export class RevokeMessagesBySignerJobWorker { this._status = "working"; - let nextJob = await this._queue.popNextJob(); + let nextJob = await this._queue.popNextJob(doBeforeTs); while (nextJob.isOk()) { await this.processJob(nextJob.value); - nextJob = await this._queue.popNextJob(); + nextJob = await this._queue.popNextJob(doBeforeTs); } this._status = "waiting"; @@ -142,7 +146,13 @@ export class RevokeMessagesBySignerJobQueue extends TypedEmitter return err(result.error); } - this.emit("enqueueJob", key.value); + if (doAt) { + setTimeout(() => { + this.emit("enqueueJob", key.value); + }, doAt - Date.now() + 1000); + } else { + this.emit("enqueueJob", key.value); + } return ok(key.value); }