Skip to content

Commit

Permalink
feat: revoke signers 1hr after custody event (#1177)
Browse files Browse the repository at this point in the history
* feat: revoke signers 1hr after custody event

* add changeset
  • Loading branch information
sanjayprabhu authored Jul 21, 2023
1 parent b9efe14 commit c7ec4ca
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 10 deletions.
5 changes: 5 additions & 0 deletions .changeset/fifty-feet-punch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

feat: revoke signers 1hr after custody event
18 changes: 16 additions & 2 deletions apps/hubble/src/storage/engine/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import { setReferenceDateForTest } from "../../utils/versions.js";
import { getUserNameProof } from "../db/nameRegistryEvent.js";
import { publicClient } from "../../test/utils.js";
import { jest } from "@jest/globals";
import { RevokeMessagesBySignerJobQueue, RevokeMessagesBySignerJobWorker } from "../jobs/revokeMessagesBySignerJob.js";

const db = jestRocksDB("protobufs.engine.test");
const network = FarcasterNetwork.TESTNET;
Expand Down Expand Up @@ -882,9 +883,22 @@ describe("with listeners and workers", () => {
blockNumber: custodyEvent.blockNumber + 1,
});
await liveEngine.mergeIdRegistryEvent(custodyTransfer);
await sleep(200);
// Does not immediately revoke messages, will wait 1 hr
expect(revokedMessages).toEqual([]);
await sleep(200); // Wait for engine to revoke messages
expect(revokedMessages).toEqual([signerAdd, castAdd, reactionAdd, linkAdd]);

// Manually trigger the job
const queue = new RevokeMessagesBySignerJobQueue(db);
const worker = new RevokeMessagesBySignerJobWorker(queue, db, liveEngine);
await worker.processJobs(Date.now() + 1000 * 10 * 60 + 5000);
expect(revokedMessages).toEqual([]); // No messages revoked yet, after 10 mins

// Revokes messages after 1 hr
await worker.processJobs(Date.now() + 1000 * 60 * 60 + 5000);
expect(revokedMessages).toContainEqual(signerAdd);
expect(revokedMessages).toContainEqual(castAdd);
expect(revokedMessages).toContainEqual(reactionAdd);
expect(revokedMessages).toContainEqual(linkAdd);
});

test("revokes messages when SignerAdd is pruned", async () => {
Expand Down
3 changes: 2 additions & 1 deletion apps/hubble/src/storage/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,8 @@ class Engine {
fid: idRegistryEvent.fid,
signer: fromAddress,
});
const enqueueRevoke = await this._revokeSignerQueue.enqueueJob(payload);
const oneHourFromNow = Date.now() + 60 * 60 * 1000;
const enqueueRevoke = await this._revokeSignerQueue.enqueueJob(payload, oneHourFromNow);
if (enqueueRevoke.isErr()) {
log.error(
{ errCode: enqueueRevoke.error.errCode },
Expand Down
24 changes: 17 additions & 7 deletions apps/hubble/src/storage/jobs/revokeMessagesBySignerJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,29 @@ export class RevokeMessagesBySignerJobWorker {
private _db: RocksDB;
private _engine: Engine;
private _status: "working" | "waiting";
private _processJobs: () => Promise<void>;

constructor(queue: RevokeMessagesBySignerJobQueue, db: RocksDB, engine: Engine) {
this._queue = queue;
this._db = db;
this._engine = engine;
this._status = "waiting";

this.processJobs = this.processJobs.bind(this);
this._processJobs = async () => {
await this.processJobs();
};
}

start() {
this._queue.on("enqueueJob", this.processJobs);
this._queue.on("enqueueJob", this._processJobs);
}

stop() {
this._queue.off("enqueueJob", this.processJobs);
this._queue.off("enqueueJob", this._processJobs);
}

async processJobs(): HubAsyncResult<void> {
async processJobs(doBefore?: number): HubAsyncResult<void> {
const doBeforeTs = doBefore || Date.now() + 500; // Add a 500ms buffer for tests
if (this._status === "working") {
return err(new HubError("unavailable", "worker is already processing jobs"));
}
Expand All @@ -50,11 +54,11 @@ export class RevokeMessagesBySignerJobWorker {

this._status = "working";

let nextJob = await this._queue.popNextJob();
let nextJob = await this._queue.popNextJob(doBeforeTs);
while (nextJob.isOk()) {
await this.processJob(nextJob.value);

nextJob = await this._queue.popNextJob();
nextJob = await this._queue.popNextJob(doBeforeTs);
}

this._status = "waiting";
Expand Down Expand Up @@ -142,7 +146,13 @@ export class RevokeMessagesBySignerJobQueue extends TypedEmitter<JobQueueEvents>
return err(result.error);
}

this.emit("enqueueJob", key.value);
if (doAt) {
setTimeout(() => {
this.emit("enqueueJob", key.value);
}, doAt - Date.now() + 1000);
} else {
this.emit("enqueueJob", key.value);
}

return ok(key.value);
}
Expand Down

0 comments on commit c7ec4ca

Please sign in to comment.