From 9175ce358e33b2179248b175acdb09a230671cb7 Mon Sep 17 00:00:00 2001 From: Marco Ippolito Date: Mon, 12 Dec 2022 19:48:43 +0100 Subject: [PATCH 1/8] doc: add stream/promises pipeline and finished to doc --- doc/api/stream.md | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/doc/api/stream.md b/doc/api/stream.md index 1726635b55b9d4..25fd83a3f486fc 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -59,6 +59,34 @@ functions for streams that return `Promise` objects rather than using callbacks. The API is accessible via `require('node:stream/promises')` or `require('node:stream').promises`. +### `stream.pipeline(source[, ...transforms], destination, options)` + +### `stream.pipeline(streams, options)` + +* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]} +* `source` {Stream|Iterable|AsyncIterable|Function} + * Returns: {Promise\} +* `...transforms` {Stream|Function} + * `source` {AsyncIterable} + * Returns: {Promise} +* `destination` {Stream|Function} + * `source` {AsyncIterable} + * Returns: {Promise} +* `options` {Object} + * `signal` {AbortSignal} + * `end` {boolean} +* Returns: {Promise} + +### `stream.finished(stream, options)` + +* `stream` {Stream} +* `options` {Object} + * `error` {boolean|undefined} + * `readable` {boolean|undefined} + * `writable` {boolean|undefined} + * `signal`: {AbortSignal|undefined} +* Returns: {Promise} + ### Object mode All streams created by Node.js APIs operate exclusively on strings and `Buffer` From f4336cb10ed16e753593da5cb6dc574a243d6606 Mon Sep 17 00:00:00 2001 From: Marco Ippolito Date: Mon, 12 Dec 2022 20:12:43 +0100 Subject: [PATCH 2/8] doc: fixed correct return type --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 25fd83a3f486fc..9e88cc7338aabd 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -65,7 +65,7 @@ or `require('node:stream').promises`. * `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]} * `source` {Stream|Iterable|AsyncIterable|Function} - * Returns: {Promise\} + * Returns: {Promise} * `...transforms` {Stream|Function} * `source` {AsyncIterable} * Returns: {Promise} From a02e238eedd487f3f03a63ec6a8f2f5f4ecd20ff Mon Sep 17 00:00:00 2001 From: Marco Ippolito Date: Tue, 13 Dec 2022 17:14:37 +0100 Subject: [PATCH 3/8] doc: improved documentation --- doc/api/stream.md | 321 +++++++++++++++++++++++++++++----------------- 1 file changed, 205 insertions(+), 116 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 9e88cc7338aabd..de6bfba8ae46b8 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -59,25 +59,183 @@ functions for streams that return `Promise` objects rather than using callbacks. The API is accessible via `require('node:stream/promises')` or `require('node:stream').promises`. -### `stream.pipeline(source[, ...transforms], destination, options)` +### `stream.pipeline(source[, ...transforms], destination[, options])` -### `stream.pipeline(streams, options)` +### `stream.pipeline(streams[, options])` + + * `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]} * `source` {Stream|Iterable|AsyncIterable|Function} - * Returns: {Promise} + * Returns: {Promise|AsyncIterable} * `...transforms` {Stream|Function} * `source` {AsyncIterable} - * Returns: {Promise} + * Returns: {Promise|AsyncIterable>} * `destination` {Stream|Function} * `source` {AsyncIterable} - * Returns: {Promise} + * Returns: {Promise|AsyncIterable} * `options` {Object} * `signal` {AbortSignal} * `end` {boolean} -* Returns: {Promise} +* Returns: {Promise} Fulfills when the pipeline is complete. + +```cjs +const { pipeline } = require('node:stream/promises'); +const fs = require('node:fs'); +const zlib = require('node:zlib'); + +async function run() { + await pipeline( + fs.createReadStream('archive.tar'), + zlib.createGzip(), + fs.createWriteStream('archive.tar.gz'), + ); + console.log('Pipeline succeeded.'); +} + +run().catch(console.error); +``` + +```mjs +import { pipeline } from 'node:stream/promises'; +import { createReadStream, createWriteStream } from 'node:fs'; +import { createGzip } from 'node:zlib'; + +async function run() { + await pipeline( + createReadStream('archive.tar'), + createGzip(), + createWriteStream('archive.tar.gz'), + ); + console.log('Pipeline succeeded.'); +} + +run().catch(console.error); +``` + +To use an `AbortSignal`, pass it inside an options object, +as the last argument. +When the signal is aborted, +`destroy` will be called on the underlying pipeline, with an +`AbortError`. + +```cjs +const { pipeline } = require('node:stream/promises'); +const fs = require('node:fs'); +const zlib = require('node:zlib'); + +async function run() { + const ac = new AbortController(); + const signal = ac.signal; + + setTimeout(() => ac.abort(), 1); + await pipeline( + fs.createReadStream('archive.tar'), + zlib.createGzip(), + fs.createWriteStream('archive.tar.gz'), + { signal }, + ); +} + +run().catch(console.error); // AbortError +``` + +```mjs +import { pipeline } from 'node:stream/promises'; +import { createReadStream, createWriteStream } from 'node:fs'; +import { createGzip } from 'node:zlib'; + +async function run() { + const ac = new AbortController(); + const signal = ac.signal; + + setTimeout(() => ac.abort(), 1); + await pipeline( + createReadStream('archive.tar'), + createGzip(), + createWriteStream('archive.tar.gz'), + { signal }, + ); +} + +run().catch(console.error); // AbortError +``` + +The `pipeline` API also supports async generators: + +```cjs +const { pipeline } = require('node:stream/promises'); +const fs = require('node:fs'); + +async function run() { + await pipeline( + fs.createReadStream('lowercase.txt'), + async function* (source, { signal }) { + source.setEncoding('utf8'); // Work with strings rather than `Buffer`s. + for await (const chunk of source) { + yield await processChunk(chunk, { signal }); + } + }, + fs.createWriteStream('uppercase.txt'), + ); + console.log('Pipeline succeeded.'); +} + +run().catch(console.error); +``` + +```mjs +import { pipeline } from 'node:stream/promises'; +import { createReadStream, createWriteStream } from 'node:fs'; + +async function run() { + await pipeline( + createReadStream('lowercase.txt'), + async function* (source, { signal }) { + source.setEncoding('utf8'); // Work with strings rather than `Buffer`s. + for await (const chunk of source) { + yield await processChunk(chunk, { signal }); + } + }, + createWriteStream('uppercase.txt'), + ); + console.log('Pipeline succeeded.'); +} + +run().catch(console.error); +``` + +Remember to handle the `signal` argument passed into the async generator. +Especially in the case where the async generator is the source for the +pipeline (i.e. first argument) or the pipeline will never complete. + +```js +const { pipeline } = require('node:stream/promises'); +const fs = require('node:fs'); + +async function run() { + await pipeline( + async function* ({ signal }) { + await someLongRunningfn({ signal }); + yield 'asd'; + }, + fs.createWriteStream('uppercase.txt'), + ); + console.log('Pipeline succeeded.'); +} + +run().catch(console.error); +``` + +The `pipeline` API provides [callback version][stream-pipeline]: + +### `stream.finished(stream[, options])` -### `stream.finished(stream, options)` + * `stream` {Stream} * `options` {Object} @@ -85,7 +243,40 @@ or `require('node:stream').promises`. * `readable` {boolean|undefined} * `writable` {boolean|undefined} * `signal`: {AbortSignal|undefined} -* Returns: {Promise} +* Returns: {Promise} Fulfills when the stream is no + longer readable or writable. + +```cjs +const { finished } = require('node:stream/promises'); +const fs = require('node:fs'); + +const rs = fs.createReadStream('archive.tar'); + +async function run() { + await finished(rs); + console.log('Stream is done reading.'); +} + +run().catch(console.error); +rs.resume(); // Drain the stream. +``` + +```mjs +import { finished } from 'node:stream/promises'; +import { createReadStream } from 'node:fs'; + +const rs = createReadStream('archive.tar'); + +async function run() { + await finished(rs); + console.log('Stream is done reading.'); +} + +run().catch(console.error); +rs.resume(); // Drain the stream. +``` + +The `finished` API provides [callback version][stream-finished]: ### Object mode @@ -2475,22 +2666,7 @@ Especially useful in error handling scenarios where a stream is destroyed prematurely (like an aborted HTTP request), and will not emit `'end'` or `'finish'`. -The `finished` API provides promise version: - -```js -const { finished } = require('node:stream/promises'); -const fs = require('node:fs'); - -const rs = fs.createReadStream('archive.tar'); - -async function run() { - await finished(rs); - console.log('Stream is done reading.'); -} - -run().catch(console.error); -rs.resume(); // Drain the stream. -``` +The `finished` API provides [promise version][stream-finished-promise]: `stream.finished()` leaves dangling event listeners (in particular `'error'`, `'end'`, `'finish'` and `'close'`) after `callback` has been @@ -2570,98 +2746,7 @@ pipeline( ); ``` -The `pipeline` API provides a promise version, which can also -receive an options argument as the last parameter with a -`signal` {AbortSignal} property. When the signal is aborted, -`destroy` will be called on the underlying pipeline, with an -`AbortError`. - -```js -const { pipeline } = require('node:stream/promises'); -const fs = require('node:fs'); -const zlib = require('node:zlib'); - -async function run() { - await pipeline( - fs.createReadStream('archive.tar'), - zlib.createGzip(), - fs.createWriteStream('archive.tar.gz'), - ); - console.log('Pipeline succeeded.'); -} - -run().catch(console.error); -``` - -To use an `AbortSignal`, pass it inside an options object, -as the last argument: - -```js -const { pipeline } = require('node:stream/promises'); -const fs = require('node:fs'); -const zlib = require('node:zlib'); - -async function run() { - const ac = new AbortController(); - const signal = ac.signal; - - setTimeout(() => ac.abort(), 1); - await pipeline( - fs.createReadStream('archive.tar'), - zlib.createGzip(), - fs.createWriteStream('archive.tar.gz'), - { signal }, - ); -} - -run().catch(console.error); // AbortError -``` - -The `pipeline` API also supports async generators: - -```js -const { pipeline } = require('node:stream/promises'); -const fs = require('node:fs'); - -async function run() { - await pipeline( - fs.createReadStream('lowercase.txt'), - async function* (source, { signal }) { - source.setEncoding('utf8'); // Work with strings rather than `Buffer`s. - for await (const chunk of source) { - yield await processChunk(chunk, { signal }); - } - }, - fs.createWriteStream('uppercase.txt'), - ); - console.log('Pipeline succeeded.'); -} - -run().catch(console.error); -``` - -Remember to handle the `signal` argument passed into the async generator. -Especially in the case where the async generator is the source for the -pipeline (i.e. first argument) or the pipeline will never complete. - -```js -const { pipeline } = require('node:stream/promises'); -const fs = require('node:fs'); - -async function run() { - await pipeline( - async function* ({ signal }) { - await someLongRunningfn({ signal }); - yield 'asd'; - }, - fs.createWriteStream('uppercase.txt'), - ); - console.log('Pipeline succeeded.'); -} - -run().catch(console.error); -``` - +The `pipeline` API provides a [promise version][stream-pipeline-promise]. `stream.pipeline()` will call `stream.destroy(err)` on all streams except: * `Readable` streams which have emitted `'end'` or `'close'`. @@ -4594,7 +4679,11 @@ contain multi-byte characters. [stream-_write]: #writable_writechunk-encoding-callback [stream-_writev]: #writable_writevchunks-callback [stream-end]: #writableendchunk-encoding-callback +[stream-finished]: #streamfinishedstream-options-callback +[stream-finished-promise]: #streamfinishedstream-options [stream-pause]: #readablepause +[stream-pipeline]: #streampipelinesource-transforms-destination-callback +[stream-pipeline-promise]: #streampipelinesource-transforms-destination-options [stream-push]: #readablepushchunk-encoding [stream-read]: #readablereadsize [stream-resume]: #readableresume From 5559f0d2f1e4a3683946afb4531fcbce1fd4eaf2 Mon Sep 17 00:00:00 2001 From: Marco Ippolito Date: Tue, 13 Dec 2022 17:19:55 +0100 Subject: [PATCH 4/8] doc: typo removed > --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index de6bfba8ae46b8..35e19930726e4e 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -72,7 +72,7 @@ added: v15.0.0 * Returns: {Promise|AsyncIterable} * `...transforms` {Stream|Function} * `source` {AsyncIterable} - * Returns: {Promise|AsyncIterable>} + * Returns: {Promise|AsyncIterable} * `destination` {Stream|Function} * `source` {AsyncIterable} * Returns: {Promise|AsyncIterable} From b545d80aea57ef7a573c3947f52e2c2960081404 Mon Sep 17 00:00:00 2001 From: Marco Ippolito Date: Thu, 15 Dec 2022 09:45:22 +0100 Subject: [PATCH 5/8] doc: top level await and typo --- doc/api/stream.md | 80 ++++++++++++++++++++++++----------------------- 1 file changed, 41 insertions(+), 39 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 35e19930726e4e..1700e3228f0475 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -103,23 +103,17 @@ import { pipeline } from 'node:stream/promises'; import { createReadStream, createWriteStream } from 'node:fs'; import { createGzip } from 'node:zlib'; -async function run() { - await pipeline( - createReadStream('archive.tar'), - createGzip(), - createWriteStream('archive.tar.gz'), - ); - console.log('Pipeline succeeded.'); -} - -run().catch(console.error); +await pipeline( + createReadStream('archive.tar'), + createGzip(), + createWriteStream('archive.tar.gz'), +); +console.log('Pipeline succeeded.'); ``` -To use an `AbortSignal`, pass it inside an options object, -as the last argument. -When the signal is aborted, -`destroy` will be called on the underlying pipeline, with an -`AbortError`. +To use an `AbortSignal`, pass it inside an options object, as the last argument. +When the signal is aborted, `destroy` will be called on the underlying pipeline, +with an `AbortError`. ```cjs const { pipeline } = require('node:stream/promises'); @@ -147,20 +141,19 @@ import { pipeline } from 'node:stream/promises'; import { createReadStream, createWriteStream } from 'node:fs'; import { createGzip } from 'node:zlib'; -async function run() { - const ac = new AbortController(); - const signal = ac.signal; - - setTimeout(() => ac.abort(), 1); +const ac = new AbortController(); +const { signal } = ac; +setImmediate(() => ac.abort()); +try { await pipeline( createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), { signal }, ); +} catch (err) { + console.error(err); // AbortError } - -run().catch(console.error); // AbortError ``` The `pipeline` API also supports async generators: @@ -190,28 +183,24 @@ run().catch(console.error); import { pipeline } from 'node:stream/promises'; import { createReadStream, createWriteStream } from 'node:fs'; -async function run() { - await pipeline( - createReadStream('lowercase.txt'), - async function* (source, { signal }) { - source.setEncoding('utf8'); // Work with strings rather than `Buffer`s. - for await (const chunk of source) { - yield await processChunk(chunk, { signal }); - } - }, - createWriteStream('uppercase.txt'), - ); - console.log('Pipeline succeeded.'); -} - -run().catch(console.error); +await pipeline( + createReadStream('lowercase.txt'), + async function* (source, { signal }) { + source.setEncoding('utf8'); // Work with strings rather than `Buffer`s. + for await (const chunk of source) { + yield await processChunk(chunk, { signal }); + } + }, + createWriteStream('uppercase.txt'), +); +console.log('Pipeline succeeded.'); ``` Remember to handle the `signal` argument passed into the async generator. Especially in the case where the async generator is the source for the pipeline (i.e. first argument) or the pipeline will never complete. -```js +```cjs const { pipeline } = require('node:stream/promises'); const fs = require('node:fs'); @@ -229,6 +218,19 @@ async function run() { run().catch(console.error); ``` +```mjs +import { pipeline } from 'node:stream/promises'; +import fs from 'node:fs'; +await pipeline( + async function* ({ signal }) { + await someLongRunningfn({ signal }); + yield 'asd'; + }, + fs.createWriteStream('uppercase.txt'), +); +console.log('Pipeline succeeded.'); +``` + The `pipeline` API provides [callback version][stream-pipeline]: ### `stream.finished(stream[, options])` @@ -243,7 +245,7 @@ added: v15.0.0 * `readable` {boolean|undefined} * `writable` {boolean|undefined} * `signal`: {AbortSignal|undefined} -* Returns: {Promise} Fulfills when the stream is no +* Returns: {Promise} Fulfills when the stream is no longer readable or writable. ```cjs From 083825ec6d29f3a3f6d1420f26570bc4b96c5fa7 Mon Sep 17 00:00:00 2001 From: Marco Ippolito Date: Thu, 15 Dec 2022 10:23:42 +0100 Subject: [PATCH 6/8] doc: fix setImmediate Co-authored-by: Antoine du Hamel --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 1700e3228f0475..f6dfd3df360e60 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -124,7 +124,7 @@ async function run() { const ac = new AbortController(); const signal = ac.signal; - setTimeout(() => ac.abort(), 1); + setImmediate(() => ac.abort()); await pipeline( fs.createReadStream('archive.tar'), zlib.createGzip(), From 50bf95941e0e58773c31c51e2ee9c2c0d6429b55 Mon Sep 17 00:00:00 2001 From: Marco Ippolito Date: Thu, 15 Dec 2022 14:03:05 +0100 Subject: [PATCH 7/8] fix: period instead of : Co-authored-by: Antoine du Hamel --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index f6dfd3df360e60..6d2e543d240a1a 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2668,7 +2668,7 @@ Especially useful in error handling scenarios where a stream is destroyed prematurely (like an aborted HTTP request), and will not emit `'end'` or `'finish'`. -The `finished` API provides [promise version][stream-finished-promise]: +The `finished` API provides [promise version][stream-finished-promise]. `stream.finished()` leaves dangling event listeners (in particular `'error'`, `'end'`, `'finish'` and `'close'`) after `callback` has been From c93011614c33a643e864e0e6ccade49e90ef7b4e Mon Sep 17 00:00:00 2001 From: Marco Ippolito Date: Thu, 15 Dec 2022 14:03:25 +0100 Subject: [PATCH 8/8] doc: newline Co-authored-by: Antoine du Hamel --- doc/api/stream.md | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/api/stream.md b/doc/api/stream.md index 6d2e543d240a1a..803dfe2a7ce748 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2749,6 +2749,7 @@ pipeline( ``` The `pipeline` API provides a [promise version][stream-pipeline-promise]. + `stream.pipeline()` will call `stream.destroy(err)` on all streams except: * `Readable` streams which have emitted `'end'` or `'close'`.