Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement rate limit #3454

Merged
merged 11 commits into from
Dec 15, 2021
40 changes: 40 additions & 0 deletions packages/cli/src/options/beaconNodeOptions/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ export interface INetworkArgs {
"network.subscribeAllSubnets": boolean;
"network.connectToDiscv5Bootnodes": boolean;
"network.discv5FirstQueryDelayMs": number;
"network.requestCountPeerLimit": number;
"network.blockCountTotalLimit": number;
"network.blockCountPeerLimit": number;
"network.rateTrackerTimeoutMs": number;
}

export function parseArgs(args: INetworkArgs): IBeaconNodeOptions["network"] {
Expand All @@ -30,6 +34,10 @@ export function parseArgs(args: INetworkArgs): IBeaconNodeOptions["network"] {
subscribeAllSubnets: args["network.subscribeAllSubnets"],
connectToDiscv5Bootnodes: args["network.connectToDiscv5Bootnodes"],
discv5FirstQueryDelayMs: args["network.discv5FirstQueryDelayMs"],
requestCountPeerLimit: args["network.requestCountPeerLimit"],
blockCountTotalLimit: args["network.blockCountTotalLimit"],
blockCountPeerLimit: args["network.blockCountPeerLimit"],
rateTrackerTimeoutMs: args["network.rateTrackerTimeoutMs"],
};
}

Expand Down Expand Up @@ -104,4 +112,36 @@ export const options: ICliCommandOptions<INetworkArgs> = {
defaultDescription: String(defaultOptions.network.discv5FirstQueryDelayMs),
group: "network",
},

"network.requestCountPeerLimit": {
type: "number",
description: "Max block req/resp requests per peer per rateTrackerTimeoutMs",
hidden: true,
defaultDescription: String(defaultOptions.network.requestCountPeerLimit),
group: "network",
},

"network.blockCountTotalLimit": {
type: "number",
description: "Max block count requested per rateTrackerTimeoutMs",
hidden: true,
defaultDescription: String(defaultOptions.network.blockCountTotalLimit),
group: "network",
},

"network.blockCountPeerLimit": {
type: "number",
description: "Max block count requested per peer per rateTrackerTimeoutMs",
hidden: true,
defaultDescription: String(defaultOptions.network.blockCountPeerLimit),
group: "network",
},

