Skip to content

Commit

Permalink
feat: Storage RPC (#1211)
Browse files Browse the repository at this point in the history
* Storage RPC

* added changeset

* updated docs

* Add tests and minor cleanup

* Rename signer on chain event factory for clarity

---------

Co-authored-by: Cassie Heart <cassie@merklemanufactory.com>
  • Loading branch information
sanjayprabhu and CassOnMars authored Aug 4, 2023
1 parent 67e9466 commit 86149d3
Show file tree
Hide file tree
Showing 31 changed files with 966 additions and 1,569 deletions.
8 changes: 8 additions & 0 deletions .changeset/young-lobsters-peel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@farcaster/hub-nodejs": minor
"@farcaster/hub-web": minor
"@farcaster/core": minor
"@farcaster/hubble": minor
---

Added storage limits RPC
2 changes: 1 addition & 1 deletion apps/hubble/src/eth/l2EventsProvider.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { FarcasterNetwork, OnChainEventType, StorageRegistryEventType } from "@farcaster/hub-nodejs";
import { FarcasterNetwork, OnChainEventType } from "@farcaster/hub-nodejs";
import { StorageRegistry } from "./abis.js";
import { jestRocksDB } from "../storage/db/jestUtils.js";
import Engine from "../storage/engine/index.js";
Expand Down
19 changes: 16 additions & 3 deletions apps/hubble/src/rpc/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
CastAddMessage,
CastId,
CastRemoveMessage,
DbStats,
FidsResponse,
getServer,
HubAsyncResult,
Expand All @@ -26,15 +27,15 @@ import {
SignerAddMessage,
SignerRemoveMessage,
status,
StorageLimitsResponse,
SyncIds,
DbStats,
SyncStatus,
SyncStatusResponse,
TrieNodeMetadataResponse,
TrieNodeSnapshotResponse,
UserDataAddMessage,
VerificationAddEthAddressMessage,
VerificationRemoveMessage,
SyncStatusResponse,
SyncStatus,
UserNameProof,
UsernameProofsResponse,
OnChainEventResponse,
Expand Down Expand Up @@ -982,6 +983,18 @@ export default class Server {
},
);
},
getCurrentStorageLimitsByFid: async (call, callback) => {
const request = call.request;
const storageLimitsResult = await this.engine?.getCurrentStorageLimitsByFid(request.fid);
storageLimitsResult?.match(
(storageLimits: StorageLimitsResponse) => {
callback(null, storageLimits);
},
(err: HubError) => {
callback(toServiceError(err));
},
);
},
getFids: async (call, callback) => {
const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown");
log.debug({ method: "getFids", req: call.request }, `RPC call from ${peer}`);
Expand Down
39 changes: 0 additions & 39 deletions apps/hubble/src/rpc/test/bulkService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ import {
IdRegistryEvent,
Message,
MessagesResponse,
OnChainEvent,
OnChainEventRequest,
OnChainEventResponse,
OnChainEventType,
ReactionAddMessage,
ReactionRemoveMessage,
SignerAddMessage,
Expand All @@ -28,7 +24,6 @@ import Server from "../server.js";
import { jestRocksDB } from "../../storage/db/jestUtils.js";
import Engine from "../../storage/engine/index.js";
import { MockHub } from "../../test/mocks.js";
import { Ok } from "neverthrow";

const db = jestRocksDB("protobufs.rpc.bulkService.test");
const network = FarcasterNetwork.TESTNET;
Expand Down Expand Up @@ -72,12 +67,6 @@ const assertMessagesMatchResult = (result: HubResult<MessagesResponse>, messages
expect(result._unsafeUnwrap().messages.map((m) => Message.toJSON(m))).toEqual(messages.map((m) => Message.toJSON(m)));
};

const assertEventsMatchResult = (result: HubResult<OnChainEventResponse>, events: OnChainEvent[]) => {
expect(result._unsafeUnwrap().events.map((e) => OnChainEvent.toJSON(e))).toEqual(
events.map((e) => OnChainEvent.toJSON(e)),
);
};

describe("getAllCastMessagesByFid", () => {
let castAdd: CastAddMessage;
let castRemove: CastRemoveMessage;
Expand Down Expand Up @@ -251,31 +240,3 @@ describe("getAllUserDataMessagesByFid", () => {
expect(result._unsafeUnwrap().messages.length).toEqual(0);
});
});

describe("getOnChainEvents", () => {
test("succeeds", async () => {
const idRegistryEvent = Factories.IdRegistryOnChainEvent.build({ fid });
const idRegistryEvent2 = Factories.IdRegistryOnChainEvent.build({ fid: fid + 1 });
const signerEvent = Factories.KeyRegistryOnChainEvent.build({ fid });
const signerEvent2 = Factories.KeyRegistryOnChainEvent.build({ blockNumber: signerEvent.blockNumber + 1, fid });
await expect(engine.mergeOnChainEvent(idRegistryEvent)).resolves.toBeInstanceOf(Ok);
await expect(engine.mergeOnChainEvent(idRegistryEvent2)).resolves.toBeInstanceOf(Ok);
await expect(engine.mergeOnChainEvent(signerEvent)).resolves.toBeInstanceOf(Ok);
await expect(engine.mergeOnChainEvent(signerEvent2)).resolves.toBeInstanceOf(Ok);

const idResult = await client.getOnChainEvents(
OnChainEventRequest.create({ eventType: OnChainEventType.EVENT_TYPE_ID_REGISTER, fid }),
);
assertEventsMatchResult(idResult, [idRegistryEvent]);

const signerResult = await client.getOnChainEvents(
OnChainEventRequest.create({ eventType: OnChainEventType.EVENT_TYPE_SIGNER, fid }),
);
assertEventsMatchResult(signerResult, [signerEvent, signerEvent2]);

const emptyResult = await client.getOnChainEvents(
OnChainEventRequest.create({ eventType: OnChainEventType.EVENT_TYPE_STORAGE_RENT, fid }),
);
expect(emptyResult._unsafeUnwrap().events.length).toEqual(0);
});
});
163 changes: 163 additions & 0 deletions apps/hubble/src/rpc/test/server.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import { jestRocksDB } from "../../storage/db/jestUtils.js";
import {
Factories,
FarcasterNetwork,
FidRequest,
getInsecureHubRpcClient,
HubResult,
HubRpcClient,
IdRegistryEvent,
OnChainEvent,
OnChainEventRequest,
OnChainEventResponse,
OnChainEventType,
SignerAddMessage,
StorageLimit,
StorageLimitsResponse,
StoreType,
} from "@farcaster/hub-nodejs";
import Engine from "../../storage/engine/index.js";
import { MockHub } from "../../test/mocks.js";
import Server from "../server.js";
import SyncEngine from "../../network/sync/syncEngine.js";
import { Ok } from "neverthrow";
import { CAST_PRUNE_SIZE_LIMIT_DEFAULT } from "../../storage/stores/castStore.js";
import { REACTION_PRUNE_SIZE_LIMIT_DEFAULT } from "../../storage/stores/reactionStore.js";
import { LINK_PRUNE_SIZE_LIMIT_DEFAULT } from "../../storage/stores/linkStore.js";
import { USER_DATA_PRUNE_SIZE_LIMIT_DEFAULT } from "../../storage/stores/userDataStore.js";
import { VERIFICATION_PRUNE_SIZE_LIMIT_DEFAULT } from "../../storage/stores/verificationStore.js";

const db = jestRocksDB("protobufs.rpc.server.test");
const network = FarcasterNetwork.TESTNET;
const engine = new Engine(db, network);
const hub = new MockHub(db, engine);

let server: Server;
let client: HubRpcClient;

beforeAll(async () => {
server = new Server(hub, engine, new SyncEngine(hub, db));
const port = await server.start();
client = getInsecureHubRpcClient(`127.0.0.1:${port}`);
});

afterAll(async () => {
client.close();
await server.stop();
await engine.stop();
});

const fid = Factories.Fid.build();
const signer = Factories.Ed25519Signer.build();
const custodySigner = Factories.Eip712Signer.build();

let custodyEvent: IdRegistryEvent;
let signerAdd: SignerAddMessage;

beforeAll(async () => {
const signerKey = (await signer.getSignerKey())._unsafeUnwrap();
const custodySignerKey = (await custodySigner.getSignerKey())._unsafeUnwrap();
custodyEvent = Factories.IdRegistryEvent.build({ fid, to: custodySignerKey });

signerAdd = await Factories.SignerAddMessage.create(
{ data: { fid, network, signerAddBody: { signer: signerKey } } },
{ transient: { signer: custodySigner } },
);
});

describe("server rpc tests", () => {
beforeEach(async () => {
await engine.mergeIdRegistryEvent(custodyEvent);
await engine.mergeMessage(signerAdd);
});

describe("getOnChainEvents", () => {
const assertEventsMatchResult = (result: HubResult<OnChainEventResponse>, events: OnChainEvent[]) => {
expect(result._unsafeUnwrap().events.map((e) => OnChainEvent.toJSON(e))).toEqual(
events.map((e) => OnChainEvent.toJSON(e)),
);
};

test("succeeds", async () => {
const idRegistryEvent = Factories.IdRegistryOnChainEvent.build({ fid });
const idRegistryEvent2 = Factories.IdRegistryOnChainEvent.build({ fid: fid + 1 });
const signerEvent = Factories.SignerOnChainEvent.build({ fid });
const signerEvent2 = Factories.SignerOnChainEvent.build({ blockNumber: signerEvent.blockNumber + 1, fid });
await expect(engine.mergeOnChainEvent(idRegistryEvent)).resolves.toBeInstanceOf(Ok);
await expect(engine.mergeOnChainEvent(idRegistryEvent2)).resolves.toBeInstanceOf(Ok);
await expect(engine.mergeOnChainEvent(signerEvent)).resolves.toBeInstanceOf(Ok);
await expect(engine.mergeOnChainEvent(signerEvent2)).resolves.toBeInstanceOf(Ok);

const idResult = await client.getOnChainEvents(
OnChainEventRequest.create({ eventType: OnChainEventType.EVENT_TYPE_ID_REGISTER, fid }),
);
assertEventsMatchResult(idResult, [idRegistryEvent]);

const signerResult = await client.getOnChainEvents(
OnChainEventRequest.create({ eventType: OnChainEventType.EVENT_TYPE_SIGNER, fid }),
);
assertEventsMatchResult(signerResult, [signerEvent, signerEvent2]);

const emptyResult = await client.getOnChainEvents(
OnChainEventRequest.create({ eventType: OnChainEventType.EVENT_TYPE_STORAGE_RENT, fid }),
);
expect(emptyResult._unsafeUnwrap().events.length).toEqual(0);
});
});

describe("getCurrentStorageLimitsByFid", () => {
test("succeeds for user with no storage", async () => {
const result = await client.getCurrentStorageLimitsByFid(FidRequest.create({ fid }));
expect(result._unsafeUnwrap().limits.map((l) => l.limit)).toEqual([0, 0, 0, 0, 0]);
});

test("succeeds for user with storage", async () => {
const rentEvent = Factories.StorageRentOnChainEvent.build({
fid,
storageRentEventBody: Factories.StorageRentEventBody.build({ units: 1 }),
});
await engine.mergeOnChainEvent(rentEvent);
const result = await client.getCurrentStorageLimitsByFid(FidRequest.create({ fid }));
const storageLimits = StorageLimitsResponse.fromJSON(result._unsafeUnwrap()).limits;
expect(storageLimits).toContainEqual(
StorageLimit.create({ limit: CAST_PRUNE_SIZE_LIMIT_DEFAULT, storeType: StoreType.CASTS }),
);
expect(storageLimits).toContainEqual(
StorageLimit.create({ limit: REACTION_PRUNE_SIZE_LIMIT_DEFAULT, storeType: StoreType.REACTIONS }),
);
expect(storageLimits).toContainEqual(
StorageLimit.create({ limit: LINK_PRUNE_SIZE_LIMIT_DEFAULT, storeType: StoreType.LINKS }),
);
expect(storageLimits).toContainEqual(
StorageLimit.create({ limit: USER_DATA_PRUNE_SIZE_LIMIT_DEFAULT, storeType: StoreType.USER_DATA }),
);
expect(storageLimits).toContainEqual(
StorageLimit.create({ limit: VERIFICATION_PRUNE_SIZE_LIMIT_DEFAULT, storeType: StoreType.VERIFICATIONS }),
);

// add 2 more units
const rentEvent2 = Factories.StorageRentOnChainEvent.build({
fid,
storageRentEventBody: Factories.StorageRentEventBody.build({ units: 2 }),
});
await engine.mergeOnChainEvent(rentEvent2);
const result2 = await client.getCurrentStorageLimitsByFid(FidRequest.create({ fid }));
const newLimits = StorageLimitsResponse.fromJSON(result2._unsafeUnwrap()).limits;
expect(newLimits).toContainEqual(
StorageLimit.create({ limit: CAST_PRUNE_SIZE_LIMIT_DEFAULT * 3, storeType: StoreType.CASTS }),
);
expect(newLimits).toContainEqual(
StorageLimit.create({ limit: REACTION_PRUNE_SIZE_LIMIT_DEFAULT * 3, storeType: StoreType.REACTIONS }),
);
expect(newLimits).toContainEqual(
StorageLimit.create({ limit: LINK_PRUNE_SIZE_LIMIT_DEFAULT * 3, storeType: StoreType.LINKS }),
);
expect(newLimits).toContainEqual(
StorageLimit.create({ limit: USER_DATA_PRUNE_SIZE_LIMIT_DEFAULT * 3, storeType: StoreType.USER_DATA }),
);
expect(newLimits).toContainEqual(
StorageLimit.create({ limit: VERIFICATION_PRUNE_SIZE_LIMIT_DEFAULT * 3, storeType: StoreType.VERIFICATIONS }),
);
});
});
});
2 changes: 1 addition & 1 deletion apps/hubble/src/rpc/test/signerService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ beforeAll(async () => {
{ data: { fid, network, signerAddBody: { signer: signerKey } } },
{ transient: { signer: custodySigner } },
);
onChainSigner = Factories.KeyRegistryOnChainEvent.build({
onChainSigner = Factories.SignerOnChainEvent.build({
fid: fid,
signerEventBody: Factories.SignerEventBody.build({
key: signerKey,
Expand Down
4 changes: 2 additions & 2 deletions apps/hubble/src/storage/db/onChainEvent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ let onChainEvent: OnChainEvent;
let anotherEvent: OnChainEvent;

beforeEach(async () => {
onChainEvent = Factories.KeyRegistryOnChainEvent.build();
onChainEvent = Factories.SignerOnChainEvent.build();
anotherEvent = Factories.IdRegistryOnChainEvent.build();
let txn = db.transaction();
txn = putOnChainEventTransaction(txn, onChainEvent);
Expand All @@ -37,7 +37,7 @@ describe("makeOnChainEventPrimaryKey", () => {

describe("putOnChainEvent", () => {
test("succeeds", async () => {
const onChainEvent = Factories.KeyRegistryOnChainEvent.build();
const onChainEvent = Factories.SignerOnChainEvent.build();
const txn = db.transaction();
putOnChainEventTransaction(txn, onChainEvent);
await db.commit(txn);
Expand Down
10 changes: 5 additions & 5 deletions apps/hubble/src/storage/engine/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ describe("mergeMessage", () => {
test("succeeds with l2 id registry event and on chain signer", async () => {
const idRegistryOnChainEvent = Factories.IdRegistryOnChainEvent.build({ fid });
const signerEventBody = Factories.SignerEventBody.build({ key: signerAdd.data.signerAddBody.signer });
const onChainSignerEvent = Factories.KeyRegistryOnChainEvent.build({ fid, signerEventBody });
const onChainSignerEvent = Factories.SignerOnChainEvent.build({ fid, signerEventBody });
await migratedEngine.mergeOnChainEvent(idRegistryOnChainEvent);
await migratedEngine.mergeOnChainEvent(onChainSignerEvent);

Expand All @@ -770,13 +770,13 @@ describe("mergeMessage", () => {
test("fails if signer is removed on chain", async () => {
const idRegistryOnChainEvent = Factories.IdRegistryOnChainEvent.build({ fid });
const signerEventBody = Factories.SignerEventBody.build({ key: signerAdd.data.signerAddBody.signer });
const onChainSignerEvent = Factories.KeyRegistryOnChainEvent.build({ fid, signerEventBody });
const onChainSignerEvent = Factories.SignerOnChainEvent.build({ fid, signerEventBody });

const signerRemovalBody = Factories.SignerEventBody.build({
eventType: SignerEventType.REMOVE,
key: signerAdd.data.signerAddBody.signer,
});
const signerRemovalEvent = Factories.KeyRegistryOnChainEvent.build({ fid, signerEventBody: signerRemovalBody });
const signerRemovalEvent = Factories.SignerOnChainEvent.build({ fid, signerEventBody: signerRemovalBody });

await migratedEngine.mergeOnChainEvent(idRegistryOnChainEvent);
await migratedEngine.mergeOnChainEvent(onChainSignerEvent);
Expand Down Expand Up @@ -982,7 +982,7 @@ describe("with listeners and workers", () => {
test("revokes messages when onchain signer is removed", async () => {
const idRegistryOnChainEvent = Factories.IdRegistryOnChainEvent.build({ fid });
const signerEventBody = Factories.SignerEventBody.build({ key: signerAdd.data.signerAddBody.signer });
const onChainSignerEvent = Factories.KeyRegistryOnChainEvent.build({ fid, signerEventBody });
const onChainSignerEvent = Factories.SignerOnChainEvent.build({ fid, signerEventBody });

await liveEngine.mergeOnChainEvent(idRegistryOnChainEvent);
await liveEngine.mergeOnChainEvent(onChainSignerEvent);
Expand All @@ -992,7 +992,7 @@ describe("with listeners and workers", () => {
eventType: SignerEventType.REMOVE,
key: signerEventBody.key,
});
const signerRemovalEvent = Factories.KeyRegistryOnChainEvent.build({ fid, signerEventBody: signerRemovalBody });
const signerRemovalEvent = Factories.SignerOnChainEvent.build({ fid, signerEventBody: signerRemovalBody });
await liveEngine.mergeOnChainEvent(signerRemovalEvent);

expect(revokedMessages).toEqual([]);
Expand Down
Loading

0 comments on commit 86149d3

Please sign in to comment.