Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Storage RPC #1211

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/young-lobsters-peel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@farcaster/hub-nodejs": minor
"@farcaster/hub-web": minor
"@farcaster/core": minor
"@farcaster/hubble": minor
---

Added storage limits RPC
2 changes: 1 addition & 1 deletion apps/hubble/src/eth/l2EventsProvider.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { FarcasterNetwork, OnChainEventType, StorageRegistryEventType } from "@farcaster/hub-nodejs";
import { FarcasterNetwork, OnChainEventType } from "@farcaster/hub-nodejs";
import { StorageRegistry } from "./abis.js";
import { jestRocksDB } from "../storage/db/jestUtils.js";
import Engine from "../storage/engine/index.js";
Expand Down
19 changes: 16 additions & 3 deletions apps/hubble/src/rpc/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
CastAddMessage,
CastId,
CastRemoveMessage,
DbStats,
FidsResponse,
getServer,
HubAsyncResult,
Expand All @@ -26,15 +27,15 @@ import {
SignerAddMessage,
SignerRemoveMessage,
status,
StorageLimitsResponse,
SyncIds,
DbStats,
SyncStatus,
SyncStatusResponse,
TrieNodeMetadataResponse,
TrieNodeSnapshotResponse,
UserDataAddMessage,
VerificationAddEthAddressMessage,
VerificationRemoveMessage,
SyncStatusResponse,
SyncStatus,
UserNameProof,
UsernameProofsResponse,
OnChainEventResponse,
Expand Down Expand Up @@ -982,6 +983,18 @@ export default class Server {
},
);
},
getCurrentStorageLimitsByFid: async (call, callback) => {
const request = call.request;
const storageLimitsResult = await this.engine?.getCurrentStorageLimitsByFid(request.fid);
storageLimitsResult?.match(
(storageLimits: StorageLimitsResponse) => {
callback(null, storageLimits);
},
(err: HubError) => {
callback(toServiceError(err));
},
);
},
getFids: async (call, callback) => {
const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown");
log.debug({ method: "getFids", req: call.request }, `RPC call from ${peer}`);
Expand Down
39 changes: 0 additions & 39 deletions apps/hubble/src/rpc/test/bulkService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ import {
IdRegistryEvent,
Message,
MessagesResponse,
OnChainEvent,
OnChainEventRequest,
OnChainEventResponse,
OnChainEventType,
ReactionAddMessage,
ReactionRemoveMessage,
SignerAddMessage,
Expand All @@ -28,7 +24,6 @@ import Server from "../server.js";
import { jestRocksDB } from "../../storage/db/jestUtils.js";
import Engine from "../../storage/engine/index.js";
import { MockHub } from "../../test/mocks.js";
import { Ok } from "neverthrow";

const db = jestRocksDB("protobufs.rpc.bulkService.test");
const network = FarcasterNetwork.TESTNET;
Expand Down Expand Up @@ -72,12 +67,6 @@ const assertMessagesMatchResult = (result: HubResult<MessagesResponse>, messages
expect(result._unsafeUnwrap().messages.map((m) => Message.toJSON(m))).toEqual(messages.map((m) => Message.toJSON(m)));
};

const assertEventsMatchResult = (result: HubResult<OnChainEventResponse>, events: OnChainEvent[]) => {
expect(result._unsafeUnwrap().events.map((e) => OnChainEvent.toJSON(e))).toEqual(
events.map((e) => OnChainEvent.toJSON(e)),
);
};

describe("getAllCastMessagesByFid", () => {
let castAdd: CastAddMessage;
let castRemove: CastRemoveMessage;
Expand Down Expand Up @@ -251,31 +240,3 @@ describe("getAllUserDataMessagesByFid", () => {
expect(result._unsafeUnwrap().messages.length).toEqual(0);
});
});

describe("getOnChainEvents", () => {
test("succeeds", async () => {
const idRegistryEvent = Factories.IdRegistryOnChainEvent.build({ fid });
const idRegistryEvent2 = Factories.IdRegistryOnChainEvent.build({ fid: fid + 1 });
const signerEvent = Factories.KeyRegistryOnChainEvent.build({ fid });
const signerEvent2 = Factories.KeyRegistryOnChainEvent.build({ blockNumber: signerEvent.blockNumber + 1, fid });
await expect(engine.mergeOnChainEvent(idRegistryEvent)).resolves.toBeInstanceOf(Ok);
await expect(engine.mergeOnChainEvent(idRegistryEvent2)).resolves.toBeInstanceOf(Ok);
await expect(engine.mergeOnChainEvent(signerEvent)).resolves.toBeInstanceOf(Ok);
await expect(engine.mergeOnChainEvent(signerEvent2)).resolves.toBeInstanceOf(Ok);

const idResult = await client.getOnChainEvents(
OnChainEventRequest.create({ eventType: OnChainEventType.EVENT_TYPE_ID_REGISTER, fid }),
);
assertEventsMatchResult(idResult, [idRegistryEvent]);

const signerResult = await client.getOnChainEvents(
OnChainEventRequest.create({ eventType: OnChainEventType.EVENT_TYPE_SIGNER, fid }),
);
assertEventsMatchResult(signerResult, [signerEvent, signerEvent2]);

const emptyResult = await client.getOnChainEvents(
OnChainEventRequest.create({ eventType: OnChainEventType.EVENT_TYPE_STORAGE_RENT, fid }),
);
expect(emptyResult._unsafeUnwrap().events.length).toEqual(0);
});
});
163 changes: 163 additions & 0 deletions apps/hubble/src/rpc/test/server.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import { jestRocksDB } from "../../storage/db/jestUtils.js";
import {
Factories,
FarcasterNetwork,
FidRequest,
getInsecureHubRpcClient,
HubResult,
HubRpcClient,
IdRegistryEvent,
OnChainEvent,
OnChainEventRequest,
OnChainEventResponse,
OnChainEventType,
SignerAddMessage,
StorageLimit,
StorageLimitsResponse,
StoreType,
} from "@farcaster/hub-nodejs";
import Engine from "../../storage/engine/index.js";
import { MockHub } from "../../test/mocks.js";
import Server from "../server.js";
import SyncEngine from "../../network/sync/syncEngine.js";
import { Ok } from "neverthrow";
import { CAST_PRUNE_SIZE_LIMIT_DEFAULT } from "../../storage/stores/castStore.js";
import { REACTION_PRUNE_SIZE_LIMIT_DEFAULT } from "../../storage/stores/reactionStore.js";
import { LINK_PRUNE_SIZE_LIMIT_DEFAULT } from "../../storage/stores/linkStore.js";
import { USER_DATA_PRUNE_SIZE_LIMIT_DEFAULT } from "../../storage/stores/userDataStore.js";
import { VERIFICATION_PRUNE_SIZE_LIMIT_DEFAULT } from "../../storage/stores/verificationStore.js";

const db = jestRocksDB("protobufs.rpc.server.test");
const network = FarcasterNetwork.TESTNET;
const engine = new Engine(db, network);
const hub = new MockHub(db, engine);

let server: Server;
let client: HubRpcClient;

beforeAll(async () => {
server = new Server(hub, engine, new SyncEngine(hub, db));
const port = await server.start();
client = getInsecureHubRpcClient(`127.0.0.1:${port}`);
});

afterAll(async () => {
client.close();
await server.stop();
await engine.stop();
});

const fid = Factories.Fid.build();
const signer = Factories.Ed25519Signer.build();
const custodySigner = Factories.Eip712Signer.build();

let custodyEvent: IdRegistryEvent;
let signerAdd: SignerAddMessage;

beforeAll(async () => {
const signerKey = (await signer.getSignerKey())._unsafeUnwrap();
const custodySignerKey = (await custodySigner.getSignerKey())._unsafeUnwrap();
custodyEvent = Factories.IdRegistryEvent.build({ fid, to: custodySignerKey });

signerAdd = await Factories.SignerAddMessage.create(
{ data: { fid, network, signerAddBody: { signer: signerKey } } },
{ transient: { signer: custodySigner } },
);
});

describe("server rpc tests", () => {
beforeEach(async () => {
await engine.mergeIdRegistryEvent(custodyEvent);
await engine.mergeMessage(signerAdd);
});

describe("getOnChainEvents", () => {
const assertEventsMatchResult = (result: HubResult<OnChainEventResponse>, events: OnChainEvent[]) => {
expect(result._unsafeUnwrap().events.map((e) => OnChainEvent.toJSON(e))).toEqual(
events.map((e) => OnChainEvent.toJSON(e)),
);
};

test("succeeds", async () => {
const idRegistryEvent = Factories.IdRegistryOnChainEvent.build({ fid });
const idRegistryEvent2 = Factories.IdRegistryOnChainEvent.build({ fid: fid + 1 });
const signerEvent = Factories.SignerOnChainEvent.build({ fid });
const signerEvent2 = Factories.SignerOnChainEvent.build({ blockNumber: signerEvent.blockNumber + 1, fid });
await expect(engine.mergeOnChainEvent(idRegistryEvent)).resolves.toBeInstanceOf(Ok);
await expect(engine.mergeOnChainEvent(idRegistryEvent2)).resolves.toBeInstanceOf(Ok);
await expect(engine.mergeOnChainEvent(signerEvent)).resolves.toBeInstanceOf(Ok);
await expect(engine.mergeOnChainEvent(signerEvent2)).resolves.toBeInstanceOf(Ok);

const idResult = await client.getOnChainEvents(
OnChainEventRequest.create({ eventType: OnChainEventType.EVENT_TYPE_ID_REGISTER, fid }),
);
assertEventsMatchResult(idResult, [idRegistryEvent]);

const signerResult = await client.getOnChainEvents(
OnChainEventRequest.create({ eventType: OnChainEventType.EVENT_TYPE_SIGNER, fid }),
);
assertEventsMatchResult(signerResult, [signerEvent, signerEvent2]);

const emptyResult = await client.getOnChainEvents(
OnChainEventRequest.create({ eventType: OnChainEventType.EVENT_TYPE_STORAGE_RENT, fid }),
);
expect(emptyResult._unsafeUnwrap().events.length).toEqual(0);
});
});

describe("getCurrentStorageLimitsByFid", () => {
test("succeeds for user with no storage", async () => {
const result = await client.getCurrentStorageLimitsByFid(FidRequest.create({ fid }));
expect(result._unsafeUnwrap().limits.map((l) => l.limit)).toEqual([0, 0, 0, 0, 0]);
});

test("succeeds for user with storage", async () => {
const rentEvent = Factories.StorageRentOnChainEvent.build({
fid,
storageRentEventBody: Factories.StorageRentEventBody.build({ units: 1 }),
});
await engine.mergeOnChainEvent(rentEvent);
const result = await client.getCurrentStorageLimitsByFid(FidRequest.create({ fid }));
const storageLimits = StorageLimitsResponse.fromJSON(result._unsafeUnwrap()).limits;
expect(storageLimits).toContainEqual(
StorageLimit.create({ limit: CAST_PRUNE_SIZE_LIMIT_DEFAULT, storeType: StoreType.CASTS }),
);
expect(storageLimits).toContainEqual(
StorageLimit.create({ limit: REACTION_PRUNE_SIZE_LIMIT_DEFAULT, storeType: StoreType.REACTIONS }),
);
expect(storageLimits).toContainEqual(
StorageLimit.create({ limit: LINK_PRUNE_SIZE_LIMIT_DEFAULT, storeType: StoreType.LINKS }),
);
expect(storageLimits).toContainEqual(
StorageLimit.create({ limit: USER_DATA_PRUNE_SIZE_LIMIT_DEFAULT, storeType: StoreType.USER_DATA }),
);
expect(storageLimits).toContainEqual(
StorageLimit.create({ limit: VERIFICATION_PRUNE_SIZE_LIMIT_DEFAULT, storeType: StoreType.VERIFICATIONS }),
);

// add 2 more units
const rentEvent2 = Factories.StorageRentOnChainEvent.build({
fid,
storageRentEventBody: Factories.StorageRentEventBody.build({ units: 2 }),
});
await engine.mergeOnChainEvent(rentEvent2);
const result2 = await client.getCurrentStorageLimitsByFid(FidRequest.create({ fid }));
const newLimits = StorageLimitsResponse.fromJSON(result2._unsafeUnwrap()).limits;
expect(newLimits).toContainEqual(
StorageLimit.create({ limit: CAST_PRUNE_SIZE_LIMIT_DEFAULT * 3, storeType: StoreType.CASTS }),
);
expect(newLimits).toContainEqual(
StorageLimit.create({ limit: REACTION_PRUNE_SIZE_LIMIT_DEFAULT * 3, storeType: StoreType.REACTIONS }),
);
expect(newLimits).toContainEqual(
StorageLimit.create({ limit: LINK_PRUNE_SIZE_LIMIT_DEFAULT * 3, storeType: StoreType.LINKS }),
);
expect(newLimits).toContainEqual(
StorageLimit.create({ limit: USER_DATA_PRUNE_SIZE_LIMIT_DEFAULT * 3, storeType: StoreType.USER_DATA }),
);
expect(newLimits).toContainEqual(
StorageLimit.create({ limit: VERIFICATION_PRUNE_SIZE_LIMIT_DEFAULT * 3, storeType: StoreType.VERIFICATIONS }),
);
});
});
});
2 changes: 1 addition & 1 deletion apps/hubble/src/rpc/test/signerService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ beforeAll(async () => {
{ data: { fid, network, signerAddBody: { signer: signerKey } } },
{ transient: { signer: custodySigner } },
);
onChainSigner = Factories.KeyRegistryOnChainEvent.build({
onChainSigner = Factories.SignerOnChainEvent.build({
fid: fid,
signerEventBody: Factories.SignerEventBody.build({
key: signerKey,
Expand Down
4 changes: 2 additions & 2 deletions apps/hubble/src/storage/db/onChainEvent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ let onChainEvent: OnChainEvent;
let anotherEvent: OnChainEvent;

beforeEach(async () => {
onChainEvent = Factories.KeyRegistryOnChainEvent.build();
onChainEvent = Factories.SignerOnChainEvent.build();
anotherEvent = Factories.IdRegistryOnChainEvent.build();
let txn = db.transaction();
txn = putOnChainEventTransaction(txn, onChainEvent);
Expand All @@ -37,7 +37,7 @@ describe("makeOnChainEventPrimaryKey", () => {

describe("putOnChainEvent", () => {
test("succeeds", async () => {
const onChainEvent = Factories.KeyRegistryOnChainEvent.build();
const onChainEvent = Factories.SignerOnChainEvent.build();
const txn = db.transaction();
putOnChainEventTransaction(txn, onChainEvent);
await db.commit(txn);
Expand Down
10 changes: 5 additions & 5 deletions apps/hubble/src/storage/engine/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ describe("mergeMessage", () => {
test("succeeds with l2 id registry event and on chain signer", async () => {
const idRegistryOnChainEvent = Factories.IdRegistryOnChainEvent.build({ fid });
const signerEventBody = Factories.SignerEventBody.build({ key: signerAdd.data.signerAddBody.signer });
const onChainSignerEvent = Factories.KeyRegistryOnChainEvent.build({ fid, signerEventBody });
const onChainSignerEvent = Factories.SignerOnChainEvent.build({ fid, signerEventBody });
await migratedEngine.mergeOnChainEvent(idRegistryOnChainEvent);
await migratedEngine.mergeOnChainEvent(onChainSignerEvent);

Expand All @@ -770,13 +770,13 @@ describe("mergeMessage", () => {
test("fails if signer is removed on chain", async () => {
const idRegistryOnChainEvent = Factories.IdRegistryOnChainEvent.build({ fid });
const signerEventBody = Factories.SignerEventBody.build({ key: signerAdd.data.signerAddBody.signer });
const onChainSignerEvent = Factories.KeyRegistryOnChainEvent.build({ fid, signerEventBody });
const onChainSignerEvent = Factories.SignerOnChainEvent.build({ fid, signerEventBody });

const signerRemovalBody = Factories.SignerEventBody.build({
eventType: SignerEventType.REMOVE,
key: signerAdd.data.signerAddBody.signer,
});
const signerRemovalEvent = Factories.KeyRegistryOnChainEvent.build({ fid, signerEventBody: signerRemovalBody });
const signerRemovalEvent = Factories.SignerOnChainEvent.build({ fid, signerEventBody: signerRemovalBody });

await migratedEngine.mergeOnChainEvent(idRegistryOnChainEvent);
await migratedEngine.mergeOnChainEvent(onChainSignerEvent);
Expand Down Expand Up @@ -982,7 +982,7 @@ describe("with listeners and workers", () => {
test("revokes messages when onchain signer is removed", async () => {
const idRegistryOnChainEvent = Factories.IdRegistryOnChainEvent.build({ fid });
const signerEventBody = Factories.SignerEventBody.build({ key: signerAdd.data.signerAddBody.signer });
const onChainSignerEvent = Factories.KeyRegistryOnChainEvent.build({ fid, signerEventBody });
const onChainSignerEvent = Factories.SignerOnChainEvent.build({ fid, signerEventBody });

await liveEngine.mergeOnChainEvent(idRegistryOnChainEvent);
await liveEngine.mergeOnChainEvent(onChainSignerEvent);
Expand All @@ -992,7 +992,7 @@ describe("with listeners and workers", () => {
eventType: SignerEventType.REMOVE,
key: signerEventBody.key,
});
const signerRemovalEvent = Factories.KeyRegistryOnChainEvent.build({ fid, signerEventBody: signerRemovalBody });
const signerRemovalEvent = Factories.SignerOnChainEvent.build({ fid, signerEventBody: signerRemovalBody });
await liveEngine.mergeOnChainEvent(signerRemovalEvent);

expect(revokedMessages).toEqual([]);
Expand Down
Loading