Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add pipeline() and finished() promises version #33991

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ Additionally, this module includes the utility functions
[`stream.pipeline()`][], [`stream.finished()`][] and
[`stream.Readable.from()`][].

### Streams Promises API

The `stream/promises` API provides an alternative set of asynchronous utility
functions for streams that return `Promise` objects rather than using
callbacks. The API is accessible via `require('stream/promises')`
or `require('stream').promises`.

### Object mode

All streams created by Node.js APIs operate exclusively on strings and `Buffer`
Expand Down Expand Up @@ -1563,10 +1570,10 @@ Especially useful in error handling scenarios where a stream is destroyed
prematurely (like an aborted HTTP request), and will not emit `'end'`
or `'finish'`.

The `finished` API is promisify-able as well;
The `finished` API provides promise version:

```js
const finished = util.promisify(stream.finished);
const { finished } = require('stream/promises');

const rs = fs.createReadStream('archive.tar');

Expand Down Expand Up @@ -1648,10 +1655,10 @@ pipeline(
);
```

The `pipeline` API is promisify-able as well:
The `pipeline` API provides promise version:

```js
const pipeline = util.promisify(stream.pipeline);
const { pipeline } = require('stream/promises');

async function run() {
await pipeline(
Expand All @@ -1668,7 +1675,7 @@ run().catch(console.error);
The `pipeline` API also supports async generators:

```js
const pipeline = util.promisify(stream.pipeline);
const { pipeline } = require('stream/promises');
const fs = require('fs');

async function run() {
Expand Down Expand Up @@ -2891,9 +2898,9 @@ handling of backpressure and errors. [`stream.pipeline()`][] abstracts away
the handling of backpressure and backpressure-related errors:

```js
const { pipeline } = require('stream');
const util = require('util');
const fs = require('fs');
const { pipeline } = require('stream');
const { pipeline: pipelinePromise } = require('stream/promises');
rickyes marked this conversation as resolved.
Show resolved Hide resolved

const writable = fs.createWriteStream('./file');

Expand All @@ -2907,7 +2914,6 @@ pipeline(iterator, writable, (err, value) => {
});

// Promise Pattern
const pipelinePromise = util.promisify(pipeline);
pipelinePromise(iterator, writable)
.then((value) => {
console.log(value, 'value returned');
Expand Down
36 changes: 36 additions & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,21 @@

'use strict';

const {
ObjectDefineProperty,
} = primordials;

const {
promisify: { custom: customPromisify },
} = require('internal/util');

const pipeline = require('internal/streams/pipeline');
rickyes marked this conversation as resolved.
Show resolved Hide resolved
const eos = require('internal/streams/end-of-stream');
const internalBuffer = require('internal/buffer');

// Lazy loaded
let promises = null;

// Note: export Stream before Readable/Writable/Duplex/...
// to avoid a cross-reference(require) issues
const Stream = module.exports = require('internal/streams/legacy');
Expand All @@ -38,6 +49,31 @@ Stream.PassThrough = require('_stream_passthrough');
Stream.pipeline = pipeline;
Stream.finished = eos;

ObjectDefineProperty(Stream, 'promises', {
configurable: true,
enumerable: true,
get() {
if (promises === null) promises = require('stream/promises');
return promises;
}
});
ronag marked this conversation as resolved.
Show resolved Hide resolved

ObjectDefineProperty(pipeline, customPromisify, {
enumerable: true,
get() {
if (promises === null) promises = require('stream/promises');
return promises.pipeline;
}
});

ObjectDefineProperty(eos, customPromisify, {
enumerable: true,
get() {
if (promises === null) promises = require('stream/promises');
return promises.finished;
}
});

// Backwards-compat with node 0.4.x
Stream.Stream = Stream;

Expand Down
39 changes: 39 additions & 0 deletions lib/stream/promises.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
'use strict';

const {
Promise,
} = primordials;

let pl;
let eos;

function pipeline(...streams) {
if (!pl) pl = require('internal/streams/pipeline');
return new Promise((resolve, reject) => {
pl(...streams, (err, value) => {
if (err) {
reject(err);
} else {
resolve(value);
}
});
});
}

function finished(stream, opts) {
if (!eos) eos = require('internal/streams/end-of-stream');
return new Promise((resolve, reject) => {
eos(stream, opts, (err) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}

module.exports = {
finished,
pipeline,
};
1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
'lib/readline.js',
'lib/repl.js',
'lib/stream.js',
'lib/stream/promises.js',
'lib/_stream_readable.js',
'lib/_stream_writable.js',
'lib/_stream_duplex.js',
Expand Down
103 changes: 103 additions & 0 deletions test/parallel/test-stream-promises.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
'use strict';

const common = require('../common');
const stream = require('stream');
const {
Readable,
Writable,
promises,
} = stream;
const {
finished,
pipeline,
} = require('stream/promises');
const fs = require('fs');
const assert = require('assert');
const { promisify } = require('util');

assert.strictEqual(promises.pipeline, pipeline);
assert.strictEqual(promises.finished, finished);
assert.strictEqual(pipeline, promisify(stream.pipeline));
assert.strictEqual(finished, promisify(stream.finished));

// pipeline success
{
let finished = false;
const processed = [];
const expected = [
Buffer.from('a'),
Buffer.from('b'),
Buffer.from('c')
];

const read = new Readable({
read() { }
});

const write = new Writable({
write(data, enc, cb) {
processed.push(data);
cb();
}
});

write.on('finish', () => {
finished = true;
});

for (let i = 0; i < expected.length; i++) {
read.push(expected[i]);
}
read.push(null);

pipeline(read, write).then(common.mustCall((value) => {
assert.ok(finished);
assert.deepStrictEqual(processed, expected);
}));
}

// pipeline error
{
const read = new Readable({
read() { }
});

const write = new Writable({
write(data, enc, cb) {
cb();
}
});

read.push('data');
setImmediate(() => read.destroy());

pipeline(read, write).catch(common.mustCall((err) => {
assert.ok(err, 'should have an error');
}));
}

// finished success
{
async function run() {
const rs = fs.createReadStream(__filename);

let ended = false;
rs.resume();
rs.on('end', () => {
ended = true;
});
await finished(rs);
assert(ended);
}

run().then(common.mustCall());
}

// finished error
{
const rs = fs.createReadStream('file-does-not-exist');

assert.rejects(finished(rs), {
code: 'ENOENT'
}).then(common.mustCall());
}