Skip to content

Commit

Permalink
feat: deprecate time based pruning (#1227)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanjayprabhu authored Aug 7, 2023
1 parent 9ae366b commit 2df3849
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 12 deletions.
5 changes: 5 additions & 0 deletions .changeset/tidy-needles-greet.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

feat: Deprecate time based pruning in sets
36 changes: 29 additions & 7 deletions apps/hubble/src/storage/jobs/pruneMessagesJob.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import { Ed25519Signer, Factories, FarcasterNetwork, Message, PruneMessageHubEvent } from "@farcaster/hub-nodejs";
import {
Ed25519Signer,
Factories,
FarcasterNetwork,
Message,
MessageType,
PruneMessageHubEvent,
} from "@farcaster/hub-nodejs";
import { jestRocksDB } from "../db/jestUtils.js";
import Engine from "../engine/index.js";
import { seedSigner } from "../engine/seed.js";
Expand All @@ -19,7 +26,13 @@ const seedMessagesFromTimestamp = async (engine: Engine, fid: number, signer: Ed
{ transient: { signer } },
);
const linkAdd = await Factories.LinkAddMessage.create({ data: { fid, timestamp } }, { transient: { signer } });
return engine.mergeMessages([castAdd, reactionAdd, linkAdd]);
const proofs = await Factories.VerificationAddEthAddressMessage.createList(
51,
{ data: { fid, timestamp } },
{ transient: { signer } },
);

return engine.mergeMessages([castAdd, reactionAdd, linkAdd, ...proofs]);
};

let prunedMessages: Message[] = [];
Expand Down Expand Up @@ -77,10 +90,14 @@ describe("doJobs", () => {

const links = await engine.getLinksByFid(fid);
expect(links._unsafeUnwrap().messages.length).toEqual(1);

const verifications = await engine.getVerificationsByFid(fid);
expect(verifications._unsafeUnwrap().messages.length).toEqual(51);
}

const nowOrig = Date.now;
Date.now = () => FARCASTER_EPOCH + (currentTime + 60 * 60 * 24 * 365 + 1) * 1000; // advance 1 year and 1 second
// advance 1 month to get beyond the PRUNE_STOP_TIMESTAMP
Date.now = () => FARCASTER_EPOCH + (currentTime + 60 * 60 * 24 * 30) * 1000;
try {
const result = await scheduler.doJobs();
expect(result._unsafeUnwrap()).toEqual(undefined);
Expand All @@ -89,18 +106,23 @@ describe("doJobs", () => {
}

for (const fid of [fid1, fid2]) {
// These messages are not pruned (under size limit)
const casts = await engine.getCastsByFid(fid);
expect(casts._unsafeUnwrap().messages).toEqual([]);
expect(casts._unsafeUnwrap().messages.length).toEqual(1);

const reactions = await engine.getReactionsByFid(fid);
expect(reactions._unsafeUnwrap().messages).toEqual([]);
expect(reactions._unsafeUnwrap().messages.length).toEqual(1);

// These don't prune based on time
const links = await engine.getLinksByFid(fid);
expect(links._unsafeUnwrap().messages.length).toEqual(1);

// These are pruned based on size
const verifications = await engine.getVerificationsByFid(fid);
expect(verifications._unsafeUnwrap().messages.length).toEqual(50);
}

expect(prunedMessages.length).toEqual(4);
expect(prunedMessages.length).toEqual(2);
expect(prunedMessages.filter((m) => m.data?.type !== MessageType.VERIFICATION_ADD_ETH_ADDRESS)).toEqual([]);
},
15 * 1000,
);
Expand Down
18 changes: 13 additions & 5 deletions apps/hubble/src/storage/stores/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ const deepPartialEquals = <T>(partial: DeepPartial<T>, whole: T) => {
return true;
};

// Timestamp after which we stop pruning based on time limit. This is in preparation for mainnet migration,
// since there is no time based pruning with storage based limits.
const PRUNE_STOP_TIMESTAMP = new Date("2023-08-22").getTime();

export abstract class Store<TAdd extends Message, TRemove extends Message> {
protected _db: RocksDB;
protected _eventHandler: StoreEventHandler;
Expand Down Expand Up @@ -195,8 +199,8 @@ export abstract class Store<TAdd extends Message, TRemove extends Message> {
// rome-ignore lint/suspicious/noExplicitAny: legacy code, avoid using ignore for new code
message as any,
this._postfix,
this._pruneSizeLimit,
this._pruneTimeLimit,
this.pruneSizeLimit,
this.pruneTimeLimit,
);
if (prunableResult.isErr()) {
throw prunableResult.error;
Expand Down Expand Up @@ -264,8 +268,7 @@ export abstract class Store<TAdd extends Message, TRemove extends Message> {
}

// Calculate the timestamp cut-off to prune
const timestampToPrune =
this._pruneTimeLimit === undefined ? undefined : farcasterTime.value - this._pruneTimeLimit;
const timestampToPrune = this.pruneTimeLimit === undefined ? undefined : farcasterTime.value - this.pruneTimeLimit;

// Go over all messages for this fid and postfix
await this._db.forEachIteratorByPrefix(
Expand All @@ -289,7 +292,7 @@ export abstract class Store<TAdd extends Message, TRemove extends Message> {
// Since the TS hash has the first 4 bytes be the timestamp (bigendian), we can use it to prune
// since the iteration will be implicitly sorted by timestamp
if (
count.value <= this._pruneSizeLimit * units.value &&
count.value <= this.pruneSizeLimit * units.value &&
(timestampToPrune === undefined || (message.value.data && message.value.data.timestamp >= timestampToPrune))
) {
return true; // Nothing left to prune
Expand Down Expand Up @@ -331,6 +334,11 @@ export abstract class Store<TAdd extends Message, TRemove extends Message> {
}

get pruneTimeLimit(): number | undefined {
// We're deprecating time based pruning, so pick a date in the future that all the hubs can use as a common point
// of reference to stop time based pruning. This is so we don't affect sync.
if (Date.now() > PRUNE_STOP_TIMESTAMP) {
return undefined;
}
return this._pruneTimeLimit;
}

Expand Down

0 comments on commit 2df3849

Please sign in to comment.