Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add historical state regen #6033

Merged
merged 29 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
583bc18
feat: add historical state regen
wemeetagain Oct 11, 2023
8c1790b
chore: wire up metrics
wemeetagain Oct 11, 2023
93c1981
chore: make historical state regen module optional
wemeetagain Oct 13, 2023
06e58af
chore: persist pubkey cache across historical state regen runs
wemeetagain Oct 13, 2023
623b58c
chore: cleanup worker termination
wemeetagain Oct 13, 2023
deba946
Merge branch 'unstable' into cayman/historical-state-regen
wemeetagain Oct 13, 2023
b8d6113
chore: fix worker usage
wemeetagain Oct 23, 2023
0315e1a
fix: swap Level for ClassicLevel for multithreading
matthewkeil Nov 6, 2023
cdf3732
fix: getStateV2 state handling hack
matthewkeil Nov 6, 2023
a8f719d
Merge branch 'unstable' into cayman/historical-state-regen
wemeetagain Jan 22, 2024
6d9d256
chore: update classic-level
wemeetagain Jan 22, 2024
5a812e1
chore: fix build errors
wemeetagain Jan 22, 2024
dabd767
chore: add comments
wemeetagain Jan 22, 2024
51fc29a
chore: fix test worker path
wemeetagain Jan 22, 2024
21bb96b
chore: simplify function naming
wemeetagain Jan 22, 2024
dc45097
chore: optimize getSlotFromOffset
wemeetagain Jan 22, 2024
d32f103
chore: refactor to avoid needless deserialization
wemeetagain Jan 22, 2024
25b1c9f
fix: update metrics names
wemeetagain Jan 22, 2024
b6fd577
feat: add historical state regen dashboard
wemeetagain Jan 22, 2024
e8c9d27
fix: update vm dashboards with historical state worker
wemeetagain Jan 22, 2024
03a241c
chore: fix test data
wemeetagain Jan 22, 2024
6001521
feat: transfer state across worker boundary
wemeetagain Jan 22, 2024
41c8b5d
Merge branch 'unstable' into cayman/historical-state-regen
wemeetagain Jul 9, 2024
7ea9c1a
chore: address some pr comments
wemeetagain Jul 9, 2024
4a297dd
chore: clean module close
wemeetagain Jul 9, 2024
73c1b7c
Merge branch 'unstable' into cayman/historical-state-regen
wemeetagain Jul 16, 2024
cc4197f
Merge branch 'unstable' into cayman/historical-state-regen
wemeetagain Jul 18, 2024
80debf1
Merge branch 'unstable' into cayman/historical-state-regen
wemeetagain Jul 23, 2024
e8f4d97
feat: add metrics
twoeths Jul 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,646 changes: 2,646 additions & 0 deletions dashboards/lodestar_historical_state_regen.json

Large diffs are not rendered by default.

