@@ -1886,16 +1886,14 @@ const { pipeline } = require('stream/promises');
18861886
18871887async function run () {
18881888 const ac = new AbortController ();
1889- const options = {
1890- signal: ac .signal ,
1891- };
1889+ const signal = ac .signal ;
18921890
18931891 setTimeout (() => ac .abort (), 1 );
18941892 await pipeline (
18951893 fs .createReadStream (' archive.tar' ),
18961894 zlib .createGzip (),
18971895 fs .createWriteStream (' archive.tar.gz' ),
1898- options ,
1896+ { signal } ,
18991897 );
19001898}
19011899
@@ -1911,10 +1909,10 @@ const fs = require('fs');
19111909async function run () {
19121910 await pipeline (
19131911 fs .createReadStream (' lowercase.txt' ),
1914- async function * (source ) {
1912+ async function * (source , signal ) {
19151913 source .setEncoding (' utf8' ); // Work with strings rather than `Buffer`s.
19161914 for await (const chunk of source ) {
1917- yield chunk . toUpperCase ( );
1915+ yield await processChunk (chunk, { signal } );
19181916 }
19191917 },
19201918 fs .createWriteStream (' uppercase.txt' )
@@ -1925,6 +1923,28 @@ async function run() {
19251923run ().catch (console .error );
19261924```
19271925
1926+ Remember to handle the ` signal ` argument passed into the async generator.
1927+ Especially in the case where the async generator is the source for the
1928+ pipeline (i.e. first argument) or the pipeline will never complete.
1929+
1930+ ``` js
1931+ const { pipeline } = require (' stream/promises' );
1932+ const fs = require (' fs' );
1933+
1934+ async function run () {
1935+ await pipeline (
1936+ async function * (signal ) {
1937+ await someLongRunningfn ({ signal });
1938+ yield ' asd' ;
1939+ },
1940+ fs .createWriteStream (' uppercase.txt' )
1941+ );
1942+ console .log (' Pipeline succeeded.' );
1943+ }
1944+
1945+ run ().catch (console .error );
1946+ ```
1947+
19281948` stream.pipeline() ` will call ` stream.destroy(err) ` on all streams except:
19291949* ` Readable ` streams which have emitted ` 'end' ` or ` 'close' ` .
19301950* ` Writable ` streams which have emitted ` 'finish' ` or ` 'close' ` .
@@ -3342,13 +3362,20 @@ the `Readable.from()` utility method:
33423362``` js
33433363const { Readable } = require (' stream' );
33443364
3365+ const ac = new AbortController ();
3366+ const signal = ac .signal ;
3367+
33453368async function * generate () {
33463369 yield ' a' ;
3370+ await someLongRunningFn ({ signal });
33473371 yield ' b' ;
33483372 yield ' c' ;
33493373}
33503374
33513375const readable = Readable .from (generate ());
3376+ readable .on (' close' , () => {
3377+ ac .abort ();
3378+ });
33523379
33533380readable .on (' data' , (chunk ) => {
33543381 console .log (chunk);
@@ -3368,21 +3395,31 @@ const { pipeline: pipelinePromise } = require('stream/promises');
33683395
33693396const writable = fs .createWriteStream (' ./file' );
33703397
3398+ const ac = new AbortController ();
3399+ const signal = ac .signal ;
3400+
3401+ const iterator = createIterator ({ signal });
3402+
33713403// Callback Pattern
33723404pipeline (iterator, writable, (err , value ) => {
33733405 if (err) {
33743406 console .error (err);
33753407 } else {
33763408 console .log (value, ' value returned' );
33773409 }
3410+ }).on (' close' , () => {
3411+ ac .abort ();
33783412});
33793413
33803414// Promise Pattern
33813415pipelinePromise (iterator, writable)
33823416 .then ((value ) => {
33833417 console .log (value, ' value returned' );
33843418 })
3385- .catch (console .error );
3419+ .catch ((err ) => {
3420+ console .error (err);
3421+ ac .abort ();
3422+ });
33863423```
33873424
33883425<!-- type=misc-->
0 commit comments