Skip to content

Commit

Permalink
feat: Add more information in the limits response (#1569)
Browse files Browse the repository at this point in the history
* feat: Add more information in the limits response

* changeset

* Fix flaky test
  • Loading branch information
sanjayprabhu authored Dec 4, 2023
1 parent 3784188 commit 704e077
Show file tree
Hide file tree
Showing 12 changed files with 560 additions and 138 deletions.
8 changes: 8 additions & 0 deletions .changeset/rotten-files-poke.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@farcaster/hub-nodejs": patch
"@farcaster/hub-web": patch
"@farcaster/core": patch
"@farcaster/hubble": patch
---

feat: Add usage information to getCurrentStorageLimitsByFid rpc call
88 changes: 74 additions & 14 deletions apps/hubble/src/rpc/test/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { MockHub } from "../../test/mocks.js";
import Server from "../server.js";
import SyncEngine from "../../network/sync/syncEngine.js";
import { Ok } from "neverthrow";
import { sleep } from "../../utils/crypto.js";

const db = jestRocksDB("protobufs.rpc.server.test");
const network = FarcasterNetwork.TESTNET;
Expand All @@ -44,15 +45,16 @@ afterAll(async () => {
await engine.stop();
});

const fid = Factories.Fid.build();
let fid: number;
const signer = Factories.Ed25519Signer.build();
const custodySigner = Factories.Eip712Signer.build();

let custodyEvent: OnChainEvent;
let signerEvent: OnChainEvent;
let storageEvent: OnChainEvent;

beforeAll(async () => {
beforeEach(async () => {
fid = Factories.Fid.build();
const signerKey = (await signer.getSignerKey())._unsafeUnwrap();
const custodySignerKey = (await custodySigner.getSignerKey())._unsafeUnwrap();
custodyEvent = Factories.IdRegistryOnChainEvent.build({ fid }, { transient: { to: custodySignerKey } });
Expand Down Expand Up @@ -105,34 +107,92 @@ describe("server rpc tests", () => {
test("succeeds for user with no storage", async () => {
const result = await client.getCurrentStorageLimitsByFid(FidRequest.create({ fid: fid - 1 }));
// zero storage
expect(result._unsafeUnwrap().units).toEqual(0);
expect(result._unsafeUnwrap().limits.map((l) => l.limit)).toEqual([0, 0, 0, 0, 0, 0]);
});

test("succeeds for user with storage", async () => {
// Add some data, so we can test usage responses
const verification = await Factories.VerificationAddEthAddressMessage.create(
{ data: { fid } },
{ transient: { signer } },
);
const olderCast = await Factories.CastAddMessage.create({ data: { fid } }, { transient: { signer } });
const newerCast = await Factories.CastAddMessage.create(
{ data: { fid, timestamp: olderCast.data.timestamp + 10 } },
{ transient: { signer } },
);

expect(await engine.mergeMessage(verification)).toBeInstanceOf(Ok);
expect(await engine.mergeMessage(olderCast)).toBeInstanceOf(Ok);
expect(await engine.mergeMessage(newerCast)).toBeInstanceOf(Ok);

await sleep(100); // Wait for events to be processed
const result = await client.getCurrentStorageLimitsByFid(FidRequest.create({ fid }));
const storageLimits = StorageLimitsResponse.fromJSON(result._unsafeUnwrap()).limits;

expect((await client.getOnChainEvents({ fid: fid, eventType: 4 }))._unsafeUnwrap().events.length).toEqual(1);

const limitsResponse = StorageLimitsResponse.fromJSON(result._unsafeUnwrap());
expect(limitsResponse.units).toEqual(1);
const storageLimits = limitsResponse.limits;
expect(storageLimits).toContainEqual(
StorageLimit.create({ limit: getDefaultStoreLimit(StoreType.CASTS), storeType: StoreType.CASTS }),
StorageLimit.create({
limit: getDefaultStoreLimit(StoreType.CASTS),
storeType: StoreType.CASTS,
name: "CASTS",
used: 2,
earliestHash: olderCast.hash,
earliestTimestamp: olderCast.data.timestamp,
}),
);
expect(storageLimits).toContainEqual(
StorageLimit.create({ limit: getDefaultStoreLimit(StoreType.REACTIONS), storeType: StoreType.REACTIONS }),
StorageLimit.create({
limit: getDefaultStoreLimit(StoreType.REACTIONS),
storeType: StoreType.REACTIONS,
name: "REACTIONS",
used: 0,
earliestTimestamp: 0,
earliestHash: new Uint8Array(),
}),
);
expect(storageLimits).toContainEqual(
StorageLimit.create({ limit: getDefaultStoreLimit(StoreType.LINKS), storeType: StoreType.LINKS }),
StorageLimit.create({
limit: getDefaultStoreLimit(StoreType.LINKS),
storeType: StoreType.LINKS,
name: "LINKS",
used: 0,
earliestHash: new Uint8Array(),
earliestTimestamp: 0,
}),
);
expect(storageLimits).toContainEqual(
StorageLimit.create({ limit: getDefaultStoreLimit(StoreType.USER_DATA), storeType: StoreType.USER_DATA }),
StorageLimit.create({
limit: getDefaultStoreLimit(StoreType.USER_DATA),
storeType: StoreType.USER_DATA,
name: "USER_DATA",
used: 0,
earliestHash: new Uint8Array(),
earliestTimestamp: 0,
}),
);
expect(storageLimits).toContainEqual(
StorageLimit.create({
limit: getDefaultStoreLimit(StoreType.VERIFICATIONS),
storeType: StoreType.VERIFICATIONS,
name: "VERIFICATIONS",
used: 1,
earliestHash: verification.hash,
earliestTimestamp: verification.data.timestamp,
}),
);
expect(storageLimits).toContainEqual(
StorageLimit.create({
limit: getDefaultStoreLimit(StoreType.USERNAME_PROOFS),
storeType: StoreType.USERNAME_PROOFS,
name: "USERNAME_PROOFS",
used: 0,
earliestHash: new Uint8Array(),
earliestTimestamp: 0,
}),
);

Expand All @@ -143,13 +203,13 @@ describe("server rpc tests", () => {
});
await engine.mergeOnChainEvent(rentEvent2);
const result2 = await client.getCurrentStorageLimitsByFid(FidRequest.create({ fid }));
const newLimits = StorageLimitsResponse.fromJSON(result2._unsafeUnwrap()).limits;
expect(newLimits).toContainEqual(StorageLimit.create({ limit: 5000 * 3, storeType: StoreType.CASTS }));
expect(newLimits).toContainEqual(StorageLimit.create({ limit: 2500 * 3, storeType: StoreType.REACTIONS }));
expect(newLimits).toContainEqual(StorageLimit.create({ limit: 2500 * 3, storeType: StoreType.LINKS }));
expect(newLimits).toContainEqual(StorageLimit.create({ limit: 50 * 3, storeType: StoreType.USER_DATA }));
expect(newLimits).toContainEqual(StorageLimit.create({ limit: 25 * 3, storeType: StoreType.VERIFICATIONS }));
expect(newLimits).toContainEqual(StorageLimit.create({ limit: 5 * 3, storeType: StoreType.USERNAME_PROOFS }));
const limitsResponse2 = StorageLimitsResponse.fromJSON(result2._unsafeUnwrap());
expect(limitsResponse2.units).toEqual(3);
const newLimits = limitsResponse2.limits;
expect(newLimits.length).toEqual(6);
for (const limit of newLimits) {
expect(limit.limit).toEqual(getDefaultStoreLimit(limit.storeType) * 3);
}
});
});
});
8 changes: 8 additions & 0 deletions apps/hubble/src/storage/db/message.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
makeUserKey,
putMessage,
typeToSetPostfix,
unpackTsHash,
} from "./message.js";

const db = jestRocksDB("storage.db.message.test");
Expand Down Expand Up @@ -80,6 +81,13 @@ describe("makeTsHash", () => {
const tsHash2 = makeTsHash(castMessage.data.timestamp, new Uint8Array([...castMessage.hash, 1]));
expect(bytesCompare(tsHash1._unsafeUnwrap(), tsHash2._unsafeUnwrap())).toEqual(-1);
});

test("unpacks tsHash", () => {
const tsHash = makeTsHash(castMessage.data.timestamp, castMessage.hash);
const [timestamp, hash] = unpackTsHash(tsHash._unsafeUnwrap())._unsafeUnwrap();
expect(timestamp).toEqual(castMessage.data.timestamp);
expect(hash).toEqual(castMessage.hash);
});
});

