Skip to content

Commit

Permalink
WIP import block+blobs coupled
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Nov 16, 2022
1 parent d268815 commit 3d86a02
Show file tree
Hide file tree
Showing 25 changed files with 219 additions and 241 deletions.
14 changes: 11 additions & 3 deletions packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import {routes} from "@lodestar/api";

import {computeTimeAtSlot} from "@lodestar/state-transition";
import {SLOTS_PER_HISTORICAL_ROOT} from "@lodestar/params";
import {ForkSeq, SLOTS_PER_HISTORICAL_ROOT} from "@lodestar/params";
import {sleep} from "@lodestar/utils";
import {eip4844} from "@lodestar/types";
import {fromHexString, toHexString} from "@chainsafe/ssz";
import {BlockImport} from "../../../../chain/blocks/types.js";
import {promiseAllMaybeAsync} from "../../../../util/promises.js";
import {BlockError, BlockErrorCode} from "../../../../chain/errors/index.js";
import {OpSource} from "../../../../metrics/validatorMonitor.js";
Expand Down Expand Up @@ -187,13 +189,19 @@ export function getBeaconBlockApi({

metrics?.registerBeaconBlock(OpSource.api, seenTimestampSec, signedBlock.message);

// TODO EIP-4844: Open question if broadcast to both block topic + block_and_blobs topic
const blockForImport: BlockImport =
config.getForkSeq(signedBlock.message.slot) >= ForkSeq.eip4844
? {block: signedBlock, blobs: chain.getBlobsSidecar(signedBlock.message as eip4844.BeaconBlock)}
: {block: signedBlock, blobs: null};

await promiseAllMaybeAsync([
// Send the block, regardless of whether or not it is valid. The API
// specification is very clear that this is the desired behaviour.
() => network.publishBeaconBlockMaybeBlobs(signedBlock),
() => network.publishBeaconBlockMaybeBlobs(blockForImport),

() =>
chain.processBlock(signedBlock).catch((e) => {
chain.processBlock(blockForImport).catch((e) => {
if (e instanceof BlockError && e.type.code === BlockErrorCode.PARENT_UNKNOWN) {
network.events.emit(NetworkEvent.unknownBlockParent, signedBlock, network.peerId.toString());
}
Expand Down
10 changes: 9 additions & 1 deletion packages/beacon-node/src/api/impl/beacon/state/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import {
createCachedBeaconState,
createEmptyEpochContextImmutableData,
PubkeyIndexMap,
BlockExternalData,
ExecutionPayloadStatus,
DataAvailableStatus,
} from "@lodestar/state-transition";
import {BLSPubkey, phase0} from "@lodestar/types";
import {stateTransition, processSlots} from "@lodestar/state-transition";
Expand Down Expand Up @@ -221,7 +224,12 @@ async function getFinalizedState(

// process blocks up to the requested slot
for await (const block of db.blockArchive.valuesStream({gt: state.slot, lte: slot})) {
state = stateTransition(state, block, {
// Replaying finalized blocks, all data is considered valid
const externalData: BlockExternalData = {
executionPayloadStatus: ExecutionPayloadStatus.valid,
dataAvailableStatus: DataAvailableStatus.available,
};
state = stateTransition(state, block, externalData, {
verifyStateRoot: false,
verifyProposer: false,
verifySignatures: false,
Expand Down
14 changes: 12 additions & 2 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {altair, ssz} from "@lodestar/types";
import {MAX_SEED_LOOKAHEAD, SLOTS_PER_EPOCH} from "@lodestar/params";
import {ForkSeq, MAX_SEED_LOOKAHEAD, SLOTS_PER_EPOCH} from "@lodestar/params";
import {toHexString} from "@chainsafe/ssz";
import {
CachedBeaconStateAltair,
Expand Down Expand Up @@ -48,7 +48,7 @@ export async function importBlock(
fullyVerifiedBlock: FullyVerifiedBlock,
opts: ImportBlockOpts
): Promise<void> {
const {block, postState, parentBlockSlot, executionStatus} = fullyVerifiedBlock;
const {block, blobs, postState, parentBlockSlot, executionStatus} = fullyVerifiedBlock;
const pendingEvents = new PendingEvents(this.emitter);

// - Observe attestations
Expand Down Expand Up @@ -305,8 +305,18 @@ export async function importBlock(
// MUST happen before any other block is processed
// This adds the state necessary to process the next block
this.stateCache.add(postState);

await this.db.block.add(block);

if (this.config.getForkSeq(block.message.slot) >= ForkSeq.eip4844) {
if (!blobs) {
throw Error("blobsSidecar not provided for block post eip4844");
}
await this.db.blobsSidecar.add(blobs);

// TODO EIP-4844: Prune old blobs
}

// - head_tracker.register_block(block_root, parent_root, slot)

// - Send event after everything is done
Expand Down
10 changes: 5 additions & 5 deletions packages/beacon-node/src/chain/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import type {BeaconChain} from "../chain.js";
import {verifyBlocksInEpoch} from "./verifyBlock.js";
import {importBlock} from "./importBlock.js";
import {assertLinearChainSegment} from "./utils/chainSegment.js";
import {FullyVerifiedBlock, ImportBlockOpts} from "./types.js";
import {BlockImport, FullyVerifiedBlock, ImportBlockOpts} from "./types.js";
import {verifyBlocksSanityChecks} from "./verifyBlocksSanityChecks.js";
export {ImportBlockOpts} from "./types.js";

Expand All @@ -19,10 +19,10 @@ const QUEUE_MAX_LENGTH = 256;
* BlockProcessor processes block jobs in a queued fashion, one after the other.
*/
export class BlockProcessor {
readonly jobQueue: JobItemQueue<[allForks.SignedBeaconBlock[], ImportBlockOpts], void>;
readonly jobQueue: JobItemQueue<[BlockImport[], ImportBlockOpts], void>;

constructor(chain: BeaconChain, metrics: IMetrics | null, opts: BlockProcessOpts, signal: AbortSignal) {
this.jobQueue = new JobItemQueue<[allForks.SignedBeaconBlock[], ImportBlockOpts], void>(
this.jobQueue = new JobItemQueue<[BlockImport[], ImportBlockOpts], void>(
(job, importOpts) => {
return processBlocks.call(chain, job, {...opts, ...importOpts});
},
Expand All @@ -31,7 +31,7 @@ export class BlockProcessor {
);
}

async processBlocksJob(job: allForks.SignedBeaconBlock[], opts: ImportBlockOpts = {}): Promise<void> {
async processBlocksJob(job: BlockImport[], opts: ImportBlockOpts = {}): Promise<void> {
await this.jobQueue.push(job, opts);
}
}
Expand All @@ -48,7 +48,7 @@ export class BlockProcessor {
*/
export async function processBlocks(
this: BeaconChain,
blocks: allForks.SignedBeaconBlock[],
blocks: BlockImport[],
opts: BlockProcessOpts & ImportBlockOpts
): Promise<void> {
if (blocks.length === 0) {
Expand Down
8 changes: 7 additions & 1 deletion packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {MaybeValidExecutionStatus} from "@lodestar/fork-choice";
import {allForks, Slot} from "@lodestar/types";
import {allForks, eip4844, Slot} from "@lodestar/types";

export type BlockImport = {
block: allForks.SignedBeaconBlock;
blobs: eip4844.BlobsSidecar | null;
};

export type ImportBlockOpts = {
/**
Expand Down Expand Up @@ -44,6 +49,7 @@ export type ImportBlockOpts = {
*/
export type FullyVerifiedBlock = {
block: allForks.SignedBeaconBlock;
blobs: eip4844.BlobsSidecar | null;
postState: CachedBeaconStateAllForks;
parentBlockSlot: Slot;
proposerBalanceDelta: number;
Expand Down
9 changes: 5 additions & 4 deletions packages/beacon-node/src/chain/blocks/utils/chainSegment.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import {IChainForkConfig} from "@lodestar/config";
import {allForks, ssz} from "@lodestar/types";
import {ssz} from "@lodestar/types";
import {BlockError, BlockErrorCode} from "../../errors/index.js";
import {BlockImport} from "../types.js";

/**
* Assert this chain segment of blocks is linear with slot numbers and hashes
*/
export function assertLinearChainSegment(config: IChainForkConfig, blocks: allForks.SignedBeaconBlock[]): void {
for (const [i, block] of blocks.entries()) {
const child = blocks[i + 1];
export function assertLinearChainSegment(config: IChainForkConfig, blocks: BlockImport[]): void {
for (const [i, {block}] of blocks.entries()) {
const child = blocks[i + 1].block;
if (child !== undefined) {
// If this block has a child in this chain segment, ensure that its parent root matches
// the root of this block.
Expand Down
10 changes: 5 additions & 5 deletions packages/beacon-node/src/chain/blocks/verifyBlock.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {CachedBeaconStateAllForks, computeEpochAtSlot} from "@lodestar/state-transition";
import {allForks, bellatrix} from "@lodestar/types";
import {bellatrix} from "@lodestar/types";
import {toHexString} from "@chainsafe/ssz";
import {ProtoBlock} from "@lodestar/fork-choice";
import {IChainForkConfig} from "@lodestar/config";
Expand All @@ -8,7 +8,7 @@ import {BlockError, BlockErrorCode} from "../errors/index.js";
import {BlockProcessOpts} from "../options.js";
import {RegenCaller} from "../regen/index.js";
import type {BeaconChain} from "../chain.js";
import {ImportBlockOpts} from "./types.js";
import {BlockImport, ImportBlockOpts} from "./types.js";
import {POS_PANDA_MERGE_TRANSITION_BANNER} from "./utils/pandaMergeTransitionBanner.js";
import {verifyBlocksStateTransitionOnly} from "./verifyBlocksStateTransitionOnly.js";
import {verifyBlocksSignatures} from "./verifyBlocksSignatures.js";
Expand All @@ -28,7 +28,7 @@ import {verifyBlocksExecutionPayload, SegmentExecStatus} from "./verifyBlocksExe
export async function verifyBlocksInEpoch(
this: BeaconChain,
parentBlock: ProtoBlock,
blocks: allForks.SignedBeaconBlock[],
blocks: BlockImport[],
opts: BlockProcessOpts & ImportBlockOpts
): Promise<{
postStates: CachedBeaconStateAllForks[];
Expand All @@ -39,12 +39,12 @@ export async function verifyBlocksInEpoch(
throw Error("Empty partiallyVerifiedBlocks");
}

const block0 = blocks[0];
const block0 = blocks[0].block;
const block0Epoch = computeEpochAtSlot(block0.message.slot);

// Ensure all blocks are in the same epoch
for (let i = 1; i < blocks.length; i++) {
const blockSlot = blocks[i].message.slot;
const blockSlot = blocks[i].block.message.slot;
if (block0Epoch !== computeEpochAtSlot(blockSlot)) {
throw Error(`Block ${i} slot ${blockSlot} not in same epoch ${block0Epoch}`);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import {computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {IChainForkConfig} from "@lodestar/config";
import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {allForks, Slot} from "@lodestar/types";
import {Slot} from "@lodestar/types";
import {toHexString} from "@lodestar/utils";
import {IBeaconClock} from "../clock/interface.js";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {ImportBlockOpts} from "./types.js";
import {BlockImport, ImportBlockOpts} from "./types.js";

/**
* Verifies some early cheap sanity checks on the block before running the full state transition.
Expand All @@ -21,18 +21,19 @@ import {ImportBlockOpts} from "./types.js";
*/
export function verifyBlocksSanityChecks(
chain: {forkChoice: IForkChoice; clock: IBeaconClock; config: IChainForkConfig},
blocks: allForks.SignedBeaconBlock[],
blocks: BlockImport[],
opts: ImportBlockOpts
): {relevantBlocks: allForks.SignedBeaconBlock[]; parentSlots: Slot[]; parentBlock: ProtoBlock | null} {
): {relevantBlocks: BlockImport[]; parentSlots: Slot[]; parentBlock: ProtoBlock | null} {
if (blocks.length === 0) {
throw Error("Empty partiallyVerifiedBlocks");
}

const relevantBlocks: allForks.SignedBeaconBlock[] = [];
const relevantBlocks: BlockImport[] = [];
const parentSlots: Slot[] = [];
let parentBlock: ProtoBlock | null = null;

for (const block of blocks) {
for (const blockImport of blocks) {
const {block} = blockImport;
const blockSlot = block.message.slot;

// Not genesis block
Expand All @@ -59,7 +60,7 @@ export function verifyBlocksSanityChecks(
let parentBlockSlot: Slot;

if (relevantBlocks.length > 0) {
parentBlockSlot = relevantBlocks[relevantBlocks.length - 1].message.slot;
parentBlockSlot = relevantBlocks[relevantBlocks.length - 1].block.message.slot;
} else {
// When importing a block segment, only the first NON-IGNORED block must be known to the fork-choice.
const parentRoot = toHexString(block.message.parentRoot);
Expand Down Expand Up @@ -92,7 +93,7 @@ export function verifyBlocksSanityChecks(
}

// Block is relevant
relevantBlocks.push(block);
relevantBlocks.push(blockImport);
parentSlots.push(parentBlockSlot);
}

Expand Down
7 changes: 5 additions & 2 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import {CheckpointBalancesCache} from "./balancesCache.js";
import {AssembledBlockType, BlockType} from "./produceBlock/index.js";
import {BlobsResultType, BlockAttributes, produceBlockBody} from "./produceBlock/produceBlockBody.js";
import {computeNewStateRoot} from "./produceBlock/computeNewStateRoot.js";
import {BlockImport} from "./blocks/types.js";

export class BeaconChain implements IBeaconChain {
readonly genesisTime: UintNum64;
Expand Down Expand Up @@ -119,6 +120,7 @@ export class BeaconChain implements IBeaconChain {
private successfulExchangeTransition = false;
private readonly exchangeTransitionConfigurationEverySlots: number;

// TODO EIP-4844: Prune data structure every time period, for both old entries
/** Map keyed by executionPayload.blockHash of the block for those blobs */
private readonly producedBlobsCache = new Map<RootHex, eip4844.Blobs>();

Expand Down Expand Up @@ -384,6 +386,7 @@ export class BeaconChain implements IBeaconChain {

// Cache for latter broadcasting
if (blobs.type === BlobsResultType.produced) {
// TODO EIP-4844: Prune data structure for max entries
this.producedBlobsCache.set(blobs.blockHash, blobs.blobs);
}

Expand Down Expand Up @@ -416,11 +419,11 @@ export class BeaconChain implements IBeaconChain {
};
}

async processBlock(block: allForks.SignedBeaconBlock, opts?: ImportBlockOpts): Promise<void> {
async processBlock(block: BlockImport, opts?: ImportBlockOpts): Promise<void> {
return await this.blockProcessor.processBlocksJob([block], opts);
}

async processChainSegment(blocks: allForks.SignedBeaconBlock[], opts?: ImportBlockOpts): Promise<void> {
async processChainSegment(blocks: BlockImport[], opts?: ImportBlockOpts): Promise<void> {
return await this.blockProcessor.processBlocksJob(blocks, opts);
}

Expand Down
6 changes: 3 additions & 3 deletions packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
import {AttestationPool, OpPool, SyncCommitteeMessagePool, SyncContributionAndProofPool} from "./opPools/index.js";
import {LightClientServer} from "./lightClient/index.js";
import {AggregatedAttestationPool} from "./opPools/aggregatedAttestationPool.js";
import {ImportBlockOpts} from "./blocks/types.js";
import {BlockImport, ImportBlockOpts} from "./blocks/types.js";
import {ReprocessController} from "./reprocess.js";
import {SeenAggregatedAttestations} from "./seenCache/seenAggregateAndProof.js";
import {BeaconProposerCache, ProposerPreparationData} from "./beaconProposerCache.js";
Expand Down Expand Up @@ -114,9 +114,9 @@ export interface IBeaconChain {
getBlobsSidecar(beaconBlock: eip4844.BeaconBlock): eip4844.BlobsSidecar;

/** Process a block until complete */
processBlock(block: allForks.SignedBeaconBlock, opts?: ImportBlockOpts): Promise<void>;
processBlock(block: BlockImport, opts?: ImportBlockOpts): Promise<void>;
/** Process a chain of blocks until complete */
processChainSegment(blocks: allForks.SignedBeaconBlock[], opts?: ImportBlockOpts): Promise<void>;
processChainSegment(blocks: BlockImport[], opts?: ImportBlockOpts): Promise<void>;

getStatus(): phase0.Status;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ export async function produceBlockBody<T extends BlockType>(
if (forkName === ForkName.eip4844) {
// Empty blobs for now
(blockBody as eip4844.BeaconBlockBody).blobKzgCommitments = [];
blobs = {blobs: [], blockHash: executionPayloadHeader.blockHash};
blobs = {blobs: [], blockHash: toHex(executionPayloadHeader.blockHash)};
}
}

Expand Down
18 changes: 4 additions & 14 deletions packages/beacon-node/src/db/repositories/blobsSidecar.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import {IChainForkConfig} from "@lodestar/config";
import {Bucket, Db, Repository} from "@lodestar/db";
import {allForks, eip4844, ssz} from "@lodestar/types";
import {getSignedBlockTypeFromBytes} from "../../util/multifork.js";
import {eip4844, ssz} from "@lodestar/types";

/**
* BlobsSidecar by block root (= hash_tree_root(SignedBeaconBlockAndBlobsSidecar.beacon_block.message))
Expand All @@ -10,22 +9,13 @@ import {getSignedBlockTypeFromBytes} from "../../util/multifork.js";
*/
export class BlobsSidecarRepository extends Repository<Uint8Array, eip4844.BlobsSidecar> {
constructor(config: IChainForkConfig, db: Db) {
const type = ssz.phase0.SignedBeaconBlock; // Pick some type but won't be used
super(config, db, Bucket.allForks_block, type);
super(config, db, Bucket.allForks_blobsSidecar, ssz.eip4844.BlobsSidecar);
}

/**
* Id is hashTreeRoot of unsigned BeaconBlock
*/
getId(value: allForks.SignedBeaconBlock): Uint8Array {
return this.config.getForkTypes(value.message.slot).BeaconBlock.hashTreeRoot(value.message);
}

encodeValue(value: allForks.SignedBeaconBlock): Buffer {
return this.config.getForkTypes(value.message.slot).SignedBeaconBlock.serialize(value) as Buffer;
}

decodeValue(data: Buffer): allForks.SignedBeaconBlock {
return getSignedBlockTypeFromBytes(this.config, data).deserialize(data);
getId(value: eip4844.BlobsSidecar): Uint8Array {
return value.beaconBlockRoot;
}
}
Loading

0 comments on commit 3d86a02

Please sign in to comment.