diff --git a/iterable.js b/iterable.js index ed21fbe..9d6e94b 100644 --- a/iterable.js +++ b/iterable.js @@ -1,25 +1,35 @@ -export async function * combineAsyncIterators(iterator1, iterator2) { - while (true) { - // eslint-disable-next-line no-await-in-loop - const [result1, result2] = await Promise.all([ - iterator1.next(), - iterator2.next(), - ]); - - if (result1.done && result2.done) { - break; - } +// Merge two async iterators into one +export async function * combineAsyncIterators(...iterators) { + try { + let promises = []; + while (iterators.length > 0) { + promises = iterators.map((iterator, index) => promises[index] ?? getNext(iterator)); + // eslint-disable-next-line no-await-in-loop + const [{value, done}, index] = await Promise.race(promises + .map((promise, index) => Promise.all([promise, index]))); - if (!result1.done) { - yield result1.value; - } + const [iterator] = iterators.splice(index, 1); + promises.splice(index, 1); - if (!result2.done) { - yield result2.value; + if (!done) { + iterators.push(iterator); + yield value; + } } + } finally { + await Promise.all(iterators.map(iterator => iterator.return?.())); } } +const getNext = async iterator => { + try { + return await iterator.next(); + } catch (error) { + await iterator.throw?.(error); + throw error; + } +}; + export async function * lineIterator(stream, resultPromise) { if (!stream) { return; diff --git a/test.js b/test.js index 6ebc35e..9c0b6a7 100644 --- a/test.js +++ b/test.js @@ -170,6 +170,41 @@ test('promise.stderr can be iterated', async t => { t.deepEqual(lines, ['.']); }); +test('promise[Symbol.asyncIterator] can be iterated', async t => { + const promise = nanoSpawn('node', ['-e', ` +console.log("a"); +console.log("b"); +console.error("c"); +console.error("d");`]); + + const lines = await arrayFromAsync(promise); + t.deepEqual(lines, ['a', 'b', 'c', 'd']); + + const {stdout, stderr} = await promise; + t.is(stdout, 'a\nb'); + t.is(stderr, 'c\nd'); +}); + +test.serial('promise iteration can be interleaved', async t => { + const length = 10; + const promise = nanoSpawn('node', ['--input-type=module', '-e', ` +import {setTimeout} from 'node:timers/promises'; + +for (let index = 0; index < ${length}; index += 1) { + console.log("a"); + await setTimeout(10); + console.error("b"); + await setTimeout(10); +}`]); + + const lines = await arrayFromAsync(promise); + t.deepEqual(lines, Array.from({length}, () => ['a', 'b']).flat()); + + const {stdout, stderr} = await promise; + t.is(stdout, Array.from({length}, () => 'a').join('\n')); + t.is(stderr, Array.from({length}, () => 'b').join('\n')); +}); + test('result.stdout is set', async t => { const {stdout, stderr} = await nanoSpawn('node', ['-e', 'console.log(".")']); t.is(stdout, '.'); @@ -210,6 +245,24 @@ test('promise.stderr has no iterations if options.stderr "ignore"', async t => { t.deepEqual(stderrLines, []); }); +test('promise[Symbol.asyncIterator] has iterations if only options.stdout "ignore"', async t => { + const promise = nanoSpawn('node', ['-e', 'console.log("a"); console.error("b");'], {stdout: 'ignore'}); + const lines = await arrayFromAsync(promise); + t.deepEqual(lines, ['b']); +}); + +test('promise[Symbol.asyncIterator] has iterations if only options.stderr "ignore"', async t => { + const promise = nanoSpawn('node', ['-e', 'console.log("a"); console.error("b");'], {stderr: 'ignore'}); + const lines = await arrayFromAsync(promise); + t.deepEqual(lines, ['a']); +}); + +test('promise[Symbol.asyncIterator] has no iterations if only options.stdout + options.stderr "ignore"', async t => { + const promise = nanoSpawn('node', ['-e', 'console.log("a"); console.error("b");'], {stdout: 'ignore', stderr: 'ignore'}); + const lines = await arrayFromAsync(promise); + t.deepEqual(lines, []); +}); + test('result.stdout is undefined if options.stdout "ignore"', async t => { const {stdout, stderr} = await nanoSpawn('node', ['-e', 'console.log("."); console.error(".");'], {stdout: 'ignore'}); t.is(stdout, undefined); @@ -222,6 +275,17 @@ test('result.stderr is undefined if options.stderr "ignore"', async t => { t.is(stderr, undefined); }); +test('promise[Symbol.asyncIterator] is line-wise', async t => { + const promise = nanoSpawn('node', ['--input-type=module', '-e', ` +import {setTimeout} from 'node:timers/promises'; + +process.stdout.write("a\\nb\\n"); +await setTimeout(0); +process.stderr.write("c\\nd\\n");`]); + const lines = await arrayFromAsync(promise); + t.deepEqual(lines, ['a', 'b', 'c', 'd']); +}); + test('promise.stdout handles no newline at the end', async t => { const promise = nanoSpawn('node', ['-e', 'process.stdout.write("a\\nb")']); const lines = await arrayFromAsync(promise.stdout); @@ -329,6 +393,22 @@ test('Handles promise.stderr error', async t => { t.is(cause, error); }); +test('Handles promise.stdout error in promise[Symbol.asyncIterator]', async t => { + const promise = nanoSpawn('node', ['--version']); + const error = new Error(testString); + promise.subprocess.stdout.emit('error', error); + const {cause} = await t.throwsAsync(arrayFromAsync(promise)); + t.is(cause, error); +}); + +test('Handles promise.stderr error in promise[Symbol.asyncIterator]', async t => { + const promise = nanoSpawn('node', ['--version']); + const error = new Error(testString); + promise.subprocess.stderr.emit('error', error); + const {cause} = await t.throwsAsync(arrayFromAsync(promise)); + t.is(cause, error); +}); + test('Handles result.stdout error', async t => { const promise = nanoSpawn('node', ['--version']); const error = new Error(testString); @@ -366,6 +446,28 @@ test.serial('promise.stdout iteration break waits for the subprocess success', a t.is(stdout, 'a\nb'); }); +test.serial('promise[Symbol.asyncIterator] iteration break waits for the subprocess success', async t => { + const promise = nanoSpawn('node', ['-e', 'process.stdin.pipe(process.stdout); console.log("a");']); + let done = false; + globalThis.setTimeout(() => { + t.true(promise.subprocess.stdout.readable); + t.true(promise.subprocess.stdin.writable); + promise.subprocess.stdin.end('b'); + done = true; + }, 1e2); + + // eslint-disable-next-line no-unreachable-loop + for await (const line of promise) { + t.is(line, 'a'); + t.false(done); + break; + } + + t.true(done); + const {stdout} = await promise; + t.is(stdout, 'a\nb'); +}); + test.serial('promise.stdout iteration exception waits for the subprocess success', async t => { const promise = nanoSpawn('node', ['-e', 'process.stdin.pipe(process.stdout); console.log("a");']); let done = false; @@ -392,6 +494,33 @@ test.serial('promise.stdout iteration exception waits for the subprocess success t.is(stdout, 'a\nb'); }); +test.serial('promise[Symbol.asyncIterator] iteration exception waits for the subprocess success', async t => { + const promise = nanoSpawn('node', ['-e', 'process.stdin.pipe(process.stdout); console.log("a");']); + let done = false; + globalThis.setTimeout(() => { + t.true(promise.subprocess.stdout.readable); + t.true(promise.subprocess.stdin.writable); + promise.subprocess.stdin.end('b'); + done = true; + }, 1e2); + + const cause = new Error(testString); + try { + // eslint-disable-next-line no-unreachable-loop + for await (const line of promise) { + t.is(line, 'a'); + t.false(done); + throw cause; + } + } catch (error) { + t.is(error, cause); + } + + t.true(done); + const {stdout} = await promise; + t.is(stdout, 'a\nb'); +}); + test.serial('promise.stdout iteration break waits for the subprocess failure', async t => { const promise = nanoSpawn('node', ['-e', 'process.stdin.once("data", (chunk) => {console.log(chunk.toString()); process.exit(2)}); console.log("a");']); let done = false; @@ -419,6 +548,34 @@ test.serial('promise.stdout iteration break waits for the subprocess failure', a t.is(error.stdout, 'a\nb'); }); +test.serial('promise[Symbol.asyncIterator] iteration break waits for the subprocess failure', async t => { + const promise = nanoSpawn('node', ['-e', 'process.stdin.once("data", (chunk) => {console.log(chunk.toString()); process.exit(2)}); console.log("a");']); + let done = false; + globalThis.setTimeout(() => { + t.true(promise.subprocess.stdout.readable); + t.true(promise.subprocess.stdin.writable); + promise.subprocess.stdin.end('b'); + done = true; + }, 1e2); + + let cause; + try { + // eslint-disable-next-line no-unreachable-loop + for await (const line of promise) { + t.is(line, 'a'); + t.false(done); + break; + } + } catch (error) { + cause = error; + } + + t.true(done); + const error = await t.throwsAsync(promise); + t.is(error, cause); + t.is(error.stdout, 'a\nb'); +}); + test.serial('promise.stdout iteration exception waits for the subprocess failure', async t => { const promise = nanoSpawn('node', ['-e', 'process.stdin.once("data", (chunk) => {console.log(chunk.toString()); process.exit(2)}); console.log("a");']); let done = false; @@ -446,15 +603,32 @@ test.serial('promise.stdout iteration exception waits for the subprocess failure t.is(error.stdout, 'a\nb'); }); -test('promise can be iterated with both stdout and stderr', async t => { - const promise = nanoSpawn('node', ['-e', 'console.log("a"); console.error("b"); console.log("c"); console.error("d");']); +test.serial('promise[Symbol.asyncIterator] iteration exception waits for the subprocess failure', async t => { + const promise = nanoSpawn('node', ['-e', 'process.stdin.once("data", (chunk) => {console.log(chunk.toString()); process.exit(2)}); console.log("a");']); + let done = false; + globalThis.setTimeout(() => { + t.true(promise.subprocess.stdout.readable); + t.true(promise.subprocess.stdin.writable); + promise.subprocess.stdin.end('b'); + done = true; + }, 1e2); - const lines = await arrayFromAsync(promise); - t.deepEqual(lines, ['a', 'b', 'c', 'd']); + const cause = new Error(testString); + try { + // eslint-disable-next-line no-unreachable-loop + for await (const line of promise) { + t.is(line, 'a'); + t.false(done); + throw cause; + } + } catch (error) { + t.is(error, cause); + } - const {stdout, stderr} = await promise; - t.is(stdout, 'a\nc'); - t.is(stderr, 'b\nd'); + t.true(done); + const error = await t.throwsAsync(promise); + t.not(error, cause); + t.is(error.stdout, 'a\nb'); }); test('Returns a promise', async t => {