describe("putMessage", () => {
Expand Down
19 changes: 18 additions & 1 deletion apps/hubble/src/storage/db/message.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import { bytesIncrement, CastId, HubError, HubResult, Message, MessageData, MessageType } from "@farcaster/hub-nodejs";
import { err, ok, Result, ResultAsync } from "neverthrow";
import RocksDB, { Iterator, Transaction } from "./rocksdb.js";
import { FID_BYTES, RootPrefix, TRUE_VALUE, UserMessagePostfix, UserMessagePostfixMax, UserPostfix } from "./types.js";
import {
FID_BYTES,
RootPrefix,
TRUE_VALUE,
TSHASH_LENGTH,
UserMessagePostfix,
UserMessagePostfixMax,
UserPostfix,
} from "./types.js";
import { MessagesPage, PAGE_SIZE_MAX, PageOptions } from "../stores/types.js";

export const makeFidKey = (fid: number): Buffer => {
Expand Down Expand Up @@ -74,6 +82,15 @@ export const makeTsHash = (timestamp: number, hash: Uint8Array): HubResult<Uint8
return ok(new Uint8Array(buffer));
};

export const unpackTsHash = (tsHash: Uint8Array): HubResult<[number, Uint8Array]> => {
if (tsHash.length !== TSHASH_LENGTH) {
return err(new HubError("bad_request.invalid_param", "invalid tsHash length"));
}
const timestamp = tsHash.slice(0, 4);
const hash = tsHash.slice(4);
return ok([Buffer.from(timestamp).readUInt32BE(0), hash]);
};

export const typeToSetPostfix = (type: MessageType): UserMessagePostfix => {
if (type === MessageType.CAST_ADD || type === MessageType.CAST_REMOVE) {
return UserPostfix.CastMessage;
Expand Down
24 changes: 23 additions & 1 deletion apps/hubble/src/storage/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import {
RevokeMessagesBySignerJobPayload,
SignerEventType,
SignerOnChainEvent,
StorageLimit,
StorageLimitsResponse,
StoreType,
UserDataAddMessage,
UserDataType,
UserNameProof,
Expand Down Expand Up @@ -717,8 +719,28 @@ class Engine extends TypedEmitter<EngineEvents> {
return err(units.error);
}

const storeLimits = getStoreLimits(units.value);
const limits: StorageLimit[] = [];
for (const limit of storeLimits) {
const usageResult = await this.eventHandler.getUsage(fid, limit.storeType);
if (usageResult.isErr()) {
log.warn({ err: usageResult.error }, `error getting usage for storage limit for ${fid} and ${limit.storeType}`);
continue;
}
limits.push(
StorageLimit.create({
limit: limit.limit,
name: StoreType[limit.storeType],
used: usageResult.value.used,
earliestTimestamp: usageResult.value.earliestTimestamp,
earliestHash: usageResult.value.earliestHash,
storeType: limit.storeType,
}),
);
}
return ok({
limits: getStoreLimits(units.value),
units: units.value,
limits: limits,
});
}

Expand Down
35 changes: 33 additions & 2 deletions apps/hubble/src/storage/stores/storeEventHandler.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { CastAddMessage, Factories, FARCASTER_EPOCH, HubEvent, HubEventType } from "@farcaster/hub-nodejs";
import { CastAddMessage, Factories, FARCASTER_EPOCH, HubEvent, HubEventType, StoreType } from "@farcaster/hub-nodejs";
import { ok, Result } from "neverthrow";
import { jestRocksDB } from "../db/jestUtils.js";
import { getMessage, makeTsHash, putMessage, putMessageTransaction } from "../db/message.js";
import { UserPostfix } from "../db/types.js";
import StoreEventHandler, { HubEventArgs, HubEventIdGenerator } from "./storeEventHandler.js";
import { sleep } from "../../utils/crypto.js";
import { getFarcasterTime, extractEventTimestamp } from "@farcaster/core";
import { extractEventTimestamp, getFarcasterTime } from "@farcaster/core";
import OnChainEventStore from "./onChainEventStore.js";
import CastStore from "./castStore.js";

const db = jestRocksDB("stores.storeEventHandler.test");
const eventHandler = new StoreEventHandler(db);
Expand Down Expand Up @@ -206,3 +207,33 @@ describe("getCurrentStorageUnitsForFid", () => {
expect(await eventHandler.getCurrentStorageUnitsForFid(fid)).toEqual(ok(storageEvent.storageRentEventBody.units));
});
});

describe("getUsage", () => {
test("returns 0 if no messages", async () => {
const usage = await eventHandler.getUsage(message.data.fid, StoreType.CASTS);
expect(usage.isOk()).toBeTruthy();
expect(usage._unsafeUnwrap().used).toEqual(0);
expect(usage._unsafeUnwrap().earliestTimestamp).toEqual(0);
expect(usage._unsafeUnwrap().earliestHash.length).toEqual(0);
});

test("returns actual usage based on messages", async () => {
const fid = Factories.Fid.build();
const storageEvent = Factories.StorageRentOnChainEvent.build({ fid });
const onChainEventStore = new OnChainEventStore(db, eventHandler);
await onChainEventStore.mergeOnChainEvent(storageEvent);

const castStore = new CastStore(db, eventHandler);
const newCast = await Factories.CastAddMessage.create({ data: { fid } });
const olderCast = await Factories.CastAddMessage.create({
data: { fid, timestamp: newCast.data.timestamp - 10 },
});
await castStore.merge(newCast);
await castStore.merge(olderCast);

const usage = (await eventHandler.getUsage(newCast.data.fid, StoreType.CASTS))._unsafeUnwrap();
expect(usage.used).toEqual(2);
expect(usage.earliestTimestamp).toEqual(olderCast.data.timestamp);
expect(usage.earliestHash).toEqual(olderCast.hash);
});
});
Loading

0 comments on commit 704e077

Please sign in to comment.