"network.rateTrackerTimeoutMs": {
type: "number",
twoeths marked this conversation as resolved.
Show resolved Hide resolved
description: "Time window to track rate limit in milli seconds",
hidden: true,
defaultDescription: String(defaultOptions.network.rateTrackerTimeoutMs),
group: "network",
},
};
9 changes: 8 additions & 1 deletion packages/cli/test/unit/options/beaconNodeOptions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ describe("options / beaconNodeOptions", () => {
"network.subscribeAllSubnets": true,
"network.connectToDiscv5Bootnodes": true,
"network.discv5FirstQueryDelayMs": 1000,

"network.requestCountPeerLimit": 5,
"network.blockCountTotalLimit": 1000,
"network.blockCountPeerLimit": 500,
"network.rateTrackerTimeoutMs": 60000,
"sync.isSingleNode": true,
"sync.disableProcessAsChainSegment": true,
} as IBeaconNodeArgs;
Expand Down Expand Up @@ -102,6 +105,10 @@ describe("options / beaconNodeOptions", () => {
subscribeAllSubnets: true,
connectToDiscv5Bootnodes: true,
discv5FirstQueryDelayMs: 1000,
requestCountPeerLimit: 5,
blockCountTotalLimit: 1000,
blockCountPeerLimit: 500,
rateTrackerTimeoutMs: 60000,
},
sync: {
isSingleNode: true,
Expand Down
4 changes: 4 additions & 0 deletions packages/lodestar/src/constants/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ export enum RespStatus {
* The responder does not have requested resource. The response payload adheres to the ErrorMessage schema (described below). Note: This response code is only valid as a response to BlocksByRange
*/
RESOURCE_UNAVAILABLE = 3,
/**
* Our node does not have bandwidth to serve requests due to either per-peer quota or total quota.
*/
RATE_LIMITED = 139,
}

export type RpcResponseStatusError = Exclude<RespStatus, RespStatus.SUCCESS>;
Expand Down
5 changes: 5 additions & 0 deletions packages/lodestar/src/metrics/metrics/beacon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ export function createBeaconMetrics(register: RegistryMetricCreator) {
name: "beacon_reqresp_dial_errors_total",
help: "Count total dial errors",
}),
reqRespRateLimitErrors: register.gauge<"tracker">({
name: "beacon_reqresp_rate_limiter_errors_total",
help: "Count rate limiter errors",
labelNames: ["tracker"],
}),

blockProductionTime: register.histogram({
name: "beacon_block_production_seconds",
Expand Down
4 changes: 3 additions & 1 deletion packages/lodestar/src/network/options.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import {ENR, IDiscv5DiscoveryInputOptions} from "@chainsafe/discv5";
import {PeerManagerOpts} from "./peers";
import {defaultRateLimiterOpts, RateLimiterOpts} from "./reqresp/response/rateLimiter";

export interface INetworkOptions extends PeerManagerOpts {
export interface INetworkOptions extends PeerManagerOpts, RateLimiterOpts {
localMultiaddrs: string[];
bootMultiaddrs?: string[];
subscribeAllSubnets?: boolean;
Expand All @@ -23,4 +24,5 @@ export const defaultNetworkOptions: INetworkOptions = {
localMultiaddrs: ["/ip4/0.0.0.0/tcp/9000"],
bootMultiaddrs: [],
discv5: defaultDiscv5Options,
...defaultRateLimiterOpts,
};
1 change: 1 addition & 0 deletions packages/lodestar/src/network/peers/peerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ export class PeerManager {

this.logger.verbose("peer disconnected", {peer: prettyPrintPeerId(peer), direction, status});
this.networkEventBus.emit(NetworkEvent.peerDisconnected, peer);
this.reqResp.pruneRateLimiterData(peer);
this.metrics?.peerDisconnectedEvent.inc({direction});
};

Expand Down
19 changes: 19 additions & 0 deletions packages/lodestar/src/network/reqresp/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {MetadataController} from "../metadata";
import {INetworkEventBus} from "../events";
import {ReqRespHandlers} from "./handlers";
import {IMetrics} from "../../metrics";
import {RequestTypedContainer} from "./types";

export interface IReqResp {
start(): void;
Expand All @@ -22,6 +23,7 @@ export interface IReqResp {
request: phase0.BeaconBlocksByRangeRequest
): Promise<allForks.SignedBeaconBlock[]>;
beaconBlocksByRoot(peerId: PeerId, request: phase0.BeaconBlocksByRootRequest): Promise<allForks.SignedBeaconBlock[]>;
pruneRateLimiterData(peerId: PeerId): void;
}

export interface IReqRespModules {
Expand Down Expand Up @@ -69,3 +71,20 @@ export type Libp2pStream = {
*/
abort: (err: Error) => void;
};

/**
* Rate limiter interface for inbound and outbound requests.
*/
export interface IRateLimiter {
/**
* Allow to request or response based on rate limit params configured.
*/
allowRequest(peerId: PeerId, requestTyped: RequestTypedContainer): boolean;

/**
* Prune by peer id
*/
prune(peerId: PeerId): void;
start(): void;
stop(): void;
}
65 changes: 65 additions & 0 deletions packages/lodestar/src/network/reqresp/rateTracker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import {MapDef} from "../../util/map";

type RateTrackerOpts = {
limit: number;
timeoutMs: number;
};

const BUCKET_SIZE_MS = 1000;

/**
* The generic rate tracker allows up to `limit` objects in a period of time.
* This could apply to both request count or block count, for both requests and responses.
*/
export class RateTracker {
private requestsWithinWindow = 0;
private limit: number;
private timeoutMs: number;
/** Key as time in second and value as object requested */
private requests: MapDef<number, number>;

constructor(opts: RateTrackerOpts, requests = new MapDef<number, number>(() => 0)) {
this.limit = opts.limit;
this.timeoutMs = opts.timeoutMs;
this.requests = requests;
}

requestObjects(objectCount: number): number {
if (objectCount <= 0) throw Error("Invalid objectCount " + objectCount);
this.prune();
if (this.requestsWithinWindow >= this.limit) {
return 0;
}

this.requestsWithinWindow += objectCount;
const key = Math.floor(Date.now() / BUCKET_SIZE_MS);
const curObjectCount = this.requests.getOrDefault(key);
this.requests.set(key, curObjectCount + objectCount);

return objectCount;
}

getRequestedObjectsWithinWindow(): number {
return this.requestsWithinWindow;
}

private prune(): void {
const now = Date.now();

for (const [timeInSec, count] of this.requests.entries()) {
// reclaim the quota for old requests
if (now - timeInSec * BUCKET_SIZE_MS >= this.timeoutMs) {
this.requestsWithinWindow -= count;
this.requests.delete(timeInSec);
} else {
// Break after the first entry within the timeout window.
// Since the entries are added in order, all the rest will be within the window.
break;
}
twoeths marked this conversation as resolved.
Show resolved Hide resolved
}

if (this.requestsWithinWindow < 0) {
this.requestsWithinWindow = 0;
}
}
}
21 changes: 17 additions & 4 deletions packages/lodestar/src/network/reqresp/reqResp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import {ILogger} from "@chainsafe/lodestar-utils";
import {AbortController} from "@chainsafe/abort-controller";
import LibP2p from "libp2p";
import PeerId from "peer-id";
import {timeoutOptions} from "../../constants";
import {IReqResp, IReqRespModules, Libp2pStream} from "./interface";
import {RespStatus, timeoutOptions} from "../../constants";
import {IReqResp, IReqRespModules, IRateLimiter, Libp2pStream} from "./interface";
import {sendRequest} from "./request";
import {handleRequest} from "./response";
import {handleRequest, ResponseError} from "./response";
import {onOutgoingReqRespError} from "./score";
import {IPeerMetadataStore, IPeerRpcScoreStore} from "../peers";
import {assertSequentialBlocksInRange, formatProtocolId} from "./utils";
Expand All @@ -33,6 +33,7 @@ import {
protocolsSupported,
IncomingResponseBody,
} from "./types";
import {InboundRateLimiter, RateLimiterOpts} from "./response/rateLimiter";

export type IReqRespOptions = Partial<typeof timeoutOptions>;

Expand All @@ -49,21 +50,23 @@ export class ReqResp implements IReqResp {
private metadataController: MetadataController;
private peerMetadata: IPeerMetadataStore;
private peerRpcScores: IPeerRpcScoreStore;
private inboundRateLimiter: IRateLimiter;
private networkEventBus: INetworkEventBus;
private controller = new AbortController();
private options?: IReqRespOptions;
private reqCount = 0;
private respCount = 0;
private metrics: IMetrics | null;

constructor(modules: IReqRespModules, options?: IReqRespOptions) {
constructor(modules: IReqRespModules, options: IReqRespOptions & RateLimiterOpts) {
this.config = modules.config;
this.libp2p = modules.libp2p;
this.logger = modules.logger;
this.reqRespHandlers = modules.reqRespHandlers;
this.peerMetadata = modules.peerMetadata;
this.metadataController = modules.metadata;
this.peerRpcScores = modules.peerRpcScores;
this.inboundRateLimiter = new InboundRateLimiter(options, {...modules});
this.networkEventBus = modules.networkEventBus;
this.options = options;
this.metrics = modules.metrics;
Expand All @@ -77,13 +80,15 @@ export class ReqResp implements IReqResp {
(this.getRequestHandler({method, version, encoding}) as unknown) as (props: HandlerProps) => void
);
}
this.inboundRateLimiter.start();
}

stop(): void {
for (const [method, version, encoding] of protocolsSupported) {
this.libp2p.unhandle(formatProtocolId(method, version, encoding));
}
this.controller.abort();
this.inboundRateLimiter.stop();
}

async status(peerId: PeerId, request: phase0.Status): Promise<phase0.Status> {
Expand Down Expand Up @@ -132,6 +137,10 @@ export class ReqResp implements IReqResp {
);
}

pruneRateLimiterData(peerId: PeerId): void {
this.inboundRateLimiter.prune(peerId);
}

// Helper to reduce code duplication
private async sendRequest<T extends IncomingResponseBody | IncomingResponseBody[]>(
peerId: PeerId,
Expand Down Expand Up @@ -213,6 +222,10 @@ export class ReqResp implements IReqResp {
): AsyncIterable<OutgoingResponseBody> {
const requestTyped = {method: protocol.method, body: requestBody} as RequestTypedContainer;

if (requestTyped.method !== Method.Goodbye && !this.inboundRateLimiter.allowRequest(peerId, requestTyped)) {
throw new ResponseError(RespStatus.RATE_LIMITED, "rate limit");
}

switch (requestTyped.method) {
case Method.Ping:
yield this.metadataController.seqNumber;
Expand Down
Loading