Skip to content

Commit

Permalink
feat: implement swingStore data export/import in support of state sync
Browse files Browse the repository at this point in the history
Closes #6773
  • Loading branch information
FUDCo committed Feb 27, 2023
1 parent d657882 commit 078b5b7
Show file tree
Hide file tree
Showing 18 changed files with 1,428 additions and 381 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ if (!dirPath) {
if (!isSwingStore(dirPath)) {
throw Error(`${dirPath} does not appear to be a swingstore (no ./data.mdb)`);
}
const { kvStore, streamStore } = openSwingStore(dirPath).kernelStorage;
const { kvStore, transcriptStore } = openSwingStore(dirPath).kernelStorage;
function get(key) {
return kvStore.get(key);
}
Expand Down Expand Up @@ -98,7 +98,7 @@ if (!vatName) {
fs.writeSync(fd, JSON.stringify(first));
fs.writeSync(fd, '\n');

// The streamStore holds concatenated transcripts from all upgraded
// The transcriptStore holds concatenated transcripts from all upgraded
// versions. For each old version, it holds every delivery from
// `startVat` through `stopVat`. For the current version, it holds
// every delivery from `startVat` up through the last delivery
Expand All @@ -123,9 +123,8 @@ if (!vatName) {
console.log(`${transcriptLength} transcript entries`);

let deliveryNum = 0;
const transcriptStream = `transcript-${vatID}`;
const stream = streamStore.readStream(transcriptStream, startPos, endPos);
for (const entry of stream) {
const transcript = transcriptStore.readSpan(vatID, startPos, endPos);
for (const entry of transcript) {
// entry is JSON.stringify({ d, syscalls }), syscall is { d, response }
const t = { transcriptNum, ...JSON.parse(entry) };
// console.log(`t.${deliveryNum} : ${t}`);
Expand Down
27 changes: 4 additions & 23 deletions packages/SwingSet/src/kernel/state/kernelKeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ const enableKernelGC = true;
* @typedef { import('../../types-external.js').KernelSlog } KernelSlog
* @typedef { import('../../types-external.js').ManagerType } ManagerType
* @typedef { import('../../types-external.js').SnapStore } SnapStore
* @typedef { import('../../types-external.js').StreamPosition } StreamPosition
* @typedef { import('../../types-external.js').StreamStore } StreamStore
* @typedef { import('../../types-external.js').TranscriptStore } TranscriptStore
* @typedef { import('../../types-external.js').VatKeeper } VatKeeper
* @typedef { import('../../types-external.js').VatManager } VatManager
*/
Expand Down Expand Up @@ -86,8 +85,6 @@ const enableKernelGC = true;
// $vatSlot is one of: o+$NN/o-$NN/p+$NN/p-$NN/d+$NN/d-$NN
// v$NN.c.$vatSlot = $kernelSlot = ko$NN/kp$NN/kd$NN
// v$NN.nextDeliveryNum = $NN
// v$NN.t.startPosition = $NN // inclusive
// v$NN.t.endPosition = $NN // exclusive
// v$NN.vs.$key = string
// v$NN.meter = m$NN // XXX does this exist?
// v$NN.reapInterval = $NN or 'never'
Expand Down Expand Up @@ -174,7 +171,7 @@ const FIRST_METER_ID = 1n;
* @param {KernelSlog|null} kernelSlog
*/
export default function makeKernelKeeper(kernelStorage, kernelSlog) {
const { kvStore, streamStore, snapStore } = kernelStorage;
const { kvStore, transcriptStore, snapStore } = kernelStorage;

insistStorageAPI(kvStore);

Expand Down Expand Up @@ -1297,11 +1294,11 @@ export default function makeKernelKeeper(kernelStorage, kernelSlog) {
return found;
}
if (!kvStore.has(`${vatID}.o.nextID`)) {
initializeVatState(kvStore, streamStore, vatID);
initializeVatState(kvStore, transcriptStore, vatID);
}
const vk = makeVatKeeper(
kvStore,
streamStore,
transcriptStore,
kernelSlog,
vatID,
addKernelObject,
Expand All @@ -1328,20 +1325,6 @@ export default function makeKernelKeeper(kernelStorage, kernelSlog) {
}

/**
* Cease writing to the vat's transcript.
*
* @param {string} vatID
*/
function closeVatTranscript(vatID) {
insistVatID(vatID);
const transcriptStream = `transcript-${vatID}`;
streamStore.closeStream(transcriptStream);
}

/**
* NOTE: caller is responsible to closeVatTranscript()
* before evicting a VatKeeper.
*
* @param {string} vatID
*/
function evictVatKeeper(vatID) {
Expand Down Expand Up @@ -1448,7 +1431,6 @@ export default function makeKernelKeeper(kernelStorage, kernelSlog) {
if (vk) {
// TODO: find some way to expose the liveSlots internal tables, the
// kernel doesn't see them
closeVatTranscript(vatID);
const vatTable = {
vatID,
state: { transcript: Array.from(vk.getTranscript()) },
Expand Down Expand Up @@ -1615,7 +1597,6 @@ export default function makeKernelKeeper(kernelStorage, kernelSlog) {
provideVatKeeper,
vatIsAlive,
evictVatKeeper,
closeVatTranscript,
cleanupAfterTerminatedVat,
addDynamicVatID,
getDynamicVats,
Expand Down
53 changes: 16 additions & 37 deletions packages/SwingSet/src/kernel/state/vatKeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ import { enumeratePrefixedKeys } from './storageHelper.js';
* @typedef { import('../../types-external.js').ManagerOptions } ManagerOptions
* @typedef { import('../../types-external.js').SnapStore } SnapStore
* @typedef { import('../../types-external.js').SourceOfBundle } SourceOfBundle
* @typedef { import('../../types-external.js').StreamPosition } StreamPosition
* @typedef { import('../../types-external.js').StreamStore } StreamStore
* @typedef { import('../../types-external.js').TranscriptStore } TranscriptStore
* @typedef { import('../../types-external.js').VatManager } VatManager
* @typedef { import('../../types-internal.js').RecordedVatOptions } RecordedVatOptions
* @typedef { import('../../types-external.js').TranscriptEntry } TranscriptEntry
Expand All @@ -36,25 +35,24 @@ const FIRST_DEVICE_ID = 70n;
* Establish a vat's state.
*
* @param {*} kvStore The key-value store in which the persistent state will be kept
* @param {*} streamStore Accompanying stream store
* @param {*} transcriptStore Accompanying transcript store
* @param {string} vatID The vat ID string of the vat in question
* TODO: consider making this part of makeVatKeeper
*/
export function initializeVatState(kvStore, streamStore, vatID) {
export function initializeVatState(kvStore, transcriptStore, vatID) {
kvStore.set(`${vatID}.o.nextID`, `${FIRST_OBJECT_ID}`);
kvStore.set(`${vatID}.p.nextID`, `${FIRST_PROMISE_ID}`);
kvStore.set(`${vatID}.d.nextID`, `${FIRST_DEVICE_ID}`);
kvStore.set(`${vatID}.nextDeliveryNum`, `0`);
kvStore.set(`${vatID}.incarnationNumber`, `1`);
kvStore.set(`${vatID}.t.startPosition`, `${streamStore.STREAM_START}`);
kvStore.set(`${vatID}.t.endPosition`, `${streamStore.STREAM_START}`);
transcriptStore.initTranscript(vatID);
}

/**
* Produce a vat keeper for a vat.
*
* @param {KVStore} kvStore The keyValue store in which the persistent state will be kept
* @param {StreamStore} streamStore Accompanying stream store, for the transcripts
* @param {TranscriptStore} transcriptStore Accompanying transcript store, for the transcripts
* @param {*} kernelSlog
* @param {string} vatID The vat ID string of the vat in question
* @param {*} addKernelObject Kernel function to add a new object to the kernel's
Expand All @@ -76,7 +74,7 @@ export function initializeVatState(kvStore, streamStore, vatID) {
*/
export function makeVatKeeper(
kvStore,
streamStore,
transcriptStore,
kernelSlog,
vatID,
addKernelObject,
Expand All @@ -94,7 +92,6 @@ export function makeVatKeeper(
snapStore = undefined,
) {
insistVatID(vatID);
const transcriptStream = `transcript-${vatID}`;

function getRequired(key) {
const value = kvStore.get(key);
Expand Down Expand Up @@ -475,20 +472,12 @@ export function makeVatKeeper(
/**
* Generator function to return the vat's transcript, one entry at a time.
*
* @param {StreamPosition} [startPos] Optional position to begin reading from
* @param {number} [startPos] Optional position to begin reading from
*
* @yields { TranscriptEntry } a stream of transcript entries
*/
function* getTranscript(startPos) {
if (startPos === undefined) {
startPos = Number(getRequired(`${vatID}.t.startPosition`));
}
const endPos = Number(getRequired(`${vatID}.t.endPosition`));
for (const entry of streamStore.readStream(
transcriptStream,
/** @type { StreamPosition } */ (startPos),
endPos,
)) {
for (const entry of transcriptStore.readSpan(vatID, startPos)) {
yield /** @type { TranscriptEntry } */ (JSON.parse(entry));
}
}
Expand All @@ -499,21 +488,13 @@ export function makeVatKeeper(
* @param {object} entry The transcript entry to append.
*/
function addToTranscript(entry) {
const oldPos = Number(getRequired(`${vatID}.t.endPosition`));
const newPos = streamStore.writeStreamItem(
transcriptStream,
JSON.stringify(entry),
oldPos,
);
kvStore.set(`${vatID}.t.endPosition`, `${newPos}`);
transcriptStore.addItem(vatID, JSON.stringify(entry));
}

/** @returns {StreamPosition} */
/** @returns {number} */
function getTranscriptEndPosition() {
const endPosition =
kvStore.get(`${vatID}.t.endPosition`) ||
assert.fail('missing endPosition');
return Number(endPosition);
const { endPos } = transcriptStore.getCurrentSpanBounds(vatID);
return endPos;
}

function getSnapshotInfo() {
Expand All @@ -540,6 +521,7 @@ export function makeVatKeeper(

const endPosition = getTranscriptEndPosition();
const info = await manager.makeSnapshot(endPosition, snapStore);
transcriptStore.rolloverSpan(vatID);
const {
hash,
uncompressedSize,
Expand Down Expand Up @@ -570,9 +552,7 @@ export function makeVatKeeper(
if (snapStore) {
snapStore.deleteVatSnapshots(vatID);
}

const endPos = getRequired(`${vatID}.t.endPosition`);
kvStore.set(`${vatID}.t.startPosition`, endPos);
transcriptStore.rolloverSpan(vatID);
}

function vatStats() {
Expand All @@ -584,9 +564,8 @@ export function makeVatKeeper(
const objectCount = getCount(`${vatID}.o.nextID`, FIRST_OBJECT_ID);
const promiseCount = getCount(`${vatID}.p.nextID`, FIRST_PROMISE_ID);
const deviceCount = getCount(`${vatID}.d.nextID`, FIRST_DEVICE_ID);
const startCount = Number(getRequired(`${vatID}.t.startPosition`));
const endCount = Number(getRequired(`${vatID}.t.endPosition`));
const transcriptCount = endCount - startCount;
const { startPos, endPos } = transcriptStore.getCurrentSpanBounds(vatID);
const transcriptCount = endPos - startPos;

// TODO: Fix the downstream JSON.stringify to allow the counts to be BigInts
return harden({
Expand Down
1 change: 0 additions & 1 deletion packages/SwingSet/src/kernel/vat-warehouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ export function makeVatWarehouse(kernelKeeper, vatLoader, policyOptions) {
}
ephemeral.vats.delete(vatID);
xlate.delete(vatID);
kernelKeeper.closeVatTranscript(vatID);
kernelKeeper.evictVatKeeper(vatID);

// console.log('evict: shutting down', vatID);
Expand Down
3 changes: 1 addition & 2 deletions packages/SwingSet/src/types-ambient.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@
*/
/**
* @typedef { import('@agoric/swing-store').KVStore } KVStore
* @typedef { import('@agoric/swing-store').StreamStore } StreamStore
* @typedef { import('@agoric/swing-store').StreamPosition } StreamPosition
* @typedef { import('@agoric/swing-store').TranscriptStore } TranscriptStore
* @typedef { import('@agoric/swing-store').SwingStore } SwingStore
* @typedef { import('@agoric/swing-store').SwingStoreKernelStorage } SwingStoreKernelStorage
* @typedef { import('@agoric/swing-store').SwingStoreHostStorage } SwingStoreHostStorage
Expand Down
5 changes: 2 additions & 3 deletions packages/SwingSet/src/types-external.js
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ export {};
* vatSyscallHandler: unknown) => Promise<VatManager>,
* } } VatManagerFactory
* @typedef { { deliver: (delivery: VatDeliveryObject) => Promise<VatDeliveryResult>,
* replayTranscript: (startPos: StreamPosition | undefined) => Promise<number?>,
* replayTranscript: (startPos: number | undefined) => Promise<number?>,
* makeSnapshot?: (endPos: number, ss: SnapStore) => Promise<SnapshotResult>,
* shutdown: () => Promise<void>,
* } } VatManager
Expand Down Expand Up @@ -277,8 +277,7 @@ export {};
* @typedef { import('@agoric/swing-store').KVStore } KVStore
* @typedef { import('@agoric/swing-store').SnapStore } SnapStore
* @typedef { import('@agoric/swing-store').SnapshotResult } SnapshotResult
* @typedef { import('@agoric/swing-store').StreamStore } StreamStore
* @typedef { import('@agoric/swing-store').StreamPosition } StreamPosition
* @typedef { import('@agoric/swing-store').TranscriptStore } TranscriptStore
* @typedef { import('@agoric/swing-store').SwingStore } SwingStore
* @typedef { import('@agoric/swing-store').SwingStoreKernelStorage } SwingStoreKernelStorage
* @typedef { import('@agoric/swing-store').SwingStoreHostStorage } SwingStoreHostStorage
Expand Down
2 changes: 1 addition & 1 deletion packages/SwingSet/test/upgrade/test-upgrade.js
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ test('failed upgrade - explode', async t => {
},
};

const kernelStorage = initSwingStore().kernelStorage;
const { kernelStorage } = initSwingStore();
await initializeSwingset(config, [], kernelStorage);
const c = await makeSwingsetController(kernelStorage);
c.pinVatRoot('bootstrap');
Expand Down
4 changes: 2 additions & 2 deletions packages/SwingSet/test/vat-warehouse/test-reload-snapshot.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ test('vat reload from snapshot', async t => {
const snapshotInfo = snapStore.getSnapshotInfo(vatID);

const start = snapshotInfo ? snapshotInfo.endPos : 0;
const endPosition = kernelStorage.kvStore.get(`${vatID}.t.endPosition`);
const end = Number(endPosition);
const bounds = kernelStorage.transcriptStore.getCurrentSpanBounds(vatID);
const end = bounds.endPos;
return [start, end];
}

Expand Down
1 change: 1 addition & 0 deletions packages/swing-store/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"@agoric/assert": "^0.5.1",
"@agoric/internal": "^0.2.1",
"better-sqlite3": "^7.5.0",
"readline-transform": "^1.0.0",
"tmp": "^0.2.1"
},
"devDependencies": {
Expand Down
4 changes: 2 additions & 2 deletions packages/swing-store/src/hasher.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import { assert } from '@agoric/assert';
import { createHash } from 'crypto';

/**
* @typedef { (initial?: string) => {
* add: (more: string) => void,
* @typedef { (initial?: string | Buffer) => {
* add: (more: string | Buffer) => void,
* finish: () => string,
* }
* } CreateSHA256
Expand Down
Loading

0 comments on commit 078b5b7

Please sign in to comment.