From e77e64e7749f6a5bbd8ba79c2b9736826a2e735c Mon Sep 17 00:00:00 2001 From: RafaelGSS Date: Sat, 20 Nov 2021 23:44:38 -0300 Subject: [PATCH] stream: fix enqueue race condition on esm modules stream: use nextTick on close --- lib/internal/webstreams/readablestream.js | 19 +++++++---- test/parallel/test-whatwg-readablestream.mjs | 36 ++++++++++++++++++++ 2 files changed, 48 insertions(+), 7 deletions(-) create mode 100644 test/parallel/test-whatwg-readablestream.mjs diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index fe9b26b991f04e..8d8a7b533e1b1a 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1424,13 +1424,18 @@ function readableStreamTee(stream, cloneForBranch2) { }); }, [kClose]() { - reading = false; - if (!canceled1) - readableStreamDefaultControllerClose(branch1[kState].controller); - if (!canceled2) - readableStreamDefaultControllerClose(branch2[kState].controller); - if (!canceled1 || !canceled2) - cancelPromise.resolve(); + // The `process.nextTick()` is not part of the spec. + // This approach was needed to avoid a race condition working with esm + // Further information, see: https://github.com/nodejs/node/issues/39758 + process.nextTick(() => { + reading = false; + if (!canceled1) + readableStreamDefaultControllerClose(branch1[kState].controller); + if (!canceled2) + readableStreamDefaultControllerClose(branch2[kState].controller); + if (!canceled1 || !canceled2) + cancelPromise.resolve(); + }); }, [kError]() { reading = false; diff --git a/test/parallel/test-whatwg-readablestream.mjs b/test/parallel/test-whatwg-readablestream.mjs new file mode 100644 index 00000000000000..a3693f62439ce7 --- /dev/null +++ b/test/parallel/test-whatwg-readablestream.mjs @@ -0,0 +1,36 @@ +import { mustCall } from '../common/index.mjs'; +import { ReadableStream } from 'stream/web'; +import assert from 'assert'; + +{ + // Test tee() with close in the nextTick after enqueue + async function read(stream) { + const chunks = []; + for await (const chunk of stream) + chunks.push(chunk); + return Buffer.concat(chunks).toString(); + } + + const [r1, r2] = new ReadableStream({ + start(controller) { + process.nextTick(() => { + controller.enqueue(new Uint8Array([102, 111, 111, 98, 97, 114])); + + process.nextTick(() => { + controller.close(); + }); + }); + } + }).tee(); + + (async () => { + const [dataReader1, dataReader2] = await Promise.all([ + read(r1), + read(r2), + ]); + + assert.strictEqual(dataReader1, dataReader2); + assert.strictEqual(dataReader1, 'foobar'); + assert.strictEqual(dataReader2, 'foobar'); + })().then(mustCall()); +}