Skip to content

Commit

Permalink
Revert "Lodestar gossip queues to wrap processRpcMessage() (#3554)" (#…
Browse files Browse the repository at this point in the history
…3745)

This reverts commit fcbc459.
  • Loading branch information
twoeths authored Feb 12, 2022
1 parent 4292978 commit c6fe430
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 82 deletions.
59 changes: 17 additions & 42 deletions packages/lodestar/src/network/gossip/gossipsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,25 @@ import {computeStartSlotAtEpoch} from "@chainsafe/lodestar-beacon-state-transiti

import {IMetrics} from "../../metrics";
import {
GossipJobQueues,
GossipTopic,
GossipTopicMap,
GossipType,
GossipTypeMap,
ValidatorFnsByType,
GossipHandlers,
ProcessRpcMessageFnsByType,
GossipJobQueues,
} from "./interface";
import {getGossipSSZType, GossipTopicCache, stringifyGossipTopic} from "./topic";
import {computeMsgId, encodeMessageData, UncompressCache} from "./encoding";
import {DEFAULT_ENCODING} from "./constants";
import {GossipValidationError} from "./errors";
import {GOSSIP_MAX_SIZE} from "../../constants";
import {createProcessRpcMessageFnsByType, createValidatorFnsByType} from "./validation";
import {createValidatorFnsByType} from "./validation";
import {Map2d, Map2dArr} from "../../util/map";
import pipe from "it-pipe";
import PeerStreams from "libp2p-interfaces/src/pubsub/peer-streams";
import BufferList from "bl";
// import {RPC} from "libp2p-interfaces/src/pubsub/message/rpc";
import {RPC} from "libp2p-gossipsub/src/message/rpc";
import {normalizeInRpcMessage} from "libp2p-interfaces/src/pubsub/utils";

