Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate stats from loadgen run #36

Merged
merged 11 commits into from
Jan 17, 2022
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,4 @@ RUN mkdir -p $SDK_SRC $OUTPUT_DIR && chown $USER_UID:$USER_GID $SDK_SRC $OUTPUT_

USER $USER_UID:$USER_GID

ENTRYPOINT ["/tini", "--", "/app/start.sh", "--no-reset"]
ENTRYPOINT ["/tini", "--", "/app/start.sh", "--no-reset", "--test-data.docker"]
139 changes: 105 additions & 34 deletions runner/lib/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import { makeTasks as makeTestnetTasks } from './tasks/testnet.js';

import { makeChainMonitor } from './monitor/chain-monitor.js';
import { monitorSlog } from './monitor/slog-monitor.js';
import { monitorLoadgen } from './monitor/loadgen-monitor.js';
import { makeRunStats } from './stats/run.js';
import { makeTimeSource } from './helpers/time.js';

/** @typedef {import('./helpers/async.js').Task} Task */
Expand Down Expand Up @@ -205,13 +207,15 @@ const main = async (progName, rawArgs, powers) => {
throw new Error(`Unexpected profile option: ${argv.profile}`);
}

const { setupTasks, runChain, runClient, runLoadgen } = makeTasks({
spawn,
fs,
findDirByPrefix: findByPrefix,
makeFIFO,
getProcessInfo,
});
const { getEnvInfo, setupTasks, runChain, runClient, runLoadgen } = makeTasks(
{
spawn,
fs,
findDirByPrefix: findByPrefix,
makeFIFO,
getProcessInfo,
},
);

const monitorInterval =
Number(argv.monitorInterval || defaultMonitorIntervalMinutes) * 60 * 1000;
Expand All @@ -228,6 +232,17 @@ const main = async (progName, rawArgs, powers) => {
);
await fsStreamReady(outputStream);

const envInfo = await getEnvInfo({ stdout, stderr });

const runStats = makeRunStats({
metadata: {
profile: argv.profile || 'local',
testnetOrigin,
...envInfo,
testData: argv.testData,
},
});

