Skip to content

Commit

Permalink
feat: Rate limit merges by FID (#1213)
Browse files Browse the repository at this point in the history
* feat: Rate limit merges by FID

* changeset

* tests
  • Loading branch information
adityapk00 authored Aug 4, 2023
1 parent d675af9 commit 1e0979b
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 69 deletions.
5 changes: 5 additions & 0 deletions .changeset/heavy-pants-poke.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

feat: Rate limit merges per FID to the total messages storage available for the FID
29 changes: 6 additions & 23 deletions apps/hubble/src/rpc/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ import Engine from "../storage/engine/index.js";
import { MessagesPage } from "../storage/stores/types.js";
import { logger } from "../utils/logger.js";
import { addressInfoFromParts, extractIPAddress } from "../utils/p2p.js";
import { RateLimiterAbstract, RateLimiterMemory } from "rate-limiter-flexible";
import { RateLimiterMemory } from "rate-limiter-flexible";
import {
BufferedStreamWriter,
STREAM_MESSAGE_BUFFER_SIZE,
SLOW_CLIENT_GRACE_PERIOD_MS,
} from "./bufferedStreamWriter.js";
import { sleep } from "../utils/crypto.js";
import { SUBMIT_MESSAGE_RATE_LIMIT, rateLimitByIp } from "../utils/rateLimits.js";

const HUBEVENTS_READER_TIMEOUT = 1 * 60 * 60 * 1000; // 1 hour

Expand All @@ -67,18 +68,6 @@ export type RpcUsers = Map<string, string[]>;

const log = logger.child({ component: "rpcServer" });

export const rateLimitByIp = async (ip: string, limiter: RateLimiterAbstract): HubAsyncResult<boolean> => {
// Get the IP part of the address
const ipPart = ip.split(":")[0] ?? "";

try {
await limiter.consume(ipPart);
return ok(true);
} catch (e) {
return err(new HubError("unavailable", "Too many requests"));
}
};

// Check if the user is authenticated via the metadata
export const authenticateUser = async (metadata: Metadata, rpcUsers: RpcUsers): HubAsyncResult<boolean> => {
// If there is no auth user/pass, we don't need to authenticate
Expand Down Expand Up @@ -276,16 +265,13 @@ export default class Server {
this.grpcServer.addService(HubServiceService, this.getImpl());

// Submit message are rate limited by default to 20k per minute
let rateLimitPerMinute = 20_000;
const rateLimitPerMinute = SUBMIT_MESSAGE_RATE_LIMIT;
if (rpcRateLimit !== undefined && rpcRateLimit >= 0) {
rateLimitPerMinute = rpcRateLimit;
rateLimitPerMinute.points = rpcRateLimit;
}
log.info({ rpcRateLimit }, "RPC rate limit enabled");

this.submitMessageRateLimiter = new RateLimiterMemory({
points: rateLimitPerMinute,
duration: 60,
});
this.submitMessageRateLimiter = new RateLimiterMemory(rateLimitPerMinute);
}

async start(ip = "0.0.0.0", port = 0): Promise<number> {
Expand Down Expand Up @@ -501,10 +487,7 @@ export default class Server {
},
getSyncSnapshotByPrefix: (call, callback) => {
const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown");
log.debug(
{ method: "getSyncSnapshotByPrefix", req: call.request, reqStr: JSON.stringify(call.request) },
`RPC call from ${peer}`,
);
log.debug({ method: "getSyncSnapshotByPrefix", req: call.request }, `RPC call from ${peer}`);

// If someone is asking for our sync snapshot, that means we're getting incoming
// connections
Expand Down
31 changes: 1 addition & 30 deletions apps/hubble/src/rpc/test/rpcAuth.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { RateLimiterMemory } from "rate-limiter-flexible";
import {
Factories,
HubError,
Expand All @@ -10,12 +9,10 @@ import {
HubInfoRequest,
} from "@farcaster/hub-nodejs";
import SyncEngine from "../../network/sync/syncEngine.js";

import Server, { rateLimitByIp } from "../server.js";
import Server from "../server.js";
import { jestRocksDB } from "../../storage/db/jestUtils.js";
import Engine from "../../storage/engine/index.js";
import { MockHub } from "../../test/mocks.js";
import { sleep } from "../../utils/crypto.js";

const db = jestRocksDB("protobufs.rpcAuth.test");
const network = FarcasterNetwork.TESTNET;
Expand Down Expand Up @@ -111,30 +108,4 @@ describe("auth tests", () => {
await authServer.stop();
authClient.close();
});

test("test rate limiting", async () => {
const Limit10PerSecond = new RateLimiterMemory({
points: 10,
duration: 1,
});

// 10 Requests should be fine
for (let i = 0; i < 10; i++) {
const result = await rateLimitByIp("testip:3000", Limit10PerSecond);
expect(result.isOk()).toBeTruthy();
}

// Sleep for 1 second to reset the rate limiter
await sleep(1100);

// 11th+ request should fail
for (let i = 0; i < 20; i++) {
const result = await rateLimitByIp("testip:3000", Limit10PerSecond);
if (i < 10) {
expect(result.isOk()).toBeTruthy();
} else {
expect(result._unsafeUnwrapErr().message).toEqual("Too many requests");
}
}
});
});
31 changes: 31 additions & 0 deletions apps/hubble/src/storage/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ import { normalize } from "viem/ens";
import os from "os";
import UsernameProofStore from "../stores/usernameProofStore.js";
import OnChainEventStore from "../stores/onChainEventStore.js";
import { getRateLimiterForTotalMessages, rateLimitByKey } from "../../utils/rateLimits.js";

const log = logger.child({
component: "Engine",
Expand Down Expand Up @@ -108,6 +109,8 @@ class Engine {
private _revokeSignerQueue: RevokeMessagesBySignerJobQueue;
private _revokeSignerWorker: RevokeMessagesBySignerJobWorker;

private _totalPruneSize: number;

constructor(db: RocksDB, network: FarcasterNetwork, eventHandler?: StoreEventHandler, publicClient?: PublicClient) {
this._db = db;
this._network = network;
Expand All @@ -124,6 +127,19 @@ class Engine {
this._onchainEventsStore = new OnChainEventStore(db, this.eventHandler);
this._usernameProofStore = new UsernameProofStore(db, this.eventHandler);

// Calculate total storage available per unit of store. Note that OnChainEventStore
// is not included in this calculation because it is not pruned.
this._totalPruneSize =
this._linkStore.pruneSizeLimit +
this._reactionStore.pruneSizeLimit +
this._signerStore.pruneSizeLimit +
this._castStore.pruneSizeLimit +
this._userDataStore.pruneSizeLimit +
this._verificationStore.pruneSizeLimit +
this._usernameProofStore.pruneSizeLimit;

log.info({ totalPruneSize: this._totalPruneSize }, "total default storage limit size");

this._revokeSignerQueue = new RevokeMessagesBySignerJobQueue(db);
this._revokeSignerWorker = new RevokeMessagesBySignerJobWorker(this._revokeSignerQueue, db, this);

Expand Down Expand Up @@ -224,6 +240,21 @@ class Engine {
}

async mergeMessage(message: Message): HubAsyncResult<number> {
// Extract the FID that this message was signed by
const fid = message.data?.fid ?? 0;
const storageUnits = await this.eventHandler.getCurrentStorageUnitsForFid(fid);

if (storageUnits.isOk()) {
// We rate limit the number of messages that can be merged per FID
const limiter = getRateLimiterForTotalMessages(storageUnits.value * this._totalPruneSize);

const rateLimitResult = await rateLimitByKey(`${fid}`, limiter);
if (rateLimitResult.isErr()) {
logger.warn({ fid, err: rateLimitResult.error }, "rate limit exceeded for FID");
return err(rateLimitResult.error);
}
}

const validatedMessage = await this.validateMessage(message);
if (validatedMessage.isErr()) {
return err(validatedMessage.error);
Expand Down
9 changes: 1 addition & 8 deletions apps/hubble/src/storage/stores/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,6 @@ export abstract class Store<TAdd extends Message, TRemove extends Message> {
return err(units.error);
}

if (units.value === 0) {
logger.debug({ fid }, "fid has no registered storage, would be pruned");
}

// This is temporary, when all fids are migrated to using storage rent, we'll just use the units directly.
const unitsMultiplier = units.value > 0 ? units.value : 1;

// Require storage cache to be synced to prune
if (cachedCount.isErr()) {
return err(cachedCount.error);
Expand Down Expand Up @@ -296,7 +289,7 @@ export abstract class Store<TAdd extends Message, TRemove extends Message> {
// Since the TS hash has the first 4 bytes be the timestamp (bigendian), we can use it to prune
// since the iteration will be implicitly sorted by timestamp
if (
count.value <= this._pruneSizeLimit * unitsMultiplier &&
count.value <= this._pruneSizeLimit * units.value &&
(timestampToPrune === undefined || (message.value.data && message.value.data.timestamp >= timestampToPrune))
) {
return true; // Nothing left to prune
Expand Down
17 changes: 9 additions & 8 deletions apps/hubble/src/storage/stores/storeEventHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,14 @@ class StoreEventHandler extends TypedEmitter<StoreEvents> {
}

async getCurrentStorageUnitsForFid(fid: number): HubAsyncResult<number> {
return await this._storageCache.getCurrentStorageUnitsForFid(fid);
const units = await this._storageCache.getCurrentStorageUnitsForFid(fid);

if (units.isOk() && units.value === 0) {
logger.debug({ fid }, "fid has no registered storage, would be pruned");
}

// This is temporary, when all fids are migrated to using storage rent, we'll just use the units directly.
return units.map((u) => (u > 0 ? u : 1));
}

async getCacheMessageCount(fid: number, set: UserMessagePostfix): HubAsyncResult<number> {
Expand Down Expand Up @@ -272,18 +279,12 @@ class StoreEventHandler extends TypedEmitter<StoreEvents> {
return err(units.error);
}

if (units.value === 0) {
logger.debug({ fid: message.data.fid }, "fid has no registered storage, would be pruned");
}

const unitsMultiplier = units.value > 0 ? units.value : 1;

const messageCount = await this.getCacheMessageCount(message.data.fid, set);
if (messageCount.isErr()) {
return err(messageCount.error);
}

if (messageCount.value < sizeLimit * unitsMultiplier) {
if (messageCount.value < sizeLimit * units.value) {
return ok(false);
}

Expand Down
67 changes: 67 additions & 0 deletions apps/hubble/src/utils/rateLimits.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { RateLimiterMemory } from "rate-limiter-flexible";
import { getRateLimiterForTotalMessages, rateLimitByIp, rateLimitByKey } from "./rateLimits.js";
import { sleep } from "./crypto.js";

describe("test rate limits", () => {
const Limit10PerSecond = new RateLimiterMemory({
points: 10,
duration: 1,
});

test("test rate limiting", async () => {
// 10 Requests should be fine
for (let i = 0; i < 10; i++) {
const result = await rateLimitByIp("testip:3000", Limit10PerSecond);
expect(result.isOk()).toBeTruthy();
}

// Sleep for 1 second to reset the rate limiter
await sleep(1100);

// 11th+ request should fail
for (let i = 0; i < 20; i++) {
const result = await rateLimitByIp("testip:3000", Limit10PerSecond);
if (i < 10) {
expect(result.isOk()).toBeTruthy();
} else {
expect(result._unsafeUnwrapErr().message).toEqual("Too many requests");
}
}
});

test("test dynamic rate limiting", async () => {
// 10 Requests should be fine for 1st set of messages
const rateLimiter1 = getRateLimiterForTotalMessages(10, 1);
const rateLimiter2 = getRateLimiterForTotalMessages(11, 1);

for (let i = 0; i < 10; i++) {
const result1 = await rateLimitByKey("3000", rateLimiter1);
expect(result1.isOk()).toBeTruthy();

// same key, but different rate limiter should also be fine
const result2 = await rateLimitByKey("3000", rateLimiter2);
expect(result2.isOk()).toBeTruthy();
}

// Sleep for 1 second to reset the rate limiter
await sleep(1100);

// 11th+ request should fail
for (let i = 0; i < 20; i++) {
const result1 = await rateLimitByKey("3000", rateLimiter1);
if (i < 10) {
expect(result1.isOk()).toBeTruthy();
} else {
expect(result1._unsafeUnwrapErr().message).toEqual("Too many requests");
}

// same key, but different rate limiter should pass till the 11th message
const result2 = await rateLimitByKey("3000", rateLimiter2);
if (i < 11) {
expect(result2.isOk()).toBeTruthy();
} else {
expect(result2._unsafeUnwrapErr().message).toEqual("Too many requests");
}
}
});
});
44 changes: 44 additions & 0 deletions apps/hubble/src/utils/rateLimits.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { HubAsyncResult, HubError } from "@farcaster/hub-nodejs";
import { err, ok } from "neverthrow";
import { RateLimiterAbstract, RateLimiterMemory } from "rate-limiter-flexible";

// Number of submit messages (total) that can be merged per 60 seconds
export const SUBMIT_MESSAGE_RATE_LIMIT = {
points: 20_000,
duration: 60,
};

// We keep a map of rate limiters per total messages allowed, since each fid has a different limit
// The totalMessages are always num of storage units purchased * totalPruneSize limit, so there will
// be as many rate limiters as the number of distinct storage units purchased, which is a small number
const rateLimiters = new Map<number, RateLimiterMemory>();
export function getRateLimiterForTotalMessages(totalMessages: number, duration = 60 * 60 * 24): RateLimiterAbstract {
if (rateLimiters.has(totalMessages)) {
return rateLimiters.get(totalMessages) as RateLimiterAbstract;
}

const limiter = new RateLimiterMemory({
points: totalMessages,
duration,
});
rateLimiters.set(totalMessages, limiter);
return limiter;
}

/** Rate limit by IP address */
export const rateLimitByIp = async (ip: string, limiter: RateLimiterAbstract): HubAsyncResult<boolean> => {
// Get the IP part of the address
const ipPart = ip.split(":")[0] ?? "";

return rateLimitByKey(ipPart, limiter);
};

/** Rate limit by key for the limiter */
export const rateLimitByKey = async (fid: string, limiter: RateLimiterAbstract): HubAsyncResult<boolean> => {
try {
await limiter.consume(fid);
return ok(true);
} catch (e) {
return err(new HubError("unavailable", "Too many requests"));
}
};

0 comments on commit 1e0979b

Please sign in to comment.