diff --git a/yarn-project/p2p/src/service/reqresp/interface.ts b/yarn-project/p2p/src/service/reqresp/interface.ts index 5ae61d0389d..606efc17bc9 100644 --- a/yarn-project/p2p/src/service/reqresp/interface.ts +++ b/yarn-project/p2p/src/service/reqresp/interface.ts @@ -16,6 +16,36 @@ export type ReqRespSubProtocol = typeof PING_PROTOCOL | typeof STATUS_PROTOCOL | */ export type ReqRespSubProtocolHandler = (msg: Buffer) => Promise; +/** + * A type mapping from supprotocol to it's rate limits + */ +export type ReqRespSubProtocolRateLimits = Record; + +/** + * A rate limit quota + */ +export interface RateLimitQuota { + /** + * The time window in ms + */ + quotaTimeMs: number; + /** + * The number of requests allowed within the time window + */ + quotaCount: number; +} + +export interface ProtocolRateLimitQuota { + /** + * The rate limit quota for a single peer + */ + peerLimit: RateLimitQuota; + /** + * The rate limit quota for the global peer set + */ + globalLimit: RateLimitQuota; +} + /** * A type mapping from supprotocol to it's handling funciton */ diff --git a/yarn-project/p2p/src/service/reqresp/rate_limiter/index.ts b/yarn-project/p2p/src/service/reqresp/rate_limiter/index.ts new file mode 100644 index 00000000000..0645376861b --- /dev/null +++ b/yarn-project/p2p/src/service/reqresp/rate_limiter/index.ts @@ -0,0 +1 @@ +export { RequestResponseRateLimiter } from './rate_limiter.js'; diff --git a/yarn-project/p2p/src/service/reqresp/rate_limiter/rate_limiter.test.ts b/yarn-project/p2p/src/service/reqresp/rate_limiter/rate_limiter.test.ts new file mode 100644 index 00000000000..e35a59d3a08 --- /dev/null +++ b/yarn-project/p2p/src/service/reqresp/rate_limiter/rate_limiter.test.ts @@ -0,0 +1,175 @@ +import { jest } from '@jest/globals'; +import { type PeerId } from '@libp2p/interface'; + +import { PING_PROTOCOL, type ReqRespSubProtocolRateLimits, TX_REQ_PROTOCOL } from '../interface.js'; +import { RequestResponseRateLimiter } from './rate_limiter.js'; + +class MockPeerId { + private id: string; + constructor(id: string) { + this.id = id; + } + public toString(): string { + return this.id; + } +} + +const makePeer = (id: string): PeerId => { + return new MockPeerId(id) as unknown as PeerId; +}; + +describe('rate limiter', () => { + let rateLimiter: RequestResponseRateLimiter; + + beforeEach(() => { + jest.useFakeTimers(); + const config = { + [TX_REQ_PROTOCOL]: { + // One request every 200ms + peerLimit: { + quotaCount: 5, + quotaTimeMs: 1000, + }, + // One request every 100ms + globalLimit: { + quotaCount: 10, + quotaTimeMs: 1000, + }, + }, + } as ReqRespSubProtocolRateLimits; // force type as we will not provide descriptions of all protocols + rateLimiter = new RequestResponseRateLimiter(config); + }); + + afterEach(() => { + jest.useRealTimers(); + rateLimiter.stop(); + }); + + it('Should allow requests within a peer limit', () => { + const peerId = makePeer('peer1'); + // Expect to allow a burst of 5, then not allow + for (let i = 0; i < 5; i++) { + expect(rateLimiter.allow(TX_REQ_PROTOCOL, peerId)).toBe(true); + } + expect(rateLimiter.allow(TX_REQ_PROTOCOL, peerId)).toBe(false); + + // Smooth requests + for (let i = 0; i < 5; i++) { + jest.advanceTimersByTime(200); + expect(rateLimiter.allow(TX_REQ_PROTOCOL, peerId)).toBe(true); + } + expect(rateLimiter.allow(TX_REQ_PROTOCOL, peerId)).toBe(false); + + // Reset after quota has passed + jest.advanceTimersByTime(1000); + // Second burst + for (let i = 0; i < 5; i++) { + expect(rateLimiter.allow(TX_REQ_PROTOCOL, peerId)).toBe(true); + } + expect(rateLimiter.allow(TX_REQ_PROTOCOL, peerId)).toBe(false); + }); + + it('Should allow requests within the global limit', () => { + // Initial burst + for (let i = 0; i < 10; i++) { + expect(rateLimiter.allow(TX_REQ_PROTOCOL, makePeer(`peer${i}`))).toBe(true); + } + expect(rateLimiter.allow(TX_REQ_PROTOCOL, makePeer('nolettoinno'))).toBe(false); + + // Smooth requests + for (let i = 0; i < 10; i++) { + jest.advanceTimersByTime(100); + expect(rateLimiter.allow(TX_REQ_PROTOCOL, makePeer(`peer${i}`))).toBe(true); + } + expect(rateLimiter.allow(TX_REQ_PROTOCOL, makePeer('nolettoinno'))).toBe(false); + + // Reset after quota has passed + jest.advanceTimersByTime(1000); + // Second burst + for (let i = 0; i < 10; i++) { + expect(rateLimiter.allow(TX_REQ_PROTOCOL, makePeer(`peer${i}`))).toBe(true); + } + expect(rateLimiter.allow(TX_REQ_PROTOCOL, makePeer('nolettoinno'))).toBe(false); + }); + + it('Should reset after quota has passed', () => { + const peerId = makePeer('peer1'); + for (let i = 0; i < 5; i++) { + expect(rateLimiter.allow(TX_REQ_PROTOCOL, peerId)).toBe(true); + } + expect(rateLimiter.allow(TX_REQ_PROTOCOL, peerId)).toBe(false); + jest.advanceTimersByTime(1000); + expect(rateLimiter.allow(TX_REQ_PROTOCOL, peerId)).toBe(true); + }); + + it('Should handle multiple protocols separately', () => { + const config = { + [TX_REQ_PROTOCOL]: { + peerLimit: { + quotaCount: 5, + quotaTimeMs: 1000, + }, + globalLimit: { + quotaCount: 10, + quotaTimeMs: 1000, + }, + }, + [PING_PROTOCOL]: { + peerLimit: { + quotaCount: 2, + quotaTimeMs: 1000, + }, + globalLimit: { + quotaCount: 4, + quotaTimeMs: 1000, + }, + }, + } as ReqRespSubProtocolRateLimits; + const multiProtocolRateLimiter = new RequestResponseRateLimiter(config); + + const peerId = makePeer('peer1'); + + // Protocol 1 + for (let i = 0; i < 5; i++) { + expect(multiProtocolRateLimiter.allow(TX_REQ_PROTOCOL, peerId)).toBe(true); + } + expect(multiProtocolRateLimiter.allow(TX_REQ_PROTOCOL, peerId)).toBe(false); + + // Protocol 2 + for (let i = 0; i < 2; i++) { + expect(multiProtocolRateLimiter.allow(PING_PROTOCOL, peerId)).toBe(true); + } + expect(multiProtocolRateLimiter.allow(PING_PROTOCOL, peerId)).toBe(false); + + multiProtocolRateLimiter.stop(); + }); + + it('Should allow requests if no rate limiter is configured', () => { + const rateLimiter = new RequestResponseRateLimiter({} as ReqRespSubProtocolRateLimits); + expect(rateLimiter.allow(TX_REQ_PROTOCOL, makePeer('peer1'))).toBe(true); + }); + + it('Should smooth out spam', () => { + const requests = 1000; + const peers = 100; + let allowedRequests = 0; + + for (let i = 0; i < requests; i++) { + const peerId = makePeer(`peer${i % peers}`); + if (rateLimiter.allow(TX_REQ_PROTOCOL, peerId)) { + allowedRequests++; + } + jest.advanceTimersByTime(5); + } + // With 1000 iterations of 5ms per iteration, we run over 5 seconds + // With the configuration of 5 requests per second per peer and 10 requests per second globally, we expect: + // most of the allowed request to come through the global limit + // This sets a floor of 50 requests per second, but allowing for the initial burst, we expect there to be upto an additional 10 requests + + // (upon running a few times we actually see 59) + const expectedRequestsFloor = 50; + const expectedRequestsCeiling = 60; + expect(allowedRequests).toBeGreaterThan(expectedRequestsFloor); + expect(allowedRequests).toBeLessThan(expectedRequestsCeiling); + }); +}); diff --git a/yarn-project/p2p/src/service/reqresp/rate_limiter/rate_limiter.ts b/yarn-project/p2p/src/service/reqresp/rate_limiter/rate_limiter.ts new file mode 100644 index 00000000000..d6bf348df42 --- /dev/null +++ b/yarn-project/p2p/src/service/reqresp/rate_limiter/rate_limiter.ts @@ -0,0 +1,198 @@ +/** + * @attribution Rate limiter approach implemented in the lodestar ethereum 2 client. + * Rationale is that if it was good enough for them, then it should be good enough for us. + * https://github.com/ChainSafe/lodestar + */ +import { type PeerId } from '@libp2p/interface'; + +import { type ReqRespSubProtocol, type ReqRespSubProtocolRateLimits } from '../interface.js'; +import { DEFAULT_RATE_LIMITS } from './rate_limits.js'; + +// Check for disconnected peers every 10 minutes +const CHECK_DISCONNECTED_PEERS_INTERVAL_MS = 10 * 60 * 1000; + +/** + * GCRARateLimiter: A Generic Cell Rate Algorithm (GCRA) based rate limiter. + * + * How it works: + * 1. The rate limiter allows a certain number of operations (quotaCount) within a specified + * time interval (quotaTimeMs). + * 2. It uses a "virtual scheduling time" (VST) to determine when the next operation should be allowed. + * 3. When an operation is requested, the limiter checks if enough time has passed since the last + * allowed operation. + * 4. If sufficient time has passed, the operation is allowed, and the VST is updated. + * 5. If not enough time has passed, the operation is denied. + * + * The limiter also allows for short bursts of activity, as long as the overall rate doesn't exceed + * the specified quota over time. + * + * Usage example: + * ``` + * const limiter = new GCRARateLimiter(100, 60000); // 100 operations per minute + * ``` + */ +export class GCRARateLimiter { + // Virtual scheduling time: i.e. the time at which we should allow the next request + private vst: number; + // The interval at which we emit a new token + private readonly emissionInterval: number; + // The interval over which we limit the number of requests + private readonly limitInterval: number; + + /** + * @param quotaCount - The number of requests to allow over the limit interval + * @param quotaTimeMs - The time interval over which the quotaCount applies + */ + constructor(quotaCount: number, quotaTimeMs: number) { + this.emissionInterval = quotaTimeMs / quotaCount; + this.limitInterval = quotaTimeMs; + this.vst = Date.now(); + } + + allow(): boolean { + const now = Date.now(); + + const newVst = Math.max(this.vst, now) + this.emissionInterval; + if (newVst - now <= this.limitInterval) { + this.vst = newVst; + return true; + } + + return false; + } +} + +interface PeerRateLimiter { + // The rate limiter for this peer + limiter: GCRARateLimiter; + // The last time the peer was accessed - used to determine if the peer is still connected + lastAccess: number; +} + +/** + * SubProtocolRateLimiter: A rate limiter for managing request rates on a per-peer and global basis for a specific subprotocol. + * + * This class provides a two-tier rate limiting system: + * 1. A global rate limit for all requests across all peers for this subprotocol. + * 2. Individual rate limits for each peer. + * + * How it works: + * - When a request comes in, it first checks against the global rate limit. + * - If the global limit allows, it then checks against the specific peer's rate limit. + * - The request is only allowed if both the global and peer-specific limits allow it. + * - It automatically creates and manages rate limiters for new peers as they make requests. + * - It periodically cleans up rate limiters for inactive peers to conserve memory. + * + * Note: Remember to call `start()` to begin the cleanup process and `stop()` when shutting down to clear the cleanup interval. + */ +export class SubProtocolRateLimiter { + private peerLimiters: Map = new Map(); + private globalLimiter: GCRARateLimiter; + private readonly peerQuotaCount: number; + private readonly peerQuotaTimeMs: number; + + constructor(peerQuotaCount: number, peerQuotaTimeMs: number, globalQuotaCount: number, globalQuotaTimeMs: number) { + this.peerLimiters = new Map(); + this.globalLimiter = new GCRARateLimiter(globalQuotaCount, globalQuotaTimeMs); + this.peerQuotaCount = peerQuotaCount; + this.peerQuotaTimeMs = peerQuotaTimeMs; + } + + allow(peerId: PeerId): boolean { + if (!this.globalLimiter.allow()) { + return false; + } + + const peerIdStr = peerId.toString(); + let peerLimiter: PeerRateLimiter | undefined = this.peerLimiters.get(peerIdStr); + if (!peerLimiter) { + // Create a limiter for this peer + peerLimiter = { + limiter: new GCRARateLimiter(this.peerQuotaCount, this.peerQuotaTimeMs), + lastAccess: Date.now(), + }; + this.peerLimiters.set(peerIdStr, peerLimiter); + } else { + peerLimiter.lastAccess = Date.now(); + } + return peerLimiter.limiter.allow(); + } + + cleanupInactivePeers() { + const now = Date.now(); + this.peerLimiters.forEach((peerLimiter, peerId) => { + if (now - peerLimiter.lastAccess > CHECK_DISCONNECTED_PEERS_INTERVAL_MS) { + this.peerLimiters.delete(peerId); + } + }); + } +} + +/** + * RequestResponseRateLimiter. + * + * A rate limiter that is protocol aware, then peer aware. + * SubProtocols can have their own global / peer level rate limits. + * + * How it works: + * - Initializes with a set of rate limit configurations for different subprotocols. + * - Creates a separate SubProtocolRateLimiter for each configured subprotocol. + * - When a request comes in, it routes the rate limiting decision to the appropriate subprotocol limiter. + * + * Usage: + * ``` + * const rateLimits = { + * subprotocol1: { peerLimit: { quotaCount: 10, quotaTimeMs: 1000 }, globalLimit: { quotaCount: 100, quotaTimeMs: 1000 } }, + * subprotocol2: { peerLimit: { quotaCount: 5, quotaTimeMs: 1000 }, globalLimit: { quotaCount: 50, quotaTimeMs: 1000 } } + * }; + * const limiter = new RequestResponseRateLimiter(rateLimits); + * + * Note: Ensure to call `stop()` when shutting down to properly clean up all subprotocol limiters. + */ +export class RequestResponseRateLimiter { + private subProtocolRateLimiters: Map; + + private cleanupInterval: NodeJS.Timeout | undefined = undefined; + + constructor(rateLimits: ReqRespSubProtocolRateLimits = DEFAULT_RATE_LIMITS) { + this.subProtocolRateLimiters = new Map(); + + for (const [subProtocol, protocolLimits] of Object.entries(rateLimits)) { + this.subProtocolRateLimiters.set( + subProtocol as ReqRespSubProtocol, + new SubProtocolRateLimiter( + protocolLimits.peerLimit.quotaCount, + protocolLimits.peerLimit.quotaTimeMs, + protocolLimits.globalLimit.quotaCount, + protocolLimits.globalLimit.quotaTimeMs, + ), + ); + } + } + + start() { + this.cleanupInterval = setInterval(() => { + this.cleanupInactivePeers(); + }, CHECK_DISCONNECTED_PEERS_INTERVAL_MS); + } + + allow(subProtocol: ReqRespSubProtocol, peerId: PeerId): boolean { + const limiter = this.subProtocolRateLimiters.get(subProtocol); + if (!limiter) { + // TODO: maybe throw an error here if no rate limiter is configured? + return true; + } + return limiter.allow(peerId); + } + + cleanupInactivePeers() { + this.subProtocolRateLimiters.forEach(limiter => limiter.cleanupInactivePeers()); + } + + /** + * Make sure to call destroy on each of the sub protocol rate limiters when cleaning up + */ + stop() { + clearInterval(this.cleanupInterval); + } +} diff --git a/yarn-project/p2p/src/service/reqresp/rate_limiter/rate_limits.ts b/yarn-project/p2p/src/service/reqresp/rate_limiter/rate_limits.ts new file mode 100644 index 00000000000..836505ad386 --- /dev/null +++ b/yarn-project/p2p/src/service/reqresp/rate_limiter/rate_limits.ts @@ -0,0 +1,35 @@ +import { PING_PROTOCOL, type ReqRespSubProtocolRateLimits, STATUS_PROTOCOL, TX_REQ_PROTOCOL } from '../interface.js'; + +// TODO(md): these defaults need to be tuned +export const DEFAULT_RATE_LIMITS: ReqRespSubProtocolRateLimits = { + [PING_PROTOCOL]: { + peerLimit: { + quotaTimeMs: 1000, + quotaCount: 5, + }, + globalLimit: { + quotaTimeMs: 1000, + quotaCount: 10, + }, + }, + [STATUS_PROTOCOL]: { + peerLimit: { + quotaTimeMs: 1000, + quotaCount: 5, + }, + globalLimit: { + quotaTimeMs: 1000, + quotaCount: 10, + }, + }, + [TX_REQ_PROTOCOL]: { + peerLimit: { + quotaTimeMs: 1000, + quotaCount: 5, + }, + globalLimit: { + quotaTimeMs: 1000, + quotaCount: 10, + }, + }, +}; diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.test.ts b/yarn-project/p2p/src/service/reqresp/reqresp.test.ts index ecd9a6824e9..7c57d3fa845 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.test.ts @@ -72,6 +72,30 @@ describe('ReqResp', () => { await stopNodes(nodes); }); + it('Should hit a rate limit if too many requests are made in quick succession', async () => { + const nodes = await createNodes(2); + + await startNodes(nodes); + + // Spy on the logger to make sure the error message is logged + const loggerSpy = jest.spyOn((nodes[1].req as any).logger, 'warn'); + + await sleep(500); + await connectToPeers(nodes); + await sleep(500); + + // Default rate is set at 1 every 200 ms; so this should fire a few times + for (let i = 0; i < 10; i++) { + await nodes[0].req.sendRequestToPeer(nodes[1].p2p.peerId, PING_PROTOCOL, Buffer.from('ping')); + } + + // Make sure the error message is logged + const errorMessage = `Rate limit exceeded for ${PING_PROTOCOL} from ${nodes[0].p2p.peerId.toString()}`; + expect(loggerSpy).toHaveBeenCalledWith(errorMessage); + + await stopNodes(nodes); + }); + describe('TX REQ PROTOCOL', () => { it('Can request a Tx from TxHash', async () => { const tx = mockTx(); diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.ts b/yarn-project/p2p/src/service/reqresp/reqresp.ts index 8c6e19e5d40..51f7a12eb8a 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.ts @@ -14,6 +14,7 @@ import { type ReqRespSubProtocol, type ReqRespSubProtocolHandlers, } from './interface.js'; +import { RequestResponseRateLimiter } from './rate_limiter/rate_limiter.js'; /** * The Request Response Service @@ -35,12 +36,15 @@ export class ReqResp { private individualRequestTimeoutMs: number; private subProtocolHandlers: ReqRespSubProtocolHandlers = DEFAULT_SUB_PROTOCOL_HANDLERS; + private rateLimiter: RequestResponseRateLimiter; constructor(config: P2PReqRespConfig, protected readonly libp2p: Libp2p) { this.logger = createDebugLogger('aztec:p2p:reqresp'); this.overallRequestTimeoutMs = config.overallRequestTimeoutMs; this.individualRequestTimeoutMs = config.individualRequestTimeoutMs; + + this.rateLimiter = new RequestResponseRateLimiter(); } /** @@ -52,6 +56,7 @@ export class ReqResp { for (const subProtocol of Object.keys(this.subProtocolHandlers)) { await this.libp2p.handle(subProtocol, this.streamHandler.bind(this, subProtocol as ReqRespSubProtocol)); } + this.rateLimiter.start(); } /** @@ -62,6 +67,7 @@ export class ReqResp { for (const protocol of Object.keys(this.subProtocolHandlers)) { await this.libp2p.unhandle(protocol); } + this.rateLimiter.stop(); await this.libp2p.stop(); this.abortController.abort(); } @@ -167,8 +173,16 @@ export class ReqResp { * * @param param0 - The incoming stream data */ - private async streamHandler(protocol: ReqRespSubProtocol, { stream }: IncomingStreamData) { + private async streamHandler(protocol: ReqRespSubProtocol, { stream, connection }: IncomingStreamData) { // Store a reference to from this for the async generator + if (!this.rateLimiter.allow(protocol, connection.remotePeer)) { + this.logger.warn(`Rate limit exceeded for ${protocol} from ${connection.remotePeer}`); + + // TODO(#8483): handle changing peer scoring for failed rate limit, maybe differentiate between global and peer limits here when punishing + await stream.close(); + return; + } + const handler = this.subProtocolHandlers[protocol]; try {