Skip to content

Commit

Permalink
refactor(runner): virtualize time keeping
Browse files Browse the repository at this point in the history
  • Loading branch information
mhofman committed Nov 24, 2021
1 parent 35b972b commit 230b8de
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 42 deletions.
40 changes: 40 additions & 0 deletions runner/lib/helpers/time.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/** @typedef {number} TimeValueS */

/**
* @param {number} [resolution] number of decimal digits
* @param {number} [inputAdjustment] number of decimal digits the input is already shifted
*/
export const makeRounder = (resolution = 6, inputAdjustment = 0) => {
const factor = 10 ** resolution;
const valueFactor = 10 ** (resolution - inputAdjustment);
/**
* @param {number} value
* @returns {number}
*/
return (value) => Math.round(value * valueFactor) / factor;
};

/**
* @param {Object} param0
* @param {Pick<Performance, 'timeOrigin' | 'now'>} param0.performance
* @param {number} [param0.resolution] number of decimal digits
* @param {TimeValueS} [param0.offset] origin offset to apply
*/
export const makeTimeSource = ({
performance,
resolution = 6,
offset: initialOffset = 0,
}) => {
const offsetMs = initialOffset * 1000;
const rounder = makeRounder(resolution, 3);

const timeOrigin = rounder(performance.timeOrigin + offsetMs);
const getTime = () => rounder(performance.timeOrigin + performance.now());
const now = () => rounder(performance.now() - offsetMs);
const shift = (offset = now()) =>
makeTimeSource({ performance, resolution, offset: offset + initialOffset });

return { timeOrigin, getTime, now, shift };
};

/** @typedef {ReturnType<makeTimeSource>} TimeSource */
58 changes: 30 additions & 28 deletions runner/lib/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { makeTasks as makeTestnetTasks } from './tasks/testnet.js';

import { makeChainMonitor } from './monitor/chain-monitor.js';
import { monitorSlog } from './monitor/slog-monitor.js';
import { makeTimeSource } from './helpers/time.js';

/** @typedef {import('./helpers/async.js').Task} Task */

