Skip to content

Commit

Permalink
Add Stats (#36)
Browse files Browse the repository at this point in the history
Add the stats infrastructure to collect and summarize loadgen run data
  • Loading branch information
mhofman authored Jan 17, 2022
2 parents 0348471 + 6ff64f6 commit c3900aa
Show file tree
Hide file tree
Showing 18 changed files with 1,715 additions and 64 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,4 @@ RUN mkdir -p $SDK_SRC $OUTPUT_DIR && chown $USER_UID:$USER_GID $SDK_SRC $OUTPUT_

USER $USER_UID:$USER_GID

ENTRYPOINT ["/tini", "--", "/app/start.sh", "--no-reset"]
ENTRYPOINT ["/tini", "--", "/app/start.sh", "--no-reset", "--test-data.docker"]
139 changes: 105 additions & 34 deletions runner/lib/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import { makeTasks as makeTestnetTasks } from './tasks/testnet.js';

import { makeChainMonitor } from './monitor/chain-monitor.js';
import { monitorSlog } from './monitor/slog-monitor.js';
import { monitorLoadgen } from './monitor/loadgen-monitor.js';
import { makeRunStats } from './stats/run.js';
import { makeTimeSource } from './helpers/time.js';

/** @typedef {import('./helpers/async.js').Task} Task */
Expand Down Expand Up @@ -205,13 +207,15 @@ const main = async (progName, rawArgs, powers) => {
throw new Error(`Unexpected profile option: ${argv.profile}`);
}

const { setupTasks, runChain, runClient, runLoadgen } = makeTasks({
spawn,
fs,
findDirByPrefix: findByPrefix,
makeFIFO,
getProcessInfo,
});
const { getEnvInfo, setupTasks, runChain, runClient, runLoadgen } = makeTasks(
{
spawn,
fs,
findDirByPrefix: findByPrefix,
makeFIFO,
getProcessInfo,
},
);

const monitorInterval =
Number(argv.monitorInterval || defaultMonitorIntervalMinutes) * 60 * 1000;
Expand All @@ -228,6 +232,17 @@ const main = async (progName, rawArgs, powers) => {
);
await fsStreamReady(outputStream);

const envInfo = await getEnvInfo({ stdout, stderr });

const runStats = makeRunStats({
metadata: {
profile: argv.profile || 'local',
testnetOrigin,
...envInfo,
testData: argv.testData,
},
});

/** @type {import('./stats/types.js').LogPerfEvent} */
const logPerfEvent = (eventType, data = {}) => {
outputStream.write(
Expand All @@ -247,20 +262,21 @@ const main = async (progName, rawArgs, powers) => {
};

/**
* @param {Object} param0
* @param {boolean} param0.chainOnly
* @param {number} param0.durationConfig
* @param {unknown} param0.loadgenConfig
* @param {boolean} param0.withMonitor
* @param {boolean} param0.saveStorage
* @param {Object} config
* @param {boolean} config.chainOnly
* @param {number} config.durationConfig
* @param {unknown} config.loadgenConfig
* @param {boolean} config.withMonitor
* @param {boolean} config.saveStorage
*/
const runStage = async ({
chainOnly,
durationConfig,
loadgenConfig,
withMonitor,
saveStorage,
}) => {
const runStage = async (config) => {
const {
chainOnly,
durationConfig,
loadgenConfig,
withMonitor,
saveStorage,
} = config;
/** @type {string | void} */
let chainStorageLocation;
currentStageTimeSource = timeSource.shift();
Expand All @@ -275,16 +291,18 @@ const main = async (progName, rawArgs, powers) => {
logPerfEvent('stage-start');
const stageStart = timeSource.shift();

const stats = runStats.newStage({
stageIndex: currentStage,
stageConfig: config,
});

/** @type {Task} */
const spawnChain = async (nextStep) => {
stageConsole.log('Running chain', {
chainOnly,
durationConfig,
loadgenConfig,
});
stageConsole.log('Running chain', config);
logPerfEvent('run-chain-start');
const runChainResult = await runChain({ stdout: out, stderr: err });
logPerfEvent('run-chain-finish');
stats.recordChainStart(timeSource.getTime());

let chainExited = false;
const done = runChainResult.done.finally(() => {
Expand Down Expand Up @@ -326,20 +344,16 @@ const main = async (progName, rawArgs, powers) => {
/** @type {(() => void) | null} */
let resolveFirstEmptyBlock = firstEmptyBlockKit.resolve;

let blockCount = 0;

const notifier = {
/** @param {{blockHeight: number, slogLines: number}} block */
/** @param {import('./stats/types.js').BlockStats} block */
blockDone(block) {
blockCount += 1;

if (resolveFirstBlockDone) {
resolveFirstBlockDone();
resolveFirstBlockDone = null;
}

if (resolveFirstEmptyBlock) {
if (block.slogLines === 0 || blockCount > 10) {
if (block.slogLines === 0 || stats.blockCount > 10) {
if (block.slogLines === 0) {
logPerfEvent('stage-first-empty-block', {
block: block.blockHeight,
Expand All @@ -364,6 +378,7 @@ const main = async (progName, rawArgs, powers) => {
{ slogLines },
{
...makeConsole('monitor-slog', out, err),
stats,
notifier,
chainMonitor,
localTimeSource: timeSource,
Expand All @@ -374,6 +389,7 @@ const main = async (progName, rawArgs, powers) => {
await aggregateTryFinally(
async () => {
await orInterrupt(runChainResult.ready);
stats.recordChainReady(timeSource.getTime());
logPerfEvent('chain-ready');
stageConsole.log('Chain ready');

Expand Down Expand Up @@ -411,9 +427,9 @@ const main = async (progName, rawArgs, powers) => {
const spawnClient = async (nextStep) => {
stageConsole.log('Running client');
logPerfEvent('run-client-start');
const runClientStart = timeSource.shift();
const runClientResult = await runClient({ stdout: out, stderr: err });
logPerfEvent('run-client-finish');
stats.recordClientStart(timeSource.getTime());

let clientExited = false;
const done = runClientResult.done.finally(() => {
Expand All @@ -424,9 +440,18 @@ const main = async (progName, rawArgs, powers) => {
await aggregateTryFinally(
async () => {
await orInterrupt(runClientResult.ready);
stats.recordClientReady(timeSource.getTime());
logPerfEvent('client-ready', {
duration: runClientStart.now(),
duration: stats.clientInitDuration,
});
if (!runStats.walletDeployEndedAt) {
runStats.recordWalletDeployStart(
/** @type {number} */ (stats.clientStartedAt),
);
runStats.recordWalletDeployEnd(
/** @type {number} */ (stats.clientReadyAt),
);
}

await nextStep(done);
},
Expand All @@ -450,6 +475,7 @@ const main = async (progName, rawArgs, powers) => {
stderr: err,
config: loadgenConfig,
});
stats.recordLoadgenStart(timeSource.getTime());
logPerfEvent('run-loadgen-finish');

let loadgenExited = false;
Expand All @@ -458,10 +484,24 @@ const main = async (progName, rawArgs, powers) => {
logPerfEvent('loadgen-stopped');
});

const monitorLoadgenDone = monitorLoadgen(runLoadgenResult, {
...makeConsole('monitor-loadgen', out, err),
stats,
});

await aggregateTryFinally(
async () => {
await orInterrupt(runLoadgenResult.ready);
stats.recordLoadgenReady(timeSource.getTime());
logPerfEvent('loadgen-ready');
if (!runStats.loadgenDeployEndedAt) {
runStats.recordLoadgenDeployStart(
/** @type {number} */ (stats.loadgenStartedAt),
);
runStats.recordLoadgenDeployEnd(
/** @type {number} */ (stats.loadgenReadyAt),
);
}

await nextStep(done);
},
Expand All @@ -472,6 +512,8 @@ const main = async (progName, rawArgs, powers) => {
runLoadgenResult.stop();
await done;
}

await monitorLoadgenDone;
},
);
};
Expand Down Expand Up @@ -500,6 +542,7 @@ const main = async (progName, rawArgs, powers) => {
stageConsole.log('Stage ready, no time to sleep, moving on');
}
}
stats.recordReady(timeSource.getTime());
logPerfEvent('stage-ready');
await nextStep(sleeping).finally(sleepCancel.resolve);
logPerfEvent('stage-shutdown');
Expand Down Expand Up @@ -530,8 +573,9 @@ const main = async (progName, rawArgs, powers) => {
} else {
tasks.push(stageReady);
}

stats.recordStart(timeSource.getTime());
await sequential(...tasks)((stop) => stop);
stats.recordEnd(timeSource.getTime());
},
async () =>
aggregateTryFinally(
Expand All @@ -554,6 +598,16 @@ const main = async (progName, rawArgs, powers) => {
releaseInterrupt();

logPerfEvent('stage-finish');
stageConsole.log('Live blocks stats:', {
...((stats.blocksSummaries && stats.blocksSummaries.onlyLive) || {
blockCount: 0,
}),
});
stageConsole.log('Cycles stats:', {
...((stats.cyclesSummaries && stats.cyclesSummaries.all) || {
cycleCount: 0,
}),
});
currentStageTimeSource = timeSource;
},
),
Expand All @@ -565,6 +619,7 @@ const main = async (progName, rawArgs, powers) => {
await aggregateTryFinally(
async () => {
const { console: initConsole, out, err } = makeConsole('init');
runStats.recordStart(timeSource.getTime());
logPerfEvent('start', {
cpuTimeOffset,
timeOrigin: timeSource.timeOrigin,
Expand Down Expand Up @@ -665,10 +720,26 @@ const main = async (progName, rawArgs, powers) => {
saveStorage,
});
}

runStats.recordEnd(timeSource.getTime());
},
async () => {
logPerfEvent('finish', { stats: runStats });

outputStream.end();

const { console } = makeConsole('summary');
console.log('Live blocks stats:', {
...(runStats.liveBlocksSummary || {
blockCount: 0,
}),
});
console.log('Cycles stats:', {
...(runStats.cyclesSummary || {
cycleCount: 0,
}),
});

await finished(outputStream);
},
);
Expand Down
31 changes: 31 additions & 0 deletions runner/lib/monitor/loadgen-monitor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/** @typedef {import('../stats/types.js').StageStats} StageStats */

/**
* @param {import("../tasks/types.js").RunLoadgenInfo} loadgenInfo
* @param {Object} param1
* @param {StageStats} param1.stats
* @param {Console} param1.console
*/
export const monitorLoadgen = async ({ taskEvents }, { stats, console }) => {
for await (const event of taskEvents) {
const { time } = event;
switch (event.type) {
case 'start': {
const { task, seq } = event;
console.log('start', task, seq);
const cycle = stats.getOrMakeCycle({ task, seq });
cycle.recordStart(time);
break;
}
case 'finish': {
const { task, seq, success } = event;
console.log('finish', event.task, event.seq);
const cycle = stats.getOrMakeCycle({ task, seq });
cycle.recordEnd(time, success);
break;
}
case 'status':
default:
}
}
};
Loading

0 comments on commit c3900aa

Please sign in to comment.