Skip to content

Commit

Permalink
stream: support passing generator functions into pipeline()
Browse files Browse the repository at this point in the history
Backport-PR-URL: #31975
PR-URL: #31223
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
ronag authored and codebytere committed Mar 1, 2020
1 parent 4d05508 commit 8ad64b8
Show file tree
Hide file tree
Showing 4 changed files with 633 additions and 31 deletions.
52 changes: 43 additions & 9 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1555,17 +1555,30 @@ const cleanup = finished(rs, (err) => {
});
```

### `stream.pipeline(...streams, callback)`
### `stream.pipeline(source, ...transforms, destination, callback)`
<!-- YAML
added: v10.0.0
-->

* `...streams` {Stream} Two or more streams to pipe between.
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/31223
description: Add support for async generators.
-->

* `source` {Stream|Iterable|AsyncIterable|Function}
* Returns: {Iterable|AsyncIterable}
* `...transforms` {Stream|Function}
* `source` {AsyncIterable}
* Returns: {AsyncIterable}
* `destination` {Stream|Function}
* `source` {AsyncIterable}
* Returns: {AsyncIterable|Promise}
* `callback` {Function} Called when the pipeline is fully done.
* `err` {Error}
* `val` Resolved value of `Promise` returned by `destination`.
* Returns: {Stream}

A module method to pipe between streams forwarding errors and properly cleaning
up and provide a callback when the pipeline is complete.
A module method to pipe between streams and generators forwarding errors and
properly cleaning up and provide a callback when the pipeline is complete.

```js
const { pipeline } = require('stream');
Expand Down Expand Up @@ -1608,6 +1621,28 @@ async function run() {
run().catch(console.error);
```

The `pipeline` API also supports async generators:

```js
const pipeline = util.promisify(stream.pipeline);
const fs = require('fs').promises;

async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
},
fs.createWriteStream('uppercase.txt')
);
console.log('Pipeline succeeded.');
}

run().catch(console.error);
```

`stream.pipeline()` will call `stream.destroy(err)` on all streams except:
* `Readable` streams which have emitted `'end'` or `'close'`.
* `Writable` streams which have emitted `'finish'` or `'close'`.
Expand Down Expand Up @@ -2707,8 +2742,7 @@ const pipeline = util.promisify(stream.pipeline);
const writable = fs.createWriteStream('./file');

(async function() {
const readable = Readable.from(iterable);
await pipeline(readable, writable);
await pipeline(iterable, writable);
})();
```

Expand Down Expand Up @@ -2843,7 +2877,7 @@ contain multi-byte characters.
[`stream.cork()`]: #stream_writable_cork
[`stream.finished()`]: #stream_stream_finished_stream_options_callback
[`stream.pipe()`]: #stream_readable_pipe_destination_options
[`stream.pipeline()`]: #stream_stream_pipeline_streams_callback
[`stream.pipeline()`]: #stream_stream_pipeline_source_transforms_destination_callback
[`stream.uncork()`]: #stream_writable_uncork
[`stream.unpipe()`]: #stream_readable_unpipe_destination
[`stream.wrap()`]: #stream_readable_wrap_stream
Expand Down
213 changes: 191 additions & 22 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,37 @@

const {
ArrayIsArray,
SymbolAsyncIterator,
SymbolIterator
} = primordials;

let eos;

const { once } = require('internal/util');
const {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_RETURN_VALUE,
ERR_INVALID_CALLBACK,
ERR_MISSING_ARGS,
ERR_STREAM_DESTROYED
} = require('internal/errors').codes;

let EE;
let PassThrough;
let createReadableStreamAsyncIterator;

function isRequest(stream) {
return stream && stream.setHeader && typeof stream.abort === 'function';
}

function destroyStream(stream, err) {
// request.destroy just do .end - .abort is what we want
if (isRequest(stream)) return stream.abort();
if (isRequest(stream.req)) return stream.req.abort();
if (typeof stream.destroy === 'function') return stream.destroy(err);
if (typeof stream.close === 'function') return stream.close();
}

function destroyer(stream, reading, writing, callback) {
callback = once(callback);

Expand All @@ -41,19 +57,12 @@ function destroyer(stream, reading, writing, callback) {
if (destroyed) return;
destroyed = true;

// request.destroy just do .end - .abort is what we want
if (isRequest(stream)) return stream.abort();
if (isRequest(stream.req)) return stream.req.abort();
if (typeof stream.destroy === 'function') return stream.destroy(err);
destroyStream(stream, err);

callback(err || new ERR_STREAM_DESTROYED('pipe'));
};
}

function pipe(from, to) {
return from.pipe(to);
}

