From 303211b37dae2cdd72493a6976a7363fc8642a83 Mon Sep 17 00:00:00 2001 From: RafaelGSS Date: Sat, 20 Nov 2021 23:44:38 -0300 Subject: [PATCH] streams: fix enqueue race condition on esm modules streams: use nextTick on close fix: lint --- lib/internal/webstreams/readablestream.js | 16 +++++----- test/parallel/test-whatwg-readablestream.js | 33 +++++++++++++++++++++ 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index fe9b26b991f04e..b4d989febeca75 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1424,13 +1424,15 @@ function readableStreamTee(stream, cloneForBranch2) { }); }, [kClose]() { - reading = false; - if (!canceled1) - readableStreamDefaultControllerClose(branch1[kState].controller); - if (!canceled2) - readableStreamDefaultControllerClose(branch2[kState].controller); - if (!canceled1 || !canceled2) - cancelPromise.resolve(); + process.nextTick(() => { + reading = false; + if (!canceled1) + readableStreamDefaultControllerClose(branch1[kState].controller); + if (!canceled2) + readableStreamDefaultControllerClose(branch2[kState].controller); + if (!canceled1 || !canceled2) + cancelPromise.resolve(); + }); }, [kError]() { reading = false; diff --git a/test/parallel/test-whatwg-readablestream.js b/test/parallel/test-whatwg-readablestream.js index c8b82fa9823786..c333584c2b940e 100644 --- a/test/parallel/test-whatwg-readablestream.js +++ b/test/parallel/test-whatwg-readablestream.js @@ -1488,6 +1488,39 @@ class Source { common.mustCall(({ value }) => assert.strictEqual(value, 'hello'))); } +{ + // Test tee() with close in the nextTick after enqueue + async function read(stream) { + const chunks = []; + for await (const chunk of stream) + chunks.push(chunk); + return Buffer.concat(chunks).toString(); + } + + const [r1, r2] = new ReadableStream({ + start(controller) { + process.nextTick(() => { + controller.enqueue(new Uint8Array([102, 111, 111, 98, 97, 114])); + + process.nextTick(() => { + controller.close(); + }); + }); + } + }).tee(); + + (async () => { + const [dataReader1, dataReader2] = await Promise.all([ + read(r1), + read(r2), + ]); + + assert.strictEqual(dataReader1, dataReader2); + assert.strictEqual(dataReader1, 'foobar'); + assert.strictEqual(dataReader2, 'foobar'); + })().then(common.mustCall()); +} + { assert.throws(() => { readableByteStreamControllerConvertPullIntoDescriptor({