Skip to content

Commit

Permalink
Merge 2d60bf1 into a876466
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths authored Dec 9, 2021
2 parents a876466 + 2d60bf1 commit 98b058a
Show file tree
Hide file tree
Showing 19 changed files with 614 additions and 11 deletions.
40 changes: 40 additions & 0 deletions packages/cli/src/options/beaconNodeOptions/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ export interface INetworkArgs {
"network.subscribeAllSubnets": boolean;
"network.connectToDiscv5Bootnodes": boolean;
"network.discv5FirstQueryDelayMs": number;
"network.requestCountPeerLimit": number;
"network.blockCountTotalLimit": number;
"network.blockCountPeerLimit": number;
"network.rateTrackerTimeoutMs": number;
}

export function parseArgs(args: INetworkArgs): IBeaconNodeOptions["network"] {
Expand All @@ -30,6 +34,10 @@ export function parseArgs(args: INetworkArgs): IBeaconNodeOptions["network"] {
subscribeAllSubnets: args["network.subscribeAllSubnets"],
connectToDiscv5Bootnodes: args["network.connectToDiscv5Bootnodes"],
discv5FirstQueryDelayMs: args["network.discv5FirstQueryDelayMs"],
requestCountPeerLimit: args["network.requestCountPeerLimit"],
blockCountTotalLimit: args["network.blockCountTotalLimit"],
blockCountPeerLimit: args["network.blockCountPeerLimit"],
rateTrackerTimeoutMs: args["network.rateTrackerTimeoutMs"],
};
}

Expand Down Expand Up @@ -104,4 +112,36 @@ export const options: ICliCommandOptions<INetworkArgs> = {
defaultDescription: String(defaultOptions.network.discv5FirstQueryDelayMs),
group: "network",
},

"network.requestCountPeerLimit": {
type: "number",
description: "Max block req/resp requests per peer per rateTrackerTimeoutMs",
hidden: true,
defaultDescription: String(defaultOptions.network.requestCountPeerLimit),
group: "network",
},

"network.blockCountTotalLimit": {
type: "number",
description: "Max block count requested per rateTrackerTimeoutMs",
hidden: true,
defaultDescription: String(defaultOptions.network.blockCountTotalLimit),
group: "network",
},

"network.blockCountPeerLimit": {
type: "number",
description: "Max block count requested per peer per rateTrackerTimeoutMs",
hidden: true,
defaultDescription: String(defaultOptions.network.blockCountPeerLimit),
group: "network",
},

