Skip to content

Commit

Permalink
Improve promise iteration logic (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
ehmicky authored Aug 27, 2024
1 parent 9759b96 commit 5f449e1
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 23 deletions.
42 changes: 26 additions & 16 deletions iterable.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,35 @@
export async function * combineAsyncIterators(iterator1, iterator2) {
while (true) {
// eslint-disable-next-line no-await-in-loop
const [result1, result2] = await Promise.all([
iterator1.next(),
iterator2.next(),
]);

if (result1.done && result2.done) {
break;
}
// Merge two async iterators into one
export async function * combineAsyncIterators(...iterators) {
try {
let promises = [];
while (iterators.length > 0) {
promises = iterators.map((iterator, index) => promises[index] ?? getNext(iterator));
// eslint-disable-next-line no-await-in-loop
const [{value, done}, index] = await Promise.race(promises
.map((promise, index) => Promise.all([promise, index])));

if (!result1.done) {
yield result1.value;
}
const [iterator] = iterators.splice(index, 1);
promises.splice(index, 1);

if (!result2.done) {
yield result2.value;
if (!done) {
iterators.push(iterator);
yield value;
}
}
} finally {
await Promise.all(iterators.map(iterator => iterator.return?.()));
}
}

const getNext = async iterator => {
try {
return await iterator.next();
} catch (error) {
await iterator.throw?.(error);
throw error;
}
};

export async function * lineIterator(stream, resultPromise) {
if (!stream) {
return;
Expand Down
188 changes: 181 additions & 7 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,41 @@ test('promise.stderr can be iterated', async t => {
t.deepEqual(lines, ['.']);
});

test('promise[Symbol.asyncIterator] can be iterated', async t => {
const promise = nanoSpawn('node', ['-e', `
console.log("a");
console.log("b");
console.error("c");
console.error("d");`]);

const lines = await arrayFromAsync(promise);
t.deepEqual(lines, ['a', 'b', 'c', 'd']);

const {stdout, stderr} = await promise;
t.is(stdout, 'a\nb');
t.is(stderr, 'c\nd');
});

test.serial('promise iteration can be interleaved', async t => {
const length = 10;
const promise = nanoSpawn('node', ['--input-type=module', '-e', `
import {setTimeout} from 'node:timers/promises';
for (let index = 0; index < ${length}; index += 1) {
console.log("a");
await setTimeout(10);
console.error("b");
await setTimeout(10);
}`]);

const lines = await arrayFromAsync(promise);
t.deepEqual(lines, Array.from({length}, () => ['a', 'b']).flat());

const {stdout, stderr} = await promise;
t.is(stdout, Array.from({length}, () => 'a').join('\n'));
t.is(stderr, Array.from({length}, () => 'b').join('\n'));
});

test('result.stdout is set', async t => {
const {stdout, stderr} = await nanoSpawn('node', ['-e', 'console.log(".")']);
t.is(stdout, '.');
Expand Down Expand Up @@ -210,6 +245,24 @@ test('promise.stderr has no iterations if options.stderr "ignore"', async t => {
t.deepEqual(stderrLines, []);
});

test('promise[Symbol.asyncIterator] has iterations if only options.stdout "ignore"', async t => {
const promise = nanoSpawn('node', ['-e', 'console.log("a"); console.error("b");'], {stdout: 'ignore'});
const lines = await arrayFromAsync(promise);
t.deepEqual(lines, ['b']);
});

test('promise[Symbol.asyncIterator] has iterations if only options.stderr "ignore"', async t => {
const promise = nanoSpawn('node', ['-e', 'console.log("a"); console.error("b");'], {stderr: 'ignore'});
const lines = await arrayFromAsync(promise);
t.deepEqual(lines, ['a']);
});

test('promise[Symbol.asyncIterator] has no iterations if only options.stdout + options.stderr "ignore"', async t => {
const promise = nanoSpawn('node', ['-e', 'console.log("a"); console.error("b");'], {stdout: 'ignore', stderr: 'ignore'});
const lines = await arrayFromAsync(promise);
t.deepEqual(lines, []);
});

test('result.stdout is undefined if options.stdout "ignore"', async t => {
const {stdout, stderr} = await nanoSpawn('node', ['-e', 'console.log("."); console.error(".");'], {stdout: 'ignore'});
t.is(stdout, undefined);
Expand All @@ -222,6 +275,17 @@ test('result.stderr is undefined if options.stderr "ignore"', async t => {
t.is(stderr, undefined);
});

test('promise[Symbol.asyncIterator] is line-wise', async t => {
const promise = nanoSpawn('node', ['--input-type=module', '-e', `
import {setTimeout} from 'node:timers/promises';
process.stdout.write("a\\nb\\n");
await setTimeout(0);
process.stderr.write("c\\nd\\n");`]);
const lines = await arrayFromAsync(promise);
t.deepEqual(lines, ['a', 'b', 'c', 'd']);
});

test('promise.stdout handles no newline at the end', async t => {
const promise = nanoSpawn('node', ['-e', 'process.stdout.write("a\\nb")']);
const lines = await arrayFromAsync(promise.stdout);
Expand Down Expand Up @@ -329,6 +393,22 @@ test('Handles promise.stderr error', async t => {
t.is(cause, error);
});

test('Handles promise.stdout error in promise[Symbol.asyncIterator]', async t => {
const promise = nanoSpawn('node', ['--version']);
const error = new Error(testString);
promise.subprocess.stdout.emit('error', error);
const {cause} = await t.throwsAsync(arrayFromAsync(promise));
t.is(cause, error);
});

test('Handles promise.stderr error in promise[Symbol.asyncIterator]', async t => {
const promise = nanoSpawn('node', ['--version']);
const error = new Error(testString);
promise.subprocess.stderr.emit('error', error);
const {cause} = await t.throwsAsync(arrayFromAsync(promise));
t.is(cause, error);
});

test('Handles result.stdout error', async t => {
const promise = nanoSpawn('node', ['--version']);
const error = new Error(testString);
Expand Down Expand Up @@ -366,6 +446,28 @@ test.serial('promise.stdout iteration break waits for the subprocess success', a
t.is(stdout, 'a\nb');
});

test.serial('promise[Symbol.asyncIterator] iteration break waits for the subprocess success', async t => {
const promise = nanoSpawn('node', ['-e', 'process.stdin.pipe(process.stdout); console.log("a");']);
let done = false;
globalThis.setTimeout(() => {
t.true(promise.subprocess.stdout.readable);
t.true(promise.subprocess.stdin.writable);
promise.subprocess.stdin.end('b');
done = true;
}, 1e2);

// eslint-disable-next-line no-unreachable-loop
for await (const line of promise) {
t.is(line, 'a');
t.false(done);
break;
}

t.true(done);
const {stdout} = await promise;
t.is(stdout, 'a\nb');
});

test.serial('promise.stdout iteration exception waits for the subprocess success', async t => {
const promise = nanoSpawn('node', ['-e', 'process.stdin.pipe(process.stdout); console.log("a");']);
let done = false;
Expand All @@ -392,6 +494,33 @@ test.serial('promise.stdout iteration exception waits for the subprocess success
t.is(stdout, 'a\nb');
});

test.serial('promise[Symbol.asyncIterator] iteration exception waits for the subprocess success', async t => {
const promise = nanoSpawn('node', ['-e', 'process.stdin.pipe(process.stdout); console.log("a");']);
let done = false;
globalThis.setTimeout(() => {
t.true(promise.subprocess.stdout.readable);
t.true(promise.subprocess.stdin.writable);
promise.subprocess.stdin.end('b');
done = true;
}, 1e2);

const cause = new Error(testString);
try {
// eslint-disable-next-line no-unreachable-loop
for await (const line of promise) {
t.is(line, 'a');
t.false(done);
throw cause;
}
} catch (error) {
t.is(error, cause);
}

t.true(done);
const {stdout} = await promise;
t.is(stdout, 'a\nb');
});

test.serial('promise.stdout iteration break waits for the subprocess failure', async t => {
const promise = nanoSpawn('node', ['-e', 'process.stdin.once("data", (chunk) => {console.log(chunk.toString()); process.exit(2)}); console.log("a");']);
let done = false;
Expand Down Expand Up @@ -419,6 +548,34 @@ test.serial('promise.stdout iteration break waits for the subprocess failure', a
t.is(error.stdout, 'a\nb');
});

test.serial('promise[Symbol.asyncIterator] iteration break waits for the subprocess failure', async t => {
const promise = nanoSpawn('node', ['-e', 'process.stdin.once("data", (chunk) => {console.log(chunk.toString()); process.exit(2)}); console.log("a");']);
let done = false;
globalThis.setTimeout(() => {
t.true(promise.subprocess.stdout.readable);
t.true(promise.subprocess.stdin.writable);
promise.subprocess.stdin.end('b');
done = true;
}, 1e2);

let cause;
try {
// eslint-disable-next-line no-unreachable-loop
for await (const line of promise) {
t.is(line, 'a');
t.false(done);
break;
}
} catch (error) {
cause = error;
}

t.true(done);
const error = await t.throwsAsync(promise);
t.is(error, cause);
t.is(error.stdout, 'a\nb');
});

test.serial('promise.stdout iteration exception waits for the subprocess failure', async t => {
const promise = nanoSpawn('node', ['-e', 'process.stdin.once("data", (chunk) => {console.log(chunk.toString()); process.exit(2)}); console.log("a");']);
let done = false;
Expand Down Expand Up @@ -446,15 +603,32 @@ test.serial('promise.stdout iteration exception waits for the subprocess failure
t.is(error.stdout, 'a\nb');
});

test('promise can be iterated with both stdout and stderr', async t => {
const promise = nanoSpawn('node', ['-e', 'console.log("a"); console.error("b"); console.log("c"); console.error("d");']);
test.serial('promise[Symbol.asyncIterator] iteration exception waits for the subprocess failure', async t => {
const promise = nanoSpawn('node', ['-e', 'process.stdin.once("data", (chunk) => {console.log(chunk.toString()); process.exit(2)}); console.log("a");']);
let done = false;
globalThis.setTimeout(() => {
t.true(promise.subprocess.stdout.readable);
t.true(promise.subprocess.stdin.writable);
promise.subprocess.stdin.end('b');
done = true;
}, 1e2);

const lines = await arrayFromAsync(promise);
t.deepEqual(lines, ['a', 'b', 'c', 'd']);
const cause = new Error(testString);
try {
// eslint-disable-next-line no-unreachable-loop
for await (const line of promise) {
t.is(line, 'a');
t.false(done);
throw cause;
}
} catch (error) {
t.is(error, cause);
}

const {stdout, stderr} = await promise;
t.is(stdout, 'a\nc');
t.is(stderr, 'b\nd');
t.true(done);
const error = await t.throwsAsync(promise);
t.not(error, cause);
t.is(error.stdout, 'a\nb');
});

test('Returns a promise', async t => {
Expand Down

0 comments on commit 5f449e1

Please sign in to comment.