Skip to content

Commit

Permalink
feat: batch io operations when verifying & importing block (#5473)
Browse files Browse the repository at this point in the history
* feat: verify and import block - batch all I/O operations

* feat: import block to LightClientServer in the next event loop

* review PR

* fix: rename removeEagerlyPersistedBlockInputs()

* feat: add eagerPersistBlock flag

---------

Co-authored-by: dapplion <35266934+dapplion@users.noreply.github.com>
  • Loading branch information
twoeths and dapplion authored May 18, 2023
1 parent c780a1d commit 3da2287
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 38 deletions.
5 changes: 3 additions & 2 deletions packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ export function getBeaconBlockApi({
return this.publishBlock(signedBlock, {ignoreIfKnown: true});
},

async publishBlock(signedBlock, opts?: ImportBlockOpts) {
async publishBlock(signedBlock, opts: ImportBlockOpts = {}) {
const seenTimestampSec = Date.now() / 1000;

// Simple implementation of a pending block queue. Keeping the block here recycles the API logic, and keeps the
Expand Down Expand Up @@ -230,7 +230,8 @@ export function getBeaconBlockApi({
() => network.publishBeaconBlockMaybeBlobs(blockForImport) as Promise<unknown>,

() =>
chain.processBlock(blockForImport, opts).catch((e) => {
// there is no rush to persist block since we published it to gossip anyway
chain.processBlock(blockForImport, {...opts, eagerPersistBlock: false}).catch((e) => {
if (e instanceof BlockError && e.type.code === BlockErrorCode.PARENT_UNKNOWN) {
network.events.emit(NetworkEvent.unknownBlockParent, {
blockInput: blockForImport,
Expand Down
51 changes: 18 additions & 33 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import {ChainEvent, ReorgEventData} from "../emitter.js";
import {REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC} from "../reprocess.js";
import {RegenCaller} from "../regen/interface.js";
import type {BeaconChain} from "../chain.js";
import {BlockInputType, FullyVerifiedBlock, ImportBlockOpts, AttestationImportOpt} from "./types.js";
import {FullyVerifiedBlock, ImportBlockOpts, AttestationImportOpt} from "./types.js";
import {getCheckpointFromState} from "./utils/checkpoint.js";
import {writeBlockInputToDb} from "./writeBlockInputToDb.js";

/**
* Fork-choice allows to import attestations from current (0) or past (1) epoch.
Expand Down Expand Up @@ -51,7 +52,7 @@ export async function importBlock(
opts: ImportBlockOpts
): Promise<void> {
const {blockInput, postState, parentBlockSlot, executionStatus} = fullyVerifiedBlock;
const {block, serializedData, source} = blockInput;
const {block, source} = blockInput;
const blockRoot = this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message);
const blockRootHex = toHexString(blockRoot);
const currentEpoch = computeEpochAtSlot(this.forkChoice.getTime());
Expand All @@ -60,28 +61,9 @@ export async function importBlock(
const blockDelaySec = (fullyVerifiedBlock.seenTimestampSec - postState.genesisTime) % this.config.SECONDS_PER_SLOT;

// 1. Persist block to hot DB (pre-emptively)
if (serializedData) {
// skip serializing data if we already have it
this.metrics?.importBlock.persistBlockWithSerializedDataCount.inc();
await this.db.block.putBinary(this.db.block.getId(block), serializedData);
} else {
this.metrics?.importBlock.persistBlockNoSerializedDataCount.inc();
await this.db.block.add(block);
}
this.logger.debug("Persisted block to hot DB", {
slot: block.message.slot,
root: blockRootHex,
});

if (blockInput.type === BlockInputType.postDeneb) {
const {blobs} = blockInput;
// NOTE: Old blobs are pruned on archive
await this.db.blobsSidecar.add(blobs);
this.logger.debug("Persisted blobsSidecar to hot DB", {
blobsLen: blobs.blobs.length,
slot: blobs.beaconBlockSlot,
root: toHexString(blobs.beaconBlockRoot),
});
// If eagerPersistBlock = true we do that in verifyBlocksInEpoch to batch all I/O operations to save block time to head
if (!opts.eagerPersistBlock) {
await writeBlockInputToDb.call(this, [blockInput]);
}

// 2. Import block to fork choice
Expand Down Expand Up @@ -286,15 +268,18 @@ export async function importBlock(
// - Persist state witness
// - Use block's syncAggregate
if (blockEpoch >= this.config.ALTAIR_FORK_EPOCH) {
try {
this.lightClientServer.onImportBlockHead(
block.message as allForks.AllForksLightClient["BeaconBlock"],
postState as CachedBeaconStateAltair,
parentBlockSlot
);
} catch (e) {
this.logger.error("Error lightClientServer.onImportBlock", {slot: block.message.slot}, e as Error);
}
// we want to import block asap so do this in the next event loop
setTimeout(() => {
try {
this.lightClientServer.onImportBlockHead(
block.message as allForks.AllForksLightClient["BeaconBlock"],
postState as CachedBeaconStateAltair,
parentBlockSlot
);
} catch (e) {
this.logger.verbose("Error lightClientServer.onImportBlock", {slot: block.message.slot}, e as Error);
}
}, 0);
}
}

Expand Down
19 changes: 19 additions & 0 deletions packages/beacon-node/src/chain/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {importBlock} from "./importBlock.js";
import {assertLinearChainSegment} from "./utils/chainSegment.js";
import {BlockInput, FullyVerifiedBlock, ImportBlockOpts} from "./types.js";
import {verifyBlocksSanityChecks} from "./verifyBlocksSanityChecks.js";
import {removeEagerlyPersistedBlockInputs} from "./writeBlockInputToDb.js";
export {ImportBlockOpts, AttestationImportOpt} from "./types.js";

const QUEUE_MAX_LENGTH = 256;
Expand Down Expand Up @@ -141,6 +142,24 @@ export async function processBlocks(
}
}

// Clean db if we don't have blocks in forkchoice but already persisted them to db
//
// NOTE: this function is awaited to ensure that DB size remains constant, otherwise an attacker may bloat the
// disk with big malicious payloads. Our sequential block importer will wait for this promise before importing
// another block. The removal call error is not propagated since that would halt the chain.
//
// LOG: Because the error is not propagated and there's a risk of db bloat, the error is logged at warn level
// to alert the user of potential db bloat. This error _should_ never happen user must act and report to us
if (opts.eagerPersistBlock) {
await removeEagerlyPersistedBlockInputs.call(this, blocks).catch((e) => {
this.logger.warn(
"Error pruning eagerly imported block inputs, DB may grow in size if this error happens frequently",
{slot: blocks.map((block) => block.block.message.slot).join(",")},
e
);
});
}

throw err;
}
}
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ export type ImportBlockOpts = {
validBlobsSidecar?: boolean;
/** Seen timestamp seconds */
seenTimestampSec?: number;
/** Set to true if persist block right at verification time */
eagerPersistBlock?: boolean;
};

/**
Expand Down
11 changes: 9 additions & 2 deletions packages/beacon-node/src/chain/blocks/verifyBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
isStateValidatorsNodesPopulated,
DataAvailableStatus,
} from "@lodestar/state-transition";
import {bellatrix} from "@lodestar/types";
import {WithOptionalBytes, bellatrix} from "@lodestar/types";
import {ForkName} from "@lodestar/params";
import {toHexString} from "@chainsafe/ssz";
import {ProtoBlock} from "@lodestar/fork-choice";
Expand All @@ -20,6 +20,7 @@ import {CAPELLA_OWL_BANNER} from "./utils/ownBanner.js";
import {verifyBlocksStateTransitionOnly} from "./verifyBlocksStateTransitionOnly.js";
import {verifyBlocksSignatures} from "./verifyBlocksSignatures.js";
import {verifyBlocksExecutionPayload, SegmentExecStatus} from "./verifyBlocksExecutionPayloads.js";
import {writeBlockInputToDb} from "./writeBlockInputToDb.js";

/**
* Verifies 1 or more blocks are fully valid; from a linear sequence of blocks.
Expand All @@ -35,7 +36,7 @@ import {verifyBlocksExecutionPayload, SegmentExecStatus} from "./verifyBlocksExe
export async function verifyBlocksInEpoch(
this: BeaconChain,
parentBlock: ProtoBlock,
blocksInput: BlockInput[],
blocksInput: WithOptionalBytes<BlockInput>[],
dataAvailabilityStatuses: DataAvailableStatus[],
opts: BlockProcessOpts & ImportBlockOpts
): Promise<{
Expand Down Expand Up @@ -84,6 +85,7 @@ export async function verifyBlocksInEpoch(
const abortController = new AbortController();

try {
// batch all I/O operations to reduce overhead
const [segmentExecStatus, {postStates, proposerBalanceDeltas}] = await Promise.all([
// Execution payloads
verifyBlocksExecutionPayload(this, parentBlock, blocks, preState0, abortController.signal, opts),
Expand All @@ -101,6 +103,11 @@ export async function verifyBlocksInEpoch(

// All signatures at once
verifyBlocksSignatures(this.bls, this.logger, this.metrics, preState0, blocks, opts),

// ideally we want to only persist blocks after verifying them however the reality is there are
// rarely invalid blocks we'll batch all I/O operation here to reduce the overhead if there's
// an error, we'll remove blocks not in forkchoice
opts.eagerPersistBlock ? writeBlockInputToDb.call(this, blocksInput) : Promise.resolve(),
]);

if (segmentExecStatus.execAborted === null && segmentExecStatus.mergeBlockFound !== null) {
Expand Down
81 changes: 81 additions & 0 deletions packages/beacon-node/src/chain/blocks/writeBlockInputToDb.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import {WithOptionalBytes, allForks, deneb} from "@lodestar/types";
import {toHex} from "@lodestar/utils";
import {BeaconChain} from "../chain.js";
import {BlockInput, BlockInputType} from "./types.js";

/**
* Persists block input data to DB. This operation must be eventually completed if a block is imported to the fork-choice.
* Else the node will be in an inconsistent state that can lead to being stuck.
*
* This operation may be performed before, during or after importing to the fork-choice. As long as errors
* are handled properly for eventual consistency.
*/
export async function writeBlockInputToDb(
this: BeaconChain,
blocksInput: WithOptionalBytes<BlockInput>[]
): Promise<void> {
const fnPromises: Promise<void>[] = [];

for (const blockInput of blocksInput) {
const {block, serializedData, type} = blockInput;
const blockRoot = this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message);
const blockRootHex = toHex(blockRoot);
if (serializedData) {
// skip serializing data if we already have it
this.metrics?.importBlock.persistBlockWithSerializedDataCount.inc();
fnPromises.push(this.db.block.putBinary(this.db.block.getId(block), serializedData));
} else {
this.metrics?.importBlock.persistBlockNoSerializedDataCount.inc();
fnPromises.push(this.db.block.add(block));
}
this.logger.debug("Persist block to hot DB", {
slot: block.message.slot,
root: blockRootHex,
});

if (type === BlockInputType.postDeneb) {
const {blobs} = blockInput;
// NOTE: Old blobs are pruned on archive
fnPromises.push(this.db.blobsSidecar.add(blobs));
this.logger.debug("Persist blobsSidecar to hot DB", {
blobsLen: blobs.blobs.length,
slot: blobs.beaconBlockSlot,
root: toHex(blobs.beaconBlockRoot),
});
}
}

await Promise.all(fnPromises);
}

/**
* Prunes eagerly persisted block inputs only if not known to the fork-choice
*/
export async function removeEagerlyPersistedBlockInputs(
this: BeaconChain,
blockInputs: WithOptionalBytes<BlockInput>[]
): Promise<void> {
const blockToRemove: allForks.SignedBeaconBlock[] = [];
const blobsToRemove: deneb.BlobsSidecar[] = [];

for (const blockInput of blockInputs) {
const {block, type} = blockInput;
const blockRoot = toHex(this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message));
if (!this.forkChoice.hasBlockHex(blockRoot)) {
blockToRemove.push(block);

if (type === BlockInputType.postDeneb) {
blobsToRemove.push(blockInput.blobs);
this.db.blobsSidecar.remove(blockInput.blobs).catch((e) => {
this.logger.verbose("Error removing eagerly imported blobsSidecar", {blockRoot}, e);
});
}
}
}

await Promise.all([
// TODO: Batch DB operations not with Promise.all but with level db ops
this.db.block.batchRemove(blockToRemove),
this.db.blobsSidecar.batchRemove(blobsToRemove),
]);
}
2 changes: 2 additions & 0 deletions packages/beacon-node/src/network/processor/gossipHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH
blsVerifyOnMainThread: true,
// to track block process steps
seenTimestampSec,
// gossip block is validated, we want to process it asap
eagerPersistBlock: true,
})
.then(() => {
// Returns the delay between the start of `block.slot` and `current time`
Expand Down
3 changes: 3 additions & 0 deletions packages/beacon-node/src/sync/range/range.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ export class RangeSync extends (EventEmitter as {new (): RangeSyncEmitter}) {
// when this runs, syncing is the most important thing and gossip is not likely to run
// so we can utilize worker threads to verify signatures
blsVerifyOnMainThread: false,
// we want to be safe to only persist blocks after verifying it to avoid any attacks that may cause our DB
// to grow too much
eagerPersistBlock: false,
};

if (this.opts?.disableProcessAsChainSegment) {
Expand Down
7 changes: 6 additions & 1 deletion packages/beacon-node/src/sync/unknownBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,12 @@ export class UnknownBlockSync {
// otherwise we can't utilize bls thread pool capacity and Gossip Job Wait Time can't be kept low consistently.
// See https://github.com/ChainSafe/lodestar/issues/3792
const res = await wrapError(
this.chain.processBlock(pendingBlock.blockInput, {ignoreIfKnown: true, blsVerifyOnMainThread: true})
this.chain.processBlock(pendingBlock.blockInput, {
ignoreIfKnown: true,
blsVerifyOnMainThread: true,
// block is validated with correct root, we want to process it as soon as possible
eagerPersistBlock: true,
})
);
pendingBlock.status = PendingBlockStatus.pending;

Expand Down

0 comments on commit 3da2287

Please sign in to comment.