318 changes: 270 additions & 48 deletions dashboards/lodestar_vm_host.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion packages/beacon-node/src/api/impl/beacon/state/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export async function getStateResponseWithRegen(
? await chain.getStateByStateRoot(rootOrSlot, {allowRegen: true})
: rootOrSlot >= chain.forkChoice.getFinalizedBlock().slot
? await chain.getStateBySlot(rootOrSlot, {allowRegen: true})
: null; // TODO implement historical state regen
: await chain.getHistoricalStateBySlot(rootOrSlot);

if (!res) {
throw new ApiError(404, `No state found for id '${stateId}'`);
Expand Down
87 changes: 53 additions & 34 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ import {BlockAttributes, produceBlockBody, produceCommonBlockBody} from "./produ
import {computeNewStateRoot} from "./produceBlock/computeNewStateRoot.js";
import {BlockInput} from "./blocks/types.js";
import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js";
import {HistoricalStateRegen} from "./historicalState/index.js";
import {BlockRewards, computeBlockRewards} from "./rewards/blockRewards.js";
import {ShufflingCache} from "./shufflingCache.js";
import {BlockStateCacheImpl} from "./stateCache/blockStateCacheImpl.js";
Expand Down Expand Up @@ -128,6 +129,7 @@ export class BeaconChain implements IBeaconChain {
readonly regen: QueuedStateRegenerator;
readonly lightClientServer?: LightClientServer;
readonly reprocessController: ReprocessController;
readonly historicalStateRegen?: HistoricalStateRegen;

// Ops pool
readonly attestationPool: AttestationPool;
Expand Down Expand Up @@ -185,6 +187,7 @@ export class BeaconChain implements IBeaconChain {
eth1,
executionEngine,
executionBuilder,
historicalStateRegen,
}: {
config: BeaconConfig;
db: IBeaconDb;
Expand All @@ -197,6 +200,7 @@ export class BeaconChain implements IBeaconChain {
eth1: IEth1ForBlockProduction;
executionEngine: IExecutionEngine;
executionBuilder?: IExecutionBuilder;
historicalStateRegen?: HistoricalStateRegen;
}
) {
this.opts = opts;
Expand All @@ -211,6 +215,7 @@ export class BeaconChain implements IBeaconChain {
this.eth1 = eth1;
this.executionEngine = executionEngine;
this.executionBuilder = executionBuilder;
this.historicalStateRegen = historicalStateRegen;
const signal = this.abortController.signal;
const emitter = new ChainEventEmitter();
// by default, verify signatures on both main threads and worker threads
Expand Down Expand Up @@ -418,47 +423,61 @@ export class BeaconChain implements IBeaconChain {
): Promise<{state: BeaconStateAllForks; executionOptimistic: boolean; finalized: boolean} | null> {
const finalizedBlock = this.forkChoice.getFinalizedBlock();

if (slot >= finalizedBlock.slot) {
// request for non-finalized state

if (opts?.allowRegen) {
// Find closest canonical block to slot, then trigger regen
const block = this.forkChoice.getCanonicalBlockClosestLteSlot(slot) ?? finalizedBlock;
const state = await this.regen.getBlockSlotState(
block.blockRoot,
slot,
{dontTransferCache: true},
RegenCaller.restApi
);
return {
if (slot < finalizedBlock.slot) {
// request for finalized state not supported in this API
// fall back to caller to look in db or getHistoricalStateBySlot
return null;
}

if (opts?.allowRegen) {
// Find closest canonical block to slot, then trigger regen
const block = this.forkChoice.getCanonicalBlockClosestLteSlot(slot) ?? finalizedBlock;
const state = await this.regen.getBlockSlotState(
block.blockRoot,
slot,
{dontTransferCache: true},
RegenCaller.restApi
);
return {
state,
executionOptimistic: isOptimisticBlock(block),
finalized: slot === finalizedBlock.slot && finalizedBlock.slot !== GENESIS_SLOT,
};
} else {
// Just check if state is already in the cache. If it's not dialed to the correct slot,
// do not bother in advancing the state. restApiCanTriggerRegen == false means do no work
const block = this.forkChoice.getCanonicalBlockAtSlot(slot);
if (!block) {
return null;
}

const state = this.regen.getStateSync(block.stateRoot);
return (
state && {
state,
executionOptimistic: isOptimisticBlock(block),
finalized: slot === finalizedBlock.slot && finalizedBlock.slot !== GENESIS_SLOT,
};
} else {
// Just check if state is already in the cache. If it's not dialed to the correct slot,
// do not bother in advancing the state. restApiCanTriggerRegen == false means do no work
const block = this.forkChoice.getCanonicalBlockAtSlot(slot);
if (!block) {
return null;
}
);
}
}

const state = this.regen.getStateSync(block.stateRoot);
return (
state && {
state,
executionOptimistic: isOptimisticBlock(block),
finalized: slot === finalizedBlock.slot && finalizedBlock.slot !== GENESIS_SLOT,
}
);
}
} else {
// request for finalized state
async getHistoricalStateBySlot(
slot: number
): Promise<{state: Uint8Array; executionOptimistic: boolean; finalized: boolean} | null> {
const finalizedBlock = this.forkChoice.getFinalizedBlock();

if (slot >= finalizedBlock.slot) {
return null;
wemeetagain marked this conversation as resolved.
Show resolved Hide resolved
}

// do not attempt regen, just check if state is already in DB
const state = await this.db.stateArchive.get(slot);
return state && {state, executionOptimistic: false, finalized: true};
// request for finalized state using historical state regen
const stateSerialized = await this.historicalStateRegen?.getHistoricalState(slot);
if (!stateSerialized) {
return null;
}

return {state: stateSerialized, executionOptimistic: false, finalized: true};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have access to the block here, shouldn't this rather be executionOptimistic: isOptimisticBlock(finalizedBlock), afaik a finalized block can still be optimistic

}

async getStateByStateRoot(
Expand Down
111 changes: 111 additions & 0 deletions packages/beacon-node/src/chain/historicalState/getHistoricalState.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import {
BeaconStateAllForks,
CachedBeaconStateAllForks,
DataAvailableStatus,
ExecutionPayloadStatus,
PubkeyIndexMap,
createCachedBeaconState,
stateTransition,
} from "@lodestar/state-transition";
import {BeaconConfig} from "@lodestar/config";
import {IBeaconDb} from "../../db/index.js";
import {HistoricalStateRegenMetrics, RegenErrorType} from "./types.js";

/**
* Populate a PubkeyIndexMap with any new entries based on a BeaconState
*/
export function syncPubkeyCache(state: BeaconStateAllForks, pubkey2index: PubkeyIndexMap): void {
// Get the validators sub tree once for all the loop
const validators = state.validators;

const newCount = state.validators.length;
for (let i = pubkey2index.size; i < newCount; i++) {
const pubkey = validators.getReadonly(i).pubkey;
pubkey2index.set(pubkey, i);
}
}

/**
* Get the nearest BeaconState at or before a slot
*/
export async function getNearestState(
slot: number,
config: BeaconConfig,
db: IBeaconDb,
pubkey2index: PubkeyIndexMap
): Promise<CachedBeaconStateAllForks> {
const states = await db.stateArchive.values({limit: 1, lte: slot, reverse: true});
if (!states.length) {
throw new Error("No near state found in the database");
}

const state = states[0];
syncPubkeyCache(state, pubkey2index);

return createCachedBeaconState(
state,
{
config,
pubkey2index,
index2pubkey: [],
},
{
skipSyncPubkeys: true,
}
);
}

/**
* Get and regenerate a historical state
*/
export async function getHistoricalState(
slot: number,
config: BeaconConfig,
db: IBeaconDb,
pubkey2index: PubkeyIndexMap,
metrics?: HistoricalStateRegenMetrics
): Promise<Uint8Array> {
const regenTimer = metrics?.regenTime.startTimer();

const loadStateTimer = metrics?.loadStateTime.startTimer();
let state = await getNearestState(slot, config, db, pubkey2index).catch((e) => {
metrics?.regenErrorCount.inc({reason: RegenErrorType.loadState});
throw e;
});
loadStateTimer?.();

const transitionTimer = metrics?.stateTransitionTime.startTimer();
let blockCount = 0;
for await (const block of db.blockArchive.valuesStream({gt: state.slot, lte: slot})) {
try {
state = stateTransition(
state,
block,
{
verifyProposer: false,
verifySignatures: false,
verifyStateRoot: false,
executionPayloadStatus: ExecutionPayloadStatus.valid,
dataAvailableStatus: DataAvailableStatus.available,
},
metrics
);
} catch (e) {
metrics?.regenErrorCount.inc({reason: RegenErrorType.blockProcessing});
throw e;
}
blockCount++;
if (Buffer.compare(state.hashTreeRoot(), block.message.stateRoot) !== 0) {
metrics?.regenErrorCount.inc({reason: RegenErrorType.invalidStateRoot});
}
}
metrics?.stateTransitionBlocks.observe(blockCount);
transitionTimer?.();

const serializeTimer = metrics?.stateSerializationTime.startTimer();
const stateBytes = state.serialize();
serializeTimer?.();

regenTimer?.();
return stateBytes;
}
67 changes: 67 additions & 0 deletions packages/beacon-node/src/chain/historicalState/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import path from "node:path";
import {ModuleThread, Thread, spawn, Worker} from "@chainsafe/threads";
import {chainConfigToJson} from "@lodestar/config";
import {LoggerNode} from "@lodestar/logger/node";
import {
HistoricalStateRegenInitModules,
HistoricalStateRegenModules,
HistoricalStateWorkerApi,
HistoricalStateWorkerData,
} from "./types.js";

// Worker constructor consider the path relative to the current working directory
const WORKER_DIR = process.env.NODE_ENV === "test" ? "../../../lib/chain/historicalState" : "./";

/**
* HistoricalStateRegen limits the damage from recreating historical states
* by running regen in a separate worker thread.
*/
export class HistoricalStateRegen implements HistoricalStateWorkerApi {
private readonly api: ModuleThread<HistoricalStateWorkerApi>;
private readonly logger: LoggerNode;

constructor(modules: HistoricalStateRegenModules) {
this.api = modules.api;
this.logger = modules.logger;
modules.signal?.addEventListener("abort", () => this.close(), {once: true});
}
static async init(modules: HistoricalStateRegenInitModules): Promise<HistoricalStateRegen> {
const workerData: HistoricalStateWorkerData = {
chainConfigJson: chainConfigToJson(modules.config),
genesisValidatorsRoot: modules.config.genesisValidatorsRoot,
genesisTime: modules.opts.genesisTime,
maxConcurrency: 1,
maxLength: 50,
dbLocation: modules.opts.dbLocation,
metricsEnabled: Boolean(modules.metrics),
loggerOpts: modules.logger.toOpts(),
};

const worker = new Worker(path.join(WORKER_DIR, "worker.js"), {
workerData,
} as ConstructorParameters<typeof Worker>[1]);

const api = await spawn<HistoricalStateWorkerApi>(worker, {
// A Lodestar Node may do very expensive task at start blocking the event loop and causing
// the initialization to timeout. The number below is big enough to almost disable the timeout
timeout: 5 * 60 * 1000,
twoeths marked this conversation as resolved.
Show resolved Hide resolved
});

return new HistoricalStateRegen({...modules, api});
}

async scrapeMetrics(): Promise<string> {
return this.api.scrapeMetrics();
}

async close(): Promise<void> {
await this.api.close();
this.logger.debug("Terminating historical state worker");
await Thread.terminate(this.api);
this.logger.debug("Terminated historical state worker");
}

async getHistoricalState(slot: number): Promise<Uint8Array> {
return this.api.getHistoricalState(slot);
twoeths marked this conversation as resolved.
Show resolved Hide resolved
}
}
54 changes: 54 additions & 0 deletions packages/beacon-node/src/chain/historicalState/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import {ModuleThread} from "@chainsafe/threads";
import {BeaconConfig} from "@lodestar/config";
import {LoggerNode, LoggerNodeOpts} from "@lodestar/logger/node";
import {BeaconStateTransitionMetrics} from "@lodestar/state-transition";
import {Gauge, Histogram} from "@lodestar/utils";
import {Metrics} from "../../metrics/index.js";

export type HistoricalStateRegenInitModules = {
opts: {
genesisTime: number;
dbLocation: string;
};
config: BeaconConfig;
logger: LoggerNode;
metrics: Metrics | null;
signal?: AbortSignal;
};
export type HistoricalStateRegenModules = HistoricalStateRegenInitModules & {
api: ModuleThread<HistoricalStateWorkerApi>;
};

export type HistoricalStateWorkerData = {
chainConfigJson: Record<string, string>;
genesisValidatorsRoot: Uint8Array;
genesisTime: number;
maxConcurrency: number;
maxLength: number;
dbLocation: string;
metricsEnabled: boolean;
loggerOpts: LoggerNodeOpts;
};

export type HistoricalStateWorkerApi = {
close(): Promise<void>;
scrapeMetrics(): Promise<string>;
getHistoricalState(slot: number): Promise<Uint8Array>;
};

export enum RegenErrorType {
loadState = "load_state",
invalidStateRoot = "invalid_state_root",
blockProcessing = "block_processing",
}

export type HistoricalStateRegenMetrics = BeaconStateTransitionMetrics & {
regenTime: Histogram;
loadStateTime: Histogram;
stateTransitionTime: Histogram;
stateTransitionBlocks: Histogram;
stateSerializationTime: Histogram;
regenRequestCount: Gauge;
regenSuccessCount: Gauge;
regenErrorCount: Gauge<{reason: RegenErrorType}>;
};
Loading
Loading