Skip to content

Commit

Permalink
Add result.pipedFrom (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
ehmicky authored Sep 7, 2024
1 parent e9f9d3a commit d722a9f
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 57 deletions.
9 changes: 3 additions & 6 deletions source/context.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import process from 'node:process';
import {stripVTControlCharacters} from 'node:util';

export const getContext = ({start, command}, raw) => ({
start: start ?? process.hrtime.bigint(),
command: [
command,
raw.map(part => getCommandPart(stripVTControlCharacters(part))).join(' '),
].filter(Boolean).join(' | '),
export const getContext = raw => ({
start: process.hrtime.bigint(),
command: raw.map(part => getCommandPart(stripVTControlCharacters(part))).join(' '),
state: {stdout: '', stderr: '', output: ''},
});

Expand Down
2 changes: 2 additions & 0 deletions source/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export type Result = {
command: string;

durationMs: number;

pipedFrom?: Result | SubprocessError;
};

export type SubprocessError = Error & Result & {
Expand Down
12 changes: 5 additions & 7 deletions source/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,21 @@ import {getResult} from './result.js';
import {handlePipe} from './pipe.js';
import {lineIterator, combineAsyncIterators} from './iterable.js';

export default function nanoSpawn(first, second = [], third = {}) {
const [file, previous] = Array.isArray(first) ? first : [first, {}];
const [commandArguments, options] = Array.isArray(second) ? [second, third] : [[], second];

const context = getContext(previous, [file, ...commandArguments]);
export default function nanoSpawn(file, second, third, previous) {
const [commandArguments = [], options = {}] = Array.isArray(second) ? [second, third] : [[], second];
const context = getContext([file, ...commandArguments]);
const spawnOptions = getOptions(options);
const nodeChildProcess = spawnSubprocess(file, commandArguments, spawnOptions, context);
let subprocess = getResult(nodeChildProcess, spawnOptions, context);
Object.assign(subprocess, {nodeChildProcess});
subprocess = previous.subprocess === undefined ? subprocess : handlePipe(previous, subprocess);
subprocess = previous ? handlePipe(previous, subprocess) : subprocess;

const stdout = lineIterator(subprocess, context, 'stdout');
const stderr = lineIterator(subprocess, context, 'stderr');
return Object.assign(subprocess, {
stdout,
stderr,
[Symbol.asyncIterator]: () => combineAsyncIterators(stdout, stderr),
pipe: (file, second, third) => nanoSpawn([file, {...context, subprocess}], second, third),
pipe: (file, second, third) => nanoSpawn(file, second, third, subprocess),
});
}
6 changes: 6 additions & 0 deletions source/index.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ try {
expectType<string>(result.output);
expectType<string>(result.command);
expectType<number>(result.durationMs);
expectType<Result | SubprocessError | undefined>(result.pipedFrom);
expectType<Result | SubprocessError | undefined>(result.pipedFrom?.pipedFrom);
expectType<number | undefined>(result.pipedFrom?.durationMs);
expectNotAssignable<Error>(result);
expectError(result.exitCode);
expectError(result.signalName);
Expand All @@ -31,6 +34,9 @@ try {
expectType<string>(subprocessError.output);
expectType<string>(subprocessError.command);
expectType<number>(subprocessError.durationMs);
expectType<Result | SubprocessError | undefined>(subprocessError.pipedFrom);
expectType<Result | SubprocessError | undefined>(subprocessError.pipedFrom?.pipedFrom);
expectType<number | undefined>(subprocessError.pipedFrom?.durationMs);
expectAssignable<Error>(subprocessError);
expectType<number | undefined>(subprocessError.exitCode);
expectType<string | undefined>(subprocessError.signalName);
Expand Down
18 changes: 11 additions & 7 deletions source/pipe.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import {pipeline} from 'node:stream/promises';

export const handlePipe = (previous, subprocess) => Object.assign(runProcesses([previous.subprocess, subprocess]), subprocess);
export const handlePipe = (previous, subprocess) => Object.assign(runProcesses([previous, subprocess]), subprocess);

const runProcesses = async subprocesses => {
// Ensure both subprocesses have exited before resolving, and that we handle errors from both
const returns = await Promise.allSettled([pipeStreams(subprocesses), ...subprocesses]);
const [[from, to]] = await Promise.all([Promise.allSettled(subprocesses), pipeStreams(subprocesses)]);

// If both subprocesses fail, throw source error to use a predictable order and avoid race conditions
const error = returns.map(({reason}) => reason).find(Boolean);
if (error) {
throw error;
// If both subprocesses fail, throw destination error to use a predictable order and avoid race conditions
if (to.reason) {
to.reason.pipedFrom = from.reason ?? from.value;
throw to.reason;
}

if (from.reason) {
throw from.reason;
}

return returns[2].value;
return {...to.value, pipedFrom: from.value};
};

const pipeStreams = async subprocesses => {
Expand Down
99 changes: 62 additions & 37 deletions test/pipe.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,35 @@ import {

const testFixtureUrl = new URL('test.txt', FIXTURES_URL);

const getPipeSize = command => command.split(' | ').length;

test('.pipe() success', async t => {
const {stdout, output, command, durationMs} = await nanoSpawn(...nodePrintStdout).pipe(...nodeToUpperCase);
const first = nanoSpawn(...nodePrintStdout);
const {stdout, output, durationMs, pipedFrom} = await first.pipe(...nodeToUpperCase);
const firstResult = await first;
t.is(firstResult.pipedFrom, undefined);
t.is(pipedFrom, firstResult);
t.is(stdout, testUpperCase);
t.is(output, stdout);
t.is(getPipeSize(command), 2);
assertDurationMs(t, durationMs);
});

test('.pipe() source fails', async t => {
const error = await t.throwsAsync(nanoSpawn(...nodePrintFail).pipe(...nodeToUpperCase));
assertFail(t, error);
t.is(error.stdout, testString);
t.is(error.output, error.stdout);
t.is(getPipeSize(error.command), 1);
const first = nanoSpawn(...nodePrintFail);
const secondError = await t.throwsAsync(first.pipe(...nodeToUpperCase));
const firstError = await t.throwsAsync(first);
t.is(firstError, secondError);
t.is(secondError.pipedFrom, undefined);
assertFail(t, secondError);
t.is(secondError.stdout, testString);
t.is(secondError.output, secondError.stdout);
});

test('.pipe() source fails due to child_process invalid option', async t => {
const error = await t.throwsAsync(nanoSpawn(...nodePrintStdout, earlyErrorOptions).pipe(...nodeToUpperCase));
assertEarlyError(t, error);
t.is(getPipeSize(error.command), 1);
const first = nanoSpawn(...nodePrintStdout, earlyErrorOptions);
const secondError = await t.throwsAsync(first.pipe(...nodeToUpperCase));
const firstError = await t.throwsAsync(first);
assertEarlyError(t, secondError);
t.is(firstError, secondError);
t.is(secondError.pipedFrom, undefined);
});

test('.pipe() source fails due to stream error', async t => {
Expand All @@ -69,21 +76,33 @@ test('.pipe() source fails due to stream error', async t => {
const cause = new Error(testString);
const nodeChildProcess = await first.nodeChildProcess;
nodeChildProcess.stdout.destroy(cause);
const error = await t.throwsAsync(second);
assertErrorEvent(t, error, cause);
const secondError = await t.throwsAsync(second);
const firstError = await t.throwsAsync(first);
assertErrorEvent(t, secondError, cause);
assertErrorEvent(t, firstError, cause);
t.is(firstError.pipedFrom, undefined);
t.is(secondError.pipedFrom, firstError);
});

test('.pipe() destination fails', async t => {
const error = await t.throwsAsync(nanoSpawn(...nodePrintStdout).pipe(...nodeToUpperCaseFail));
assertFail(t, error);
t.is(error.stdout, testUpperCase);
t.is(getPipeSize(error.command), 2);
const first = nanoSpawn(...nodePrintStdout);
const secondError = await t.throwsAsync(first.pipe(...nodeToUpperCaseFail));
const firstResult = await first;
assertFail(t, secondError);
t.is(firstResult.pipedFrom, undefined);
t.is(secondError.pipedFrom, firstResult);
t.is(firstResult.stdout, testString);
t.is(secondError.stdout, testUpperCase);
});

test('.pipe() destination fails due to child_process invalid option', async t => {
const error = await t.throwsAsync(nanoSpawn(...nodePrintStdout).pipe(...nodeToUpperCase, earlyErrorOptions));
assertEarlyError(t, error);
t.is(getPipeSize(error.command), 2);
const first = nanoSpawn(...nodePrintStdout);
const secondError = await t.throwsAsync(first.pipe(...nodeToUpperCase, earlyErrorOptions));
const firstResult = await first;
assertEarlyError(t, secondError);
t.is(firstResult.pipedFrom, undefined);
t.is(secondError.pipedFrom, undefined);
t.is(firstResult.stdout, testString);
});

test('.pipe() destination fails due to stream error', async t => {
Expand All @@ -92,15 +111,26 @@ test('.pipe() destination fails due to stream error', async t => {
const cause = new Error(testString);
const nodeChildProcess = await second.nodeChildProcess;
nodeChildProcess.stdin.destroy(cause);
const error = await t.throwsAsync(second);
assertErrorEvent(t, error, cause);
const secondError = await t.throwsAsync(second);
const firstError = await t.throwsAsync(first);
assertErrorEvent(t, secondError, cause);
assertErrorEvent(t, firstError, cause);
t.is(firstError.pipedFrom, undefined);
t.is(secondError.pipedFrom, firstError);
});

test('.pipe() source and destination fail', async t => {
const error = await t.throwsAsync(nanoSpawn(...nodePrintFail).pipe(...nodeToUpperCaseFail));
assertFail(t, error);
t.is(error.stdout, testString);
t.is(getPipeSize(error.command), 1);
const first = nanoSpawn(...nodePrintFail);
const secondError = await t.throwsAsync(first.pipe(...nodeToUpperCaseFail));
const firstError = await t.throwsAsync(first);
assertFail(t, firstError);
assertFail(t, secondError);
t.is(firstError.pipedFrom, undefined);
t.is(secondError.pipedFrom, firstError);
t.is(firstError.stdout, testString);
t.is(firstError.output, firstError.stdout);
t.is(secondError.stdout, testUpperCase);
t.is(secondError.output, secondError.stdout);
});

test('.pipe().pipe() success', async t => {
Expand All @@ -111,10 +141,7 @@ test('.pipe().pipe() success', async t => {
t.is(firstResult.output, firstResult.stdout);
t.is(secondResult.stdout, testDoubleUpperCase);
t.is(secondResult.output, secondResult.stdout);
t.is(getPipeSize(firstResult.command), 2);
t.is(getPipeSize(secondResult.command), 3);
assertDurationMs(t, firstResult.durationMs);
t.true(secondResult.durationMs > firstResult.durationMs);
});

test('.pipe().pipe() first source fail', async t => {
Expand All @@ -125,7 +152,6 @@ test('.pipe().pipe() first source fail', async t => {
t.is(firstError, secondError);
t.is(firstError.stdout, testString);
t.is(firstError.output, firstError.stdout);
t.is(getPipeSize(firstError.command), 1);
});

test('.pipe().pipe() second source fail', async t => {
Expand All @@ -136,7 +162,6 @@ test('.pipe().pipe() second source fail', async t => {
t.is(firstError, secondError);
t.is(firstError.stdout, testUpperCase);
t.is(firstError.output, firstError.stdout);
t.is(getPipeSize(firstError.command), 2);
});

test('.pipe().pipe() destination fail', async t => {
Expand All @@ -148,8 +173,6 @@ test('.pipe().pipe() destination fail', async t => {
t.is(firstResult.output, firstResult.stdout);
t.is(secondError.stdout, testDoubleUpperCase);
t.is(secondError.output, secondError.stdout);
t.is(getPipeSize(firstResult.command), 2);
t.is(getPipeSize(secondError.command), 3);
assertDurationMs(t, firstResult.durationMs);
});

Expand All @@ -158,10 +181,12 @@ test('.pipe().pipe() all fail', async t => {
const secondError = await t.throwsAsync(first.pipe(...nodeDoubleFail));
const firstError = await t.throwsAsync(first);
assertFail(t, firstError);
t.is(firstError, secondError);
t.is(firstError.stdout, testString);
assertFail(t, secondError);
t.is(secondError.pipedFrom, firstError);
t.is(firstError.stdout, testUpperCase);
t.is(firstError.output, firstError.stdout);
t.is(getPipeSize(firstError.command), 1);
t.is(secondError.stdout, testDoubleUpperCase);
t.is(secondError.output, secondError.stdout);
});

// Cannot guarantee that `cat` exists on Windows
Expand Down

0 comments on commit d722a9f

Please sign in to comment.