Expand Down Expand Up @@ -214,8 +215,10 @@ const main = async (progName, rawArgs, powers) => {
Number(argv.monitorInterval || defaultMonitorIntervalMinutes) * 60 * 1000;

let currentStage = -1;
let currentStageElapsedOffsetNs = 0;
const timeSource = makeTimeSource({ performance });
const cpuTimeOffset = await getCPUTimeOffset();
const cpuTimeSource = timeSource.shift(0 - cpuTimeOffset);
let currentStageTimeSource = timeSource;

const outputStream = fsStream.createWriteStream(
joinPath(outputDir, 'perf.jsonl'),
Expand All @@ -225,13 +228,12 @@ const main = async (progName, rawArgs, powers) => {

/** @type {import('./stats/types.js').LogPerfEvent} */
const logPerfEvent = (eventType, data = {}) => {
const perfNowNs = performance.now() * 1000;
outputStream.write(
JSON.stringify(
{
timestamp: Math.round(perfNowNs) / 1e6,
timestamp: timeSource.now(),
stage: currentStage,
elapsed: Math.round(perfNowNs - currentStageElapsedOffsetNs) / 1e6,
elapsed: currentStageTimeSource.now(),
time: undefined, // Placeholder to put data.time before type if it exists
type: `perf-${eventType}`,
...data,
Expand All @@ -245,21 +247,21 @@ const main = async (progName, rawArgs, powers) => {
/**
* @param {Object} param0
* @param {boolean} param0.chainOnly
* @param {number} param0.duration
* @param {number} param0.durationConfig
* @param {unknown} param0.loadgenConfig
* @param {boolean} param0.withMonitor
* @param {boolean} param0.saveStorage
*/
const runStage = async ({
chainOnly,
duration,
durationConfig,
loadgenConfig,
withMonitor,
saveStorage,
}) => {
/** @type {string | void} */
let chainStorageLocation;
currentStageElapsedOffsetNs = performance.now() * 1000;
currentStageTimeSource = timeSource.shift();

const { out, err } = makeConsole(`stage-${currentStage}`);
const { console: stageConsole } = makeConsole('runner', out, err);
Expand All @@ -269,11 +271,15 @@ const main = async (progName, rawArgs, powers) => {
});

logPerfEvent('stage-start');
const stageStart = performance.now();
const stageStart = timeSource.shift();

/** @type {Task} */
const spawnChain = async (nextStep) => {
stageConsole.log('Running chain', { chainOnly, duration, loadgenConfig });
stageConsole.log('Running chain', {
chainOnly,
durationConfig,
loadgenConfig,
});
logPerfEvent('run-chain-start');
const runChainResult = await runChain({ stdout: out, stderr: err });
logPerfEvent('run-chain-finish');
Expand All @@ -284,8 +290,9 @@ const main = async (progName, rawArgs, powers) => {
logPerfEvent('chain-stopped');
});

currentStageElapsedOffsetNs =
(runChainResult.processInfo.startTimestamp - cpuTimeOffset) * 1e6;
currentStageTimeSource = cpuTimeSource.shift(
runChainResult.processInfo.startTimestamp,
);
chainStorageLocation = runChainResult.storageLocation;

const slogOutput = zlib.createGzip({
Expand All @@ -308,7 +315,7 @@ const main = async (progName, rawArgs, powers) => {
const chainMonitor = makeChainMonitor(runChainResult, {
...makeConsole('monitor-chain', out, err),
logPerfEvent,
cpuTimeOffset,
cpuTimeSource,
dirDiskUsage,
});
chainMonitor.start(monitorInterval);
Expand All @@ -317,6 +324,7 @@ const main = async (progName, rawArgs, powers) => {
...makeConsole('monitor-slog', out, err),
resolveFirstEmptyBlock,
chainMonitor,
localTimeSource: timeSource,
logPerfEvent,
slogOutput,
});
Expand Down Expand Up @@ -357,7 +365,7 @@ const main = async (progName, rawArgs, powers) => {
const spawnClient = async (nextStep) => {
stageConsole.log('Running client');
logPerfEvent('run-client-start');
const runClientStart = performance.now();
const runClientStart = timeSource.shift();
const runClientResult = await runClient({ stdout: out, stderr: err });
logPerfEvent('run-client-finish');

Expand All @@ -371,8 +379,7 @@ const main = async (progName, rawArgs, powers) => {
async () => {
await orInterrupt(runClientResult.ready);
logPerfEvent('client-ready', {
duration:
Math.round((performance.now() - runClientStart) * 1000) / 1e6,
duration: runClientStart.now(),
});

await nextStep(done);
Expand Down Expand Up @@ -429,20 +436,17 @@ const main = async (progName, rawArgs, powers) => {
let sleeping;
/** @type {import("./sdk/promise-kit.js").PromiseRecord<void>} */
const sleepCancel = makePromiseKit();
if (duration < 0) {
if (durationConfig < 0) {
// sleeping forever
sleeping = new Promise(() => {});
stageConsole.log('Stage ready, waiting for end of chain');
} else {
const sleepTime = Math.max(
0,
duration - (performance.now() - stageStart),
);
const sleepTime = Math.max(0, durationConfig - stageStart.now());
if (sleepTime) {
sleeping = sleep(sleepTime, sleepCancel.promise);
sleeping = sleep(sleepTime * 1000, sleepCancel.promise);
stageConsole.log(
'Stage ready, going to sleep for',
Math.round(sleepTime / (1000 * 60)),
Math.round(sleepTime / 60),
'minutes',
);
} else {
Expand Down Expand Up @@ -504,7 +508,7 @@ const main = async (progName, rawArgs, powers) => {
releaseInterrupt();

logPerfEvent('stage-finish');
currentStageElapsedOffsetNs = 0;
currentStageTimeSource = timeSource;
},
),
);
Expand All @@ -517,7 +521,7 @@ const main = async (progName, rawArgs, powers) => {
const { console: initConsole, out, err } = makeConsole('init');
logPerfEvent('start', {
cpuTimeOffset,
timeOrigin: performance.timeOrigin / 1000,
timeOrigin: timeSource.timeOrigin,
// TODO: add other interesting info here
});

Expand Down Expand Up @@ -604,14 +608,12 @@ const main = async (progName, rawArgs, powers) => {
? Number(stageConfig.duration)
: (!(chainOnly && makeTasks === makeLocalChainTasks) &&
sharedStageDurationMinutes) ||
0) *
60 *
1000;
0) * 60;

// eslint-disable-next-line no-await-in-loop
await runStage({
chainOnly,
duration,
durationConfig: duration,
loadgenConfig,
withMonitor,
saveStorage,
Expand Down
11 changes: 3 additions & 8 deletions runner/lib/monitor/chain-monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
/* eslint-disable no-continue */

import { basename } from 'path';
import { performance } from 'perf_hooks';

import { PromiseAllOrErrors, warnOnRejection } from '../helpers/async.js';

Expand All @@ -13,12 +12,12 @@ const vatIdentifierRE = /^(v\d+):(.*)$/;
* @param {Object} param1
* @param {Console} param1.console
* @param {import('../stats/types.js').LogPerfEvent} param1.logPerfEvent
* @param {number} param1.cpuTimeOffset
* @param {import('../helpers/time.js').TimeSource} param1.cpuTimeSource
* @param {import('../helpers/fs.js').DirDiskUsage} param1.dirDiskUsage
*/
export const makeChainMonitor = (
{ storageLocation, processInfo: kernelProcessInfo },
{ console, logPerfEvent, cpuTimeOffset, dirDiskUsage },
{ console, logPerfEvent, cpuTimeSource, dirDiskUsage },
) => {
/**
* @typedef {{
Expand Down Expand Up @@ -137,11 +136,7 @@ export const makeChainMonitor = (
const { times, memory } = await processInfo.getUsageSnapshot();
logPerfEvent('chain-process-usage', {
...eventData,
real:
Math.round(
performance.now() * 1000 -
(processInfo.startTimestamp - cpuTimeOffset) * 1e6,
) / 1e6,
real: cpuTimeSource.shift(processInfo.startTimestamp).now(),
...times,
...memory,
});
Expand Down
20 changes: 14 additions & 6 deletions runner/lib/monitor/slog-monitor.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
/* global console:off */
/* eslint-disable no-continue */

import { performance } from 'perf_hooks';

import { warnOnRejection } from '../helpers/async.js';

/**
Expand Down Expand Up @@ -99,12 +97,20 @@ const slogEventRE = filterSlogEvent([
* @param {() => void} param1.resolveFirstEmptyBlock
* @param {ReturnType<import("./chain-monitor").makeChainMonitor>} [param1.chainMonitor]
* @param {import("../stats/types.js").LogPerfEvent} param1.logPerfEvent
* @param {import("../helpers/time.js").TimeSource} [param1.localTimeSource]
* @param {import("stream").Writable} [param1.slogOutput]
* @param {Console} param1.console
*/
export const monitorSlog = async (
{ slogLines },
{ resolveFirstEmptyBlock, chainMonitor, logPerfEvent, slogOutput, console },
{
resolveFirstEmptyBlock,
chainMonitor,
localTimeSource,
logPerfEvent,
slogOutput,
console,
},
) => {
/** @type {number | null} */
let slogStart = null;
Expand All @@ -123,7 +129,7 @@ export const monitorSlog = async (
// There is a risk we could be late to the party here, with the chain
// having started some time before us but in reality we usually find
// the process before it starts the kernel
slogStart = performance.now() / 1000;
slogStart = localTimeSource ? localTimeSource.now() : 0;
warnOnRejection(
chainMonitor.logStorageUsage(),
console,
Expand All @@ -138,7 +144,7 @@ export const monitorSlog = async (
// the time and type tested prefix is guaranteed to be single-byte.
if (!slogEventRE.test(line.toString('ascii', 0, 100))) continue;

const localEventTime = performance.timeOrigin + performance.now();
const localEventTime = localTimeSource && localTimeSource.getTime();

/** @type {SlogSupportedEvent} */
let event;
Expand All @@ -149,7 +155,9 @@ export const monitorSlog = async (
continue;
}

const delay = Math.round(localEventTime - event.time * 1000);
const delay = localEventTime
? Math.round((localEventTime - event.time) * 1000)
: 0;

if (delay > 100) {
console.log('slog event', event.type, 'delay', delay, 'ms');
Expand Down

0 comments on commit 230b8de

Please sign in to comment.