Skip to content

Commit

Permalink
Merge 11e21f0 into 06b4c2d
Browse files Browse the repository at this point in the history
  • Loading branch information
g11tech authored Nov 3, 2024
2 parents 06b4c2d + 11e21f0 commit a7b8430
Show file tree
Hide file tree
Showing 15 changed files with 663 additions and 35 deletions.
10 changes: 5 additions & 5 deletions packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,12 @@ export function getBeaconBlockApi({
// specification is very clear that this is the desired behaviour.
//
// i) Publish blobs and block before importing so that network can see them asap
// ii) publish blobs first because
// a) by the times nodes see block, they might decide to pull blobs
// b) they might require more hops to reach recipients in peerDAS kind of setup where
// blobs might need to hop between nodes because of partial subnet subscription
...blobSidecars.map((blobSidecar) => () => network.publishBlobSidecar(blobSidecar)),
// ii) publish block first because
// a) as soon as node sees block they can start processing it while blobs arrive
// b) getting block first allows nodes to use getBlobs from local ELs and save
// import latency and hopefully bandwidth
() => network.publishBeaconBlock(signedBlock) as Promise<unknown>,
...blobSidecars.map((blobSidecar) => () => network.publishBlobSidecar(blobSidecar)),
() =>
// there is no rush to persist block since we published it to gossip anyway
chain
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/blocks/verifyBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ export async function verifyBlocksInEpoch(
} as SegmentExecStatus),

// data availability for the blobs
verifyBlocksDataAvailability(this, blocksInput, opts),
verifyBlocksDataAvailability(this, blocksInput, abortController.signal, opts),

// Run state transition only
// TODO: Ensure it yields to allow flushing to workers and engine API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {ChainForkConfig} from "@lodestar/config";
import {DataAvailabilityStatus} from "@lodestar/fork-choice";
import {computeTimeAtSlot} from "@lodestar/state-transition";
import {UintNum64, deneb} from "@lodestar/types";
import {Logger} from "@lodestar/utils";
import {ErrorAborted, Logger} from "@lodestar/utils";
import {Metrics} from "../../metrics/metrics.js";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {validateBlobSidecars} from "../validation/blobSidecar.js";
Expand All @@ -27,6 +27,7 @@ const BLOB_AVAILABILITY_TIMEOUT = 12_000;
export async function verifyBlocksDataAvailability(
chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger; metrics: Metrics | null},
blocks: BlockInput[],
signal: AbortSignal,
opts: ImportBlockOpts
): Promise<{
dataAvailabilityStatuses: DataAvailabilityStatus[];
Expand All @@ -43,9 +44,12 @@ export async function verifyBlocksDataAvailability(
const availableBlockInputs: BlockInput[] = [];

for (const blockInput of blocks) {
if (signal.aborted) {
throw new ErrorAborted("verifyBlocksDataAvailability");
}
// Validate status of only not yet finalized blocks, we don't need yet to propogate the status
// as it is not used upstream anywhere
const {dataAvailabilityStatus, availableBlockInput} = await maybeValidateBlobs(chain, blockInput, opts);
const {dataAvailabilityStatus, availableBlockInput} = await maybeValidateBlobs(chain, blockInput, signal, opts);
dataAvailabilityStatuses.push(dataAvailabilityStatus);
availableBlockInputs.push(availableBlockInput);
}
Expand All @@ -69,6 +73,7 @@ export async function verifyBlocksDataAvailability(
async function maybeValidateBlobs(
chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger},
blockInput: BlockInput,
signal: AbortSignal,
opts: ImportBlockOpts
): Promise<{dataAvailabilityStatus: DataAvailabilityStatus; availableBlockInput: BlockInput}> {
switch (blockInput.type) {
Expand All @@ -92,7 +97,7 @@ async function maybeValidateBlobs(
const blobsData =
blockInput.type === BlockInputType.availableData
? blockInput.blockData
: await raceWithCutoff(chain, blockInput, blockInput.cachedData.availabilityPromise);
: await raceWithCutoff(chain, blockInput, blockInput.cachedData.availabilityPromise, signal);
const {blobs} = blobsData;

const {blobKzgCommitments} = (block as deneb.SignedBeaconBlock).message.body;
Expand Down Expand Up @@ -122,16 +127,21 @@ async function maybeValidateBlobs(
async function raceWithCutoff<T>(
chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger},
blockInput: BlockInput,
availabilityPromise: Promise<T>
availabilityPromise: Promise<T>,
signal: AbortSignal
): Promise<T> {
const {block} = blockInput;
const blockSlot = block.message.slot;

const cutoffTime = Math.max(
computeTimeAtSlot(chain.config, blockSlot, chain.genesisTime) * 1000 + BLOB_AVAILABILITY_TIMEOUT - Date.now(),
0
);
const cutoffTimeout = new Promise((_resolve, reject) => setTimeout(reject, cutoffTime));
const cutoffTime =
computeTimeAtSlot(chain.config, blockSlot, chain.genesisTime) * 1000 + BLOB_AVAILABILITY_TIMEOUT - Date.now();
const cutoffTimeout =
cutoffTime > 0
? new Promise((_resolve, reject) => {
setTimeout(() => reject(new Error("Timeout exceeded")), cutoffTime);
signal.addEventListener("abort", () => reject(signal.reason));
})
: Promise.reject(new Error("Cutoff time must be greater than 0"));
chain.logger.debug("Racing for blob availabilityPromise", {blockSlot, cutoffTime});

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ export class HttpRpcError extends Error {
/**
* JSON RPC spec errors https://www.jsonrpc.org/specification#response_object
*/
function parseJsonRpcErrorCode(code: number): string {
export function parseJsonRpcErrorCode(code: number): string {
if (code === -32700) return "Parse request error";
if (code === -32600) return "Invalid request object";
if (code === -32601) return "Method not found";
Expand Down
4 changes: 4 additions & 0 deletions packages/beacon-node/src/execution/engine/disabled.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ export class ExecutionEngineDisabled implements IExecutionEngine {
getPayloadBodiesByRange(): Promise<never> {
throw Error("Execution engine disabled");
}

getBlobs(): Promise<never> {
throw Error("Execution engine disabled");
}
}
57 changes: 56 additions & 1 deletion packages/beacon-node/src/execution/engine/http.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import {Logger} from "@lodestar/logger";
import {ForkName, ForkSeq, SLOTS_PER_EPOCH} from "@lodestar/params";
import {ExecutionPayload, ExecutionRequests, Root, RootHex, Wei} from "@lodestar/types";
import {BlobAndProof} from "@lodestar/types/deneb";
import {
ErrorJsonRpcResponse,
HttpRpcError,
IJsonRpcHttpClient,
JsonRpcHttpClientEvent,
ReqOpts,
parseJsonRpcErrorCode,
} from "../../eth1/provider/jsonRpcHttpClient.js";
import {numToQuantity} from "../../eth1/provider/utils.js";
import {bytesToData, numToQuantity} from "../../eth1/provider/utils.js";
import {Metrics} from "../../metrics/index.js";
import {EPOCHS_PER_BATCH} from "../../sync/constants.js";
import {getLodestarClientVersion} from "../../util/metadata.js";
Expand All @@ -31,6 +33,7 @@ import {
EngineApiRpcReturnTypes,
ExecutionPayloadBody,
assertReqSizeLimit,
deserializeBlobAndProofs,
deserializeExecutionPayloadBody,
parseExecutionPayload,
serializeBeaconBlockRoot,
Expand Down Expand Up @@ -111,6 +114,7 @@ const getPayloadOpts: ReqOpts = {routeId: "getPayload"};
*/
export class ExecutionEngineHttp implements IExecutionEngine {
private logger: Logger;
private lastGetBlobsErrorTime = 0;

// The default state is ONLINE, it will be updated to SYNCING once we receive the first payload
// This assumption is better than the OFFLINE state, since we can't be sure if the EL is offline and being offline may trigger some notifications
Expand Down Expand Up @@ -461,6 +465,57 @@ export class ExecutionEngineHttp implements IExecutionEngine {
return response.map(deserializeExecutionPayloadBody);
}

async getBlobs(_fork: ForkName, versionedHashes: VersionedHashes): Promise<(BlobAndProof | null)[]> {
// retry only after a day may be
const GETBLOBS_RETRY_TIMEOUT = 256 * 32 * 12;
const timeNow = Date.now() / 1000;
const timeSinceLastFail = timeNow - this.lastGetBlobsErrorTime;
if (timeSinceLastFail < GETBLOBS_RETRY_TIMEOUT) {
// do not try getblobs since it might not be available
this.logger.debug(
`disabled engine_getBlobsV1 api call since last failed < GETBLOBS_RETRY_TIMEOUT=${GETBLOBS_RETRY_TIMEOUT}`,
timeSinceLastFail
);
throw Error(
`engine_getBlobsV1 call recently failed timeSinceLastFail=${timeSinceLastFail} < GETBLOBS_RETRY_TIMEOUT=${GETBLOBS_RETRY_TIMEOUT}`
);
}

const method = "engine_getBlobsV1";
assertReqSizeLimit(versionedHashes.length, 128);
const versionedHashesHex = versionedHashes.map(bytesToData);
let response = await this.rpc
.fetchWithRetries<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>({
method,
params: [versionedHashesHex],
})
.catch((e) => {
if (e instanceof ErrorJsonRpcResponse && parseJsonRpcErrorCode(e.response.error.code) === "Method not found") {
this.lastGetBlobsErrorTime = timeNow;
this.logger.debug("disabling engine_getBlobsV1 api call since engine responded with method not availeble", {
retryTimeout: GETBLOBS_RETRY_TIMEOUT,
});
}
throw e;
});

// handle nethermind buggy response
// see: https://discord.com/channels/595666850260713488/1293605631785304088/1298956894274060301
if (
(response as unknown as {blobsAndProofs: EngineApiRpcReturnTypes[typeof method]}).blobsAndProofs !== undefined
) {
response = (response as unknown as {blobsAndProofs: EngineApiRpcReturnTypes[typeof method]}).blobsAndProofs;
}

if (response.length !== versionedHashes.length) {
const error = `Invalid engine_getBlobsV1 response length=${response.length} versionedHashes=${versionedHashes.length}`;
this.logger.error(error);
throw Error(error);
}

return response.map(deserializeBlobAndProofs);
}

private async getClientVersion(clientVersion: ClientVersion): Promise<ClientVersion[]> {
const method = "engine_getClientVersionV1";

Expand Down
4 changes: 3 additions & 1 deletion packages/beacon-node/src/execution/engine/interface.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {ForkName} from "@lodestar/params";
import {ExecutionPayload, ExecutionRequests, Root, RootHex, Wei, capella} from "@lodestar/types";
import {Blob, KZGCommitment, KZGProof} from "@lodestar/types/deneb";
import {Blob, BlobAndProof, KZGCommitment, KZGProof} from "@lodestar/types/deneb";

import {DATA} from "../../eth1/provider/utils.js";
import {PayloadId, PayloadIdCache, WithdrawalV1} from "./payloadIdCache.js";
Expand Down Expand Up @@ -179,4 +179,6 @@ export interface IExecutionEngine {
getPayloadBodiesByHash(fork: ForkName, blockHash: DATA[]): Promise<(ExecutionPayloadBody | null)[]>;

getPayloadBodiesByRange(fork: ForkName, start: number, count: number): Promise<(ExecutionPayloadBody | null)[]>;

getBlobs(fork: ForkName, versionedHashes: VersionedHashes): Promise<(BlobAndProof | null)[]>;
}
7 changes: 7 additions & 0 deletions packages/beacon-node/src/execution/engine/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ export class ExecutionEngineMockBackend implements JsonRpcBackend {
engine_getPayloadBodiesByHashV1: this.getPayloadBodiesByHash.bind(this),
engine_getPayloadBodiesByRangeV1: this.getPayloadBodiesByRange.bind(this),
engine_getClientVersionV1: this.getClientVersionV1.bind(this),
engine_getBlobsV1: this.getBlobs.bind(this),
};
}

Expand Down Expand Up @@ -397,6 +398,12 @@ export class ExecutionEngineMockBackend implements JsonRpcBackend {
return [{code: ClientCode.XX, name: "mock", version: "", commit: ""}];
}

private getBlobs(
versionedHashes: EngineApiRpcParamTypes["engine_getBlobsV1"][0]
): EngineApiRpcReturnTypes["engine_getBlobsV1"] {
return versionedHashes.map((_vh) => null);
}

private timestampToFork(timestamp: number): ForkExecution {
if (timestamp > (this.opts.electraForkTimestamp ?? Infinity)) return ForkName.electra;
if (timestamp > (this.opts.denebForkTimestamp ?? Infinity)) return ForkName.deneb;
Expand Down
19 changes: 19 additions & 0 deletions packages/beacon-node/src/execution/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
ForkSeq,
} from "@lodestar/params";
import {ExecutionPayload, ExecutionRequests, Root, Wei, bellatrix, capella, deneb, electra, ssz} from "@lodestar/types";
import {BlobAndProof} from "@lodestar/types/deneb";

import {
DATA,
Expand Down Expand Up @@ -67,6 +68,8 @@ export type EngineApiRpcParamTypes = {
* Object - Instance of ClientVersion
*/
engine_getClientVersionV1: [ClientVersionRpc];

engine_getBlobsV1: [DATA[]];
};

export type PayloadStatus = {
Expand Down Expand Up @@ -109,6 +112,8 @@ export type EngineApiRpcReturnTypes = {
engine_getPayloadBodiesByRangeV1: (ExecutionPayloadBodyRpc | null)[];

engine_getClientVersionV1: ClientVersionRpc[];

engine_getBlobsV1: (BlobAndProofRpc | null)[];
};

type ExecutionPayloadRpcWithValue = {
Expand Down Expand Up @@ -171,6 +176,11 @@ export type DepositRequestsRpc = DATA;
export type WithdrawalRequestsRpc = DATA;
export type ConsolidationRequestsRpc = DATA;

export type BlobAndProofRpc = {
blob: DATA;
proof: DATA;
};

export type VersionedHashesRpc = DATA[];

export type PayloadAttributesRpc = {
Expand Down Expand Up @@ -462,6 +472,15 @@ export function serializeExecutionPayloadBody(data: ExecutionPayloadBody | null)
: null;
}

export function deserializeBlobAndProofs(data: BlobAndProofRpc | null): BlobAndProof | null {
return data
? {
blob: dataToBytes(data.blob, BYTES_PER_FIELD_ELEMENT * FIELD_ELEMENTS_PER_BLOB),
proof: dataToBytes(data.proof, 48),
}
: null;
}

export function assertReqSizeLimit(blockHashesReqCount: number, count: number): void {
if (blockHashesReqCount > count) {
throw new Error(`Requested blocks must not be > ${count}`);
Expand Down
Loading

0 comments on commit a7b8430

Please sign in to comment.