function popCallback(streams) {
// Streams should never be an empty array. It should always contain at least
// a single stream. Therefore optimize for the average case instead of
Expand All @@ -63,8 +72,89 @@ function popCallback(streams) {
return streams.pop();
}

function isPromise(obj) {
return !!(obj && typeof obj.then === 'function');
}

function isReadable(obj) {
return !!(obj && typeof obj.pipe === 'function');
}

function isWritable(obj) {
return !!(obj && typeof obj.write === 'function');
}

function isStream(obj) {
return isReadable(obj) || isWritable(obj);
}

function isIterable(obj, isAsync) {
if (!obj) return false;
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
if (isAsync === false) return typeof obj[SymbolIterator] === 'function';
return typeof obj[SymbolAsyncIterator] === 'function' ||
typeof obj[SymbolIterator] === 'function';
}

function makeAsyncIterable(val) {
if (isIterable(val)) {
return val;
} else if (isReadable(val)) {
// Legacy streams are not Iterable.
return _fromReadable(val);
} else {
throw new ERR_INVALID_ARG_TYPE(
'val', ['Readable', 'Iterable', 'AsyncIterable'], val);
}
}

async function* _fromReadable(val) {
if (!createReadableStreamAsyncIterator) {
createReadableStreamAsyncIterator =
require('internal/streams/async_iterator');
}

try {
if (typeof val.read !== 'function') {
// createReadableStreamAsyncIterator does not support
// v1 streams. Convert it into a v2 stream.

if (!PassThrough) {
PassThrough = require('_stream_passthrough');
}

const pt = new PassThrough();
val
.on('error', (err) => pt.destroy(err))
.pipe(pt);
yield* createReadableStreamAsyncIterator(pt);
} else {
yield* createReadableStreamAsyncIterator(val);
}
} finally {
destroyStream(val);
}
}

async function pump(iterable, writable, finish) {
if (!EE) {
EE = require('events');
}
try {
for await (const chunk of iterable) {
if (!writable.write(chunk)) {
if (writable.destroyed) return;
await EE.once(writable, 'drain');
}
}
writable.end();
} catch (err) {
finish(err);
}
}

function pipeline(...streams) {
const callback = popCallback(streams);
const callback = once(popCallback(streams));

if (ArrayIsArray(streams[0])) streams = streams[0];

Expand All @@ -73,25 +163,104 @@ function pipeline(...streams) {
}

let error;
const destroys = streams.map(function(stream, i) {
const destroys = [];

function finish(err, val, final) {
if (!error && err) {
error = err;
}

if (error || final) {
for (const destroy of destroys) {
destroy(error);
}
}

if (final) {
callback(error, val);
}
}

function wrap(stream, reading, writing, final) {
destroys.push(destroyer(stream, reading, writing, (err) => {
finish(err, null, final);
}));
}

let ret;
for (let i = 0; i < streams.length; i++) {
const stream = streams[i];
const reading = i < streams.length - 1;
const writing = i > 0;
return destroyer(stream, reading, writing, function(err) {
if (!error) error = err;
if (err) {
for (const destroy of destroys) {
destroy(err);

if (isStream(stream)) {
wrap(stream, reading, writing, !reading);
}

if (i === 0) {
if (typeof stream === 'function') {
ret = stream();
if (!isIterable(ret)) {
throw new ERR_INVALID_RETURN_VALUE(
'Iterable, AsyncIterable or Stream', 'source', ret);
}
} else if (isIterable(stream) || isReadable(stream)) {
ret = stream;
} else {
throw new ERR_INVALID_ARG_TYPE(
'source', ['Stream', 'Iterable', 'AsyncIterable', 'Function'],
stream);
}
if (reading) return;
for (const destroy of destroys) {
destroy();
} else if (typeof stream === 'function') {
ret = makeAsyncIterable(ret);
ret = stream(ret);

if (reading) {
if (!isIterable(ret, true)) {
throw new ERR_INVALID_RETURN_VALUE(
'AsyncIterable', `transform[${i - 1}]`, ret);
}
} else {
if (!PassThrough) {
PassThrough = require('_stream_passthrough');
}

const pt = new PassThrough();
if (isPromise(ret)) {
ret
.then((val) => {
pt.end(val);
finish(null, val, true);
})
.catch((err) => {
finish(err, null, true);
});
} else if (isIterable(ret, true)) {
pump(ret, pt, finish);
} else {
throw new ERR_INVALID_RETURN_VALUE(
'AsyncIterable or Promise', 'destination', ret);
}

ret = pt;
wrap(ret, true, false, true);
}
callback(error);
});
});
} else if (isStream(stream)) {
if (isReadable(ret)) {
ret.pipe(stream);
} else {
ret = makeAsyncIterable(ret);
pump(ret, stream, finish);
}
ret = stream;
} else {
const name = reading ? `transform[${i - 1}]` : 'destination';
throw new ERR_INVALID_ARG_TYPE(
name, ['Stream', 'Function'], ret);
}
}

return streams.reduce(pipe);
return ret;
}

module.exports = pipeline;
Loading

0 comments on commit 8ad64b8

Please sign in to comment.