Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add processBlocksInEpoch fn
Browse files Browse the repository at this point in the history
dapplion committed Jun 3, 2021
1 parent e055d02 commit 4b7b122
Showing 1 changed file with 75 additions and 101 deletions.
176 changes: 75 additions & 101 deletions packages/lodestar/src/chain/blocks/process.ts
Original file line number Diff line number Diff line change
@@ -31,10 +31,9 @@ export type BlockProcessModules = {
opts?: BlockProcessOpts;
};

export async function processBlock(
{forkChoice, regen, emitter, checkpointStateCache, bls, metrics, opts}: BlockProcessModules,
job: IBlockJob
): Promise<void> {
export async function processBlock(modules: BlockProcessModules, job: IBlockJob): Promise<void> {
const {forkChoice} = modules;

if (!forkChoice.hasBlock(job.signedBlock.message.parentRoot)) {
throw new BlockError({
code: BlockErrorCode.PARENT_UNKNOWN,
@@ -43,56 +42,11 @@ export async function processBlock(
});
}

try {
const preState = await regen.getPreState(job.signedBlock.message);

await runStateTransition({emitter, forkChoice, metrics}, checkpointStateCache, preState, job);

// Verify signatures after running state transition, so all SyncCommittee signed roots are known at this point.
// We must ensure block.slot <= state.slot before running getAllBlockSignatureSets().
if (!job.validSignatures && !opts?.disableBlsBatchVerify) {
const signatureSets = job.validProposerSignature
? allForks.getAllBlockSignatureSetsExceptProposer(
preState as CachedBeaconState<allForks.BeaconState>,
job.signedBlock
)
: allForks.getAllBlockSignatureSets(preState as CachedBeaconState<allForks.BeaconState>, job.signedBlock);

if (signatureSets.length > 0 && !(await bls.verifySignatureSets(signatureSets))) {
throw new BlockError({
code: BlockErrorCode.INVALID_SIGNATURE,
job,
});
}

job.validProposerSignature = true;
job.validSignatures = true;
}
} catch (e) {
if (e instanceof RegenError) {
throw new BlockError({
code: BlockErrorCode.PRESTATE_MISSING,
job,
});
}

if (e instanceof BlockError) {
throw e;
}

throw new BlockError({
code: BlockErrorCode.BEACON_CHAIN_ERROR,
error: e as Error,
job,
});
}
await processBlocksInEpoch(modules, job, [job.signedBlock]);
}

export async function processChainSegment(
{config, forkChoice, regen, emitter, checkpointStateCache, bls, metrics, opts}: BlockProcessModules,
job: IChainSegmentJob
): Promise<void> {
let importedBlocks = 0;
export async function processChainSegment(modules: BlockProcessModules, job: IChainSegmentJob): Promise<void> {
const {config, forkChoice} = modules;
const blocks = job.signedBlocks;

const firstSegBlock = blocks[0];
@@ -103,7 +57,7 @@ export async function processChainSegment(
code: BlockErrorCode.PARENT_UNKNOWN,
parentRoot: firstSegBlock.message.parentRoot.valueOf() as Uint8Array,
job,
importedBlocks,
importedBlocks: 0,
});
}
}
@@ -120,64 +74,84 @@ export async function processChainSegment(
continue;
}

let importedBlocks = 0;

try {
let preState = await regen.getPreState(firstBlock.message);
for (const block of blocksInEpoch) {
preState = await runStateTransition({emitter, forkChoice, metrics}, checkpointStateCache, preState, {
reprocess: job.reprocess,
prefinalized: job.prefinalized,
signedBlock: block,
validProposerSignature: true,
validSignatures: true,
});
importedBlocks++;
// this avoids keeping our node busy processing blocks
await sleep(0);
await processBlocksInEpoch(modules, job, blocksInEpoch, () => importedBlocks++);
} catch (e) {
if (e instanceof BlockError) {
throw new ChainSegmentError({...e.type, job, importedBlocks});
}

// Verify signatures after running state transition, so all SyncCommittee signed roots are known at this point.
// We must ensure block.slot <= state.slot before running getAllBlockSignatureSets().
if (!job.validSignatures && !opts?.disableBlsBatchVerify) {
const signatureSets: ISignatureSet[] = [];
for (const block of blocksInEpoch) {
signatureSets.push(
...(job.validProposerSignature
? allForks.getAllBlockSignatureSetsExceptProposer(preState, block)
: allForks.getAllBlockSignatureSets(preState as CachedBeaconState<allForks.BeaconState>, block))
);
}

if (signatureSets.length > 0 && !(await bls.verifySignatureSets(signatureSets))) {
throw new ChainSegmentError({
code: BlockErrorCode.INVALID_SIGNATURE,
job,
importedBlocks,
});
}
}
} catch (e) {
if (e instanceof RegenError) {
throw new ChainSegmentError({
code: BlockErrorCode.PRESTATE_MISSING,
job,
importedBlocks,
});
throw new ChainSegmentError({code: BlockErrorCode.BEACON_CHAIN_ERROR, error: e as Error, job, importedBlocks});
}
}
}

async function processBlocksInEpoch(
{config, forkChoice, regen, emitter, checkpointStateCache, bls, metrics, opts}: BlockProcessModules,
job: IChainSegmentJob,
blocksInEpoch: allForks.SignedBeaconBlock[],
onProcessBlock?: (block: allForks.SignedBeaconBlock) => void
): Promise<void> {
const firstBlock = blocksInEpoch[0];
if (!firstBlock) {
return;
}

try {
let preState = await regen.getPreState(firstBlock.message);
for (const block of blocksInEpoch) {
preState = await runStateTransition({emitter, forkChoice, metrics}, checkpointStateCache, preState, {
reprocess: job.reprocess,
prefinalized: job.prefinalized,
signedBlock: block,
validProposerSignature: job.validProposerSignature,
validSignatures: job.validSignatures,
});

// Callback to count processed blocks in processChainSegment
if (onProcessBlock) onProcessBlock(block);

// this avoids keeping our node busy processing blocks
await sleep(0);
}

// Verify signatures after running state transition, so all SyncCommittee signed roots are known at this point.
// We must ensure block.slot <= state.slot before running getAllBlockSignatureSets().
if (!job.validSignatures && !opts?.disableBlsBatchVerify) {
const signatureSets: ISignatureSet[] = [];
for (const block of blocksInEpoch) {
signatureSets.push(
...(job.validProposerSignature
? allForks.getAllBlockSignatureSetsExceptProposer(preState, block)
: allForks.getAllBlockSignatureSets(preState as CachedBeaconState<allForks.BeaconState>, block))
);
}

if (e instanceof BlockError) {
throw new ChainSegmentError({
...e.type,
if (signatureSets.length > 0 && !(await bls.verifySignatureSets(signatureSets))) {
throw new BlockError({
code: BlockErrorCode.INVALID_SIGNATURE,
job,
importedBlocks,
});
}

throw new ChainSegmentError({
code: BlockErrorCode.BEACON_CHAIN_ERROR,
error: e as Error,
}
} catch (e) {
if (e instanceof RegenError) {
throw new BlockError({
code: BlockErrorCode.PRESTATE_MISSING,
job,
importedBlocks,
});
}

if (e instanceof BlockError) {
throw e;
}

throw new BlockError({
code: BlockErrorCode.BEACON_CHAIN_ERROR,
error: e as Error,
job,
});
}
}

0 comments on commit 4b7b122

Please sign in to comment.