Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add peer scoring to req resp rate limits #8633

Merged
merged 5 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions yarn-project/p2p/src/mocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { bootstrap } from '@libp2p/bootstrap';
import { tcp } from '@libp2p/tcp';
import { type Libp2p, type Libp2pOptions, createLibp2p } from 'libp2p';

import { type PeerManager } from '../service/peer_manager.js';
import { type P2PReqRespConfig } from '../service/reqresp/config.js';
import { pingHandler, statusHandler } from '../service/reqresp/handlers.js';
import {
Expand Down Expand Up @@ -60,8 +61,8 @@ export const MOCK_SUB_PROTOCOL_HANDLERS: ReqRespSubProtocolHandlers = {
* @param numberOfNodes - the number of nodes to create
* @returns An array of the created nodes
*/
export const createNodes = async (numberOfNodes: number): Promise<ReqRespNode[]> => {
return await Promise.all(Array.from({ length: numberOfNodes }, () => createReqResp()));
export const createNodes = async (peerManager: PeerManager, numberOfNodes: number): Promise<ReqRespNode[]> => {
return await Promise.all(Array.from({ length: numberOfNodes }, () => createReqResp(peerManager)));
};

// TODO: think about where else this can go
Expand All @@ -79,13 +80,13 @@ export const stopNodes = async (nodes: ReqRespNode[]): Promise<void> => {
};

// Create a req resp node, exposing the underlying p2p node
export const createReqResp = async (): Promise<ReqRespNode> => {
export const createReqResp = async (peerManager: PeerManager): Promise<ReqRespNode> => {
const p2p = await createLibp2pNode();
const config: P2PReqRespConfig = {
overallRequestTimeoutMs: 4000,
individualRequestTimeoutMs: 2000,
};
const req = new ReqResp(config, p2p);
const req = new ReqResp(config, p2p, peerManager);
return {
p2p,
req,
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/p2p/src/service/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ export class LibP2PService implements P2PService {
return this.peerManager.getPeerScore(peerId);
};
this.node.services.pubsub.score.params.appSpecificWeight = 10;
this.reqresp = new ReqResp(config, node);
this.reqresp = new ReqResp(config, node, this.peerManager);

this.blockReceivedCallback = (block: BlockProposal): Promise<BlockAttestation | undefined> => {
this.logger.verbose(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { jest } from '@jest/globals';
import { type PeerId } from '@libp2p/interface';
import { type MockProxy, mock } from 'jest-mock-extended';

import { type PeerManager } from '../../peer_manager.js';
import { PeerErrorSeverity } from '../../peer_scoring.js';
import { PING_PROTOCOL, type ReqRespSubProtocolRateLimits, TX_REQ_PROTOCOL } from '../interface.js';
import { RequestResponseRateLimiter } from './rate_limiter.js';

Expand All @@ -20,6 +23,7 @@ const makePeer = (id: string): PeerId => {

describe('rate limiter', () => {
let rateLimiter: RequestResponseRateLimiter;
let peerManager: MockProxy<PeerManager>;

beforeEach(() => {
jest.useFakeTimers();
Expand All @@ -37,7 +41,10 @@ describe('rate limiter', () => {
},
},
} as ReqRespSubProtocolRateLimits; // force type as we will not provide descriptions of all protocols
rateLimiter = new RequestResponseRateLimiter(config);

peerManager = mock<PeerManager>();

rateLimiter = new RequestResponseRateLimiter(peerManager, config);
});

afterEach(() => {
Expand Down Expand Up @@ -67,29 +74,33 @@ describe('rate limiter', () => {
expect(rateLimiter.allow(TX_REQ_PROTOCOL, peerId)).toBe(true);
}
expect(rateLimiter.allow(TX_REQ_PROTOCOL, peerId)).toBe(false);

// Spy on the peer manager and check that penalizePeer is called
expect(peerManager.penalizePeer).toHaveBeenCalledWith(peerId, PeerErrorSeverity.MidToleranceError);
});

it('Should allow requests within the global limit', () => {
// Initial burst
const falingPeer = makePeer('nolettoinno');
for (let i = 0; i < 10; i++) {
expect(rateLimiter.allow(TX_REQ_PROTOCOL, makePeer(`peer${i}`))).toBe(true);
}
expect(rateLimiter.allow(TX_REQ_PROTOCOL, makePeer('nolettoinno'))).toBe(false);
expect(rateLimiter.allow(TX_REQ_PROTOCOL, falingPeer)).toBe(false);

// Smooth requests
for (let i = 0; i < 10; i++) {
jest.advanceTimersByTime(100);
expect(rateLimiter.allow(TX_REQ_PROTOCOL, makePeer(`peer${i}`))).toBe(true);
}
expect(rateLimiter.allow(TX_REQ_PROTOCOL, makePeer('nolettoinno'))).toBe(false);
expect(rateLimiter.allow(TX_REQ_PROTOCOL, falingPeer)).toBe(false);

// Reset after quota has passed
jest.advanceTimersByTime(1000);
// Second burst
for (let i = 0; i < 10; i++) {
expect(rateLimiter.allow(TX_REQ_PROTOCOL, makePeer(`peer${i}`))).toBe(true);
}
expect(rateLimiter.allow(TX_REQ_PROTOCOL, makePeer('nolettoinno'))).toBe(false);
expect(rateLimiter.allow(TX_REQ_PROTOCOL, falingPeer)).toBe(false);
});

it('Should reset after quota has passed', () => {
Expand Down Expand Up @@ -125,7 +136,7 @@ describe('rate limiter', () => {
},
},
} as ReqRespSubProtocolRateLimits;
const multiProtocolRateLimiter = new RequestResponseRateLimiter(config);
const multiProtocolRateLimiter = new RequestResponseRateLimiter(peerManager, config);

const peerId = makePeer('peer1');

Expand All @@ -145,7 +156,7 @@ describe('rate limiter', () => {
});

it('Should allow requests if no rate limiter is configured', () => {
const rateLimiter = new RequestResponseRateLimiter({} as ReqRespSubProtocolRateLimits);
const rateLimiter = new RequestResponseRateLimiter(peerManager, {} as ReqRespSubProtocolRateLimits);
expect(rateLimiter.allow(TX_REQ_PROTOCOL, makePeer('peer1'))).toBe(true);
});

Expand Down
37 changes: 30 additions & 7 deletions yarn-project/p2p/src/service/reqresp/rate_limiter/rate_limiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
import { type PeerId } from '@libp2p/interface';

import { type PeerManager } from '../../peer_manager.js';
import { PeerErrorSeverity } from '../../peer_scoring.js';
import { type ReqRespSubProtocol, type ReqRespSubProtocolRateLimits } from '../interface.js';
import { DEFAULT_RATE_LIMITS } from './rate_limits.js';

Expand Down Expand Up @@ -69,6 +71,12 @@ interface PeerRateLimiter {
lastAccess: number;
}

enum RateLimitStatus {
Allowed,
DeniedGlobal,
DeniedPeer,
}

/**
* SubProtocolRateLimiter: A rate limiter for managing request rates on a per-peer and global basis for a specific subprotocol.
*
Expand Down Expand Up @@ -98,9 +106,9 @@ export class SubProtocolRateLimiter {
this.peerQuotaTimeMs = peerQuotaTimeMs;
}

allow(peerId: PeerId): boolean {
allow(peerId: PeerId): RateLimitStatus {
if (!this.globalLimiter.allow()) {
return false;
return RateLimitStatus.DeniedGlobal;
}

const peerIdStr = peerId.toString();
Expand All @@ -115,7 +123,11 @@ export class SubProtocolRateLimiter {
} else {
peerLimiter.lastAccess = Date.now();
}
return peerLimiter.limiter.allow();
const peerLimitAllowed = peerLimiter.limiter.allow();
if (!peerLimitAllowed) {
return RateLimitStatus.DeniedPeer;
}
return RateLimitStatus.Allowed;
}

cleanupInactivePeers() {
Expand All @@ -138,14 +150,16 @@ export class SubProtocolRateLimiter {
* - Initializes with a set of rate limit configurations for different subprotocols.
* - Creates a separate SubProtocolRateLimiter for each configured subprotocol.
* - When a request comes in, it routes the rate limiting decision to the appropriate subprotocol limiter.
* - Peers who exceed their peer rate limits will be penalised by the peer manager.
*
* Usage:
* ```
* const peerManager = new PeerManager(...);
* const rateLimits = {
* subprotocol1: { peerLimit: { quotaCount: 10, quotaTimeMs: 1000 }, globalLimit: { quotaCount: 100, quotaTimeMs: 1000 } },
* subprotocol2: { peerLimit: { quotaCount: 5, quotaTimeMs: 1000 }, globalLimit: { quotaCount: 50, quotaTimeMs: 1000 } }
* };
* const limiter = new RequestResponseRateLimiter(rateLimits);
* const limiter = new RequestResponseRateLimiter(peerManager, rateLimits);
*
* Note: Ensure to call `stop()` when shutting down to properly clean up all subprotocol limiters.
*/
Expand All @@ -154,7 +168,7 @@ export class RequestResponseRateLimiter {

private cleanupInterval: NodeJS.Timeout | undefined = undefined;

constructor(rateLimits: ReqRespSubProtocolRateLimits = DEFAULT_RATE_LIMITS) {
constructor(private peerManager: PeerManager, rateLimits: ReqRespSubProtocolRateLimits = DEFAULT_RATE_LIMITS) {
this.subProtocolRateLimiters = new Map();

for (const [subProtocol, protocolLimits] of Object.entries(rateLimits)) {
Expand All @@ -179,10 +193,19 @@ export class RequestResponseRateLimiter {
allow(subProtocol: ReqRespSubProtocol, peerId: PeerId): boolean {
const limiter = this.subProtocolRateLimiters.get(subProtocol);
if (!limiter) {
// TODO: maybe throw an error here if no rate limiter is configured?
return true;
}
return limiter.allow(peerId);
const rateLimitStatus = limiter.allow(peerId);

switch (rateLimitStatus) {
case RateLimitStatus.DeniedPeer:
this.peerManager.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dang that was easy 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spyros did some good work

return false;
case RateLimitStatus.DeniedGlobal:
return false;
default:
return true;
}
}

cleanupInactivePeers() {
Expand Down
24 changes: 16 additions & 8 deletions yarn-project/p2p/src/service/reqresp/reqresp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,26 @@ import { TxHash, mockTx } from '@aztec/circuit-types';
import { sleep } from '@aztec/foundation/sleep';

import { describe, expect, it, jest } from '@jest/globals';
import { type MockProxy, mock } from 'jest-mock-extended';

import { CollectiveReqRespTimeoutError, IndiviualReqRespTimeoutError } from '../../errors/reqresp.error.js';
import { MOCK_SUB_PROTOCOL_HANDLERS, connectToPeers, createNodes, startNodes, stopNodes } from '../../mocks/index.js';
import { type PeerManager } from '../peer_manager.js';
import { PING_PROTOCOL, TX_REQ_PROTOCOL } from './interface.js';

// The Req Resp protocol should allow nodes to dial specific peers
// and ask for specific data that they missed via the traditional gossip protocol.
describe('ReqResp', () => {
let peerManager: MockProxy<PeerManager>;

beforeEach(() => {
peerManager = mock<PeerManager>();
});

it('Should perform a ping request', async () => {
// Create two nodes
// They need to discover each other
const nodes = await createNodes(2);
const nodes = await createNodes(peerManager, 2);
const { req: pinger } = nodes[0];

await startNodes(nodes);
Expand All @@ -32,7 +40,7 @@ describe('ReqResp', () => {
});

it('Should handle gracefully if a peer connected peer is offline', async () => {
const nodes = await createNodes(2);
const nodes = await createNodes(peerManager, 2);

const { req: pinger } = nodes[0];
const { req: ponger } = nodes[1];
Expand All @@ -53,7 +61,7 @@ describe('ReqResp', () => {
});

it('Should request from a later peer if other peers are offline', async () => {
const nodes = await createNodes(4);
const nodes = await createNodes(peerManager, 4);

await startNodes(nodes);
await sleep(500);
Expand All @@ -73,7 +81,7 @@ describe('ReqResp', () => {
});

it('Should hit a rate limit if too many requests are made in quick succession', async () => {
const nodes = await createNodes(2);
const nodes = await createNodes(peerManager, 2);

await startNodes(nodes);

Expand Down Expand Up @@ -110,7 +118,7 @@ describe('ReqResp', () => {
return Promise.resolve(Uint8Array.from(Buffer.from('')));
};

const nodes = await createNodes(2);
const nodes = await createNodes(peerManager, 2);

await startNodes(nodes, protocolHandlers);
await sleep(500);
Expand All @@ -133,7 +141,7 @@ describe('ReqResp', () => {
return Promise.resolve(Uint8Array.from(Buffer.from('')));
};

const nodes = await createNodes(2);
const nodes = await createNodes(peerManager, 2);

await startNodes(nodes, protocolHandlers);
await sleep(500);
Expand All @@ -147,7 +155,7 @@ describe('ReqResp', () => {
});

it('Should hit individual timeout if nothing is returned over the stream', async () => {
const nodes = await createNodes(2);
const nodes = await createNodes(peerManager, 2);

await startNodes(nodes);

Expand Down Expand Up @@ -175,7 +183,7 @@ describe('ReqResp', () => {
});

it('Should hit collective timeout if nothing is returned over the stream from multiple peers', async () => {
const nodes = await createNodes(4);
const nodes = await createNodes(peerManager, 4);

await startNodes(nodes);

Expand Down
5 changes: 3 additions & 2 deletions yarn-project/p2p/src/service/reqresp/reqresp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { type Libp2p } from 'libp2p';
import { type Uint8ArrayList } from 'uint8arraylist';

import { CollectiveReqRespTimeoutError, IndiviualReqRespTimeoutError } from '../../errors/reqresp.error.js';
import { type PeerManager } from '../peer_manager.js';
import { type P2PReqRespConfig } from './config.js';
import {
DEFAULT_SUB_PROTOCOL_HANDLERS,
Expand Down Expand Up @@ -38,13 +39,13 @@ export class ReqResp {
private subProtocolHandlers: ReqRespSubProtocolHandlers = DEFAULT_SUB_PROTOCOL_HANDLERS;
private rateLimiter: RequestResponseRateLimiter;

constructor(config: P2PReqRespConfig, protected readonly libp2p: Libp2p) {
constructor(config: P2PReqRespConfig, protected readonly libp2p: Libp2p, peerManager: PeerManager) {
this.logger = createDebugLogger('aztec:p2p:reqresp');

this.overallRequestTimeoutMs = config.overallRequestTimeoutMs;
this.individualRequestTimeoutMs = config.individualRequestTimeoutMs;

this.rateLimiter = new RequestResponseRateLimiter();
this.rateLimiter = new RequestResponseRateLimiter(peerManager);
}

/**
Expand Down
Loading