/** @type {import('./stats/types.js').LogPerfEvent} */
const logPerfEvent = (eventType, data = {}) => {
outputStream.write(
Expand All @@ -247,20 +262,21 @@ const main = async (progName, rawArgs, powers) => {
};

/**
* @param {Object} param0
* @param {boolean} param0.chainOnly
* @param {number} param0.durationConfig
* @param {unknown} param0.loadgenConfig
* @param {boolean} param0.withMonitor
* @param {boolean} param0.saveStorage
* @param {Object} config
* @param {boolean} config.chainOnly
* @param {number} config.durationConfig
* @param {unknown} config.loadgenConfig
* @param {boolean} config.withMonitor
* @param {boolean} config.saveStorage
*/
const runStage = async ({
chainOnly,
durationConfig,
loadgenConfig,
withMonitor,
saveStorage,
}) => {
const runStage = async (config) => {
const {
chainOnly,
durationConfig,
loadgenConfig,
withMonitor,
saveStorage,
} = config;
/** @type {string | void} */
let chainStorageLocation;
currentStageTimeSource = timeSource.shift();
Expand All @@ -275,16 +291,18 @@ const main = async (progName, rawArgs, powers) => {
logPerfEvent('stage-start');
const stageStart = timeSource.shift();

const stats = runStats.newStage({
stageIndex: currentStage,
stageConfig: config,
});

/** @type {Task} */
const spawnChain = async (nextStep) => {
stageConsole.log('Running chain', {
chainOnly,
durationConfig,
loadgenConfig,
});
stageConsole.log('Running chain', config);
logPerfEvent('run-chain-start');
const runChainResult = await runChain({ stdout: out, stderr: err });
logPerfEvent('run-chain-finish');
stats.recordChainStart(timeSource.getTime());

let chainExited = false;
const done = runChainResult.done.finally(() => {
Expand Down Expand Up @@ -326,20 +344,16 @@ const main = async (progName, rawArgs, powers) => {
/** @type {(() => void) | null} */
let resolveFirstEmptyBlock = firstEmptyBlockKit.resolve;

let blockCount = 0;

const notifier = {
/** @param {{blockHeight: number, slogLines: number}} block */
/** @param {import('./stats/types.js').BlockStats} block */
blockDone(block) {
blockCount += 1;

if (resolveFirstBlockDone) {
resolveFirstBlockDone();
resolveFirstBlockDone = null;
}

if (resolveFirstEmptyBlock) {
if (block.slogLines === 0 || blockCount > 10) {
if (block.slogLines === 0 || stats.blockCount > 10) {
if (block.slogLines === 0) {
logPerfEvent('stage-first-empty-block', {
block: block.blockHeight,
Expand All @@ -364,6 +378,7 @@ const main = async (progName, rawArgs, powers) => {
{ slogLines },
{
...makeConsole('monitor-slog', out, err),
stats,
notifier,
chainMonitor,
localTimeSource: timeSource,
Expand All @@ -374,6 +389,7 @@ const main = async (progName, rawArgs, powers) => {
await aggregateTryFinally(
async () => {
await orInterrupt(runChainResult.ready);
stats.recordChainReady(timeSource.getTime());
logPerfEvent('chain-ready');
stageConsole.log('Chain ready');

Expand Down Expand Up @@ -411,9 +427,9 @@ const main = async (progName, rawArgs, powers) => {
const spawnClient = async (nextStep) => {
stageConsole.log('Running client');
logPerfEvent('run-client-start');
const runClientStart = timeSource.shift();
const runClientResult = await runClient({ stdout: out, stderr: err });
logPerfEvent('run-client-finish');
stats.recordClientStart(timeSource.getTime());

let clientExited = false;
const done = runClientResult.done.finally(() => {
Expand All @@ -424,9 +440,18 @@ const main = async (progName, rawArgs, powers) => {
await aggregateTryFinally(
async () => {
await orInterrupt(runClientResult.ready);
stats.recordClientReady(timeSource.getTime());
logPerfEvent('client-ready', {
duration: runClientStart.now(),
duration: stats.clientInitDuration,
});
if (!runStats.walletDeployEndedAt) {
runStats.recordWalletDeployStart(
/** @type {number} */ (stats.clientStartedAt),
);
runStats.recordWalletDeployEnd(
/** @type {number} */ (stats.clientReadyAt),
);
}

await nextStep(done);
},
Expand All @@ -450,6 +475,7 @@ const main = async (progName, rawArgs, powers) => {
stderr: err,
config: loadgenConfig,
});
stats.recordLoadgenStart(timeSource.getTime());
logPerfEvent('run-loadgen-finish');

let loadgenExited = false;
Expand All @@ -458,10 +484,24 @@ const main = async (progName, rawArgs, powers) => {
logPerfEvent('loadgen-stopped');
});

const monitorLoadgenDone = monitorLoadgen(runLoadgenResult, {
...makeConsole('monitor-loadgen', out, err),
stats,
});

await aggregateTryFinally(
async () => {
await orInterrupt(runLoadgenResult.ready);
stats.recordLoadgenReady(timeSource.getTime());
logPerfEvent('loadgen-ready');
if (!runStats.loadgenDeployEndedAt) {
runStats.recordLoadgenDeployStart(
/** @type {number} */ (stats.loadgenStartedAt),
);
runStats.recordLoadgenDeployEnd(
/** @type {number} */ (stats.loadgenReadyAt),
);
}

await nextStep(done);
},
Expand All @@ -472,6 +512,8 @@ const main = async (progName, rawArgs, powers) => {
runLoadgenResult.stop();
await done;
}

await monitorLoadgenDone;
},
);
};
Expand Down Expand Up @@ -500,6 +542,7 @@ const main = async (progName, rawArgs, powers) => {
stageConsole.log('Stage ready, no time to sleep, moving on');
}
}
stats.recordReady(timeSource.getTime());
logPerfEvent('stage-ready');
await nextStep(sleeping).finally(sleepCancel.resolve);
logPerfEvent('stage-shutdown');
Expand Down Expand Up @@ -530,8 +573,9 @@ const main = async (progName, rawArgs, powers) => {
} else {
tasks.push(stageReady);
}

stats.recordStart(timeSource.getTime());
await sequential(...tasks)((stop) => stop);
stats.recordEnd(timeSource.getTime());
},
async () =>
aggregateTryFinally(
Expand All @@ -554,6 +598,16 @@ const main = async (progName, rawArgs, powers) => {
releaseInterrupt();

logPerfEvent('stage-finish');
stageConsole.log('Live blocks stats:', {
...((stats.blocksSummaries && stats.blocksSummaries.onlyLive) || {
blockCount: 0,
}),
});
stageConsole.log('Cycles stats:', {
...((stats.cyclesSummaries && stats.cyclesSummaries.all) || {
cycleCount: 0,
}),
});
currentStageTimeSource = timeSource;
},
),
Expand All @@ -565,6 +619,7 @@ const main = async (progName, rawArgs, powers) => {
await aggregateTryFinally(
async () => {
const { console: initConsole, out, err } = makeConsole('init');
runStats.recordStart(timeSource.getTime());
logPerfEvent('start', {
cpuTimeOffset,
timeOrigin: timeSource.timeOrigin,
Expand Down Expand Up @@ -665,10 +720,26 @@ const main = async (progName, rawArgs, powers) => {
saveStorage,
});
}

runStats.recordEnd(timeSource.getTime());
},
async () => {
logPerfEvent('finish', { stats: runStats });

outputStream.end();

const { console } = makeConsole('summary');
console.log('Live blocks stats:', {
...(runStats.liveBlocksSummary || {
blockCount: 0,
}),
});
console.log('Cycles stats:', {
...(runStats.cyclesSummary || {
cycleCount: 0,
}),
});

await finished(outputStream);
},
);
Expand Down
31 changes: 31 additions & 0 deletions runner/lib/monitor/loadgen-monitor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/** @typedef {import('../stats/types.js').StageStats} StageStats */

/**
* @param {import("../tasks/types.js").RunLoadgenInfo} loadgenInfo
* @param {Object} param1
* @param {StageStats} param1.stats
* @param {Console} param1.console
*/
export const monitorLoadgen = async ({ taskEvents }, { stats, console }) => {
for await (const event of taskEvents) {
const { time } = event;
switch (event.type) {
case 'start': {
const { task, seq } = event;
console.log('start', task, seq);
const cycle = stats.getOrMakeCycle({ task, seq });
cycle.recordStart(time);
break;
}
case 'finish': {
const { task, seq, success } = event;
console.log('finish', event.task, event.seq);
const cycle = stats.getOrMakeCycle({ task, seq });
cycle.recordEnd(time, success);
break;
}
case 'status':
default:
}
}
};
Loading