diff --git a/packages/beacon-node/src/api/impl/beacon/blocks/index.ts b/packages/beacon-node/src/api/impl/beacon/blocks/index.ts index 51af424c219..12b6b16617c 100644 --- a/packages/beacon-node/src/api/impl/beacon/blocks/index.ts +++ b/packages/beacon-node/src/api/impl/beacon/blocks/index.ts @@ -198,7 +198,7 @@ export function getBeaconBlockApi({ return this.publishBlock(signedBlock, {ignoreIfKnown: true}); }, - async publishBlock(signedBlock, opts?: ImportBlockOpts) { + async publishBlock(signedBlock, opts: ImportBlockOpts = {}) { const seenTimestampSec = Date.now() / 1000; // Simple implementation of a pending block queue. Keeping the block here recycles the API logic, and keeps the @@ -230,7 +230,8 @@ export function getBeaconBlockApi({ () => network.publishBeaconBlockMaybeBlobs(blockForImport) as Promise, () => - chain.processBlock(blockForImport, opts).catch((e) => { + // there is no rush to persist block since we published it to gossip anyway + chain.processBlock(blockForImport, {...opts, eagerPersistBlock: false}).catch((e) => { if (e instanceof BlockError && e.type.code === BlockErrorCode.PARENT_UNKNOWN) { network.events.emit(NetworkEvent.unknownBlockParent, { blockInput: blockForImport, diff --git a/packages/beacon-node/src/chain/blocks/importBlock.ts b/packages/beacon-node/src/chain/blocks/importBlock.ts index ae90ca58c14..b06da6dfcdc 100644 --- a/packages/beacon-node/src/chain/blocks/importBlock.ts +++ b/packages/beacon-node/src/chain/blocks/importBlock.ts @@ -17,8 +17,9 @@ import {ChainEvent, ReorgEventData} from "../emitter.js"; import {REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC} from "../reprocess.js"; import {RegenCaller} from "../regen/interface.js"; import type {BeaconChain} from "../chain.js"; -import {BlockInputType, FullyVerifiedBlock, ImportBlockOpts, AttestationImportOpt} from "./types.js"; +import {FullyVerifiedBlock, ImportBlockOpts, AttestationImportOpt} from "./types.js"; import {getCheckpointFromState} from "./utils/checkpoint.js"; +import {writeBlockInputToDb} from "./writeBlockInputToDb.js"; /** * Fork-choice allows to import attestations from current (0) or past (1) epoch. @@ -51,7 +52,7 @@ export async function importBlock( opts: ImportBlockOpts ): Promise { const {blockInput, postState, parentBlockSlot, executionStatus} = fullyVerifiedBlock; - const {block, serializedData, source} = blockInput; + const {block, source} = blockInput; const blockRoot = this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message); const blockRootHex = toHexString(blockRoot); const currentEpoch = computeEpochAtSlot(this.forkChoice.getTime()); @@ -60,28 +61,9 @@ export async function importBlock( const blockDelaySec = (fullyVerifiedBlock.seenTimestampSec - postState.genesisTime) % this.config.SECONDS_PER_SLOT; // 1. Persist block to hot DB (pre-emptively) - if (serializedData) { - // skip serializing data if we already have it - this.metrics?.importBlock.persistBlockWithSerializedDataCount.inc(); - await this.db.block.putBinary(this.db.block.getId(block), serializedData); - } else { - this.metrics?.importBlock.persistBlockNoSerializedDataCount.inc(); - await this.db.block.add(block); - } - this.logger.debug("Persisted block to hot DB", { - slot: block.message.slot, - root: blockRootHex, - }); - - if (blockInput.type === BlockInputType.postDeneb) { - const {blobs} = blockInput; - // NOTE: Old blobs are pruned on archive - await this.db.blobsSidecar.add(blobs); - this.logger.debug("Persisted blobsSidecar to hot DB", { - blobsLen: blobs.blobs.length, - slot: blobs.beaconBlockSlot, - root: toHexString(blobs.beaconBlockRoot), - }); + // If eagerPersistBlock = true we do that in verifyBlocksInEpoch to batch all I/O operations to save block time to head + if (!opts.eagerPersistBlock) { + await writeBlockInputToDb.call(this, [blockInput]); } // 2. Import block to fork choice @@ -286,15 +268,18 @@ export async function importBlock( // - Persist state witness // - Use block's syncAggregate if (blockEpoch >= this.config.ALTAIR_FORK_EPOCH) { - try { - this.lightClientServer.onImportBlockHead( - block.message as allForks.AllForksLightClient["BeaconBlock"], - postState as CachedBeaconStateAltair, - parentBlockSlot - ); - } catch (e) { - this.logger.error("Error lightClientServer.onImportBlock", {slot: block.message.slot}, e as Error); - } + // we want to import block asap so do this in the next event loop + setTimeout(() => { + try { + this.lightClientServer.onImportBlockHead( + block.message as allForks.AllForksLightClient["BeaconBlock"], + postState as CachedBeaconStateAltair, + parentBlockSlot + ); + } catch (e) { + this.logger.verbose("Error lightClientServer.onImportBlock", {slot: block.message.slot}, e as Error); + } + }, 0); } } diff --git a/packages/beacon-node/src/chain/blocks/index.ts b/packages/beacon-node/src/chain/blocks/index.ts index a5b98c8f314..60e3342385b 100644 --- a/packages/beacon-node/src/chain/blocks/index.ts +++ b/packages/beacon-node/src/chain/blocks/index.ts @@ -10,6 +10,7 @@ import {importBlock} from "./importBlock.js"; import {assertLinearChainSegment} from "./utils/chainSegment.js"; import {BlockInput, FullyVerifiedBlock, ImportBlockOpts} from "./types.js"; import {verifyBlocksSanityChecks} from "./verifyBlocksSanityChecks.js"; +import {removeEagerlyPersistedBlockInputs} from "./writeBlockInputToDb.js"; export {ImportBlockOpts, AttestationImportOpt} from "./types.js"; const QUEUE_MAX_LENGTH = 256; @@ -141,6 +142,24 @@ export async function processBlocks( } } + // Clean db if we don't have blocks in forkchoice but already persisted them to db + // + // NOTE: this function is awaited to ensure that DB size remains constant, otherwise an attacker may bloat the + // disk with big malicious payloads. Our sequential block importer will wait for this promise before importing + // another block. The removal call error is not propagated since that would halt the chain. + // + // LOG: Because the error is not propagated and there's a risk of db bloat, the error is logged at warn level + // to alert the user of potential db bloat. This error _should_ never happen user must act and report to us + if (opts.eagerPersistBlock) { + await removeEagerlyPersistedBlockInputs.call(this, blocks).catch((e) => { + this.logger.warn( + "Error pruning eagerly imported block inputs, DB may grow in size if this error happens frequently", + {slot: blocks.map((block) => block.block.message.slot).join(",")}, + e + ); + }); + } + throw err; } } diff --git a/packages/beacon-node/src/chain/blocks/types.ts b/packages/beacon-node/src/chain/blocks/types.ts index 0085d7f7bb3..1663ae6c3d1 100644 --- a/packages/beacon-node/src/chain/blocks/types.ts +++ b/packages/beacon-node/src/chain/blocks/types.ts @@ -101,6 +101,8 @@ export type ImportBlockOpts = { validBlobsSidecar?: boolean; /** Seen timestamp seconds */ seenTimestampSec?: number; + /** Set to true if persist block right at verification time */ + eagerPersistBlock?: boolean; }; /** diff --git a/packages/beacon-node/src/chain/blocks/verifyBlock.ts b/packages/beacon-node/src/chain/blocks/verifyBlock.ts index 99f63f2d02f..59dc0288a2d 100644 --- a/packages/beacon-node/src/chain/blocks/verifyBlock.ts +++ b/packages/beacon-node/src/chain/blocks/verifyBlock.ts @@ -4,7 +4,7 @@ import { isStateValidatorsNodesPopulated, DataAvailableStatus, } from "@lodestar/state-transition"; -import {bellatrix} from "@lodestar/types"; +import {WithOptionalBytes, bellatrix} from "@lodestar/types"; import {ForkName} from "@lodestar/params"; import {toHexString} from "@chainsafe/ssz"; import {ProtoBlock} from "@lodestar/fork-choice"; @@ -20,6 +20,7 @@ import {CAPELLA_OWL_BANNER} from "./utils/ownBanner.js"; import {verifyBlocksStateTransitionOnly} from "./verifyBlocksStateTransitionOnly.js"; import {verifyBlocksSignatures} from "./verifyBlocksSignatures.js"; import {verifyBlocksExecutionPayload, SegmentExecStatus} from "./verifyBlocksExecutionPayloads.js"; +import {writeBlockInputToDb} from "./writeBlockInputToDb.js"; /** * Verifies 1 or more blocks are fully valid; from a linear sequence of blocks. @@ -35,7 +36,7 @@ import {verifyBlocksExecutionPayload, SegmentExecStatus} from "./verifyBlocksExe export async function verifyBlocksInEpoch( this: BeaconChain, parentBlock: ProtoBlock, - blocksInput: BlockInput[], + blocksInput: WithOptionalBytes[], dataAvailabilityStatuses: DataAvailableStatus[], opts: BlockProcessOpts & ImportBlockOpts ): Promise<{ @@ -84,6 +85,7 @@ export async function verifyBlocksInEpoch( const abortController = new AbortController(); try { + // batch all I/O operations to reduce overhead const [segmentExecStatus, {postStates, proposerBalanceDeltas}] = await Promise.all([ // Execution payloads verifyBlocksExecutionPayload(this, parentBlock, blocks, preState0, abortController.signal, opts), @@ -101,6 +103,11 @@ export async function verifyBlocksInEpoch( // All signatures at once verifyBlocksSignatures(this.bls, this.logger, this.metrics, preState0, blocks, opts), + + // ideally we want to only persist blocks after verifying them however the reality is there are + // rarely invalid blocks we'll batch all I/O operation here to reduce the overhead if there's + // an error, we'll remove blocks not in forkchoice + opts.eagerPersistBlock ? writeBlockInputToDb.call(this, blocksInput) : Promise.resolve(), ]); if (segmentExecStatus.execAborted === null && segmentExecStatus.mergeBlockFound !== null) { diff --git a/packages/beacon-node/src/chain/blocks/writeBlockInputToDb.ts b/packages/beacon-node/src/chain/blocks/writeBlockInputToDb.ts new file mode 100644 index 00000000000..d5b928d131b --- /dev/null +++ b/packages/beacon-node/src/chain/blocks/writeBlockInputToDb.ts @@ -0,0 +1,81 @@ +import {WithOptionalBytes, allForks, deneb} from "@lodestar/types"; +import {toHex} from "@lodestar/utils"; +import {BeaconChain} from "../chain.js"; +import {BlockInput, BlockInputType} from "./types.js"; + +/** + * Persists block input data to DB. This operation must be eventually completed if a block is imported to the fork-choice. + * Else the node will be in an inconsistent state that can lead to being stuck. + * + * This operation may be performed before, during or after importing to the fork-choice. As long as errors + * are handled properly for eventual consistency. + */ +export async function writeBlockInputToDb( + this: BeaconChain, + blocksInput: WithOptionalBytes[] +): Promise { + const fnPromises: Promise[] = []; + + for (const blockInput of blocksInput) { + const {block, serializedData, type} = blockInput; + const blockRoot = this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message); + const blockRootHex = toHex(blockRoot); + if (serializedData) { + // skip serializing data if we already have it + this.metrics?.importBlock.persistBlockWithSerializedDataCount.inc(); + fnPromises.push(this.db.block.putBinary(this.db.block.getId(block), serializedData)); + } else { + this.metrics?.importBlock.persistBlockNoSerializedDataCount.inc(); + fnPromises.push(this.db.block.add(block)); + } + this.logger.debug("Persist block to hot DB", { + slot: block.message.slot, + root: blockRootHex, + }); + + if (type === BlockInputType.postDeneb) { + const {blobs} = blockInput; + // NOTE: Old blobs are pruned on archive + fnPromises.push(this.db.blobsSidecar.add(blobs)); + this.logger.debug("Persist blobsSidecar to hot DB", { + blobsLen: blobs.blobs.length, + slot: blobs.beaconBlockSlot, + root: toHex(blobs.beaconBlockRoot), + }); + } + } + + await Promise.all(fnPromises); +} + +/** + * Prunes eagerly persisted block inputs only if not known to the fork-choice + */ +export async function removeEagerlyPersistedBlockInputs( + this: BeaconChain, + blockInputs: WithOptionalBytes[] +): Promise { + const blockToRemove: allForks.SignedBeaconBlock[] = []; + const blobsToRemove: deneb.BlobsSidecar[] = []; + + for (const blockInput of blockInputs) { + const {block, type} = blockInput; + const blockRoot = toHex(this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message)); + if (!this.forkChoice.hasBlockHex(blockRoot)) { + blockToRemove.push(block); + + if (type === BlockInputType.postDeneb) { + blobsToRemove.push(blockInput.blobs); + this.db.blobsSidecar.remove(blockInput.blobs).catch((e) => { + this.logger.verbose("Error removing eagerly imported blobsSidecar", {blockRoot}, e); + }); + } + } + } + + await Promise.all([ + // TODO: Batch DB operations not with Promise.all but with level db ops + this.db.block.batchRemove(blockToRemove), + this.db.blobsSidecar.batchRemove(blobsToRemove), + ]); +} diff --git a/packages/beacon-node/src/network/processor/gossipHandlers.ts b/packages/beacon-node/src/network/processor/gossipHandlers.ts index 0aa4090e0b7..dfe42dd96fa 100644 --- a/packages/beacon-node/src/network/processor/gossipHandlers.ts +++ b/packages/beacon-node/src/network/processor/gossipHandlers.ts @@ -150,6 +150,8 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH blsVerifyOnMainThread: true, // to track block process steps seenTimestampSec, + // gossip block is validated, we want to process it asap + eagerPersistBlock: true, }) .then(() => { // Returns the delay between the start of `block.slot` and `current time` diff --git a/packages/beacon-node/src/sync/range/range.ts b/packages/beacon-node/src/sync/range/range.ts index 9d06eaf3890..7739b6950ec 100644 --- a/packages/beacon-node/src/sync/range/range.ts +++ b/packages/beacon-node/src/sync/range/range.ts @@ -187,6 +187,9 @@ export class RangeSync extends (EventEmitter as {new (): RangeSyncEmitter}) { // when this runs, syncing is the most important thing and gossip is not likely to run // so we can utilize worker threads to verify signatures blsVerifyOnMainThread: false, + // we want to be safe to only persist blocks after verifying it to avoid any attacks that may cause our DB + // to grow too much + eagerPersistBlock: false, }; if (this.opts?.disableProcessAsChainSegment) { diff --git a/packages/beacon-node/src/sync/unknownBlock.ts b/packages/beacon-node/src/sync/unknownBlock.ts index 8fa99c6a38b..57d65152ac7 100644 --- a/packages/beacon-node/src/sync/unknownBlock.ts +++ b/packages/beacon-node/src/sync/unknownBlock.ts @@ -196,7 +196,12 @@ export class UnknownBlockSync { // otherwise we can't utilize bls thread pool capacity and Gossip Job Wait Time can't be kept low consistently. // See https://github.com/ChainSafe/lodestar/issues/3792 const res = await wrapError( - this.chain.processBlock(pendingBlock.blockInput, {ignoreIfKnown: true, blsVerifyOnMainThread: true}) + this.chain.processBlock(pendingBlock.blockInput, { + ignoreIfKnown: true, + blsVerifyOnMainThread: true, + // block is validated with correct root, we want to process it as soon as possible + eagerPersistBlock: true, + }) ); pendingBlock.status = PendingBlockStatus.pending;