Expand Down Expand Up @@ -77,8 +77,6 @@ export class Eth2Gossipsub extends Gossipsub {
private readonly msgIdCache = new WeakMap<InMessage, Uint8Array>();

private readonly validatorFnsByType: ValidatorFnsByType;
// this wraps processRpcMessage() function in lodestar gossip queues
private readonly processRpcMessageFnsByType: ProcessRpcMessageFnsByType;

constructor(modules: IGossipsubModules) {
// Gossipsub parameters defined here:
Expand All @@ -101,21 +99,14 @@ export class Eth2Gossipsub extends Gossipsub {
// Note: We use the validator functions as handlers. No handler will be registered to gossipsub.
// libp2p-js layer will emit the message to an EventEmitter that won't be listened by anyone.
// TODO: Force to ensure there's a validatorFunction attached to every received topic.
this.validatorFnsByType = createValidatorFnsByType(gossipHandlers, {
const {validatorFnsByType, jobQueues} = createValidatorFnsByType(gossipHandlers, {
config,
logger,
uncompressCache: this.uncompressCache,
metrics,
signal,
});
// this.processRpcMessageFnsByType has the same logic to libp2p-gossipsub Gossipsub._processRpcMessage
// except that it wraps that logic in a queue
const {processRpcMessagesFnByType, jobQueues} = createProcessRpcMessageFnsByType(
super._processRpcMessage.bind(this),
signal,
metrics
);
this.processRpcMessageFnsByType = processRpcMessagesFnByType;
this.validatorFnsByType = validatorFnsByType;
this.jobQueues = jobQueues;

if (metrics) {
Expand Down Expand Up @@ -219,36 +210,20 @@ export class Eth2Gossipsub extends Gossipsub {
return true;
}

/**
* The same logic to libp2p-gossipsub Gossipsub._processRpcMessage() but we wrap its logic in queues,
* this is the entry point for lodestar gossip queue implementation, see the constructor for how we create the queue.
* libp2p-gossipsub Gossipsub._processRpcMessage() will then call libp2p-interface
* PubsubBaseProtocol._processRpcMessage()
* See https://github.com/ChainSafe/js-libp2p-gossipsub/blob/v0.11.1/ts/index.ts#L417
* which call lodestar Eth2Gossipsub.validate() in the end
* See https://github.com/libp2p/js-libp2p-interfaces/blob/libp2p-interfaces%401.0.1/packages/interfaces/src/pubsub/index.js#L442
*/
async _processRpcMessage(message: InMessage): Promise<void> {
// messages must have a single topicID
const topicStr = Array.isArray(message.topicIDs) ? message.topicIDs[0] : undefined;
if (!topicStr) {
// no need to send this to a queue
// the validate() function will handle message error
return super.validate(message);
}

// Execute the _processRpcMessage in a queue
const topic = this.gossipTopicCache.getTopic(topicStr);
await this.processRpcMessageFnsByType[topic.type](topic, message);
}
// // Snippet of _processRpcMessage from https://github.com/libp2p/js-libp2p-interfaces/blob/92245d66b0073f0a72fed9f7abcf4b533102f1fd/packages/interfaces/src/pubsub/index.js#L442
// async _processRpcMessage(msg: InMessage): Promise<void> {
// try {
// await this.validate(msg);
// } catch (err) {
// this.log("Message is invalid, dropping it. %O", err);
// return;
// }
// }

/**
* This is called from libp2p-interface PubsubBaseProtocol._processRpcMessage()
* See https://github.com/libp2p/js-libp2p-interfaces/blob/libp2p-interfaces%401.0.1/packages/interfaces/src/pubsub/index.js#L449
* @override https://github.com/libp2p/js-libp2p-interfaces/blob/libp2p-interfaces%401.0.1/packages/interfaces/src/pubsub/index.js#L567
* @override https://github.com/ChainSafe/js-libp2p-gossipsub/blob/v0.11.1/ts/index.ts#L436
* Note: this runs inside queues (see _processRpcMessage() above) and does not call super.
* All logic is re-implemented below.
* @override https://github.com/ChainSafe/js-libp2p-gossipsub/blob/3c3c46595f65823fcd7900ed716f43f76c6b355c/ts/index.ts#L436
* @override https://github.com/libp2p/js-libp2p-interfaces/blob/ff3bd10704a4c166ce63135747e3736915b0be8d/src/pubsub/index.js#L513
* Note: this does not call super. All logic is re-implemented below
*/
async validate(message: InMessage): Promise<void> {
try {
Expand Down
8 changes: 1 addition & 7 deletions packages/lodestar/src/network/gossip/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,7 @@ export type GossipValidatorFn = (topic: GossipTopic, message: InMessage, seenTim

export type ValidatorFnsByType = {[K in GossipType]: GossipValidatorFn};

export type ProcessRpcMessageTopicFn = (topic: GossipTopic, message: InMessage) => Promise<void>;

export type ProcessRpcMessageFn = (message: InMessage) => Promise<void>;

export type ProcessRpcMessageFnsByType = {[K in GossipType]: ProcessRpcMessageTopicFn};

export type GossipJobQueues = {[K in GossipType]: JobItemQueue<[GossipTopic, InMessage], void>};
export type GossipJobQueues = {[K in GossipType]: JobItemQueue<[GossipTopic, InMessage, number], void>};

export type GossipHandlerFn = (
object: GossipTypeMap[GossipType],
Expand Down
41 changes: 15 additions & 26 deletions packages/lodestar/src/network/gossip/validation/index.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
import {ERR_TOPIC_VALIDATOR_IGNORE, ERR_TOPIC_VALIDATOR_REJECT} from "libp2p-gossipsub/src/constants";
import {InMessage} from "libp2p-interfaces/src/pubsub";
import {AbortSignal} from "@chainsafe/abort-controller";
import {IChainForkConfig} from "@chainsafe/lodestar-config";
import {Json} from "@chainsafe/ssz";
import {ILogger, mapValues} from "@chainsafe/lodestar-utils";
import {IMetrics} from "../../../metrics";
import {getGossipSSZType} from "../topic";
import {
GossipJobQueues,
GossipType,
GossipValidatorFn,
ValidatorFnsByType,
GossipHandlers,
GossipHandlerFn,
ProcessRpcMessageFn,
GossipTopic,
ProcessRpcMessageFnsByType,
GossipJobQueues,
} from "../interface";
import {GossipValidationError} from "../errors";
import {GossipActionError, GossipAction} from "../../../chain/errors";
import {decodeMessageData, UncompressCache} from "../encoding";
import {createValidationQueues} from "./queue";
import {DEFAULT_ENCODING} from "../constants";
import {getGossipAcceptMetadataByType, GetGossipAcceptMetadataFn} from "./onAccept";
import {createProcessRpcMessageQueues} from "./queue";

type ValidatorFnModules = {
config: IChainForkConfig;
Expand All @@ -39,30 +35,23 @@ type ValidatorFnModules = {
export function createValidatorFnsByType(
gossipHandlers: GossipHandlers,
modules: ValidatorFnModules & {signal: AbortSignal}
): ValidatorFnsByType {
return mapValues(gossipHandlers, (gossipHandler, type) => {
): {validatorFnsByType: ValidatorFnsByType; jobQueues: GossipJobQueues} {
const gossipValidatorFns = mapValues(gossipHandlers, (gossipHandler, type) => {
return getGossipValidatorFn(gossipHandler, type, modules);
});
}

/**
* Return ProcessRpcMessageFnsByType for each GossipType, this wraps the parent processRpcMsgFn()
* (in js-libp2p-gossipsub) in a queue so that we only uncompress, compute message id, deserialize
* messages when we execute them.
*/
export function createProcessRpcMessageFnsByType(
processRpcMsgFn: ProcessRpcMessageFn,
signal: AbortSignal,
metrics: IMetrics | null
): {processRpcMessagesFnByType: ProcessRpcMessageFnsByType; jobQueues: GossipJobQueues} {
const jobQueues = createProcessRpcMessageQueues(processRpcMsgFn, signal, metrics);
const processRpcMessagesFnByType = mapValues(jobQueues, (jobQueue) => {
return async function processRpcMessageFnWithQueue(topic: GossipTopic, message: InMessage) {
await jobQueue.push(topic, message);
};
});
const jobQueues = createValidationQueues(gossipValidatorFns, modules.signal, modules.metrics);

const validatorFnsByType = mapValues(
jobQueues,
(jobQueue): GossipValidatorFn => {
return async function gossipValidatorFnWithQueue(topic, gossipMsg, seenTimestampsMs) {
await jobQueue.push(topic, gossipMsg, seenTimestampsMs);
};
}
);

return {processRpcMessagesFnByType, jobQueues};
return {jobQueues, validatorFnsByType};
}

/**
Expand Down
15 changes: 8 additions & 7 deletions packages/lodestar/src/network/gossip/validation/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {InMessage} from "libp2p-interfaces/src/pubsub";
import {mapValues} from "@chainsafe/lodestar-utils";
import {IMetrics} from "../../../metrics";
import {JobItemQueue, JobQueueOpts, QueueType} from "../../../util/queue";
import {GossipJobQueues, GossipTopic, GossipType, ProcessRpcMessageFn} from "../interface";
import {GossipJobQueues, GossipTopic, GossipType, ValidatorFnsByType} from "../interface";

/**
* Numbers from https://github.com/sigp/lighthouse/blob/b34a79dc0b02e04441ba01fd0f304d1e203d877d/beacon_node/network/src/beacon_processor/mod.rs#L69
Expand All @@ -22,10 +22,10 @@ const gossipQueueOpts: {[K in GossipType]: Pick<JobQueueOpts, "maxLength" | "typ
};

/**
* Wraps a _processRpcMessage() function with a queue, to limit the processing of gossip objects by type.
* Wraps a GossipValidatorFn with a queue, to limit the processing of gossip objects by type.
*
* A queue here is essential to protect against DOS attacks, where a peer may send many messages at once.
* Queues also protect the node against overloading. If the node gets busy with an expensive epoch transition,
* Queues also protect the node against overloading. If the node gets bussy with an expensive epoch transition,
* it may buffer too many gossip objects causing an Out of memory (OOM) error. With a queue the node will reject
* new objects to fit its current throughput.
*
Expand All @@ -37,14 +37,15 @@ const gossipQueueOpts: {[K in GossipType]: Pick<JobQueueOpts, "maxLength" | "typ
* By topic is too specific, so by type groups all similar objects in the same queue. All in the same won't allow
* to customize different queue behaviours per object type (see `gossipQueueOpts`).
*/
export function createProcessRpcMessageQueues(
processRpcMsgFn: ProcessRpcMessageFn,
export function createValidationQueues(
gossipValidatorFns: ValidatorFnsByType,
signal: AbortSignal,
metrics: IMetrics | null
): GossipJobQueues {
return mapValues(gossipQueueOpts, (opts, type) => {
return new JobItemQueue<[GossipTopic, InMessage], void>(
(topic, message) => processRpcMsgFn(message),
const gossipValidatorFn = gossipValidatorFns[type];
return new JobItemQueue<[GossipTopic, InMessage, number], void>(
(topic, message, seenTimestampsMs) => gossipValidatorFn(topic, message, seenTimestampsMs),
{signal, ...opts},
metrics
? {
Expand Down

0 comments on commit c6fe430

Please sign in to comment.