"network.rateTrackerTimeoutMs": {
type: "number",
description: "Time window to track rate limit in milli seconds",
hidden: true,
defaultDescription: String(defaultOptions.network.rateTrackerTimeoutMs),
group: "network",
},
};
9 changes: 8 additions & 1 deletion packages/cli/test/unit/options/beaconNodeOptions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ describe("options / beaconNodeOptions", () => {
"network.subscribeAllSubnets": true,
"network.connectToDiscv5Bootnodes": true,
"network.discv5FirstQueryDelayMs": 1000,

"network.requestCountPeerLimit": 5,
"network.blockCountTotalLimit": 1000,
"network.blockCountPeerLimit": 500,
"network.rateTrackerTimeoutMs": 60000,
"sync.isSingleNode": true,
"sync.disableProcessAsChainSegment": true,
} as IBeaconNodeArgs;
Expand Down Expand Up @@ -102,6 +105,10 @@ describe("options / beaconNodeOptions", () => {
subscribeAllSubnets: true,
connectToDiscv5Bootnodes: true,
discv5FirstQueryDelayMs: 1000,
requestCountPeerLimit: 5,
blockCountTotalLimit: 1000,
blockCountPeerLimit: 500,
rateTrackerTimeoutMs: 60000,
},
sync: {
isSingleNode: true,
Expand Down
4 changes: 4 additions & 0 deletions packages/lodestar/src/constants/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ export enum RespStatus {
* The responder does not have requested resource. The response payload adheres to the ErrorMessage schema (described below). Note: This response code is only valid as a response to BlocksByRange
*/
RESOURCE_UNAVAILABLE = 3,
/**
* Our node does not have bandwidth to serve requests due to either per-peer quota or total quota.
*/
RATE_LIMITED = 139,
}

export type RpcResponseStatusError = Exclude<RespStatus, RespStatus.SUCCESS>;
Expand Down
5 changes: 5 additions & 0 deletions packages/lodestar/src/metrics/metrics/beacon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ export function createBeaconMetrics(register: RegistryMetricCreator) {
name: "beacon_reqresp_dial_errors_total",
help: "Count total dial errors",
}),
reqRespRateLimitErrors: register.gauge<"tracker">({
name: "beacon_reqresp_rate_limiter_errors_total",
help: "Count rate limiter errors",
labelNames: ["tracker"],
}),

blockProductionTime: register.histogram({
name: "beacon_block_production_seconds",
Expand Down
4 changes: 3 additions & 1 deletion packages/lodestar/src/network/options.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import {ENR, IDiscv5DiscoveryInputOptions} from "@chainsafe/discv5";
import {PeerManagerOpts} from "./peers";
import {defaultRateLimiterOpts, RateLimiterOpts} from "./reqresp/response/rateLimiter";

export interface INetworkOptions extends PeerManagerOpts {
export interface INetworkOptions extends PeerManagerOpts, RateLimiterOpts {
localMultiaddrs: string[];
bootMultiaddrs?: string[];
subscribeAllSubnets?: boolean;
Expand All @@ -23,4 +24,5 @@ export const defaultNetworkOptions: INetworkOptions = {
localMultiaddrs: ["/ip4/0.0.0.0/tcp/9000"],
bootMultiaddrs: [],
discv5: defaultDiscv5Options,
...defaultRateLimiterOpts,
};
1 change: 1 addition & 0 deletions packages/lodestar/src/network/peers/peerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ export class PeerManager {

this.logger.verbose("peer disconnected", {peer: prettyPrintPeerId(peer), direction, status});
this.networkEventBus.emit(NetworkEvent.peerDisconnected, peer);
this.reqResp.pruneRateLimiterData(peer);
this.metrics?.peerDisconnectedEvent.inc({direction});
};

Expand Down
19 changes: 19 additions & 0 deletions packages/lodestar/src/network/reqresp/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {MetadataController} from "../metadata";
import {INetworkEventBus} from "../events";
import {ReqRespHandlers} from "./handlers";
import {IMetrics} from "../../metrics";
import {RequestTypedContainer} from "./types";

export interface IReqResp {
start(): void;
Expand All @@ -22,6 +23,7 @@ export interface IReqResp {
request: phase0.BeaconBlocksByRangeRequest
): Promise<allForks.SignedBeaconBlock[]>;
beaconBlocksByRoot(peerId: PeerId, request: phase0.BeaconBlocksByRootRequest): Promise<allForks.SignedBeaconBlock[]>;
pruneRateLimiterData(peerId: PeerId): void;
}

export interface IReqRespModules {
Expand Down Expand Up @@ -69,3 +71,20 @@ export type Libp2pStream = {
*/
abort: (err: Error) => void;
};

/**
* Rate limiter interface for inbound and outbound requests.
*/
export interface IRateLimiter {
/**
* Allow to request or response based on rate limit params configured.
*/
allowRequest(peerId: PeerId, requestTyped: RequestTypedContainer): boolean;

/**
* Prune by peer id
*/
prune(peerId: PeerId): void;
start(): void;
stop(): void;
}
65 changes: 65 additions & 0 deletions packages/lodestar/src/network/reqresp/rateTracker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import {MapDef} from "../../util/map";

type RateTrackerOpts = {
limit: number;
timeoutMs: number;
};

const BUCKET_SIZE_MS = 1000;

/**
* The generic rate tracker allows up to `limit` objects in a period of time.
* This could apply to both request count or block count, for both requests and responses.
*/
export class RateTracker {
private requestsWithinWindow = 0;
private limit: number;
private timeoutMs: number;
/** Key as time in second and value as object requested */
private requests: MapDef<number, number>;

constructor(opts: RateTrackerOpts, requests = new MapDef<number, number>(() => 0)) {
this.limit = opts.limit;
this.timeoutMs = opts.timeoutMs;
this.requests = requests;
}

requestObjects(objectCount: number): number {
if (objectCount <= 0) throw Error("Invalid objectCount " + objectCount);
this.prune();
if (this.requestsWithinWindow >= this.limit) {
return 0;
}

this.requestsWithinWindow += objectCount;
const key = Math.floor(Date.now() / BUCKET_SIZE_MS);
const curObjectCount = this.requests.getOrDefault(key);
this.requests.set(key, curObjectCount + objectCount);

return objectCount;
}

getRequestedObjectsWithinWindow(): number {
return this.requestsWithinWindow;
}

private prune(): void {
const now = Date.now();

for (const [timeInSec, count] of this.requests.entries()) {
// reclaim the quota for old requests
if (now - timeInSec * BUCKET_SIZE_MS >= this.timeoutMs) {
this.requestsWithinWindow -= count;
this.requests.delete(timeInSec);
} else {
// Break after the first entry within the timeout window.
// Since the entries are added in order, all the rest will be within the window.
break;
}
}

if (this.requestsWithinWindow < 0) {
this.requestsWithinWindow = 0;
}
}
}
21 changes: 17 additions & 4 deletions packages/lodestar/src/network/reqresp/reqResp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import {ILogger} from "@chainsafe/lodestar-utils";
import {AbortController} from "@chainsafe/abort-controller";
import LibP2p from "libp2p";
import PeerId from "peer-id";
import {timeoutOptions} from "../../constants";
import {IReqResp, IReqRespModules, Libp2pStream} from "./interface";
import {RespStatus, timeoutOptions} from "../../constants";
import {IReqResp, IReqRespModules, IRateLimiter, Libp2pStream} from "./interface";
import {sendRequest} from "./request";
import {handleRequest} from "./response";
import {handleRequest, ResponseError} from "./response";
import {onOutgoingReqRespError} from "./score";
import {IPeerMetadataStore, IPeerRpcScoreStore} from "../peers";
import {assertSequentialBlocksInRange, formatProtocolId} from "./utils";
Expand All @@ -33,6 +33,7 @@ import {
protocolsSupported,
IncomingResponseBody,
} from "./types";
import {InboundRateLimiter, RateLimiterOpts} from "./response/rateLimiter";

export type IReqRespOptions = Partial<typeof timeoutOptions>;

Expand All @@ -49,21 +50,23 @@ export class ReqResp implements IReqResp {
private metadataController: MetadataController;
private peerMetadata: IPeerMetadataStore;
private peerRpcScores: IPeerRpcScoreStore;
private inboundRateLimiter: IRateLimiter;
private networkEventBus: INetworkEventBus;
private controller = new AbortController();
private options?: IReqRespOptions;
private reqCount = 0;
private respCount = 0;
private metrics: IMetrics | null;

constructor(modules: IReqRespModules, options?: IReqRespOptions) {
constructor(modules: IReqRespModules, options: IReqRespOptions & RateLimiterOpts) {
this.config = modules.config;
this.libp2p = modules.libp2p;
this.logger = modules.logger;
this.reqRespHandlers = modules.reqRespHandlers;
this.peerMetadata = modules.peerMetadata;
this.metadataController = modules.metadata;
this.peerRpcScores = modules.peerRpcScores;
this.inboundRateLimiter = new InboundRateLimiter(options, {...modules});
this.networkEventBus = modules.networkEventBus;
this.options = options;
this.metrics = modules.metrics;
Expand All @@ -77,13 +80,15 @@ export class ReqResp implements IReqResp {
(this.getRequestHandler({method, version, encoding}) as unknown) as (props: HandlerProps) => void
);
}
this.inboundRateLimiter.start();
}

stop(): void {
for (const [method, version, encoding] of protocolsSupported) {
this.libp2p.unhandle(formatProtocolId(method, version, encoding));
}
this.controller.abort();
this.inboundRateLimiter.stop();
}

async status(peerId: PeerId, request: phase0.Status): Promise<phase0.Status> {
Expand Down Expand Up @@ -132,6 +137,10 @@ export class ReqResp implements IReqResp {
);
}

pruneRateLimiterData(peerId: PeerId): void {
this.inboundRateLimiter.prune(peerId);
}

// Helper to reduce code duplication
private async sendRequest<T extends IncomingResponseBody | IncomingResponseBody[]>(
peerId: PeerId,
Expand Down Expand Up @@ -213,6 +222,10 @@ export class ReqResp implements IReqResp {
): AsyncIterable<OutgoingResponseBody> {
const requestTyped = {method: protocol.method, body: requestBody} as RequestTypedContainer;

if (requestTyped.method !== Method.Goodbye && !this.inboundRateLimiter.allowRequest(peerId, requestTyped)) {
throw new ResponseError(RespStatus.RATE_LIMITED, "rate limit");
}

switch (requestTyped.method) {
case Method.Ping:
yield this.metadataController.seqNumber;
Expand Down
Loading

0 comments on commit 98b058a

Please sign in to comment.