From c9ed79fe1cac4eb4f2f0717bc8e9e8540769e35b Mon Sep 17 00:00:00 2001 From: Mestery Date: Thu, 23 Sep 2021 19:26:25 +0200 Subject: [PATCH 1/4] stream: support array of streams in promises pipeline Fixes: https://github.com/nodejs/node/issues/40191 --- lib/stream/promises.js | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lib/stream/promises.js b/lib/stream/promises.js index 0db01a8b208d60..714c8f11427e32 100644 --- a/lib/stream/promises.js +++ b/lib/stream/promises.js @@ -1,6 +1,7 @@ 'use strict'; const { + ArrayIsArray, ArrayPrototypePop, Promise, } = primordials; @@ -23,6 +24,11 @@ function pipeline(...streams) { signal = options.signal; } + // pipeline(streams) + if (streams.length === 1 && ArrayIsArray(streams[0])) { + streams = streams[0]; + } + pl(streams, (err, value) => { if (err) { reject(err); From 7af0af2b1ce729899500bfaf90ca8d45b08e4d7c Mon Sep 17 00:00:00 2001 From: Mestery Date: Thu, 23 Sep 2021 21:35:41 +0200 Subject: [PATCH 2/4] fixup! stream: support array of streams in promises pipeline --- lib/internal/streams/pipeline.js | 11 ++--- lib/stream/promises.js | 5 --- test/parallel/test-stream-pipeline.js | 61 +++++++++++++++++++++++++++ 3 files changed, 65 insertions(+), 12 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 012d99de0357f2..8dc4e5792c47d8 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -166,17 +166,14 @@ async function pump(iterable, writable, finish) { } function pipeline(...streams) { - const callback = once(popCallback(streams)); + return pipelineImpl(streams, once(popCallback(streams))); +} - // stream.pipeline(streams, callback) - if (ArrayIsArray(streams[0]) && streams.length === 1) { +function pipelineImpl(streams, callback, opts) { + if (streams.length === 1 && ArrayIsArray(streams[0])) { streams = streams[0]; } - return pipelineImpl(streams, callback); -} - -function pipelineImpl(streams, callback, opts) { if (streams.length < 2) { throw new ERR_MISSING_ARGS('streams'); } diff --git a/lib/stream/promises.js b/lib/stream/promises.js index 714c8f11427e32..3b5f304c7a4472 100644 --- a/lib/stream/promises.js +++ b/lib/stream/promises.js @@ -24,11 +24,6 @@ function pipeline(...streams) { signal = options.signal; } - // pipeline(streams) - if (streams.length === 1 && ArrayIsArray(streams[0])) { - streams = streams[0]; - } - pl(streams, (err, value) => { if (err) { reject(err); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 4b0f11ea41218a..ea7bc61799e483 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1406,3 +1406,64 @@ const tsp = require('timers/promises'); })); ac.abort(); } + +{ + async function run() { + let finished = false; + const write = new Writable({ + write(data, enc, cb) { + cb(); + } + }); + write.on('finish', () => { + finished = true; + }); + + await pipelinep([Readable.from('Hello World!'), write]); + + assert(finished); + } + + run(); +} + +{ + async function run() { + let finished = false; + let text = ''; + const write = new Writable({ + write(data, enc, cb) { + text += data; + cb(); + } + }); + write.on('finish', () => { + finished = true; + }); + + await pipelinep([Readable.from('Hello World!'), write]); + assert(finished); + assert.strictEqual(text, 'Hello World!'); + } + + run(); +} + +{ + let finished = false; + let text = ''; + const write = new Writable({ + write(data, enc, cb) { + text += data; + cb(); + } + }); + write.on('finish', () => { + finished = true; + }); + + pipeline([Readable.from('Hello World!'), write], common.mustSucceed(() => { + assert(finished); + assert.strictEqual(text, 'Hello World!'); + })); +} From a7f4073f0f7b613be37c6d4325388feec793451e Mon Sep 17 00:00:00 2001 From: Mestery Date: Thu, 23 Sep 2021 21:37:48 +0200 Subject: [PATCH 3/4] fixup! fixup! stream: support array of streams in promises pipeline --- test/parallel/test-stream-pipeline.js | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index ea7bc61799e483..061ef923d03a59 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1407,26 +1407,6 @@ const tsp = require('timers/promises'); ac.abort(); } -{ - async function run() { - let finished = false; - const write = new Writable({ - write(data, enc, cb) { - cb(); - } - }); - write.on('finish', () => { - finished = true; - }); - - await pipelinep([Readable.from('Hello World!'), write]); - - assert(finished); - } - - run(); -} - { async function run() { let finished = false; From dbc3a05b7d315025d864527da2cddc53d0c3b378 Mon Sep 17 00:00:00 2001 From: Mestery Date: Thu, 23 Sep 2021 21:53:43 +0200 Subject: [PATCH 4/4] Update promises.js --- lib/stream/promises.js | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/stream/promises.js b/lib/stream/promises.js index 3b5f304c7a4472..0db01a8b208d60 100644 --- a/lib/stream/promises.js +++ b/lib/stream/promises.js @@ -1,7 +1,6 @@ 'use strict'; const { - ArrayIsArray, ArrayPrototypePop, Promise, } = primordials;