Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: change archiving strategy to always store last finalized #5520

Merged
merged 4 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 27 additions & 16 deletions packages/beacon-node/src/chain/archiver/archiveStates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,38 +35,49 @@ export class StatesArchiver {
/**
* Persist states every some epochs to
* - Minimize disk space, storing the least states possible
* - Minimize the sync progress lost on unexpected crash, storing temp state every few epochs
* - Minimize the sync progress lost on unexpected crash, storing last finalized always
*
* At epoch `e` there will be states peristed at intervals of `PERSIST_STATE_EVERY_EPOCHS` = 32
* and one at `PERSIST_TEMP_STATE_EVERY_EPOCHS` = 1024
* ```
* | | | .
* epoch - 1024*2 epoch - 1024 epoch - 32 epoch
* ```
*
* An extra last finalized state is stored which might be cleaned up in next archiving cycle
* if there is already a previous state in the `PERSIST_STATE_EVERY_EPOCHS` window
*/
async maybeArchiveState(finalized: CheckpointWithHex): Promise<void> {
const lastStoredSlot = await this.db.stateArchive.lastKey();
const lastStoredEpoch = computeEpochAtSlot(lastStoredSlot ?? 0);
const {archiveStateEpochFrequency} = this.opts;

if (finalized.epoch - lastStoredEpoch > Math.min(PERSIST_TEMP_STATE_EVERY_EPOCHS, archiveStateEpochFrequency)) {
await this.archiveState(finalized);
// If the new big window has started, we need to cleanup on the big interval archiveStateEpochFrequency(1024)
// else we need to cleanup within the small PERSIST_TEMP_STATE_EVERY_EPOCHS interval
const frequency =
Math.floor(lastStoredEpoch / archiveStateEpochFrequency) <
Math.floor(finalized.epoch / archiveStateEpochFrequency)
? archiveStateEpochFrequency
: PERSIST_TEMP_STATE_EVERY_EPOCHS;

// Only check the current and previous intervals
const minEpoch = Math.max(
0,
(Math.floor(finalized.epoch / archiveStateEpochFrequency) - 1) * archiveStateEpochFrequency
);
// Only check the current and previous intervals
const minEpoch = Math.max(0, (Math.floor(finalized.epoch / frequency) - 1) * frequency);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dapplion @g11tech most of the time frequency=32 and we store finalized state every 32 epochs which caused the increased DB size, we should use archiveStateEpochFrequency here to clean more states to keep stable db size like the old strategy

v1.9.0-rc.0
Screenshot 2023-05-31 at 14 28 25

v1.8.0
Screenshot 2023-05-31 at 14 29 00


const storedStateSlots = await this.db.stateArchive.keys({
lt: computeStartSlotAtEpoch(finalized.epoch),
gte: computeStartSlotAtEpoch(minEpoch),
});
const storedStateSlots = await this.db.stateArchive.keys({
lt: computeStartSlotAtEpoch(finalized.epoch),
gte: computeStartSlotAtEpoch(minEpoch),
});
const stateSlotsToDelete = computeStateSlotsToDelete(storedStateSlots, frequency);

const statesSlotsToDelete = computeStateSlotsToDelete(storedStateSlots, archiveStateEpochFrequency);
if (statesSlotsToDelete.length > 0) {
await this.db.stateArchive.batchDelete(statesSlotsToDelete);
}
// 1. Always archive latest state so as to allow restarts to happen from last finalized.
// It will be cleaned up next cycle even if it results into two states for this interval
//
// 2. Delete the states only after storing the latest finalized so as to prevent any race in non
// availability of finalized state to boot from
//
await this.archiveState(finalized);
if (stateSlotsToDelete.length > 0) {
await this.db.stateArchive.batchDelete(stateSlotsToDelete);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import {expect} from "chai";
import sinon from "sinon";
import {Epoch} from "@lodestar/types";
import {config} from "@lodestar/config/default";
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {computeStartSlotAtEpoch, computeEpochAtSlot} from "@lodestar/state-transition";
import {fromHexString} from "@chainsafe/ssz";
import {ZERO_HASH_HEX, ZERO_HASH} from "../../../../src/constants/index.js";
import {StatesArchiver} from "../../../../src/chain/archiver/archiveStates.js";
import {StubbedChainMutable} from "../../../utils/stub/index.js";
import {testLogger} from "../../../utils/logger.js";
import {BeaconChain, CheckpointStateCache} from "../../../../src/chain/index.js";
import {IBeaconDb} from "../../../../src/index.js";
import {BeaconDb} from "../../../../src/db/index.js";
import {startTmpBeaconDb} from "../../../utils/db.js";
import {generateCachedState} from "../../../utils/state.js";

describe("maybeArchiveState", function () {
const logger = testLogger();
let chainStub: StubbedChainMutable<"checkpointStateCache">;
let db: BeaconDb;
let stateArchiver: StatesArchiver;

beforeEach(async function () {
chainStub = sinon.createStubInstance(BeaconChain);
chainStub.checkpointStateCache = new CheckpointStateCache({});
db = await startTmpBeaconDb(config);
});

afterEach(async () => {
await db.stop();
});

// testcases are array of [finalizedEpochs,expected archivedEpochs]
const testcases: [Epoch[], Epoch[]][] = [
[
[1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 13, 14, 15, 16, 18, 19, 20, 21, 23, 25, 26, 27, 28, 30, 31, 32, 33],
[1, 32, 33],
],
[
[
1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 13, 14, 15, 16, 18, 19, 20, 21, 23, 25, 26, 27, 28, 30, 31, 32, 33, 34, 35, 36,
37, 39, 40, 42, 43, 44,
],
[1, 32, 44],
],
[
[
1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 13, 14, 15, 16, 18, 19, 20, 21, 23, 25, 26, 27, 28, 30, 31, 32, 33, 34, 35, 36,
37, 39, 40, 42, 43, 44, 45, 46, 47, 48, 49, 50, 52, 53, 54, 56, 58, 60, 62, 63, 65, 66, 68, 70, 71, 74, 75, 77,
80, 82, 83, 84, 88, 90, 92, 94, 97, 98, 100, 102,
],
[1, 65, 97, 102],
],
[
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22],
[0, 22],
],
[
[7, 8, 35, 70],
[7, 70],
],
[
[
0, 20, 33, 40, 45, 50, 60, 64, 65, 70, 75, 80, 85, 90, 95, 100, 105, 110, 115, 120, 125, 130, 135, 140, 145,
150, 155, 160, 165, 170, 175, 180, 185, 190,
],
[0, 64, 130, 160, 190],
],
[
[
0, 20, 33, 40, 45, 50, 60, 64, 65, 70, 75, 80, 85, 90, 95, 100, 105, 110, 115, 120, 125, 130, 135, 140, 145,
150, 155, 160, 165, 170, 175, 180, 185, 190, 200,
],
[0, 64, 130, 200],
],
[
[
0, 20, 33, 40, 45, 50, 60, 64, 65, 70, 75, 80, 85, 90, 95, 100, 105, 110, 115, 120, 125, 130, 135, 140, 145,
150, 155, 160, 165, 170, 175, 180, 185, 190, 200, 205, 210, 220,
],
[0, 64, 130, 200, 220],
],
];

testcases.forEach((eachTestcase, i) => {
it(i + 1 + " should archive finalized states and delete correct ones", async function () {
const [finalizedEpochs, archivedStateSlots] = eachTestcase;

const finalizedCP = finalizedEpochs.map((epoch) => {
return {epoch, rootHex: ZERO_HASH_HEX, root: ZERO_HASH};
});

stateArchiver = new StatesArchiver(
chainStub.checkpointStateCache as StatesArchiver["checkpointStateCache"],
db as IBeaconDb,
logger,
{archiveStateEpochFrequency: 64}
);

for (const eachCP of finalizedCP) {
addDummyStateCache(chainStub["checkpointStateCache"], eachCP);
await stateArchiver.maybeArchiveState(eachCP);
chainStub["checkpointStateCache"].pruneFinalized(eachCP.epoch);
}

const finalArchivedStates = await db.stateArchive.keys();
const finalArchivedEpochs = finalArchivedStates.map((eachslot) => {
return computeEpochAtSlot(eachslot);
});
expect(finalArchivedEpochs).to.be.deep.equal(archivedStateSlots);
});
});
});

function addDummyStateCache(
checkpointStateCache: BeaconChain["checkpointStateCache"],
checkpoint: CheckpointWithHex
): void {
const rootCP = {epoch: checkpoint.epoch, root: fromHexString(checkpoint.rootHex)};
const checkpointstate = generateCachedState();
checkpointstate.epochCtx.epoch = checkpoint.epoch;
checkpointstate.slot = computeStartSlotAtEpoch(checkpoint.epoch);
checkpointStateCache.add(rootCP, checkpointstate);
}