Skip to content

Commit

Permalink
streams: fix enqueue race condition on esm modules
Browse files Browse the repository at this point in the history
streams: use nextTick on close

fix: lint
  • Loading branch information
RafaelGSS committed Nov 22, 2021
1 parent 8731193 commit 303211b
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 7 deletions.
16 changes: 9 additions & 7 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -1424,13 +1424,15 @@ function readableStreamTee(stream, cloneForBranch2) {
});
},
[kClose]() {
reading = false;
if (!canceled1)
readableStreamDefaultControllerClose(branch1[kState].controller);
if (!canceled2)
readableStreamDefaultControllerClose(branch2[kState].controller);
if (!canceled1 || !canceled2)
cancelPromise.resolve();
process.nextTick(() => {
reading = false;
if (!canceled1)
readableStreamDefaultControllerClose(branch1[kState].controller);
if (!canceled2)
readableStreamDefaultControllerClose(branch2[kState].controller);
if (!canceled1 || !canceled2)
cancelPromise.resolve();
});
},
[kError]() {
reading = false;
Expand Down
33 changes: 33 additions & 0 deletions test/parallel/test-whatwg-readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -1488,6 +1488,39 @@ class Source {
common.mustCall(({ value }) => assert.strictEqual(value, 'hello')));
}

{
// Test tee() with close in the nextTick after enqueue
async function read(stream) {
const chunks = [];
for await (const chunk of stream)
chunks.push(chunk);
return Buffer.concat(chunks).toString();
}

const [r1, r2] = new ReadableStream({
start(controller) {
process.nextTick(() => {
controller.enqueue(new Uint8Array([102, 111, 111, 98, 97, 114]));

process.nextTick(() => {
controller.close();
});
});
}
}).tee();

(async () => {
const [dataReader1, dataReader2] = await Promise.all([
read(r1),
read(r2),
]);

assert.strictEqual(dataReader1, dataReader2);
assert.strictEqual(dataReader1, 'foobar');
assert.strictEqual(dataReader2, 'foobar');
})().then(common.mustCall());
}

{
assert.throws(() => {
readableByteStreamControllerConvertPullIntoDescriptor({
Expand Down

0 comments on commit 303211b

Please sign in to comment.