Skip to content

Commit

Permalink
Verify signatures on both main thread and worker threads (#3793)
Browse files Browse the repository at this point in the history
* Implement and use BlsMixedVerifier

* useMultiThreadVerifier flag

* Add useMainThead option

* Add blsVerifyAllMultiThread and blsVerifyAllMultiThread flag

* Add blsVerifyMainThread flag to block processor

* Remove BlsMixedVerifier

* Add BLS time metrics

* Refactor to verifyOnMainThread and blsVerifyOnMainThread

* Use metric.startTimer()

* Add comments for blsVerifyOnMainThread usage

* Update chain.ts

Co-authored-by: Lion - dapplion <35266934+dapplion@users.noreply.github.com>
  • Loading branch information
twoeths and dapplion authored Mar 2, 2022
1 parent f63ae9d commit bbe818d
Show file tree
Hide file tree
Showing 17 changed files with 123 additions and 28 deletions.
22 changes: 16 additions & 6 deletions packages/cli/src/options/beaconNodeOptions/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import {defaultOptions, IBeaconNodeOptions} from "@chainsafe/lodestar";
import {ICliCommandOptions} from "../../util";

export interface IChainArgs {
"chain.useSingleThreadVerifier": boolean;
"chain.blsVerifyAllMultiThread": boolean;
"chain.blsVerifyAllMainThread": boolean;
"chain.disableBlsBatchVerify": boolean;
"chain.persistInvalidSszObjects": boolean;
"chain.proposerBoostEnabled": boolean;
Expand All @@ -13,7 +14,8 @@ export interface IChainArgs {

export function parseArgs(args: IChainArgs): IBeaconNodeOptions["chain"] {
return {
useSingleThreadVerifier: args["chain.useSingleThreadVerifier"],
blsVerifyAllMultiThread: args["chain.blsVerifyAllMultiThread"],
blsVerifyAllMainThread: args["chain.blsVerifyAllMainThread"],
disableBlsBatchVerify: args["chain.disableBlsBatchVerify"],
persistInvalidSszObjects: args["chain.persistInvalidSszObjects"],
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-explicit-any
Expand All @@ -24,11 +26,19 @@ export function parseArgs(args: IChainArgs): IBeaconNodeOptions["chain"] {
}

export const options: ICliCommandOptions<IChainArgs> = {
"chain.useSingleThreadVerifier": {
"chain.blsVerifyAllMultiThread": {
hidden: true,
type: "boolean",
description: "Disable spawning worker threads for BLS verification, use single thread implementation.",
defaultDescription: String(defaultOptions.chain.useSingleThreadVerifier),
description: "Always use worker threads for BLS verification",
defaultDescription: String(defaultOptions.chain.blsVerifyAllMultiThread),
group: "chain",
},

"chain.blsVerifyAllMainThread": {
hidden: true,
type: "boolean",
description: "Always use main threads for BLS verification",
defaultDescription: String(defaultOptions.chain.blsVerifyAllMainThread),
group: "chain",
},

Expand All @@ -38,7 +48,7 @@ export const options: ICliCommandOptions<IChainArgs> = {
description:
"Do not use BLS batch verify to validate all block signatures at once. \
Will double processing times. Use only for debugging purposes.",
defaultDescription: String(defaultOptions.chain.disableBlsBatchVerify),
defaultDescription: String(defaultOptions.chain.blsVerifyAllMultiThread),
group: "chain",
},

Expand Down
6 changes: 4 additions & 2 deletions packages/cli/test/unit/options/beaconNodeOptions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ describe("options / beaconNodeOptions", () => {
"api.rest.host": "127.0.0.1",
"api.rest.port": 7654,

"chain.useSingleThreadVerifier": true,
"chain.blsVerifyAllMultiThread": true,
"chain.blsVerifyAllMainThread": true,
"chain.disableBlsBatchVerify": true,
"chain.persistInvalidSszObjects": true,
"chain.proposerBoostEnabled": false,
Expand Down Expand Up @@ -71,7 +72,8 @@ describe("options / beaconNodeOptions", () => {
},
},
chain: {
useSingleThreadVerifier: true,
blsVerifyAllMultiThread: true,
blsVerifyAllMainThread: true,
disableBlsBatchVerify: true,
persistInvalidSszObjects: true,
proposerBoostEnabled: false,
Expand Down
4 changes: 4 additions & 0 deletions packages/lodestar/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ export type PartiallyVerifiedBlockFlags = FullyVerifiedBlockFlags & {
* From RangeSync module, we won't attest to this block so it's okay to ignore a SYNCING message from execution layer
*/
fromRangeSync?: boolean;
/**
* Verify signatures on main thread or not.
*/
blsVerifyOnMainThread?: boolean;
};

/**
Expand Down
7 changes: 6 additions & 1 deletion packages/lodestar/src/chain/blocks/verifyBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,12 @@ export async function verifyBlockStateTransition(
? allForks.getAllBlockSignatureSetsExceptProposer(postState, block)
: allForks.getAllBlockSignatureSets(postState as CachedBeaconStateAllForks, block);

if (signatureSets.length > 0 && !(await chain.bls.verifySignatureSets(signatureSets))) {
if (
signatureSets.length > 0 &&
!(await chain.bls.verifySignatureSets(signatureSets, {
verifyOnMainThread: partiallyVerifiedBlock?.blsVerifyOnMainThread,
}))
) {
throw new BlockError(block, {code: BlockErrorCode.INVALID_SIGNATURE, state: postState});
}
}
Expand Down
6 changes: 6 additions & 0 deletions packages/lodestar/src/chain/bls/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ export type VerifySignatureOpts = {
* Only non-time critical objects should be marked as batchable, since the pool may hold them for 100ms.
*/
batchable?: boolean;

/**
* Use main thread to verify signatures, use this with care.
* Ignore the batchable option if this is true.
*/
verifyOnMainThread?: boolean;
};

export interface IBlsVerifier {
Expand Down
24 changes: 23 additions & 1 deletion packages/lodestar/src/chain/bls/multithread/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@ import {BlsWorkReq, BlsWorkResult, WorkerData, WorkResultCode} from "./types";
import {chunkifyMaximizeChunkSize, getDefaultPoolSize} from "./utils";
import {ISignatureSet} from "@chainsafe/lodestar-beacon-state-transition";
import {getAggregatedPubkey} from "../utils";
import {verifySignatureSetsMaybeBatch} from "../maybeBatch";

export type BlsMultiThreadWorkerPoolModules = {
logger: ILogger;
metrics: IMetrics | null;
signal: AbortSignal;
};

export type BlsMultiThreadWorkerPoolOptions = {
blsVerifyAllMultiThread?: boolean;
};

/**
* Split big signature sets into smaller sets so they can be sent to multiple workers.
*
Expand Down Expand Up @@ -104,12 +109,14 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
firstPush: number;
timeout: NodeJS.Timeout;
} | null = null;
private blsVerifyAllMultiThread: boolean;

constructor(modules: BlsMultiThreadWorkerPoolModules) {
constructor(options: BlsMultiThreadWorkerPoolOptions, modules: BlsMultiThreadWorkerPoolModules) {
const {logger, metrics, signal} = modules;
this.logger = logger;
this.metrics = metrics;
this.signal = signal;
this.blsVerifyAllMultiThread = options.blsVerifyAllMultiThread ?? false;

// TODO: Allow to customize implementation
const implementation = bls.implementation;
Expand All @@ -136,6 +143,21 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
}

async verifySignatureSets(sets: ISignatureSet[], opts: VerifySignatureOpts = {}): Promise<boolean> {
if (opts.verifyOnMainThread && !this.blsVerifyAllMultiThread) {
const timer = this.metrics?.blsThreadPool.mainThreadDurationInThreadPool.startTimer();
try {
return verifySignatureSetsMaybeBatch(
sets.map((set) => ({
publicKey: getAggregatedPubkey(set),
message: set.signingRoot.valueOf() as Uint8Array,
signature: set.signature,
}))
);
} finally {
if (timer) timer();
}
}

// Split large array of sets into smaller.
// Very helpful when syncing finalized, sync may submit +1000 sets so chunkify allows to distribute to many workers
const results = await Promise.all(
Expand Down
26 changes: 19 additions & 7 deletions packages/lodestar/src/chain/bls/singleThread.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
import {ISignatureSet} from "@chainsafe/lodestar-beacon-state-transition";
import {IMetrics} from "../../metrics";
import {IBlsVerifier} from "./interface";
import {verifySignatureSetsMaybeBatch} from "./maybeBatch";
import {getAggregatedPubkey} from "./utils";

export class BlsSingleThreadVerifier implements IBlsVerifier {
private readonly metrics: IMetrics | null;

constructor({metrics = null}: {metrics: IMetrics | null}) {
this.metrics = metrics;
}

async verifySignatureSets(sets: ISignatureSet[]): Promise<boolean> {
return verifySignatureSetsMaybeBatch(
sets.map((set) => ({
publicKey: getAggregatedPubkey(set),
message: set.signingRoot.valueOf() as Uint8Array,
signature: set.signature,
}))
);
const timer = this.metrics?.blsThreadPool.mainThreadDurationInThreadPool.startTimer();
try {
return verifySignatureSetsMaybeBatch(
sets.map((set) => ({
publicKey: getAggregatedPubkey(set),
message: set.signingRoot.valueOf() as Uint8Array,
signature: set.signature,
}))
);
} finally {
if (timer) timer();
}
}
}
7 changes: 4 additions & 3 deletions packages/lodestar/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ export class BeaconChain implements IBeaconChain {

const signal = this.abortController.signal;
const emitter = new ChainEventEmitter();
const bls = opts.useSingleThreadVerifier
? new BlsSingleThreadVerifier()
: new BlsMultiThreadWorkerPool({logger, metrics, signal: this.abortController.signal});
// by default, verify signatures on both main threads and worker threads
const bls = opts.blsVerifyAllMainThread
? new BlsSingleThreadVerifier({metrics})
: new BlsMultiThreadWorkerPool(opts, {logger, metrics, signal: this.abortController.signal});

const clock = new LocalClock({config, emitter, genesisTime: this.genesisTime, signal});
const stateCache = new StateContextCache({metrics});
Expand Down
6 changes: 4 additions & 2 deletions packages/lodestar/src/chain/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import {ForkChoiceOpts} from "./forkChoice";
// eslint-disable-next-line @typescript-eslint/ban-types
export type IChainOptions = BlockProcessOpts &
ForkChoiceOpts & {
useSingleThreadVerifier?: boolean;
blsVerifyAllMainThread?: boolean;
blsVerifyAllMultiThread?: boolean;
persistInvalidSszObjects?: boolean;
persistInvalidSszObjectsDir: string;
};
Expand All @@ -22,7 +23,8 @@ export type BlockProcessOpts = {
};

export const defaultChainOptions: IChainOptions = {
useSingleThreadVerifier: false,
blsVerifyAllMainThread: false,
blsVerifyAllMultiThread: false,
disableBlsBatchVerify: false,
persistInvalidSszObjects: true,
persistInvalidSszObjectsDir: "",
Expand Down
2 changes: 1 addition & 1 deletion packages/lodestar/src/chain/validation/block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ export async function validateGossipBlock(
// [REJECT] The proposer signature, signed_beacon_block.signature, is valid with respect to the proposer_index pubkey.
const signatureSet = allForks.getProposerSignatureSet(blockState, signedBlock);
// Don't batch so verification is not delayed
if (!(await chain.bls.verifySignatureSets([signatureSet]))) {
if (!(await chain.bls.verifySignatureSets([signatureSet], {verifyOnMainThread: true}))) {
throw new BlockGossipError(GossipAction.REJECT, PeerAction.LowToleranceError, {
code: BlockErrorCode.PROPOSAL_SIGNATURE_INVALID,
});
Expand Down
14 changes: 14 additions & 0 deletions packages/lodestar/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,20 @@ export function createLodestarMetrics(
help: "Time from the worker sending the result and the main thread receiving it",
buckets: [0.1],
}),
mainThreadDurationInThreadPool: register.histogram({
name: "lodestar_bls_thread_pool_main_thread_time_seconds",
help: "Time to verify signatures in main thread with thread pool mode",
buckets: [0.1, 1],
}),
},

// BLS time on single thread mode
blsSingleThread: {
singleThreadDuration: register.histogram({
name: "lodestar_bls_single_thread_time_seconds",
help: "Time to verify signatures with single thread mode",
buckets: [0.1, 1],
}),
},

// Sync
Expand Down
8 changes: 7 additions & 1 deletion packages/lodestar/src/network/gossip/handlers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,14 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH
metrics?.registerBeaconBlock(OpSource.gossip, seenTimestampSec, signedBlock.message);

// `validProposerSignature = true`, in gossip validation the proposer signature is checked
// At gossip time, it's critical to keep a good number of mesh peers.
// To do that, the Gossip Job Wait Time should be consistently <3s to avoid the behavior penalties in gossip
// Gossip Job Wait Time depends on the BLS Job Wait Time
// so `blsVerifyOnMainThread = true`: we want to verify signatures immediately without affecting the bls thread pool.
// otherwise we can't utilize bls thread pool capacity and Gossip Job Wait Time can't be kept low consistently.
// See https://github.com/ChainSafe/lodestar/issues/3792
chain
.processBlock(signedBlock, {validProposerSignature: true})
.processBlock(signedBlock, {validProposerSignature: true, blsVerifyOnMainThread: true})
.then(() => {
// Returns the delay between the start of `block.slot` and `current time`
const delaySec = Date.now() / 1000 - (chain.genesisTime + slot * config.SECONDS_PER_SLOT);
Expand Down
3 changes: 3 additions & 0 deletions packages/lodestar/src/sync/range/range.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ export class RangeSync extends (EventEmitter as {new (): RangeSyncEmitter}) {
ignoreIfFinalized: true,
// We won't attest to this block so it's okay to ignore a SYNCING message from execution layer
fromRangeSync: true,
// when this runs, syncing is the most important thing and gossip is not likely to run
// so we can utilize worker threads to verify signatures
blsVerifyOnMainThread: false,
};

if (this.opts?.disableProcessAsChainSegment) {
Expand Down
10 changes: 9 additions & 1 deletion packages/lodestar/src/sync/unknownBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,15 @@ export class UnknownBlockSync {
}

pendingBlock.status = PendingBlockStatus.processing;
const res = await wrapError(this.chain.processBlock(pendingBlock.signedBlock, {ignoreIfKnown: true}));
// At gossip time, it's critical to keep a good number of mesh peers.
// To do that, the Gossip Job Wait Time should be consistently <3s to avoid the behavior penalties in gossip
// Gossip Job Wait Time depends on the BLS Job Wait Time
// so `blsVerifyOnMainThread = true`: we want to verify signatures immediately without affecting the bls thread pool.
// otherwise we can't utilize bls thread pool capacity and Gossip Job Wait Time can't be kept low consistently.
// See https://github.com/ChainSafe/lodestar/issues/3792
const res = await wrapError(
this.chain.processBlock(pendingBlock.signedBlock, {ignoreIfKnown: true, blsVerifyOnMainThread: true})
);
pendingBlock.status = PendingBlockStatus.pending;

if (res.err) this.metrics?.syncUnknownBlock.processedBlocksError.inc();
Expand Down
2 changes: 1 addition & 1 deletion packages/lodestar/test/e2e/chain/bls/multithread.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ describe("chain / bls / multithread queue", function () {
});

async function initializePool(): Promise<BlsMultiThreadWorkerPool> {
const pool = new BlsMultiThreadWorkerPool({logger, metrics: null, signal: controller.signal});
const pool = new BlsMultiThreadWorkerPool({}, {logger, metrics: null, signal: controller.signal});
// Wait until initialized
await pool["waitTillInitialized"]();
return pool;
Expand Down
2 changes: 1 addition & 1 deletion packages/lodestar/test/sim/multiNodeMultiThread.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ function runMultiNodeMultiThreadTest({nodeCount, validatorsPerNode, event, altai
params: {...testParams, ALTAIR_FORK_EPOCH: altairForkEpoch},
options: {
// Don't spawn workers from worker threads
chain: {useSingleThreadVerifier: true},
chain: {blsVerifyAllMainThread: true},
network: {
discv5: {bindAddr: `/ip4/127.0.0.1/udp/${p2pPort}`},
localMultiaddrs: [`/ip4/127.0.0.1/tcp/${p2pPort}`],
Expand Down
2 changes: 1 addition & 1 deletion packages/lodestar/test/utils/validationData/attestation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ export function getAttestationValidData(
forkChoice,
regen,
seenAttesters: new SeenAttesters(),
bls: new BlsSingleThreadVerifier(),
bls: new BlsSingleThreadVerifier({metrics: null}),
waitForBlockOfAttestation: () => Promise.resolve(false),
} as Partial<IBeaconChain>) as IBeaconChain;

Expand Down

0 comments on commit bbe818d

Please sign in to comment.