Skip to content

Commit

Permalink
feat: Add consensus-independent vat transcript archiving configuratio…
Browse files Browse the repository at this point in the history
…n to AG_COSMOS_INIT

Fixes #10036
  • Loading branch information
gibson042 committed Sep 10, 2024
1 parent 21ae513 commit a42c45f
Show file tree
Hide file tree
Showing 14 changed files with 154 additions and 52 deletions.
18 changes: 16 additions & 2 deletions golang/cosmos/x/swingset/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import (
)

const (
ConfigPrefix = "swingset"
FlagSlogfile = ConfigPrefix + ".slogfile"
ConfigPrefix = "swingset"
FlagSlogfile = ConfigPrefix + ".slogfile"
FlagVatTranscriptArchiveDir = ConfigPrefix + ".vat-transcript-archive-dir"

SnapshotRetentionOptionDebug = "debug"
SnapshotRetentionOptionOperational = "operational"
Expand Down Expand Up @@ -73,6 +74,9 @@ vat-snapshot-retention = "{{ .Swingset.VatSnapshotRetention }}"
# * "default": determined by 'pruning' ("archival" if 'pruning' is "nothing",
# otherwise "operational")
vat-transcript-retention = "{{ .Swingset.VatTranscriptRetention }}"
# Archival of historical (i.e., closed) vat transcript spans to gzipped files.
vat-transcript-archive-dir = "{{ .Swingset.VatTranscriptArchiveDir }}"
`

// SwingsetConfig defines configuration for the SwingSet VM.
Expand Down Expand Up @@ -106,6 +110,10 @@ type SwingsetConfig struct {
// * "default": determined by `pruning` ("archival" if `pruning` is
// "nothing", otherwise "operational")
VatTranscriptRetention string `mapstructure:"vat-transcript-retention" json:"vatTranscriptRetention,omitempty"`

// VatTranscriptArchiveDir controls archival of historical (i.e., closed) vat
// transcript spans to gzipped files.
VatTranscriptArchiveDir string `mapstructure:"vat-transcript-archive-dir" json:"vatTranscriptArchiveDir,omitempty"`
}

var DefaultSwingsetConfig = SwingsetConfig{
Expand Down Expand Up @@ -202,5 +210,11 @@ func SwingsetConfigFromViper(resolvedConfig servertypes.AppOptions) (*SwingsetCo
}
ssConfig.SlogFile = resolvedSlogFile

resolvedTranscriptDir, err := resolvePath(ssConfig.VatTranscriptArchiveDir, FlagVatTranscriptArchiveDir)
if err != nil {
return nil, err
}
ssConfig.VatTranscriptArchiveDir = resolvedTranscriptDir

return ssConfig, nil
}
3 changes: 2 additions & 1 deletion packages/SwingSet/src/kernel/kernel.js
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ export default function buildKernel(
console.log(`kernel terminating vat ${vatID} (failure=${shouldReject})`);
let critical = false;
insistCapData(info);
await null;
// ISSUE: terminate stuff in its own crank like creation?
// TODO: if a static vat terminates, panic the kernel?
// TODO: guard against somebody telling vatAdmin to kill a vat twice
Expand All @@ -287,7 +288,7 @@ export default function buildKernel(
// remove vatID from the list of live vats, and mark for deletion
kernelKeeper.deleteVatID(vatID);
kernelKeeper.markVatAsTerminated(vatID);
kernelKeeper.removeVatFromSwingStoreExports(vatID);
await kernelKeeper.removeVatFromSwingStoreExports(vatID);
for (const kpid of deadPromises) {
resolveToError(kpid, makeError('vat terminated'), vatID);
}
Expand Down
4 changes: 2 additions & 2 deletions packages/SwingSet/src/kernel/state/kernelKeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -930,13 +930,13 @@ export default function makeKernelKeeper(
kvStore.set(`${kernelSlot}.data.slots`, capdata.slots.join(','));
}

function removeVatFromSwingStoreExports(vatID) {
async function removeVatFromSwingStoreExports(vatID) {
// Delete primary swingstore records for this vat, in preparation
// for (slow) deletion. After this, swingstore exports will omit
// this vat. This is called from the kernel's terminateVat, which
// initiates (but does not complete) deletion.
snapStore.stopUsingLastSnapshot(vatID);
transcriptStore.stopUsingTranscript(vatID);
await transcriptStore.stopUsingTranscript(vatID);
}

/**
Expand Down
4 changes: 2 additions & 2 deletions packages/SwingSet/src/kernel/state/vatKeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ export function makeVatKeeper(
addToTranscript(makeSaveSnapshotItem(snapshotID));

// then start a new transcript span
transcriptStore.rolloverSpan(vatID);
await transcriptStore.rolloverSpan(vatID);

// then push a load-snapshot entry, so that the current span
// always starts with an initialize-worker or load-snapshot
Expand Down Expand Up @@ -715,7 +715,7 @@ export function makeVatKeeper(
return transcriptStore.deleteVatTranscripts(vatID, budget);
}

function beginNewIncarnation() {
async function beginNewIncarnation() {
if (snapStore) {
snapStore.stopUsingLastSnapshot(vatID);
}
Expand Down
46 changes: 31 additions & 15 deletions packages/cosmic-swingset/src/chain-main.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
// @ts-check

import { resolve as pathResolve } from 'path';
import path from 'node:path';
import v8 from 'node:v8';
import process from 'node:process';
import fs from 'node:fs';
import fsPromises from 'node:fs/promises';
import { performance } from 'perf_hooks';
import { resolve as importMetaResolve } from 'import-meta-resolve';
import tmpfs from 'tmp';
import { performance } from 'node:perf_hooks';
import { fork } from 'node:child_process';
import { resolve as importMetaResolve } from 'import-meta-resolve';
import tmp from 'tmp';

import { Fail, q } from '@endo/errors';
import { E } from '@endo/far';
Expand All @@ -33,6 +33,7 @@ import { makeShutdown } from '@agoric/internal/src/node/shutdown.js';
import * as STORAGE_PATH from '@agoric/internal/src/chain-storage-paths.js';
import * as ActionType from '@agoric/internal/src/action-types.js';
import { BridgeId, CosmosInitKeyToBridgeId } from '@agoric/internal';
import { makeArchiveTranscript } from '@agoric/swing-store';
import {
makeBufferedStorage,
makeReadCachingStorage,
Expand Down Expand Up @@ -76,6 +77,7 @@ const toNumber = specimen => {
* @property {number} [maxVatsOnline]
* @property {'debug' | 'operational'} [vatSnapshotRetention]
* @property {'archival' | 'operational'} [vatTranscriptRetention]
* @property {string} [vatTranscriptArchiveDir]
*/
const SwingsetConfigShape = M.splitRecord(
// All known properties are optional, but unknown properties are not allowed.
Expand All @@ -85,6 +87,7 @@ const SwingsetConfigShape = M.splitRecord(
maxVatsOnline: M.number(),
vatSnapshotRetention: M.or('debug', 'operational'),
vatTranscriptRetention: M.or('archival', 'operational'),
vatTranscriptArchiveDir: M.string(),
},
{},
);
Expand Down Expand Up @@ -159,8 +162,8 @@ const makePrefixedBridgeStorage = (
return fromBridgeStringValue(ret);
},
set: (key, value) => {
const path = `${prefix}${key}`;
const entry = [path, toBridgeStringValue(value)];
const fullPath = `${prefix}${key}`;
const entry = [fullPath, toBridgeStringValue(value)];
call(
stringify({
method: setterMethod,
Expand All @@ -169,8 +172,8 @@ const makePrefixedBridgeStorage = (
);
},
delete: key => {
const path = `${prefix}${key}`;
const entry = [path];
const fullPath = `${prefix}${key}`;
const entry = [fullPath];
call(
stringify({
method: setterMethod,
Expand Down Expand Up @@ -315,8 +318,12 @@ export default async function main(progname, args, { env, homedir, agcc }) {
/** @type {CosmosSwingsetConfig} */
const swingsetConfig = harden(initAction.resolvedConfig || {});
validateSwingsetConfig(swingsetConfig);
const { slogfile, vatSnapshotRetention, vatTranscriptRetention } =
swingsetConfig;
const {
slogfile,
vatSnapshotRetention,
vatTranscriptRetention,
vatTranscriptArchiveDir,
} = swingsetConfig;
const keepSnapshots = vatSnapshotRetention
? vatSnapshotRetention !== 'operational'
: ['1', 'true'].includes(XSNAP_KEEP_SNAPSHOTS);
Expand Down Expand Up @@ -471,14 +478,14 @@ export default async function main(progname, args, { env, homedir, agcc }) {
const swingStoreTraceFile = processValue.getPath({
envName: 'SWING_STORE_TRACE',
flagName: 'trace-store',
trueValue: pathResolve(stateDBDir, 'store-trace.log'),
trueValue: path.resolve(stateDBDir, 'store-trace.log'),
});

const nodeHeapSnapshots = Number.parseInt(NODE_HEAP_SNAPSHOTS, 10);

let lastCommitTime = 0;
let commitCallsSinceLastSnapshot = NaN;
const snapshotBaseDir = pathResolve(stateDBDir, 'node-heap-snapshots');
const snapshotBaseDir = path.resolve(stateDBDir, 'node-heap-snapshots');

if (nodeHeapSnapshots >= 0) {
fs.mkdirSync(snapshotBaseDir, { recursive: true });
Expand Down Expand Up @@ -514,7 +521,7 @@ export default async function main(progname, args, { env, homedir, agcc }) {
) {
commitCallsSinceLastSnapshot = 0;
heapSnapshot = `Heap-${process.pid}-${Date.now()}.heapsnapshot`;
const snapshotPath = pathResolve(snapshotBaseDir, heapSnapshot);
const snapshotPath = path.resolve(snapshotBaseDir, heapSnapshot);
v8.writeHeapSnapshot(snapshotPath);
heapSnapshotTime = performance.now() - t3;
}
Expand All @@ -537,6 +544,14 @@ export default async function main(progname, args, { env, homedir, agcc }) {
}
};

const archiveTranscript = vatTranscriptArchiveDir
? makeArchiveTranscript(vatTranscriptArchiveDir, {
fs,
path,
tmp,
})
: undefined;

const s = await launch({
actionQueueStorage,
highPriorityQueueStorage,
Expand All @@ -556,6 +571,7 @@ export default async function main(progname, args, { env, homedir, agcc }) {
swingStoreTraceFile,
keepSnapshots,
keepTranscripts,
archiveTranscript,
afterCommitCallback,
swingsetConfig,
});
Expand Down Expand Up @@ -608,7 +624,7 @@ export default async function main(progname, args, { env, homedir, agcc }) {
);
return performStateSyncImport(options, {
fs: { ...fs, ...fsPromises },
pathResolve,
pathResolve: path.resolve,
log: null,
});
}
Expand All @@ -632,7 +648,7 @@ export default async function main(progname, args, { env, homedir, agcc }) {
stateSyncExport = exportData;

await new Promise((resolve, reject) => {
tmpfs.dir(
tmp.dir(
{
prefix: `agd-state-sync-${blockHeight}-`,
unsafeCleanup: true,
Expand Down
2 changes: 2 additions & 0 deletions packages/cosmic-swingset/src/launch-chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ export async function launch({
swingStoreExportCallback,
keepSnapshots,
keepTranscripts,
archiveTranscript,
afterCommitCallback = async () => ({}),
swingsetConfig,
}) {
Expand Down Expand Up @@ -375,6 +376,7 @@ export async function launch({
exportCallback: swingStoreExportSyncCallback,
keepSnapshots,
keepTranscripts,
archiveTranscript,
});
const { kvStore, commit } = hostStorage;

Expand Down
44 changes: 44 additions & 0 deletions packages/swing-store/src/archiver.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { finished as streamFinishedCallback, Readable } from 'node:stream';
import { promisify } from 'node:util';
import { createGzip } from 'node:zlib';
import { withDeferredCleanup } from '@agoric/internal/src/node/utils.js';

const streamFinished = promisify(streamFinishedCallback);

/*
* @param {string} dirPath
* @param {object} powers
* @param {Pick<import('fs'), 'createWriteStream' | 'mkdirSync' | 'renameSync'>} powers.fs
* @param {Pick<import('path'), 'join'>} powers.path
* @param {Pick<import('tmp'), 'fileSync'>} powers.tmp
*/
export const makeArchiveTranscript = (dirPath, powers) => {
const { fs, path, tmp } = powers;
fs.mkdirSync(dirPath, { recursive: true });
const archiveTranscript = (spanName, entries) => {
const destPath = path.join(dirPath, `${spanName}.gz`);
return withDeferredCleanup(async addCleanup => {
const {
name: tmpName,
fd,
removeCallback,
} = tmp.fileSync({
prefix: spanName,
postfix: '.gz',
detachDescriptor: true,
});
addCleanup(() => removeCallback());
const writer = fs.createWriteStream('', { fd, flush: true });
const gzip = createGzip();
gzip.pipe(writer);
const reader = Readable.from(entries);
const destroyReader = promisify(reader.destroy.bind(reader));
addCleanup(() => destroyReader(null));
reader.pipe(gzip);
await streamFinished(gzip);
fs.renameSync(tmpName, destPath);
});
};
return archiveTranscript;
};
harden(makeArchiveTranscript);
2 changes: 2 additions & 0 deletions packages/swing-store/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ export { initSwingStore, openSwingStore, isSwingStore } from './swingStore.js';
export { makeSwingStoreExporter } from './exporter.js';
export { importSwingStore } from './importer.js';

export { makeArchiveTranscript } from './archiver.js';

// temporary, for the benefit of SwingSet/misc-tools/replay-transcript.js
export { makeSnapStore } from './snapStore.js';
// and less temporary, for SwingSet/test/vat-warehouse/test-reload-snapshot.js
Expand Down
4 changes: 3 additions & 1 deletion packages/swing-store/src/swingStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ export function makeSwingStore(dirPath, forceReset, options = {}) {
filePath = ':memory:';
}

const { traceFile, keepSnapshots, keepTranscripts } = options;
const { traceFile, keepSnapshots, keepTranscripts, archiveTranscript } =
options;

let traceOutput = traceFile
? fs.createWriteStream(path.resolve(traceFile), {
Expand Down Expand Up @@ -297,6 +298,7 @@ export function makeSwingStore(dirPath, forceReset, options = {}) {
noteExport,
{
keepTranscripts,
archiveTranscript,
},
);
const { dumpSnapshots, ...snapStore } = makeSnapStore(
Expand Down
Loading

0 comments on commit a42c45f

Please sign in to comment.