Skip to content

Commit

Permalink
fix: Limit calls to subscribe() (#1197)
Browse files Browse the repository at this point in the history
* fix: Limit subscribe() calls by IP

* changeset

* tests

* subscribe auth test
  • Loading branch information
adityapk00 authored Jul 31, 2023
1 parent eae1ba8 commit a1b9ace
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 20 deletions.
5 changes: 5 additions & 0 deletions .changeset/little-fans-sort.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

fix: Limit the number of simultaneous subscribe() streams by IP address
105 changes: 91 additions & 14 deletions apps/hubble/src/rpc/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import SyncEngine from "../network/sync/syncEngine.js";
import Engine from "../storage/engine/index.js";
import { MessagesPage } from "../storage/stores/types.js";
import { logger } from "../utils/logger.js";
import { addressInfoFromParts } from "../utils/p2p.js";
import { addressInfoFromParts, extractIPAddress } from "../utils/p2p.js";
import { RateLimiterAbstract, RateLimiterMemory } from "rate-limiter-flexible";
import {
BufferedStreamWriter,
Expand All @@ -58,6 +58,9 @@ import { RentRegistryEventsResponse } from "@farcaster/hub-nodejs";

const HUBEVENTS_READER_TIMEOUT = 1 * 60 * 60 * 1000; // 1 hour

export const SUBSCRIBE_PERIP_LIMIT = 4; // Max 4 subscriptions per IP
export const SUBSCRIBE_GLOBAL_LIMIT = 4096; // Max 4096 subscriptions globally

export type RpcUsers = Map<string, string[]>;

const log = logger.child({ component: "rpcServer" });
Expand Down Expand Up @@ -174,6 +177,60 @@ export const getRPCUsersFromAuthString = (rpcAuth?: string): Map<string, string[
return rpcUsers;
};

/**
* Limit the number of simultaneous connections to the RPC server by
* a single IP address.
*/
class IpConnectionLimiter {
private perIpLimit: number;
private globalLimit: number;

private ipConnections: Map<string, number>;
private totalConnections: number;

constructor(perIpLimit: number, globalLimit: number) {
this.ipConnections = new Map();

this.perIpLimit = perIpLimit;
this.globalLimit = globalLimit;
this.totalConnections = 0;
}

public addConnection(peerString: string): Result<boolean, Error> {
// Get the IP part of the address
const ip = extractIPAddress(peerString) ?? "unknown";

const connections = this.ipConnections.get(ip) ?? 0;
if (connections >= this.perIpLimit) {
return err(new Error(`Too many connections from this IP: ${ip}`));
}

if (this.totalConnections >= this.globalLimit) {
return err(new Error("Too many connections to this server"));
}

this.ipConnections.set(ip, connections + 1);
this.totalConnections += 1;
return ok(true);
}

public removeConnection(peerString: string) {
// Get the IP part of the address
const ip = extractIPAddress(peerString) ?? "unknown";

const connections = this.ipConnections.get(ip) ?? 0;
if (connections > 0) {
this.ipConnections.set(ip, connections - 1);
this.totalConnections -= 1;
}
}

clear() {
this.ipConnections.clear();
this.totalConnections = 0;
}
}

export default class Server {
private hub: HubInterface | undefined;
private engine: Engine | undefined;
Expand All @@ -188,6 +245,7 @@ export default class Server {

private rpcUsers: RpcUsers;
private submitMessageRateLimiter: RateLimiterMemory;
private subscribeIpLimiter = new IpConnectionLimiter(SUBSCRIBE_PERIP_LIMIT, SUBSCRIBE_GLOBAL_LIMIT);

constructor(
hub?: HubInterface,
Expand Down Expand Up @@ -284,6 +342,10 @@ export default class Server {
return this.incomingConnections > 0;
}

public clearRateLimiters() {
this.subscribeIpLimiter.clear();
}

getImpl = (): HubServiceServer => {
return {
getInfo: (call, callback) => {
Expand Down Expand Up @@ -448,16 +510,10 @@ export default class Server {
},
submitMessage: async (call, callback) => {
// Identify peer that is calling, if available. This is used for rate limiting.
let peer;
const peerResult = Result.fromThrowable(
const peer = Result.fromThrowable(
() => call.getPeer(),
(e) => e,
)();
if (peerResult.isErr()) {
peer = "unavailable"; // Catchall. If peer is unavailable, we will group all of them into one bucket
} else {
peer = peerResult.value;
}
)().unwrapOr("unavailable");

// Check for rate limits
const rateLimitResult = await rateLimitByIp(peer, this.submitMessageRateLimiter);
Expand Down Expand Up @@ -944,8 +1000,29 @@ export default class Server {
},
subscribe: async (stream) => {
const { request } = stream;
const peer = Result.fromThrowable(
() => stream.getPeer(),
(e) => {
log.error({ err: e }, "subscribe: error getting peer");
},
)().unwrapOr("unknown peer:port");

// Check if username/password authenticates. If it does, we'll allow the connection
// regardless of rate limits.
let authorized = false;
if (this.rpcUsers.size > 0) {
authorized = (await authenticateUser(stream.metadata, this.rpcUsers)).unwrapOr(false);
}
const allowed = this.subscribeIpLimiter.addConnection(peer);

log.info({ request }, "subscribe: starting stream");
if (allowed.isOk() || authorized) {
log.info({ r: request, peer }, "subscribe: starting stream");
} else {
log.info({ r: request, peer, err: allowed.error.message }, "subscribe: rejected stream");

stream.destroy(new Error(allowed.error.message));
return;
}

// We'll write using a Buffered Stream Writer
const bufferedStreamWriter = new BufferedStreamWriter(stream);
Expand All @@ -955,10 +1032,6 @@ export default class Server {
bufferedStreamWriter.writeToStream(event);
};

stream.on("cancelled", () => {
stream.destroy();
});

// Register a close listener to remove all listeners before we start sending events
stream.on("close", () => {
this.engine?.eventHandler.off("mergeMessage", eventListener);
Expand All @@ -967,6 +1040,10 @@ export default class Server {
this.engine?.eventHandler.off("mergeIdRegistryEvent", eventListener);
this.engine?.eventHandler.off("mergeNameRegistryEvent", eventListener);
this.engine?.eventHandler.off("mergeUsernameProofEvent", eventListener);

this.subscribeIpLimiter.removeConnection(peer);

log.info({ peer }, "subscribe: stream closed");
});

// If the user wants to start from a specific event, we'll start from there first
Expand Down
71 changes: 65 additions & 6 deletions apps/hubble/src/rpc/test/eventService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import {
ClientReadableStream,
UserNameProof,
isMergeUsernameProofHubEvent,
Metadata,
getAuthMetadata,
} from "@farcaster/hub-nodejs";
import Server from "../server.js";
import Server, { SUBSCRIBE_PERIP_LIMIT } from "../server.js";
import { jestRocksDB } from "../../storage/db/jestUtils.js";
import Engine from "../../storage/engine/index.js";
import { MockHub } from "../../test/mocks.js";
Expand All @@ -34,14 +36,17 @@ const hub = new MockHub(db, engine);
let server: Server;
let client: HubRpcClient;

const rpcUser = "rpcUser";
const rpcPass = "rpcPass";

beforeAll(async () => {
server = new Server(hub, engine);
server = new Server(hub, engine, undefined, undefined, `${rpcUser}:${rpcPass}`);
const port = await server.start();
client = getInsecureHubRpcClient(`127.0.0.1:${port}`);
});

afterAll(async () => {
await server.stop();
await server.stop(true);
await engine.stop();
});

Expand All @@ -62,9 +67,9 @@ beforeEach(async () => {
events = [];
});

afterEach(() => {
afterEach(async () => {
if (stream) {
stream.cancel();
await closeStream(stream);
}
});

Expand All @@ -86,6 +91,9 @@ const setupSubscription = async (
events: [HubEventType, any][],
options: { eventTypes?: HubEventType[]; fromId?: number } = {},
): Promise<ClientReadableStream<HubEvent>> => {
// First, clear the rate limits
server.clearRateLimiters();

const request = SubscribeRequest.create(options);

const streamResult = await client.subscribe(request);
Expand Down Expand Up @@ -119,6 +127,23 @@ const setupSubscription = async (
return stream;
};

const closeStream = async (stream: ClientReadableStream<HubEvent>): Promise<boolean> => {
if (stream.closed) {
return true;
}

return new Promise((resolve) => {
stream.on("close", () => {
resolve(true);
});
stream.on("end", () => {
resolve(true);
});

stream.cancel();
});
};

describe("subscribe", () => {
describe("without type filters", () => {
test("emits events", async () => {
Expand Down Expand Up @@ -199,7 +224,6 @@ describe("subscribe", () => {
await engine.mergeMessage(signerAdd);

stream = await setupSubscription(events, { fromId: 1 });

expect(events).toEqual([
[HubEventType.MERGE_ID_REGISTRY_EVENT, IdRegistryEvent.toJSON(custodyEvent)],
[HubEventType.MERGE_MESSAGE, Message.toJSON(signerAdd)],
Expand All @@ -217,6 +241,41 @@ describe("subscribe", () => {
[HubEventType.MERGE_MESSAGE, Message.toJSON(signerAdd)],
]);
});

test("can't subscribe too many times", async () => {
const streams = [];

// All these should succeed
for (let i = 0; i < SUBSCRIBE_PERIP_LIMIT; i++) {
const stream = await client.subscribe({ eventTypes: [] });
expect(stream.isOk()).toBe(true);
streams.push(stream._unsafeUnwrap());
}

// Assert all are open
for (const stream of streams) {
expect(stream.closed).toBe(false);
}

// This should fail
const overLimitStream = await client.subscribe({ eventTypes: [] });
const result = await new Promise((resolve) => {
overLimitStream._unsafeUnwrap().on("error", (err) => {
resolve(err.message);
});
});

expect(result).toContain("Too many connections");
overLimitStream._unsafeUnwrap().cancel();

// But if we pass rpc auth credentials, it will bypass the limit and succeed
const authStream = await client.subscribe({ eventTypes: [] }, getAuthMetadata(rpcUser, rpcPass));
expect(authStream.isOk()).toBe(true);
expect(authStream._unsafeUnwrap().closed).toBe(false);

// Close all streams
authStream._unsafeUnwrap().cancel();
});
});

describe("with fromId and type filters", () => {
Expand Down
15 changes: 15 additions & 0 deletions apps/hubble/src/utils/p2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@ export const parseAddress = (multiaddrStr: string): HubResult<Multiaddr> => {
)();
};

/* Extracts the ip part from "ip:port" */
export const extractIPAddress = (peerAddress: string): string | undefined => {
// This regex matches both IPv4 and IPv6 addresses
const ipRegex = /((?:[0-9]{1,3}\.){3}[0-9]{1,3}|(?:[a-fA-F0-9]{1,4}:){7}[a-fA-F0-9]{1,4}):[0-9]+/;

const match = peerAddress.match(ipRegex);

// If the address matches the regex, we remove the port part
if (match) {
return match[0].split(":")[0];
} else {
return undefined;
}
};

/** Checks that the IP address to bind to is valid and that the combined IP, transport, and port multiaddr is valid */
export const checkNodeAddrs = (listenIPAddr: string, listenCombinedAddr: string): HubResult<void> => {
return Result.combine([checkIpAddr(listenIPAddr), checkCombinedAddr(listenCombinedAddr)]).map(() => undefined);
Expand Down

0 comments on commit a1b9ace

Please sign in to comment.