Skip to content

Commit

Permalink
feat: block network processor when processing current slot block (#5458)
Browse files Browse the repository at this point in the history
* feat: block network processor if processing current slot block

* fix: remove default maxGossipTopicConcurrency value

* fix: move isProcessingCurrentSlotBlock to network processor
  • Loading branch information
twoeths authored May 4, 2023
1 parent 597996a commit a24ada9
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 10 deletions.
3 changes: 2 additions & 1 deletion packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ export function createLodestarMetrics(
help: "Total calls to network processor execute work fn",
buckets: [0, 1, 5, 128],
}),
canNotAcceptWork: register.gauge({
canNotAcceptWork: register.gauge<"reason">({
name: "lodestar_network_processor_can_not_accept_work_total",
help: "Total times network processor can not accept work on executeWork",
labelNames: ["reason"],
}),
},

Expand Down
3 changes: 0 additions & 3 deletions packages/beacon-node/src/network/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,5 @@ export const defaultNetworkOptions: NetworkOptions = {
// TODO: this value is 12 per spec, however lodestar has performance issue if there are too many mesh peers
// see https://github.com/ChainSafe/lodestar/issues/5420
gossipsubDHigh: 9,
// TODO: with this value, lodestar drops about 35% of attestation messages on a test mainnet node subscribed to all subnets
// see https://github.com/ChainSafe/lodestar/issues/5441
maxGossipTopicConcurrency: 512,
...defaultGossipHandlerOpts,
};
61 changes: 55 additions & 6 deletions packages/beacon-node/src/network/processor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,24 @@ enum ReprocessRejectReason {
expired = "expired",
}

/**
* Cannot accept work reason for metrics
*/
enum CannotAcceptWorkReason {
/**
* Validating or procesing gossip block at current slot.
*/
processingCurrentSlotBlock = "processing_current_slot_block",
/**
* bls is busy.
*/
bls = "bls_busy",
/**
* regen is busy.
*/
regen = "regen_busy",
}

/**
* Network processor handles the gossip queues and throtles processing to not overload the main thread
* - Decides when to process work and what to process
Expand Down Expand Up @@ -118,6 +136,7 @@ export class NetworkProcessor {
// to be stored in this Map and reprocessed once the block comes
private readonly awaitingGossipsubMessagesByRootBySlot: MapDef<Slot, MapDef<RootHex, Set<PendingGossipsubMessage>>>;
private unknownBlockGossipsubMessagesCount = 0;
private isProcessingCurrentSlotBlock = false;

constructor(modules: NetworkProcessorModules, private readonly opts: NetworkProcessorOpts) {
const {chain, events, logger, metrics} = modules;
Expand Down Expand Up @@ -173,7 +192,8 @@ export class NetworkProcessor {
}

private onPendingGossipsubMessage(message: PendingGossipsubMessage): void {
const extractBlockSlotRootFn = this.extractBlockSlotRootFns[message.topic.type];
const topicType = message.topic.type;
const extractBlockSlotRootFn = this.extractBlockSlotRootFns[topicType];
// check block root of Attestation and SignedAggregateAndProof messages
if (extractBlockSlotRootFn) {
const slotRoot = extractBlockSlotRootFn(message.msg.data);
Expand All @@ -185,11 +205,18 @@ export class NetworkProcessor {
if (slot < this.chain.clock.currentSlot - EARLIEST_PERMISSABLE_SLOT_DISTANCE) {
// TODO: Should report the dropped job to gossip? It will be eventually pruned from the mcache
this.metrics?.networkProcessor.gossipValidationError.inc({
topic: message.topic.type,
topic: topicType,
error: GossipErrorCode.PAST_SLOT,
});
return;
}
if (
slot === this.chain.clock.currentSlot &&
(topicType === GossipType.beacon_block || topicType === GossipType.beacon_block_and_blobs_sidecar)
) {
// in the worse case if the current slot block is not valid, this will be reset in the next slot
this.isProcessingCurrentSlotBlock = true;
}
message.msgSlot = slot;
if (root && !this.chain.forkChoice.hasBlockHex(root)) {
if (this.unknownBlockGossipsubMessagesCount > MAX_QUEUED_UNKNOWN_BLOCK_GOSSIP_OBJECTS) {
Expand Down Expand Up @@ -232,6 +259,7 @@ export class NetworkProcessor {
block: string;
executionOptimistic: boolean;
}): Promise<void> {
this.isProcessingCurrentSlotBlock = false;
const byRootGossipsubMessages = this.awaitingGossipsubMessagesByRootBySlot.getOrDefault(slot);
const waitingGossipsubMessages = byRootGossipsubMessages.getOrDefault(rootHex);
if (waitingGossipsubMessages.size === 0) {
Expand All @@ -258,6 +286,7 @@ export class NetworkProcessor {
}

private onClockSlot(clockSlot: Slot): void {
this.isProcessingCurrentSlotBlock = false;
const nowSec = Date.now() / 1000;
for (const [slot, gossipMessagesByRoot] of this.awaitingGossipsubMessagesByRootBySlot.entries()) {
if (slot < clockSlot) {
Expand All @@ -281,13 +310,14 @@ export class NetworkProcessor {
let jobsSubmitted = 0;

job_loop: while (jobsSubmitted < MAX_JOBS_SUBMITTED_PER_TICK) {
// Check canAcceptWork before calling queue.next() since it consumes the items
const canAcceptWork = this.chain.blsThreadPoolCanAcceptWork() && this.chain.regenCanAcceptWork();
// Check if chain can accept work before calling queue.next() since it consumes the items
const reason = this.checkAcceptWork();

for (const topic of executeGossipWorkOrder) {
// beacon block is guaranteed to be processed immedately
if (!canAcceptWork && !executeGossipWorkOrderObj[topic]?.bypassQueue) {
this.metrics?.networkProcessor.canNotAcceptWork.inc();
// reason !== null means cannot accept work
if (reason !== null && !executeGossipWorkOrderObj[topic]?.bypassQueue) {
this.metrics?.networkProcessor.canNotAcceptWork.inc({reason});
break job_loop;
}
if (
Expand Down Expand Up @@ -320,4 +350,23 @@ export class NetworkProcessor {
this.metrics?.networkProcessor.jobsSubmitted.observe(jobsSubmitted);
}
}

/**
* Return null if chain can accept work, otherwise return the reason why it cannot accept work
*/
private checkAcceptWork(): null | CannotAcceptWorkReason {
if (this.isProcessingCurrentSlotBlock) {
return CannotAcceptWorkReason.processingCurrentSlotBlock;
}

if (!this.chain.blsThreadPoolCanAcceptWork()) {
return CannotAcceptWorkReason.bls;
}

if (!this.chain.regenCanAcceptWork()) {
return CannotAcceptWorkReason.regen;
}

return null;
}
}

0 comments on commit a24ada9

Please sign in to comment.