Skip to content

Commit

Permalink
refactor: change archiving strategy to always store last finalized (#…
Browse files Browse the repository at this point in the history
…5520)

* refac: change archiving strategy to always store last finalized

* update lookup frequency

* lint

* chore: remove console.log in test case

---------

Co-authored-by: Tuyen Nguyen <vutuyen2636@gmail.com>
  • Loading branch information
g11tech and twoeths authored May 23, 2023
1 parent 42c8097 commit 4fd72b2
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 16 deletions.
43 changes: 27 additions & 16 deletions packages/beacon-node/src/chain/archiver/archiveStates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,38 +35,49 @@ export class StatesArchiver {
/**
* Persist states every some epochs to
* - Minimize disk space, storing the least states possible
* - Minimize the sync progress lost on unexpected crash, storing temp state every few epochs
* - Minimize the sync progress lost on unexpected crash, storing last finalized always
*
* At epoch `e` there will be states peristed at intervals of `PERSIST_STATE_EVERY_EPOCHS` = 32
* and one at `PERSIST_TEMP_STATE_EVERY_EPOCHS` = 1024
* ```
* | | | .
* epoch - 1024*2 epoch - 1024 epoch - 32 epoch
* ```
*
* An extra last finalized state is stored which might be cleaned up in next archiving cycle
* if there is already a previous state in the `PERSIST_STATE_EVERY_EPOCHS` window
*/
async maybeArchiveState(finalized: CheckpointWithHex): Promise<void> {
const lastStoredSlot = await this.db.stateArchive.lastKey();
const lastStoredEpoch = computeEpochAtSlot(lastStoredSlot ?? 0);
const {archiveStateEpochFrequency} = this.opts;

if (finalized.epoch - lastStoredEpoch > Math.min(PERSIST_TEMP_STATE_EVERY_EPOCHS, archiveStateEpochFrequency)) {
await this.archiveState(finalized);
// If the new big window has started, we need to cleanup on the big interval archiveStateEpochFrequency(1024)
// else we need to cleanup within the small PERSIST_TEMP_STATE_EVERY_EPOCHS interval
const frequency =
Math.floor(lastStoredEpoch / archiveStateEpochFrequency) <
Math.floor(finalized.epoch / archiveStateEpochFrequency)
? archiveStateEpochFrequency
: PERSIST_TEMP_STATE_EVERY_EPOCHS;

// Only check the current and previous intervals
const minEpoch = Math.max(
0,
(Math.floor(finalized.epoch / archiveStateEpochFrequency) - 1) * archiveStateEpochFrequency
);
// Only check the current and previous intervals
const minEpoch = Math.max(0, (Math.floor(finalized.epoch / frequency) - 1) * frequency);

const storedStateSlots = await this.db.stateArchive.keys({
lt: computeStartSlotAtEpoch(finalized.epoch),
gte: computeStartSlotAtEpoch(minEpoch),
});
const storedStateSlots = await this.db.stateArchive.keys({
lt: computeStartSlotAtEpoch(finalized.epoch),
gte: computeStartSlotAtEpoch(minEpoch),
});
const stateSlotsToDelete = computeStateSlotsToDelete(storedStateSlots, frequency);

const statesSlotsToDelete = computeStateSlotsToDelete(storedStateSlots, archiveStateEpochFrequency);
if (statesSlotsToDelete.length > 0) {
await this.db.stateArchive.batchDelete(statesSlotsToDelete);
}
// 1. Always archive latest state so as to allow restarts to happen from last finalized.
// It will be cleaned up next cycle even if it results into two states for this interval
//
// 2. Delete the states only after storing the latest finalized so as to prevent any race in non
// availability of finalized state to boot from
//
await this.archiveState(finalized);
if (stateSlotsToDelete.length > 0) {
await this.db.stateArchive.batchDelete(stateSlotsToDelete);
}
}

Expand Down
125 changes: 125 additions & 0 deletions packages/beacon-node/test/unit/chain/archive/maybeArchiveState.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import {expect} from "chai";
import sinon from "sinon";
import {Epoch} from "@lodestar/types";
import {config} from "@lodestar/config/default";
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {computeStartSlotAtEpoch, computeEpochAtSlot} from "@lodestar/state-transition";
import {fromHexString} from "@chainsafe/ssz";
import {ZERO_HASH_HEX, ZERO_HASH} from "../../../../src/constants/index.js";
import {StatesArchiver} from "../../../../src/chain/archiver/archiveStates.js";
import {StubbedChainMutable} from "../../../utils/stub/index.js";
import {testLogger} from "../../../utils/logger.js";
import {BeaconChain, CheckpointStateCache} from "../../../../src/chain/index.js";
import {IBeaconDb} from "../../../../src/index.js";
import {BeaconDb} from "../../../../src/db/index.js";
import {startTmpBeaconDb} from "../../../utils/db.js";
import {generateCachedState} from "../../../utils/state.js";

describe("maybeArchiveState", function () {
const logger = testLogger();
let chainStub: StubbedChainMutable<"checkpointStateCache">;
let db: BeaconDb;
let stateArchiver: StatesArchiver;

beforeEach(async function () {
chainStub = sinon.createStubInstance(BeaconChain);
chainStub.checkpointStateCache = new CheckpointStateCache({});
db = await startTmpBeaconDb(config);
});

afterEach(async () => {
await db.stop();
});

// testcases are array of [finalizedEpochs,expected archivedEpochs]
const testcases: [Epoch[], Epoch[]][] = [
[
[1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 13, 14, 15, 16, 18, 19, 20, 21, 23, 25, 26, 27, 28, 30, 31, 32, 33],
[1, 32, 33],
],
[
[
1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 13, 14, 15, 16, 18, 19, 20, 21, 23, 25, 26, 27, 28, 30, 31, 32, 33, 34, 35, 36,
37, 39, 40, 42, 43, 44,
],
[1, 32, 44],
],
[
[
1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 13, 14, 15, 16, 18, 19, 20, 21, 23, 25, 26, 27, 28, 30, 31, 32, 33, 34, 35, 36,
37, 39, 40, 42, 43, 44, 45, 46, 47, 48, 49, 50, 52, 53, 54, 56, 58, 60, 62, 63, 65, 66, 68, 70, 71, 74, 75, 77,
80, 82, 83, 84, 88, 90, 92, 94, 97, 98, 100, 102,
],
[1, 65, 97, 102],
],
[
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22],
[0, 22],
],
[
[7, 8, 35, 70],
[7, 70],
],
[
[
0, 20, 33, 40, 45, 50, 60, 64, 65, 70, 75, 80, 85, 90, 95, 100, 105, 110, 115, 120, 125, 130, 135, 140, 145,
150, 155, 160, 165, 170, 175, 180, 185, 190,
],
[0, 64, 130, 160, 190],
],
[
[
0, 20, 33, 40, 45, 50, 60, 64, 65, 70, 75, 80, 85, 90, 95, 100, 105, 110, 115, 120, 125, 130, 135, 140, 145,
150, 155, 160, 165, 170, 175, 180, 185, 190, 200,
],
[0, 64, 130, 200],
],
[
[
0, 20, 33, 40, 45, 50, 60, 64, 65, 70, 75, 80, 85, 90, 95, 100, 105, 110, 115, 120, 125, 130, 135, 140, 145,
150, 155, 160, 165, 170, 175, 180, 185, 190, 200, 205, 210, 220,
],
[0, 64, 130, 200, 220],
],
];

testcases.forEach((eachTestcase, i) => {
it(i + 1 + " should archive finalized states and delete correct ones", async function () {
const [finalizedEpochs, archivedStateSlots] = eachTestcase;

const finalizedCP = finalizedEpochs.map((epoch) => {
return {epoch, rootHex: ZERO_HASH_HEX, root: ZERO_HASH};
});

stateArchiver = new StatesArchiver(
chainStub.checkpointStateCache as StatesArchiver["checkpointStateCache"],
db as IBeaconDb,
logger,
{archiveStateEpochFrequency: 64}
);

for (const eachCP of finalizedCP) {
addDummyStateCache(chainStub["checkpointStateCache"], eachCP);
await stateArchiver.maybeArchiveState(eachCP);
chainStub["checkpointStateCache"].pruneFinalized(eachCP.epoch);
}

const finalArchivedStates = await db.stateArchive.keys();
const finalArchivedEpochs = finalArchivedStates.map((eachslot) => {
return computeEpochAtSlot(eachslot);
});
expect(finalArchivedEpochs).to.be.deep.equal(archivedStateSlots);
});
});
});

function addDummyStateCache(
checkpointStateCache: BeaconChain["checkpointStateCache"],
checkpoint: CheckpointWithHex
): void {
const rootCP = {epoch: checkpoint.epoch, root: fromHexString(checkpoint.rootHex)};
const checkpointstate = generateCachedState();
checkpointstate.epochCtx.epoch = checkpoint.epoch;
checkpointstate.slot = computeStartSlotAtEpoch(checkpoint.epoch);
checkpointStateCache.add(rootCP, checkpointstate);
}

0 comments on commit 4fd72b2

Please sign in to comment.