Skip to content

Commit

Permalink
Merge pull request #7561 from Agoric/mhofman/6943-force-reload
Browse files Browse the repository at this point in the history
feat(SwingSet): force reload worker from snapshot
  • Loading branch information
mergify[bot] authored Apr 30, 2023
2 parents 1265a79 + a1e0d09 commit 317cfb6
Show file tree
Hide file tree
Showing 13 changed files with 184 additions and 50 deletions.
1 change: 1 addition & 0 deletions packages/SwingSet/misc-tools/replay-transcript.js
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ async function replay(transcriptFile) {
const { hash, compressSeconds: saveSeconds } = await manager.makeSnapshot(
lastTranscriptNum,
snapStore,
false, // Do not restart, we'll do that ourselves if needed
);
fs.writeSync(
snapshotActivityFd,
Expand Down
2 changes: 1 addition & 1 deletion packages/SwingSet/src/controller/controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ export async function makeSwingsetController(
* slogCallbacks?: unknown;
* slogSender?: import('@agoric/telemetry').SlogSender;
* testTrackDecref?: unknown;
* warehousePolicy?: { maxVatsOnline?: number };
* warehousePolicy?: import('../types-external.js').VatWarehousePolicy;
* }} runtimeOptions
* @param {Record<string, unknown>} deviceEndowments
* @typedef { import('@agoric/swing-store').KVStore } KVStore
Expand Down
78 changes: 67 additions & 11 deletions packages/SwingSet/src/controller/startXSnap.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,32 @@ import { xsnap, recordXSnap } from '@agoric/xsnap';

const NETSTRING_MAX_CHUNK_SIZE = 12_000_000;

/**
* @typedef {object} StartXSnapInitFromBundlesDetails
* @property {'bundles'} from
*
* TODO: Move bundleIDs here
*/

/**
* @typedef {object} StartXSnapInitFromSnapshotStreamDetails
* @property {'snapshotStream'} from
* @property {AsyncIterable<Uint8Array>} snapshotStream
* @property {string} [snapshotDescription]
*/

/**
* @typedef {object} StartXSnapInitFromSnapStoreDetails
* @property {'snapStore'} from
* @property {string} vatID
*
* TODO: transition to direct snapshot stream, and remove this option
*/

/** @typedef {StartXSnapInitFromBundlesDetails | StartXSnapInitFromSnapshotStreamDetails | StartXSnapInitFromSnapStoreDetails} StartXSnapInitDetails */

/** @typedef {ReturnType<typeof makeStartXSnap>} StartXSnap */

/**
* @param {{
* bundleHandler: import('./bundle-handler.js').BundleHandler,
Expand Down Expand Up @@ -62,32 +88,62 @@ export function makeStartXSnap(options) {
netstringMaxChunkSize: NETSTRING_MAX_CHUNK_SIZE,
};

/** @param {StartXSnapInitDetails} initDetails */
function getSnapshotLoadOptions(initDetails) {
switch (initDetails.from) {
case 'bundles':
return undefined;

case 'snapStore': {
if (!snapStore) {
// Fallback to load from bundles
return undefined;
}

const { vatID } = initDetails;
const { hash: snapshotID, snapPos } = snapStore.getSnapshotInfo(vatID);
// console.log('startXSnap from', { snapshotID });
const snapshotStream = snapStore.loadSnapshot(vatID);
const snapshotDescription = `${vatID}-${snapPos}-${snapshotID}`;
return { snapshotStream, snapshotDescription };
}

case 'snapshotStream': {
const { snapshotStream, snapshotDescription } = initDetails;
return { snapshotStream, snapshotDescription };
}

default:
// @ts-expect-error exhaustive check
throw Fail`Unexpected xsnap init type ${initDetails.type}`;
}
}

/**
* @param {string} vatID
* @param {string} name
* @param {object} details
* @param {import('../types-external.js').BundleID[]} details.bundleIDs
* @param {(request: Uint8Array) => Promise<Uint8Array>} details.handleCommand
* @param {boolean} [details.metered]
* @param {boolean} [details.reload]
* @param {StartXSnapInitDetails} [details.init]
*/
async function startXSnap(
vatID,
name,
{ bundleIDs, handleCommand, metered, reload = false },
{
bundleIDs,
handleCommand,
metered,
init: initDetails = { from: 'bundles' },
},
) {
const meterOpts = metered ? {} : { meteringLimit: 0 };
if (snapStore && reload) {
const { hash: snapshotID, snapPos } = snapStore.getSnapshotInfo(vatID);
// console.log('startXSnap from', { snapshotID });
const snapshotStream = snapStore.loadSnapshot(vatID);
const snapshotLoadOpts = getSnapshotLoadOptions(initDetails);
if (snapshotLoadOpts) {
// eslint-disable-next-line @jessie.js/no-nested-await
const xs = await doXSnap({
snapshotStream,
// TODO
snapshotDescription: `${vatID}-${snapPos}-${snapshotID}`,
name,
handleCommand,
...snapshotLoadOpts,
...meterOpts,
...xsnapOpts,
});
Expand Down
10 changes: 8 additions & 2 deletions packages/SwingSet/src/kernel/state/vatKeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -547,16 +547,21 @@ export function makeVatKeeper(
* Store a snapshot, if given a snapStore.
*
* @param {VatManager} manager
* @param {boolean} [restartWorker]
* @returns {Promise<void>}
*/
async function saveSnapshot(manager) {
async function saveSnapshot(manager, restartWorker) {
if (!snapStore || !manager.makeSnapshot) {
return;
}

// tell the manager to save a heap snapshot to the snapStore
const endPosition = getTranscriptEndPosition();
const info = await manager.makeSnapshot(endPosition, snapStore);
const info = await manager.makeSnapshot(
endPosition,
snapStore,
restartWorker,
);

const {
hash: snapshotID,
Expand Down Expand Up @@ -586,6 +591,7 @@ export function makeVatKeeper(
compressedSize,
compressSeconds,
endPosition,
restartWorker,
});
}

Expand Down
5 changes: 3 additions & 2 deletions packages/SwingSet/src/kernel/vat-loader/manager-helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
* @typedef {import('@agoric/swingset-liveslots').VatSyscallObject} VatSyscallObject
* @typedef {import('@agoric/swingset-liveslots').VatSyscallResult} VatSyscallResult
* @typedef {import('../../types-internal.js').VatManager} VatManager
* @typedef {import('../../types-internal.js').MakeSnapshot} MakeSnapshot
*/

// We use vat-centric terminology here, so "inbound" means "into a vat",
Expand Down Expand Up @@ -57,7 +58,7 @@ import {
/**
*
* @typedef { { getManager: (shutdown: () => Promise<void>,
* makeSnapshot?: (snapPos: number, ss: SnapStore) => Promise<SnapshotResult>) => VatManager,
* makeSnapshot?: MakeSnapshot) => VatManager,
* syscallFromWorker: (vso: VatSyscallObject) => VatSyscallResult,
* setDeliverToWorker: (dtw: unknown) => void,
* } } ManagerKit
Expand Down Expand Up @@ -170,7 +171,7 @@ function makeManagerKit(retainSyscall = false) {
/**
*
* @param { () => Promise<void>} shutdown
* @param {(snapPos: number, ss: SnapStore) => Promise<SnapshotResult>} [makeSnapshot]
* @param {MakeSnapshot} [makeSnapshot]
* @returns {VatManager}
*/
function getManager(shutdown, makeSnapshot) {
Expand Down
98 changes: 77 additions & 21 deletions packages/SwingSet/src/kernel/vat-loader/manager-subprocess-xsnap.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { synchronizedTee } from '@agoric/internal';
import { assert, Fail, q } from '@agoric/assert';
import { ExitCode } from '@agoric/xsnap/api.js';
import { makeManagerKit } from './manager-helper.js';
Expand Down Expand Up @@ -26,16 +27,31 @@ function parentLog(first, ...args) {
const encoder = new TextEncoder();
const decoder = new TextDecoder();

/** @param { (item: Tagged) => unknown } [handleUpstream] */
const makeRevokableHandleCommandKit = handleUpstream => {
/**
* @param {Uint8Array} msg
* @returns {Promise<Uint8Array>}
*/
const handleCommand = async msg => {
// parentLog('handleCommand', { length: msg.byteLength });
if (!handleUpstream) {
throw Fail`Worker received command after revocation`;
}
const tagged = handleUpstream(JSON.parse(decoder.decode(msg)));
return encoder.encode(JSON.stringify(tagged));
};
const revoke = () => {
handleUpstream = undefined;
};
return harden({ handleCommand, revoke });
};

/**
* @param {{
* kernelKeeper: KernelKeeper,
* kernelSlog: KernelSlog,
* startXSnap: (vatID: string, name: string,
* details: {
* bundleIDs: BundleID[],
* handleCommand: AsyncHandler,
* metered?: boolean,
* reload?: boolean } ) => Promise<XSnap>,
* startXSnap: import('../../controller/startXSnap.js').StartXSnap,
* testLog: (...args: unknown[]) => void,
* }} tools
* @returns {VatManagerFactory}
Expand Down Expand Up @@ -110,13 +126,6 @@ export function makeXsSubprocessFactory({
}
}

/** @type { (msg: Uint8Array) => Promise<Uint8Array> } */
async function handleCommand(msg) {
// parentLog('handleCommand', { length: msg.byteLength });
const tagged = handleUpstream(JSON.parse(decoder.decode(msg)));
return encoder.encode(JSON.stringify(tagged));
}

const vatKeeper = kernelKeeper.provideVatKeeper(vatID);
const snapshotInfo = vatKeeper.getSnapshotInfo();
if (snapshotInfo) {
Expand All @@ -128,12 +137,15 @@ export function makeXsSubprocessFactory({
// a shell-escape attack
const argName = `${vatID}:${vatName !== undefined ? vatName : ''}`;

/** @type {ReturnType<typeof makeRevokableHandleCommandKit> | undefined} */
let handleCommandKit = makeRevokableHandleCommandKit(handleUpstream);

// start the worker and establish a connection
const worker = await startXSnap(vatID, argName, {
let worker = await startXSnap(argName, {
bundleIDs,
handleCommand,
handleCommand: handleCommandKit.handleCommand,
metered,
reload: !!snapshotInfo,
init: snapshotInfo && { from: 'snapStore', vatID },
});

/** @type { (item: Tagged) => Promise<WorkerResults> } */
Expand Down Expand Up @@ -215,19 +227,63 @@ export function makeXsSubprocessFactory({
mk.setDeliverToWorker(deliverToWorker);

function shutdown() {
handleCommandKit?.revoke();
handleCommandKit = undefined;
return worker.close().then(_ => undefined);
}
/**
* @param {number} snapPos
* @param {SnapStore} snapStore
* @param {boolean} [restartWorker]
* @returns {Promise<SnapshotResult>}
*/
function makeSnapshot(snapPos, snapStore) {
return snapStore.saveSnapshot(
vatID,
snapPos,
worker.makeSnapshotStream(`${vatID}-${snapPos}`),
async function makeSnapshot(snapPos, snapStore, restartWorker) {
const snapshotDescription = `${vatID}-${snapPos}`;
const snapshotStream = worker.makeSnapshotStream(snapshotDescription);

if (!restartWorker) {
return snapStore.saveSnapshot(vatID, snapPos, snapshotStream);
}

/** @type {AsyncGenerator<Uint8Array, void, void>[]} */
const [restartWorkerStream, snapStoreSaveStream] = synchronizedTee(
snapshotStream,
2,
);

handleCommandKit?.revoke();
handleCommandKit = makeRevokableHandleCommandKit(handleUpstream);

let snapshotResults;
const closeP = worker.close();
[worker, snapshotResults] = await Promise.all([
startXSnap(argName, {
bundleIDs,
handleCommand: handleCommandKit.handleCommand,
metered,
init: {
from: 'snapshotStream',
snapshotStream: restartWorkerStream,
snapshotDescription,
},
}),
snapStore.saveSnapshot(vatID, snapPos, snapStoreSaveStream),
]);
await closeP;

/** @type {Partial<import('@agoric/swing-store/src/snapStore.js').SnapshotInfo>} */
const reloadSnapshotInfo = {
snapPos,
hash: snapshotResults.hash,
};

kernelSlog.write({
type: 'heap-snapshot-load',
vatID,
...reloadSnapshotInfo,
});

return snapshotResults;
}

return mk.getManager(shutdown, makeSnapshot);
Expand Down
9 changes: 6 additions & 3 deletions packages/SwingSet/src/kernel/vat-warehouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ export function makeVatWarehouse({
panic,
warehousePolicy,
}) {
const { maxVatsOnline = 50 } = warehousePolicy || {};
const { maxVatsOnline = 50, restartWorkerOnSnapshot = true } =
warehousePolicy || {};
// Often a large contract evaluation is among the first few deliveries,
// so let's do a snapshot after just a few deliveries.
const snapshotInitial = kernelKeeper.getSnapshotInitial();
Expand Down Expand Up @@ -603,8 +604,10 @@ export function makeVatWarehouse({
// vatKeeper.saveSnapshot() pushes a save-snapshot transcript
// entry, then starts a new transcript span, then pushes a
// load-snapshot entry, so that the current span always starts
// with an initialize-snapshot or load-snapshot pseudo-delivery
await vatKeeper.saveSnapshot(manager);
// with an initialize-snapshot or load-snapshot pseudo-delivery,
// regardless of whether the worker was restarted from snapshot
// or not.
await vatKeeper.saveSnapshot(manager, restartWorkerOnSnapshot);
return true;
}

Expand Down
1 change: 1 addition & 0 deletions packages/SwingSet/src/types-external.js
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ export {};
*
* @typedef {object} VatWarehousePolicy
* @property { number } [maxVatsOnline] Limit the number of simultaneous workers
* @property { boolean } [restartWorkerOnSnapshot] Reload worker immediately upon snapshot creation
*/

/**
Expand Down
4 changes: 3 additions & 1 deletion packages/SwingSet/src/types-internal.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ export {};
* bundle?: Bundle,
* }} ManagerOptions
*
* @typedef {(snapPos: number, ss: SnapStore, restartWorker?: boolean) => Promise<SnapshotResult>} MakeSnapshot
*
* @typedef { { deliver: (delivery: VatDeliveryObject, vatSyscallHandler: VatSyscallHandler)
* => Promise<VatDeliveryResult>,
* makeSnapshot?: undefined | ((snapPos: number, ss: SnapStore) => Promise<SnapshotResult>),
* makeSnapshot?: undefined | MakeSnapshot,
* shutdown: () => Promise<void>,
* } } VatManager
* @typedef { { createFromBundle: (vatID: string,
Expand Down
3 changes: 3 additions & 0 deletions packages/SwingSet/test/test-controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ test('static vats are unmetered on XS', async t => {
limited.push(args.includes('-l'));
return spawn(command, args, options);
},
warehousePolicy: {
restartWorkerOnSnapshot: false,
},
},
);
t.teardown(c.shutdown);
Expand Down
Loading

0 comments on commit 317cfb6

Please sign in to comment.