Skip to content

Commit

Permalink
feat: move to blob_sidecars_by_range/root methods (#5564)
Browse files Browse the repository at this point in the history
* feat: move req/resp to blob_sidecars_by_range/root methods

* fix unit test
  • Loading branch information
g11tech authored May 30, 2023
1 parent 8c05e37 commit 5672d84
Show file tree
Hide file tree
Showing 18 changed files with 406 additions and 339 deletions.
24 changes: 13 additions & 11 deletions packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@ import {SLOTS_PER_HISTORICAL_ROOT} from "@lodestar/params";
import {sleep} from "@lodestar/utils";
import {allForks, deneb} from "@lodestar/types";
import {fromHexString, toHexString} from "@chainsafe/ssz";
import {BlockSource, getBlockInput, ImportBlockOpts, BlockInput} from "../../../../chain/blocks/types.js";
import {
BlockSource,
getBlockInput,
ImportBlockOpts,
BlockInput,
blobSidecarsToBlobsSidecar,
} from "../../../../chain/blocks/types.js";
import {promiseAllMaybeAsync} from "../../../../util/promises.js";
import {isOptimisticBlock} from "../../../../util/forkChoice.js";
import {BlockError, BlockErrorCode} from "../../../../chain/errors/index.js";
import {OpSource} from "../../../../metrics/validatorMonitor.js";
import {NetworkEvent} from "../../../../network/index.js";
import {ApiModules} from "../../types.js";
import {ckzg} from "../../../../util/kzg.js";
import {resolveBlockId, toBeaconHeaderResponse} from "./utils.js";

/**
Expand Down Expand Up @@ -213,21 +218,18 @@ export function getBeaconBlockApi({
if (isSignedBlockContents(signedBlockOrContents)) {
// Build a blockInput for post deneb, signedBlobs will be be used in followup PRs
({signedBlock, signedBlobSidecars: signedBlobs} = signedBlockOrContents as SignedBlockContents);
const beaconBlockSlot = signedBlock.message.slot;
const beaconBlockRoot = config.getForkTypes(beaconBlockSlot).BeaconBlock.hashTreeRoot(signedBlock.message);
const blobs = signedBlobs.map((sblob) => sblob.message.blob);
const blobsSidecar = blobSidecarsToBlobsSidecar(
config,
signedBlock,
signedBlobs.map(({message}) => message)
);

blockForImport = getBlockInput.postDeneb(
config,
signedBlock,
BlockSource.api,
// The blobsSidecar will be replaced in the followup PRs with just blobs
{
beaconBlockRoot,
beaconBlockSlot,
blobs,
kzgAggregatedProof: ckzg.computeAggregateKzgProof(blobs),
}
blobsSidecar
);
} else {
signedBlock = signedBlockOrContents as allForks.SignedBeaconBlock;
Expand Down
21 changes: 21 additions & 0 deletions packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import {allForks, deneb, Slot, WithOptionalBytes} from "@lodestar/types";
import {ForkSeq, MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS} from "@lodestar/params";
import {ChainForkConfig} from "@lodestar/config";

import {ckzg} from "../../util/kzg.js";

export enum BlockInputType {
preDeneb = "preDeneb",
postDeneb = "postDeneb",
Expand All @@ -29,6 +31,25 @@ export function blockRequiresBlobs(config: ChainForkConfig, blockSlot: Slot, clo
);
}

// TODO DENEB: a helper function to convert blobSidecars to blobsSidecar, to be cleanup on BlockInput
// migration
export function blobSidecarsToBlobsSidecar(
config: ChainForkConfig,
signedBlock: allForks.SignedBeaconBlock,
blobSidecars: deneb.BlobSidecars
): deneb.BlobsSidecar {
const beaconBlockSlot = signedBlock.message.slot;
const beaconBlockRoot = config.getForkTypes(beaconBlockSlot).BeaconBlock.hashTreeRoot(signedBlock.message);
const blobs = blobSidecars.map(({blob}) => blob);
const blobsSidecar = {
beaconBlockRoot,
beaconBlockSlot,
blobs,
kzgAggregatedProof: ckzg.computeAggregateKzgProof(blobs),
};
return blobsSidecar;
}

export const getBlockInput = {
preDeneb(config: ChainForkConfig, block: allForks.SignedBeaconBlock, source: BlockSource): BlockInput {
if (config.getForkSeq(block.message.slot) >= ForkSeq.deneb) {
Expand Down
10 changes: 2 additions & 8 deletions packages/beacon-node/src/network/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,8 @@ export interface INetwork extends INetworkCorePublic {
peerId: PeerIdStr,
request: phase0.BeaconBlocksByRootRequest
): Promise<allForks.SignedBeaconBlock[]>;
sendBlobsSidecarsByRange(
peerId: PeerIdStr,
request: deneb.BlobsSidecarsByRangeRequest
): Promise<deneb.BlobsSidecar[]>;
sendBeaconBlockAndBlobsSidecarByRoot(
peerId: PeerIdStr,
request: deneb.BeaconBlockAndBlobsSidecarByRootRequest
): Promise<deneb.SignedBeaconBlockAndBlobsSidecar[]>;
sendBlobSidecarsByRange(peerId: PeerIdStr, request: deneb.BlobSidecarsByRangeRequest): Promise<deneb.BlobSidecar[]>;
sendBlobSidecarsByRoot(peerId: PeerIdStr, request: deneb.BlobSidecarsByRootRequest): Promise<deneb.BlobSidecar[]>;

// Gossip
publishBeaconBlockMaybeBlobs(blockInput: BlockInput): Promise<number>;
Expand Down
20 changes: 10 additions & 10 deletions packages/beacon-node/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -471,25 +471,25 @@ export class Network implements INetwork {
);
}

async sendBlobsSidecarsByRange(
async sendBlobSidecarsByRange(
peerId: PeerIdStr,
request: deneb.BlobsSidecarsByRangeRequest
): Promise<deneb.BlobsSidecar[]> {
request: deneb.BlobSidecarsByRangeRequest
): Promise<deneb.BlobSidecar[]> {
return collectMaxResponseTyped(
this.sendReqRespRequest(peerId, ReqRespMethod.BlobsSidecarsByRange, [Version.V1], request),
this.sendReqRespRequest(peerId, ReqRespMethod.BlobSidecarsByRange, [Version.V1], request),
request.count,
responseSszTypeByMethod[ReqRespMethod.BlobsSidecarsByRange]
responseSszTypeByMethod[ReqRespMethod.BlobSidecarsByRange]
);
}

async sendBeaconBlockAndBlobsSidecarByRoot(
async sendBlobSidecarsByRoot(
peerId: PeerIdStr,
request: deneb.BeaconBlockAndBlobsSidecarByRootRequest
): Promise<deneb.SignedBeaconBlockAndBlobsSidecar[]> {
request: deneb.BlobSidecarsByRootRequest
): Promise<deneb.BlobSidecar[]> {
return collectMaxResponseTyped(
this.sendReqRespRequest(peerId, ReqRespMethod.BeaconBlockAndBlobsSidecarByRoot, [Version.V1], request),
this.sendReqRespRequest(peerId, ReqRespMethod.BlobSidecarsByRoot, [Version.V1], request),
request.length,
responseSszTypeByMethod[ReqRespMethod.BeaconBlockAndBlobsSidecarByRoot]
responseSszTypeByMethod[ReqRespMethod.BlobSidecarsByRoot]
);
}

Expand Down
7 changes: 2 additions & 5 deletions packages/beacon-node/src/network/reqresp/ReqRespBeaconNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,8 @@ export class ReqRespBeaconNode extends ReqResp {

if (ForkSeq[fork] >= ForkSeq.deneb) {
protocolsAtFork.push(
[
protocols.BeaconBlockAndBlobsSidecarByRoot(this.config),
this.getHandler(ReqRespMethod.BeaconBlockAndBlobsSidecarByRoot),
],
[protocols.BlobsSidecarsByRange(this.config), this.getHandler(ReqRespMethod.BlobsSidecarsByRange)]
[protocols.BlobSidecarsByRoot(this.config), this.getHandler(ReqRespMethod.BlobSidecarsByRoot)],
[protocols.BlobSidecarsByRange(this.config), this.getHandler(ReqRespMethod.BlobSidecarsByRange)]
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import {BeaconConfig} from "@lodestar/config";
import {deneb, Epoch, phase0} from "@lodestar/types";
import {ChainForkConfig} from "@lodestar/config";
import {deneb, Epoch, phase0, allForks, Slot} from "@lodestar/types";
import {ForkSeq, MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS} from "@lodestar/params";
import {computeEpochAtSlot} from "@lodestar/state-transition";

import {BlockInput, BlockSource, getBlockInput} from "../../chain/blocks/types.js";
import {getEmptyBlobsSidecar} from "../../util/blobs.js";
import {BlockInput, BlockSource, getBlockInput, blobSidecarsToBlobsSidecar} from "../../chain/blocks/types.js";
import {PeerIdStr} from "../../util/peerId.js";
import {INetwork} from "../interface.js";

export async function beaconBlocksMaybeBlobsByRange(
config: BeaconConfig,
config: ChainForkConfig,
network: INetwork,
peerId: PeerIdStr,
request: phase0.BeaconBlocksByRangeRequest,
Expand Down Expand Up @@ -39,63 +38,81 @@ export async function beaconBlocksMaybeBlobsByRange(

// Only request blobs if they are recent enough
else if (computeEpochAtSlot(startSlot) >= currentEpoch - MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS) {
const [blocks, blobsSidecars] = await Promise.all([
const [allBlocks, allBlobSidecars] = await Promise.all([
network.sendBeaconBlocksByRange(peerId, request),
network.sendBlobsSidecarsByRange(peerId, request),
network.sendBlobSidecarsByRange(peerId, request),
]);

const blockInputs: BlockInput[] = [];
let blobSideCarIndex = 0;
let lastMatchedSlot = -1;
return matchBlockWithBlobs(config, allBlocks, allBlobSidecars, endSlot, BlockSource.byRange);
}

// Post Deneb but old blobs
else {
throw Error("Cannot sync blobs outside of blobs prune window");
}
}

// Assumes that the blobs are in the same sequence as blocks, doesn't require block to be sorted
export function matchBlockWithBlobs(
config: ChainForkConfig,
allBlocks: allForks.SignedBeaconBlock[],
allBlobSidecars: deneb.BlobSidecar[],
endSlot: Slot,
blockSource: BlockSource
): BlockInput[] {
const blockInputs: BlockInput[] = [];
let blobSideCarIndex = 0;
let lastMatchedSlot = -1;

// Match blobSideCar with the block as some blocks would have no blobs and hence
// would be omitted from the response. If there are any inconsitencies in the
// response, the validations during import will reject the block and hence this
// entire segment.
//
// Assuming that the blocks and blobs will come in same sorted order
for (let i = 0; i < blocks.length; i++) {
const block = blocks[i];
let blobsSidecar: deneb.BlobsSidecar;
// Match blobSideCar with the block as some blocks would have no blobs and hence
// would be omitted from the response. If there are any inconsitencies in the
// response, the validations during import will reject the block and hence this
// entire segment.
//
// Assuming that the blocks and blobs will come in same sorted order
for (let i = 0; i < allBlocks.length; i++) {
const block = allBlocks[i];
if (config.getForkSeq(block.message.slot) < ForkSeq.deneb) {
blockInputs.push(getBlockInput.preDeneb(config, block, blockSource));
} else {
const blobSidecars: deneb.BlobSidecar[] = [];

if (blobsSidecars[blobSideCarIndex]?.beaconBlockSlot === block.message.slot) {
blobsSidecar = blobsSidecars[blobSideCarIndex];
let blobSidecar: deneb.BlobSidecar;
while ((blobSidecar = allBlobSidecars[blobSideCarIndex])?.slot === block.message.slot) {
blobSidecars.push(blobSidecar);
lastMatchedSlot = block.message.slot;
blobSideCarIndex++;
} else {
// Quick inspect if the blobsSidecar was expected
const blobKzgCommitmentsLen = (block.message.body as deneb.BeaconBlockBody).blobKzgCommitments.length;
if (blobKzgCommitmentsLen !== 0) {
throw Error(
`Missing blobsSidecar for blockSlot=${block.message.slot} with blobKzgCommitmentsLen=${blobKzgCommitmentsLen}`
);
}
blobsSidecar = getEmptyBlobsSidecar(config, block as deneb.SignedBeaconBlock);
}
blockInputs.push(getBlockInput.postDeneb(config, block, BlockSource.byRange, blobsSidecar));
}

// If there are still unconsumed blobs this means that the response was inconsistent
// and matching was wrong and hence we should throw error
if (
blobsSidecars[blobSideCarIndex] !== undefined &&
// If there are no blobs, the blobs request can give 1 block outside the requested range
blobsSidecars[blobSideCarIndex].beaconBlockSlot <= endSlot
) {
throw Error(
`Unmatched blobsSidecars, blocks=${blocks.length}, blobs=${
blobsSidecars.length
} lastMatchedSlot=${lastMatchedSlot}, pending blobsSidecars slots=${blobsSidecars
.slice(blobSideCarIndex)
.map((blb) => blb.beaconBlockSlot)
.join(",")}`
);
// Quick inspect how many blobSidecars was expected
const blobKzgCommitmentsLen = (block.message.body as deneb.BeaconBlockBody).blobKzgCommitments.length;
if (blobKzgCommitmentsLen !== blobSidecars.length) {
throw Error(
`Missing blobSidecars for blockSlot=${block.message.slot} with blobKzgCommitmentsLen=${blobKzgCommitmentsLen} blobSidecars=${blobSidecars.length}`
);
}

// TODO DENEB: cleanup blobSidecars to blobsSidecar conversion on migration of blockInput
const blobsSidecar = blobSidecarsToBlobsSidecar(config, block, blobSidecars);
blockInputs.push(getBlockInput.postDeneb(config, block, blockSource, blobsSidecar));
}
return blockInputs;
}

// Post Deneb but old blobs
else {
throw Error("Cannot sync blobs outside of blobs prune window");
// If there are still unconsumed blobs this means that the response was inconsistent
// and matching was wrong and hence we should throw error
if (
allBlobSidecars[blobSideCarIndex] !== undefined &&
// If there are no blobs, the blobs request can give 1 block outside the requested range
allBlobSidecars[blobSideCarIndex].slot <= endSlot
) {
throw Error(
`Unmatched blobSidecars, blocks=${allBlocks.length}, blobs=${
allBlobSidecars.length
} lastMatchedSlot=${lastMatchedSlot}, pending blobSidecars slots=${allBlobSidecars
.slice(blobSideCarIndex)
.map((blb) => blb.slot)
.join(",")}`
);
}
return blockInputs;
}
Loading

0 comments on commit 5672d84

Please sign in to comment.