Skip to content

Commit

Permalink
refactor(runner): move slog based triggers out of monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
mhofman committed Nov 24, 2021
1 parent 230b8de commit 3050fd7
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 44 deletions.
74 changes: 60 additions & 14 deletions runner/lib/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { promisify } from 'util';
import {
pipeline as pipelineCallback,
finished as finishedCallback,
Readable,
PassThrough,
} from 'stream';

import yargsParser from 'yargs-parser';
Expand Down Expand Up @@ -295,6 +297,9 @@ const main = async (progName, rawArgs, powers) => {
);
chainStorageLocation = runChainResult.storageLocation;

const slogLinesStream = Readable.from(runChainResult.slogLines);
const slogLines = new PassThrough({ objectMode: true });

const slogOutput = zlib.createGzip({
level: zlib.constants.Z_BEST_COMPRESSION,
});
Expand All @@ -304,13 +309,48 @@ const main = async (progName, rawArgs, powers) => {
await fsStreamReady(slogOutputWriteStream);
// const slogOutput = slogOutputWriteStream;
// const slogOutputPipeResult = finished(slogOutput);
const slogOutputPipeResult = pipeline(slogOutput, slogOutputWriteStream);
slogLinesStream.pipe(slogLines);
const slogOutputPipeResult = pipeline(
slogLinesStream,
slogOutput,
slogOutputWriteStream,
);

/** @type {import("./sdk/promise-kit.js").PromiseRecord<void>} */
const firstBlockDoneKit = makePromiseKit();
/** @type {(() => void) | null} */
let resolveFirstBlockDone = firstBlockDoneKit.resolve;

/** @type {import("./sdk/promise-kit.js").PromiseRecord<void>} */
const {
promise: chainFirstEmptyBlock,
resolve: resolveFirstEmptyBlock,
} = makePromiseKit();
const firstEmptyBlockKit = makePromiseKit();
/** @type {(() => void) | null} */
let resolveFirstEmptyBlock = firstEmptyBlockKit.resolve;

let blockCount = 0;

const notifier = {
/** @param {{blockHeight: number, slogLines: number}} block */
blockDone(block) {
blockCount += 1;

if (resolveFirstBlockDone) {
resolveFirstBlockDone();
resolveFirstBlockDone = null;
}

if (resolveFirstEmptyBlock) {
if (block.slogLines === 0 || blockCount > 10) {
if (block.slogLines === 0) {
logPerfEvent('stage-first-empty-block', {
block: block.blockHeight,
});
}
resolveFirstEmptyBlock();
resolveFirstEmptyBlock = null;
}
}
},
};

const chainMonitor = makeChainMonitor(runChainResult, {
...makeConsole('monitor-chain', out, err),
Expand All @@ -320,22 +360,28 @@ const main = async (progName, rawArgs, powers) => {
});
chainMonitor.start(monitorInterval);

const slogMonitorDone = monitorSlog(runChainResult, {
...makeConsole('monitor-slog', out, err),
resolveFirstEmptyBlock,
chainMonitor,
localTimeSource: timeSource,
logPerfEvent,
slogOutput,
});
const slogMonitorDone = monitorSlog(
{ slogLines },
{
...makeConsole('monitor-slog', out, err),
notifier,
chainMonitor,
localTimeSource: timeSource,
logPerfEvent,
},
);

await aggregateTryFinally(
async () => {
await orInterrupt(runChainResult.ready);
logPerfEvent('chain-ready');
stageConsole.log('Chain ready');

await orInterrupt(chainFirstEmptyBlock);
await Promise.race([
slogMonitorDone,
orInterrupt(firstBlockDoneKit.promise),
]);
await orInterrupt(firstEmptyBlockKit.promise);

await nextStep(done);
},
Expand Down
36 changes: 6 additions & 30 deletions runner/lib/monitor/slog-monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,36 +94,23 @@ const slogEventRE = filterSlogEvent([
/**
* @param {Pick<import("../tasks/types.js").RunChainInfo, 'slogLines'>} chainInfo
* @param {Object} param1
* @param {() => void} param1.resolveFirstEmptyBlock
* @param {{blockDone(stats: {blockHeight: number, slogLines: number}): void}} [param1.notifier]
* @param {ReturnType<import("./chain-monitor").makeChainMonitor>} [param1.chainMonitor]
* @param {import("../stats/types.js").LogPerfEvent} param1.logPerfEvent
* @param {import("../helpers/time.js").TimeSource} [param1.localTimeSource]
* @param {import("stream").Writable} [param1.slogOutput]
* @param {Console} param1.console
*/
export const monitorSlog = async (
{ slogLines },
{
resolveFirstEmptyBlock,
chainMonitor,
localTimeSource,
logPerfEvent,
slogOutput,
console,
},
{ notifier, chainMonitor, localTimeSource, logPerfEvent, console },
) => {
/** @type {number | null} */
let slogStart = null;

let slogBlocksSeen = 0;
let slogEmptyBlocksSeen = 0;
let slogLinesInBlock = 0;

for await (const line of slogLines) {
if (slogOutput) {
slogOutput.write(line);
}

if (slogStart == null && chainMonitor) {
// TODO: figure out a better way
// There is a risk we could be late to the party here, with the chain
Expand Down Expand Up @@ -193,12 +180,8 @@ export const monitorSlog = async (
if (!slogBlocksSeen) {
logPerfEvent('stage-first-block', { block: blockHeight });
if (chainMonitor) {
await chainMonitor.logProcessUsage().catch((usageErr) => {
// Abuse first empty block as it will be awaited before monitorChain
// And won't abruptly end our monitor
// @ts-ignore resolving with a rejected promise is still "void" ;)
resolveFirstEmptyBlock(Promise.reject(usageErr));
});
// This will abruptly end the monitor if there is an error
await chainMonitor.logProcessUsage();
}
}
console.log('begin-block', blockHeight);
Expand All @@ -223,15 +206,8 @@ export const monitorSlog = async (
const { blockHeight = 0 } = event;
// Finish line itself doesn't count
slogLinesInBlock -= 1;
if (slogLinesInBlock === 0) {
if (!slogEmptyBlocksSeen) {
logPerfEvent('stage-first-empty-block', {
block: blockHeight,
});
resolveFirstEmptyBlock();
}
slogEmptyBlocksSeen += 1;
}
notifier &&
notifier.blockDone({ blockHeight, slogLines: slogLinesInBlock });

console.log(
'end-block',
Expand Down

0 comments on commit 3050fd7

Please sign in to comment.