From 2d13e555bdfaa9b0b3ea6e638e69ec945c9d6c02 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Wed, 3 May 2023 14:34:49 +0700 Subject: [PATCH 1/3] feat: block network processor if processing current slot block --- packages/beacon-node/src/chain/chain.ts | 15 +++++++- packages/beacon-node/src/chain/interface.ts | 1 + .../src/metrics/metrics/lodestar.ts | 3 +- .../src/network/processor/index.ts | 34 ++++++++++++++++--- .../test/utils/mocks/chain/chain.ts | 4 +++ 5 files changed, 51 insertions(+), 6 deletions(-) diff --git a/packages/beacon-node/src/chain/chain.ts b/packages/beacon-node/src/chain/chain.ts index 9a01cc6d0e5..92198072fe5 100644 --- a/packages/beacon-node/src/chain/chain.ts +++ b/packages/beacon-node/src/chain/chain.ts @@ -133,6 +133,7 @@ export class BeaconChain implements IBeaconChain { private readonly exchangeTransitionConfigurationEverySlots: number; private processShutdownCallback: ProcessShutdownCallback; + private _isProcessingCurrentSlotBlock = false; constructor( opts: IChainOptions, @@ -284,6 +285,10 @@ export class BeaconChain implements IBeaconChain { await this.bls.close(); } + isProcessingCurrentSlotBlock(): boolean { + return this._isProcessingCurrentSlotBlock; + } + regenCanAcceptWork(): boolean { return this.regen.canAcceptWork(); } @@ -451,7 +456,14 @@ export class BeaconChain implements IBeaconChain { } async processBlock(block: BlockInput, opts?: ImportBlockOpts): Promise { - return this.blockProcessor.processBlocksJob([block], opts); + try { + if (block.block.message.slot === this.clock.currentSlot) { + this._isProcessingCurrentSlotBlock = true; + } + await this.blockProcessor.processBlocksJob([block], opts); + } finally { + this._isProcessingCurrentSlotBlock = false; + } } async processChainSegment(blocks: BlockInput[], opts?: ImportBlockOpts): Promise { @@ -647,6 +659,7 @@ export class BeaconChain implements IBeaconChain { private onClockSlot(slot: Slot): void { this.logger.verbose("Clock slot", {slot}); + this._isProcessingCurrentSlotBlock = false; // CRITICAL UPDATE if (this.forkChoice.irrecoverableError) { diff --git a/packages/beacon-node/src/chain/interface.ts b/packages/beacon-node/src/chain/interface.ts index 70111ec4123..78a464202cb 100644 --- a/packages/beacon-node/src/chain/interface.ts +++ b/packages/beacon-node/src/chain/interface.ts @@ -140,6 +140,7 @@ export interface IBeaconChain { persistInvalidSszView(view: TreeView, suffix?: string): void; updateBuilderStatus(clockSlot: Slot): void; + isProcessingCurrentSlotBlock(): boolean; regenCanAcceptWork(): boolean; blsThreadPoolCanAcceptWork(): boolean; } diff --git a/packages/beacon-node/src/metrics/metrics/lodestar.ts b/packages/beacon-node/src/metrics/metrics/lodestar.ts index 6aca4ea6c6e..d5f9cc2e7e2 100644 --- a/packages/beacon-node/src/metrics/metrics/lodestar.ts +++ b/packages/beacon-node/src/metrics/metrics/lodestar.ts @@ -97,9 +97,10 @@ export function createLodestarMetrics( help: "Total calls to network processor execute work fn", buckets: [0, 1, 5, 128], }), - canNotAcceptWork: register.gauge({ + canNotAcceptWork: register.gauge<"reason">({ name: "lodestar_network_processor_can_not_accept_work_total", help: "Total times network processor can not accept work on executeWork", + labelNames: ["reason"], }), }, diff --git a/packages/beacon-node/src/network/processor/index.ts b/packages/beacon-node/src/network/processor/index.ts index 229317358f0..02163f66c4d 100644 --- a/packages/beacon-node/src/network/processor/index.ts +++ b/packages/beacon-node/src/network/processor/index.ts @@ -281,13 +281,14 @@ export class NetworkProcessor { let jobsSubmitted = 0; job_loop: while (jobsSubmitted < MAX_JOBS_SUBMITTED_PER_TICK) { - // Check canAcceptWork before calling queue.next() since it consumes the items - const canAcceptWork = this.chain.blsThreadPoolCanAcceptWork() && this.chain.regenCanAcceptWork(); + // Check if chain can accept work before calling queue.next() since it consumes the items + const reason = checkAcceptWork(this.chain); for (const topic of executeGossipWorkOrder) { // beacon block is guaranteed to be processed immedately - if (!canAcceptWork && !executeGossipWorkOrderObj[topic]?.bypassQueue) { - this.metrics?.networkProcessor.canNotAcceptWork.inc(); + // reason !== null means cannot accept work + if (reason !== null && !executeGossipWorkOrderObj[topic]?.bypassQueue) { + this.metrics?.networkProcessor.canNotAcceptWork.inc({reason}); break job_loop; } if ( @@ -321,3 +322,28 @@ export class NetworkProcessor { } } } + +enum CannotAcceptWorkReason { + processingCurrentSlotBlock = "processing_current_slot_block", + bls = "bls_busy", + regen = "regen_busy", +} + +/** + * Return null if chain can accept work, otherwise return the reason why it cannot accept work + */ +function checkAcceptWork(chain: IBeaconChain): null | CannotAcceptWorkReason { + if (chain.isProcessingCurrentSlotBlock()) { + return CannotAcceptWorkReason.processingCurrentSlotBlock; + } + + if (!chain.blsThreadPoolCanAcceptWork()) { + return CannotAcceptWorkReason.bls; + } + + if (!chain.regenCanAcceptWork()) { + return CannotAcceptWorkReason.regen; + } + + return null; +} diff --git a/packages/beacon-node/test/utils/mocks/chain/chain.ts b/packages/beacon-node/test/utils/mocks/chain/chain.ts index e2dc88ae613..906f55c68fc 100644 --- a/packages/beacon-node/test/utils/mocks/chain/chain.ts +++ b/packages/beacon-node/test/utils/mocks/chain/chain.ts @@ -243,6 +243,10 @@ export class MockBeaconChain implements IBeaconChain { async updateBeaconProposerData(): Promise {} updateBuilderStatus(): void {} + isProcessingCurrentSlotBlock(): boolean { + return false; + } + regenCanAcceptWork(): boolean { return true; } From d8a9884ce4941e085d58775732b714582aaf41b9 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Wed, 3 May 2023 14:35:36 +0700 Subject: [PATCH 2/3] fix: remove default maxGossipTopicConcurrency value --- packages/beacon-node/src/network/options.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/packages/beacon-node/src/network/options.ts b/packages/beacon-node/src/network/options.ts index d9df8af71df..4a3629d15be 100644 --- a/packages/beacon-node/src/network/options.ts +++ b/packages/beacon-node/src/network/options.ts @@ -40,8 +40,5 @@ export const defaultNetworkOptions: NetworkOptions = { // TODO: this value is 12 per spec, however lodestar has performance issue if there are too many mesh peers // see https://github.com/ChainSafe/lodestar/issues/5420 gossipsubDHigh: 9, - // TODO: with this value, lodestar drops about 35% of attestation messages on a test mainnet node subscribed to all subnets - // see https://github.com/ChainSafe/lodestar/issues/5441 - maxGossipTopicConcurrency: 512, ...defaultGossipHandlerOpts, }; From 7331f4e6a90124f1d2b160fc20ffa439484775a4 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Thu, 4 May 2023 17:12:59 +0700 Subject: [PATCH 3/3] fix: move isProcessingCurrentSlotBlock to network processor --- packages/beacon-node/src/chain/chain.ts | 15 +--- packages/beacon-node/src/chain/interface.ts | 1 - .../src/network/processor/index.ts | 69 ++++++++++++------- .../test/utils/mocks/chain/chain.ts | 4 -- 4 files changed, 47 insertions(+), 42 deletions(-) diff --git a/packages/beacon-node/src/chain/chain.ts b/packages/beacon-node/src/chain/chain.ts index 92198072fe5..9a01cc6d0e5 100644 --- a/packages/beacon-node/src/chain/chain.ts +++ b/packages/beacon-node/src/chain/chain.ts @@ -133,7 +133,6 @@ export class BeaconChain implements IBeaconChain { private readonly exchangeTransitionConfigurationEverySlots: number; private processShutdownCallback: ProcessShutdownCallback; - private _isProcessingCurrentSlotBlock = false; constructor( opts: IChainOptions, @@ -285,10 +284,6 @@ export class BeaconChain implements IBeaconChain { await this.bls.close(); } - isProcessingCurrentSlotBlock(): boolean { - return this._isProcessingCurrentSlotBlock; - } - regenCanAcceptWork(): boolean { return this.regen.canAcceptWork(); } @@ -456,14 +451,7 @@ export class BeaconChain implements IBeaconChain { } async processBlock(block: BlockInput, opts?: ImportBlockOpts): Promise { - try { - if (block.block.message.slot === this.clock.currentSlot) { - this._isProcessingCurrentSlotBlock = true; - } - await this.blockProcessor.processBlocksJob([block], opts); - } finally { - this._isProcessingCurrentSlotBlock = false; - } + return this.blockProcessor.processBlocksJob([block], opts); } async processChainSegment(blocks: BlockInput[], opts?: ImportBlockOpts): Promise { @@ -659,7 +647,6 @@ export class BeaconChain implements IBeaconChain { private onClockSlot(slot: Slot): void { this.logger.verbose("Clock slot", {slot}); - this._isProcessingCurrentSlotBlock = false; // CRITICAL UPDATE if (this.forkChoice.irrecoverableError) { diff --git a/packages/beacon-node/src/chain/interface.ts b/packages/beacon-node/src/chain/interface.ts index 78a464202cb..70111ec4123 100644 --- a/packages/beacon-node/src/chain/interface.ts +++ b/packages/beacon-node/src/chain/interface.ts @@ -140,7 +140,6 @@ export interface IBeaconChain { persistInvalidSszView(view: TreeView, suffix?: string): void; updateBuilderStatus(clockSlot: Slot): void; - isProcessingCurrentSlotBlock(): boolean; regenCanAcceptWork(): boolean; blsThreadPoolCanAcceptWork(): boolean; } diff --git a/packages/beacon-node/src/network/processor/index.ts b/packages/beacon-node/src/network/processor/index.ts index 02163f66c4d..1169b9a13b3 100644 --- a/packages/beacon-node/src/network/processor/index.ts +++ b/packages/beacon-node/src/network/processor/index.ts @@ -85,6 +85,24 @@ enum ReprocessRejectReason { expired = "expired", } +/** + * Cannot accept work reason for metrics + */ +enum CannotAcceptWorkReason { + /** + * Validating or procesing gossip block at current slot. + */ + processingCurrentSlotBlock = "processing_current_slot_block", + /** + * bls is busy. + */ + bls = "bls_busy", + /** + * regen is busy. + */ + regen = "regen_busy", +} + /** * Network processor handles the gossip queues and throtles processing to not overload the main thread * - Decides when to process work and what to process @@ -118,6 +136,7 @@ export class NetworkProcessor { // to be stored in this Map and reprocessed once the block comes private readonly awaitingGossipsubMessagesByRootBySlot: MapDef>>; private unknownBlockGossipsubMessagesCount = 0; + private isProcessingCurrentSlotBlock = false; constructor(modules: NetworkProcessorModules, private readonly opts: NetworkProcessorOpts) { const {chain, events, logger, metrics} = modules; @@ -173,7 +192,8 @@ export class NetworkProcessor { } private onPendingGossipsubMessage(message: PendingGossipsubMessage): void { - const extractBlockSlotRootFn = this.extractBlockSlotRootFns[message.topic.type]; + const topicType = message.topic.type; + const extractBlockSlotRootFn = this.extractBlockSlotRootFns[topicType]; // check block root of Attestation and SignedAggregateAndProof messages if (extractBlockSlotRootFn) { const slotRoot = extractBlockSlotRootFn(message.msg.data); @@ -185,11 +205,18 @@ export class NetworkProcessor { if (slot < this.chain.clock.currentSlot - EARLIEST_PERMISSABLE_SLOT_DISTANCE) { // TODO: Should report the dropped job to gossip? It will be eventually pruned from the mcache this.metrics?.networkProcessor.gossipValidationError.inc({ - topic: message.topic.type, + topic: topicType, error: GossipErrorCode.PAST_SLOT, }); return; } + if ( + slot === this.chain.clock.currentSlot && + (topicType === GossipType.beacon_block || topicType === GossipType.beacon_block_and_blobs_sidecar) + ) { + // in the worse case if the current slot block is not valid, this will be reset in the next slot + this.isProcessingCurrentSlotBlock = true; + } message.msgSlot = slot; if (root && !this.chain.forkChoice.hasBlockHex(root)) { if (this.unknownBlockGossipsubMessagesCount > MAX_QUEUED_UNKNOWN_BLOCK_GOSSIP_OBJECTS) { @@ -232,6 +259,7 @@ export class NetworkProcessor { block: string; executionOptimistic: boolean; }): Promise { + this.isProcessingCurrentSlotBlock = false; const byRootGossipsubMessages = this.awaitingGossipsubMessagesByRootBySlot.getOrDefault(slot); const waitingGossipsubMessages = byRootGossipsubMessages.getOrDefault(rootHex); if (waitingGossipsubMessages.size === 0) { @@ -258,6 +286,7 @@ export class NetworkProcessor { } private onClockSlot(clockSlot: Slot): void { + this.isProcessingCurrentSlotBlock = false; const nowSec = Date.now() / 1000; for (const [slot, gossipMessagesByRoot] of this.awaitingGossipsubMessagesByRootBySlot.entries()) { if (slot < clockSlot) { @@ -282,7 +311,7 @@ export class NetworkProcessor { job_loop: while (jobsSubmitted < MAX_JOBS_SUBMITTED_PER_TICK) { // Check if chain can accept work before calling queue.next() since it consumes the items - const reason = checkAcceptWork(this.chain); + const reason = this.checkAcceptWork(); for (const topic of executeGossipWorkOrder) { // beacon block is guaranteed to be processed immedately @@ -321,29 +350,23 @@ export class NetworkProcessor { this.metrics?.networkProcessor.jobsSubmitted.observe(jobsSubmitted); } } -} -enum CannotAcceptWorkReason { - processingCurrentSlotBlock = "processing_current_slot_block", - bls = "bls_busy", - regen = "regen_busy", -} + /** + * Return null if chain can accept work, otherwise return the reason why it cannot accept work + */ + private checkAcceptWork(): null | CannotAcceptWorkReason { + if (this.isProcessingCurrentSlotBlock) { + return CannotAcceptWorkReason.processingCurrentSlotBlock; + } -/** - * Return null if chain can accept work, otherwise return the reason why it cannot accept work - */ -function checkAcceptWork(chain: IBeaconChain): null | CannotAcceptWorkReason { - if (chain.isProcessingCurrentSlotBlock()) { - return CannotAcceptWorkReason.processingCurrentSlotBlock; - } + if (!this.chain.blsThreadPoolCanAcceptWork()) { + return CannotAcceptWorkReason.bls; + } - if (!chain.blsThreadPoolCanAcceptWork()) { - return CannotAcceptWorkReason.bls; - } + if (!this.chain.regenCanAcceptWork()) { + return CannotAcceptWorkReason.regen; + } - if (!chain.regenCanAcceptWork()) { - return CannotAcceptWorkReason.regen; + return null; } - - return null; } diff --git a/packages/beacon-node/test/utils/mocks/chain/chain.ts b/packages/beacon-node/test/utils/mocks/chain/chain.ts index 906f55c68fc..e2dc88ae613 100644 --- a/packages/beacon-node/test/utils/mocks/chain/chain.ts +++ b/packages/beacon-node/test/utils/mocks/chain/chain.ts @@ -243,10 +243,6 @@ export class MockBeaconChain implements IBeaconChain { async updateBeaconProposerData(): Promise {} updateBuilderStatus(): void {} - isProcessingCurrentSlotBlock(): boolean { - return false; - } - regenCanAcceptWork(): boolean { return true; }