Skip to content

Commit

Permalink
stream: add promises version to utility functions
Browse files Browse the repository at this point in the history
PR-URL: #33991
Fixes: #33582
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Denys Otrishko <shishugi@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Trivikram Kamat <trivikr.dev@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
  • Loading branch information
rickyes authored and ronag committed Jul 9, 2020
1 parent 6ae1b9c commit 527e214
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 8 deletions.
22 changes: 14 additions & 8 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ Additionally, this module includes the utility functions
[`stream.pipeline()`][], [`stream.finished()`][] and
[`stream.Readable.from()`][].

### Streams Promises API

The `stream/promises` API provides an alternative set of asynchronous utility
functions for streams that return `Promise` objects rather than using
callbacks. The API is accessible via `require('stream/promises')`
or `require('stream').promises`.

### Object mode

All streams created by Node.js APIs operate exclusively on strings and `Buffer`
Expand Down Expand Up @@ -1597,10 +1604,10 @@ Especially useful in error handling scenarios where a stream is destroyed
prematurely (like an aborted HTTP request), and will not emit `'end'`
or `'finish'`.

The `finished` API is promisify-able as well;
The `finished` API provides promise version:

```js
const finished = util.promisify(stream.finished);
const { finished } = require('stream/promises');

const rs = fs.createReadStream('archive.tar');

Expand Down Expand Up @@ -1684,10 +1691,10 @@ pipeline(
);
```

The `pipeline` API is promisify-able as well:
The `pipeline` API provides promise version:

```js
const pipeline = util.promisify(stream.pipeline);
const { pipeline } = require('stream/promises');

async function run() {
await pipeline(
Expand All @@ -1704,7 +1711,7 @@ run().catch(console.error);
The `pipeline` API also supports async generators:

```js
const pipeline = util.promisify(stream.pipeline);
const { pipeline } = require('stream/promises');
const fs = require('fs');

async function run() {
Expand Down Expand Up @@ -2927,9 +2934,9 @@ handling of backpressure and errors. [`stream.pipeline()`][] abstracts away
the handling of backpressure and backpressure-related errors:

```js
const { pipeline } = require('stream');
const util = require('util');
const fs = require('fs');
const { pipeline } = require('stream');
const { pipeline: pipelinePromise } = require('stream/promises');

const writable = fs.createWriteStream('./file');

Expand All @@ -2943,7 +2950,6 @@ pipeline(iterator, writable, (err, value) => {
});

// Promise Pattern
const pipelinePromise = util.promisify(pipeline);
pipelinePromise(iterator, writable)
.then((value) => {
console.log(value, 'value returned');
Expand Down
36 changes: 36 additions & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,21 @@

'use strict';

const {
ObjectDefineProperty,
} = primordials;

const {
promisify: { custom: customPromisify },
} = require('internal/util');

const pipeline = require('internal/streams/pipeline');
const eos = require('internal/streams/end-of-stream');
const internalBuffer = require('internal/buffer');

// Lazy loaded
let promises = null;

// Note: export Stream before Readable/Writable/Duplex/...
// to avoid a cross-reference(require) issues
const Stream = module.exports = require('internal/streams/legacy');
Expand All @@ -38,6 +49,31 @@ Stream.PassThrough = require('_stream_passthrough');
Stream.pipeline = pipeline;
Stream.finished = eos;

ObjectDefineProperty(Stream, 'promises', {
configurable: true,
enumerable: true,
get() {
if (promises === null) promises = require('stream/promises');
return promises;
}
});

ObjectDefineProperty(pipeline, customPromisify, {
enumerable: true,
get() {
if (promises === null) promises = require('stream/promises');
return promises.pipeline;
}
});

ObjectDefineProperty(eos, customPromisify, {
enumerable: true,
get() {
if (promises === null) promises = require('stream/promises');
return promises.finished;
}
});

// Backwards-compat with node 0.4.x
Stream.Stream = Stream;

Expand Down
39 changes: 39 additions & 0 deletions lib/stream/promises.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
'use strict';

const {
Promise,
} = primordials;

let pl;
let eos;

function pipeline(...streams) {
if (!pl) pl = require('internal/streams/pipeline');
return new Promise((resolve, reject) => {
pl(...streams, (err, value) => {

This comment has been minimized.

Copy link
@sam-araiza

sam-araiza Aug 10, 2022

This assumes top-level streams only and disregards subpipes. For example:

StreamAB = StreamA.pipe(StreamB)
await pipeline(Stream1, Stream2, StreamAB)

Promise would be resolved before completion of StreamB in StreamAB. Can potentially "flatten" the list of streams if it's possible to detect if one stream is being piped to another stream before resolving parent level stream.

if (err) {
reject(err);
} else {
resolve(value);
}
});
});
}

function finished(stream, opts) {
if (!eos) eos = require('internal/streams/end-of-stream');
return new Promise((resolve, reject) => {
eos(stream, opts, (err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}

module.exports = {
finished,
pipeline,
};
1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
'lib/readline.js',
'lib/repl.js',
'lib/stream.js',
'lib/stream/promises.js',
'lib/_stream_readable.js',
'lib/_stream_writable.js',
'lib/_stream_duplex.js',
Expand Down
103 changes: 103 additions & 0 deletions test/parallel/test-stream-promises.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
'use strict';

const common = require('../common');
const stream = require('stream');
const {
Readable,
Writable,
promises,
} = stream;
const {
finished,
pipeline,
} = require('stream/promises');
const fs = require('fs');
const assert = require('assert');
const { promisify } = require('util');

assert.strictEqual(promises.pipeline, pipeline);
assert.strictEqual(promises.finished, finished);
assert.strictEqual(pipeline, promisify(stream.pipeline));
assert.strictEqual(finished, promisify(stream.finished));

// pipeline success
{
let finished = false;
const processed = [];
const expected = [
Buffer.from('a'),
Buffer.from('b'),
Buffer.from('c')
];

const read = new Readable({
read() { }
});

const write = new Writable({
write(data, enc, cb) {
processed.push(data);
cb();
}
});

write.on('finish', () => {
finished = true;
});

for (let i = 0; i < expected.length; i++) {
read.push(expected[i]);
}
read.push(null);

pipeline(read, write).then(common.mustCall((value) => {
assert.ok(finished);
assert.deepStrictEqual(processed, expected);
}));
}

// pipeline error
{
const read = new Readable({
read() { }
});

const write = new Writable({
write(data, enc, cb) {
cb();
}
});

read.push('data');
setImmediate(() => read.destroy());

pipeline(read, write).catch(common.mustCall((err) => {
assert.ok(err, 'should have an error');
}));
}

// finished success
{
async function run() {
const rs = fs.createReadStream(__filename);

let ended = false;
rs.resume();
rs.on('end', () => {
ended = true;
});
await finished(rs);
assert(ended);
}

run().then(common.mustCall());
}

// finished error
{
const rs = fs.createReadStream('file-does-not-exist');

assert.rejects(finished(rs), {
code: 'ENOENT'
}).then(common.mustCall());
}

0 comments on commit 527e214

Please sign in to comment.