Skip to content

Commit

Permalink
feat: move transcripts out of key-value store and into stream stores
Browse files Browse the repository at this point in the history
  • Loading branch information
FUDCo committed May 29, 2021
1 parent fa7ff5e commit a128e93
Show file tree
Hide file tree
Showing 23 changed files with 303 additions and 188 deletions.
2 changes: 2 additions & 0 deletions packages/SwingSet/bin/extract-transcript-from-kerneldb.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// XXX this is wrong; it needs to use the swingstore instead of opening the LMDB
// file directly, then use stream store reads to get the transcript entries.
import lmdb from 'node-lmdb';
import process from 'process';
import fs from 'fs';
Expand Down
4 changes: 2 additions & 2 deletions packages/SwingSet/src/kernel/initializeKernel.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ function makeVatRootObjectSlot() {
export function initializeKernel(config, hostStorage, verbose = false) {
const logStartup = verbose ? console.debug : () => 0;

const { kvStore } = hostStorage;
const { kvStore, streamStore } = hostStorage;
insistStorageAPI(kvStore);
const { enhancedCrankBuffer, commitCrank } = wrapStorage(kvStore);

const kernelKeeper = makeKernelKeeper(enhancedCrankBuffer);
const kernelKeeper = makeKernelKeeper(enhancedCrankBuffer, streamStore);

const wasInitialized = kernelKeeper.getInitialized();
assert(!wasInitialized);
Expand Down
8 changes: 6 additions & 2 deletions packages/SwingSet/src/kernel/kernel.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,19 @@ export default function buildKernel(
const { verbose, defaultManagerType = 'local' } = kernelOptions;
const logStartup = verbose ? console.debug : () => 0;

const { kvStore } = hostStorage;
const { kvStore, streamStore } = hostStorage;
insistStorageAPI(kvStore);
const { enhancedCrankBuffer, abortCrank, commitCrank } = wrapStorage(kvStore);

const kernelSlog = writeSlogObject
? makeSlogger(slogCallbacks, writeSlogObject)
: makeDummySlogger(slogCallbacks, makeConsole);

const kernelKeeper = makeKernelKeeper(enhancedCrankBuffer, kernelSlog);
const kernelKeeper = makeKernelKeeper(
enhancedCrankBuffer,
streamStore,
kernelSlog,
);

const meterManager = makeMeterManager(replaceGlobalMeter);

Expand Down
9 changes: 5 additions & 4 deletions packages/SwingSet/src/kernel/state/kernelKeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ const enableKernelPromiseGC = true;
// $R is 'R' when reachable, '_' when merely recognizable
// $vatSlot is one of: o+$NN/o-$NN/p+$NN/p-$NN/d+$NN/d-$NN
// v$NN.c.$vatSlot = $kernelSlot = ko$NN/kp$NN/kd$NN
// v$NN.t.$NN = JSON(transcript entry)
// v$NN.t.nextID = $NN
// v$NN.t.endPosition = $NN
// v$NN.vs.$key = string

// d$NN.o.nextID = $NN
Expand Down Expand Up @@ -100,7 +99,7 @@ const FIRST_DEVNODE_ID = 30n;
const FIRST_PROMISE_ID = 40n;
const FIRST_CRANK_NUMBER = 0n;

export default function makeKernelKeeper(kvStore, kernelSlog) {
export default function makeKernelKeeper(kvStore, streamStore, kernelSlog) {
insistEnhancedStorageAPI(kvStore);

function getRequired(key) {
Expand Down Expand Up @@ -668,11 +667,12 @@ export default function makeKernelKeeper(kvStore, kernelSlog) {
function allocateVatKeeper(vatID) {
insistVatID(vatID);
if (!kvStore.has(`${vatID}.o.nextID`)) {
initializeVatState(kvStore, vatID);
initializeVatState(kvStore, streamStore, vatID);
}
assert(!ephemeral.vatKeepers.has(vatID), X`vatID ${vatID} already defined`);
const vk = makeVatKeeper(
kvStore,
streamStore,
kernelSlog,
vatID,
addKernelObject,
Expand Down Expand Up @@ -757,6 +757,7 @@ export default function makeKernelKeeper(kvStore, kernelSlog) {
if (vk) {
// TODO: find some way to expose the liveSlots internal tables, the
// kernel doesn't see them
vk.closeTranscript();
const vatTable = {
vatID,
state: { transcript: Array.from(vk.getTranscript()) },
Expand Down
58 changes: 43 additions & 15 deletions packages/SwingSet/src/kernel/state/vatKeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,30 @@ import { kdebug } from '../kdebug';
const FIRST_OBJECT_ID = 50n;
const FIRST_PROMISE_ID = 60n;
const FIRST_DEVICE_ID = 70n;
const FIRST_TRANSCRIPT_ID = 0n;

/**
* Establish a vat's state.
*
* @param {*} kvStore The storage in which the persistent state will be kept
* @param {*} kvStore The key-value store in which the persistent state will be kept
* @param {*} streamStore Accompanying stream store
* @param {string} vatID The vat ID string of the vat in question
* TODO: consider making this part of makeVatKeeper
*/
export function initializeVatState(kvStore, vatID) {
export function initializeVatState(kvStore, streamStore, vatID) {
kvStore.set(`${vatID}.o.nextID`, `${FIRST_OBJECT_ID}`);
kvStore.set(`${vatID}.p.nextID`, `${FIRST_PROMISE_ID}`);
kvStore.set(`${vatID}.d.nextID`, `${FIRST_DEVICE_ID}`);
kvStore.set(`${vatID}.t.nextID`, `${FIRST_TRANSCRIPT_ID}`);
kvStore.set(
`${vatID}.t.endPosition`,
`${JSON.stringify(streamStore.STREAM_START)}`,
);
}

/**
* Produce a vat keeper for a vat.
*
* @param {*} kvStore The storage in which the persistent state will be kept
* @param {*} kvStore The keyValue store in which the persistent state will be kept
* @param {*} streamStore Accompanying stream store, for the transcripts
* @param {*} kernelSlog
* @param {string} vatID The vat ID string of the vat in question
* @param {*} addKernelObject Kernel function to add a new object to the kernel's
Expand All @@ -50,6 +54,7 @@ export function initializeVatState(kvStore, vatID) {
*/
export function makeVatKeeper(
kvStore,
streamStore,
kernelSlog,
vatID,
addKernelObject,
Expand All @@ -61,6 +66,7 @@ export function makeVatKeeper(
getCrankNumber,
) {
insistVatID(vatID);
const transcriptStream = `transcript-${vatID}`;

function setSourceAndOptions(source, options) {
// take care with API change
Expand Down Expand Up @@ -292,22 +298,42 @@ export function makeVatKeeper(

/**
* Generator function to return the vat's transcript, one entry at a time.
*
* @param {Object?} startPos Optional position to begin reading from
*
* @yields {string} a stream of transcript entries
*/
function* getTranscript() {
for (const value of kvStore.getPrefixedValues(`${vatID}.t.`)) {
yield JSON.parse(value);
function* getTranscript(startPos = streamStore.STREAM_START) {
const endPos = JSON.parse(kvStore.get(`${vatID}.t.endPosition`));
for (const entry of streamStore.readStream(
transcriptStream,
startPos,
endPos,
)) {
yield JSON.parse(entry);
}
}

/**
* Append a message to the vat's transcript.
* Append an entry to the vat's transcript.
*
* @param {string} msg The message to append.
* @param {Object} entry The transcript entry to append.
*/
function addToTranscript(entry) {
const oldPos = JSON.parse(kvStore.get(`${vatID}.t.endPosition`));
const newPos = streamStore.writeStreamItem(
transcriptStream,
JSON.stringify(entry),
oldPos,
);
kvStore.set(`${vatID}.t.endPosition`, `${JSON.stringify(newPos)}`);
}

/**
* Cease writing to the vat's transcript.
*/
function addToTranscript(msg) {
const id = Nat(BigInt(kvStore.get(`${vatID}.t.nextID`)));
kvStore.set(`${vatID}.t.nextID`, `${id + 1n}`);
kvStore.set(`${vatID}.t.${id}`, JSON.stringify(msg));
function closeTranscript() {
streamStore.closeStream(transcriptStream);
}

function vatStats() {
Expand All @@ -319,7 +345,8 @@ export function makeVatKeeper(
const objectCount = getCount(`${vatID}.o.nextID`, FIRST_OBJECT_ID);
const promiseCount = getCount(`${vatID}.p.nextID`, FIRST_PROMISE_ID);
const deviceCount = getCount(`${vatID}.d.nextID`, FIRST_DEVICE_ID);
const transcriptCount = getCount(`${vatID}.t.nextID`, FIRST_TRANSCRIPT_ID);
const transcriptCount = JSON.parse(kvStore.get(`${vatID}.t.endPosition`))
.itemCount;

// TODO: Fix the downstream JSON.stringify to allow the counts to be BigInts
return harden({
Expand Down Expand Up @@ -363,6 +390,7 @@ export function makeVatKeeper(
deleteCListEntriesForKernelSlots,
getTranscript,
addToTranscript,
closeTranscript,
vatStats,
dumpState,
});
Expand Down
2 changes: 1 addition & 1 deletion packages/SwingSet/src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
*
* @typedef { { d: VatDeliveryObject, syscalls: VatSyscallObject[] } } TranscriptEntry
* @typedef { { transcriptCount: number } } VatStats
* @typedef { { getTranscript: () => TranscriptEntry[],
* @typedef { { getTranscript: (startPos?: Object) => TranscriptEntry[],
* vatStats: () => VatStats,
* } } VatKeeper
* @typedef { { getVatKeeper: (vatID: string) => VatKeeper } } KernelKeeper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async function runOneTest(t, explosion, managerType) {
await c.run();
t.is(JSON.parse(kvStore.get('vat.dynamicIDs')).length, 1);
t.is(kvStore.get(`${root}.owner`), vatID);
t.is(Array.from(kvStore.getKeys(`${vatID}`, `${vatID}/`)).length, 12);
t.is(Array.from(kvStore.getKeys(`${vatID}`, `${vatID}/`)).length, 10);
// neverKPID should still be unresolved
t.is(kvStore.get(`${neverKPID}.state`), 'unresolved');

Expand Down
5 changes: 3 additions & 2 deletions packages/SwingSet/test/test-clist.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import { wrapStorage } from '../src/kernel/state/storageWrapper';

test(`clist reachability`, async t => {
const slog = makeDummySlogger({});
const { enhancedCrankBuffer: s } = wrapStorage(initSwingStore().kvStore);
const hostStorage = initSwingStore();
const { enhancedCrankBuffer: s } = wrapStorage(hostStorage.kvStore);

const kk = makeKernelKeeper(s, slog);
const kk = makeKernelKeeper(s, hostStorage.streamStore, slog);
kk.createStartingKernelState('local');
const vatID = kk.allocateUnusedVatID();
const vk = kk.allocateVatKeeper(vatID);
Expand Down
2 changes: 1 addition & 1 deletion packages/SwingSet/test/test-devices.js
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ test.serial('device state', async t => {
const d3 = c1.deviceNameToID('d3');
await c1.run();
t.deepEqual(c1.dump().log, ['undefined', 'w+r', 'called', 'got {"s":"new"}']);
const s = getAllState(hostStorage.kvStore);
const s = getAllState(hostStorage).kvStuff;
t.deepEqual(JSON.parse(s[`${d3}.deviceState`]), capargs({ s: 'new' }));
t.deepEqual(JSON.parse(s[`${d3}.o.nextID`]), 10);
});
Expand Down
1 change: 1 addition & 0 deletions packages/SwingSet/test/test-gc-transcript.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ function setup(storedTranscript = []) {
getTranscript() {
return storedTranscript;
},
closeTranscript() {},
};
const kernelKeeper = {
getVatKeeper() {
Expand Down
Loading

0 comments on commit a128e93

Please sign in to comment.