diff --git a/runner/lib/main.js b/runner/lib/main.js index 7d8c2b2..32222fa 100644 --- a/runner/lib/main.js +++ b/runner/lib/main.js @@ -7,6 +7,8 @@ import { promisify } from 'util'; import { pipeline as pipelineCallback, finished as finishedCallback, + Readable, + PassThrough, } from 'stream'; import yargsParser from 'yargs-parser'; @@ -295,6 +297,9 @@ const main = async (progName, rawArgs, powers) => { ); chainStorageLocation = runChainResult.storageLocation; + const slogLinesStream = Readable.from(runChainResult.slogLines); + const slogLines = new PassThrough({ objectMode: true }); + const slogOutput = zlib.createGzip({ level: zlib.constants.Z_BEST_COMPRESSION, }); @@ -304,13 +309,48 @@ const main = async (progName, rawArgs, powers) => { await fsStreamReady(slogOutputWriteStream); // const slogOutput = slogOutputWriteStream; // const slogOutputPipeResult = finished(slogOutput); - const slogOutputPipeResult = pipeline(slogOutput, slogOutputWriteStream); + slogLinesStream.pipe(slogLines); + const slogOutputPipeResult = pipeline( + slogLinesStream, + slogOutput, + slogOutputWriteStream, + ); + + /** @type {import("./sdk/promise-kit.js").PromiseRecord} */ + const firstBlockDoneKit = makePromiseKit(); + /** @type {(() => void) | null} */ + let resolveFirstBlockDone = firstBlockDoneKit.resolve; /** @type {import("./sdk/promise-kit.js").PromiseRecord} */ - const { - promise: chainFirstEmptyBlock, - resolve: resolveFirstEmptyBlock, - } = makePromiseKit(); + const firstEmptyBlockKit = makePromiseKit(); + /** @type {(() => void) | null} */ + let resolveFirstEmptyBlock = firstEmptyBlockKit.resolve; + + let blockCount = 0; + + const notifier = { + /** @param {{blockHeight: number, slogLines: number}} block */ + blockDone(block) { + blockCount += 1; + + if (resolveFirstBlockDone) { + resolveFirstBlockDone(); + resolveFirstBlockDone = null; + } + + if (resolveFirstEmptyBlock) { + if (block.slogLines === 0 || blockCount > 10) { + if (block.slogLines === 0) { + logPerfEvent('stage-first-empty-block', { + block: block.blockHeight, + }); + } + resolveFirstEmptyBlock(); + resolveFirstEmptyBlock = null; + } + } + }, + }; const chainMonitor = makeChainMonitor(runChainResult, { ...makeConsole('monitor-chain', out, err), @@ -320,14 +360,16 @@ const main = async (progName, rawArgs, powers) => { }); chainMonitor.start(monitorInterval); - const slogMonitorDone = monitorSlog(runChainResult, { - ...makeConsole('monitor-slog', out, err), - resolveFirstEmptyBlock, - chainMonitor, - localTimeSource: timeSource, - logPerfEvent, - slogOutput, - }); + const slogMonitorDone = monitorSlog( + { slogLines }, + { + ...makeConsole('monitor-slog', out, err), + notifier, + chainMonitor, + localTimeSource: timeSource, + logPerfEvent, + }, + ); await aggregateTryFinally( async () => { @@ -335,7 +377,11 @@ const main = async (progName, rawArgs, powers) => { logPerfEvent('chain-ready'); stageConsole.log('Chain ready'); - await orInterrupt(chainFirstEmptyBlock); + await Promise.race([ + slogMonitorDone, + orInterrupt(firstBlockDoneKit.promise), + ]); + await orInterrupt(firstEmptyBlockKit.promise); await nextStep(done); }, diff --git a/runner/lib/monitor/slog-monitor.js b/runner/lib/monitor/slog-monitor.js index 18bdceb..6721e36 100644 --- a/runner/lib/monitor/slog-monitor.js +++ b/runner/lib/monitor/slog-monitor.js @@ -94,36 +94,23 @@ const slogEventRE = filterSlogEvent([ /** * @param {Pick} chainInfo * @param {Object} param1 - * @param {() => void} param1.resolveFirstEmptyBlock + * @param {{blockDone(stats: {blockHeight: number, slogLines: number}): void}} [param1.notifier] * @param {ReturnType} [param1.chainMonitor] * @param {import("../stats/types.js").LogPerfEvent} param1.logPerfEvent * @param {import("../helpers/time.js").TimeSource} [param1.localTimeSource] - * @param {import("stream").Writable} [param1.slogOutput] * @param {Console} param1.console */ export const monitorSlog = async ( { slogLines }, - { - resolveFirstEmptyBlock, - chainMonitor, - localTimeSource, - logPerfEvent, - slogOutput, - console, - }, + { notifier, chainMonitor, localTimeSource, logPerfEvent, console }, ) => { /** @type {number | null} */ let slogStart = null; let slogBlocksSeen = 0; - let slogEmptyBlocksSeen = 0; let slogLinesInBlock = 0; for await (const line of slogLines) { - if (slogOutput) { - slogOutput.write(line); - } - if (slogStart == null && chainMonitor) { // TODO: figure out a better way // There is a risk we could be late to the party here, with the chain @@ -193,12 +180,8 @@ export const monitorSlog = async ( if (!slogBlocksSeen) { logPerfEvent('stage-first-block', { block: blockHeight }); if (chainMonitor) { - await chainMonitor.logProcessUsage().catch((usageErr) => { - // Abuse first empty block as it will be awaited before monitorChain - // And won't abruptly end our monitor - // @ts-ignore resolving with a rejected promise is still "void" ;) - resolveFirstEmptyBlock(Promise.reject(usageErr)); - }); + // This will abruptly end the monitor if there is an error + await chainMonitor.logProcessUsage(); } } console.log('begin-block', blockHeight); @@ -223,15 +206,8 @@ export const monitorSlog = async ( const { blockHeight = 0 } = event; // Finish line itself doesn't count slogLinesInBlock -= 1; - if (slogLinesInBlock === 0) { - if (!slogEmptyBlocksSeen) { - logPerfEvent('stage-first-empty-block', { - block: blockHeight, - }); - resolveFirstEmptyBlock(); - } - slogEmptyBlocksSeen += 1; - } + notifier && + notifier.blockDone({ blockHeight, slogLines: slogLinesInBlock }); console.log( 'end-block',