Skip to content

Commit

Permalink
stream: add pipeline and finished
Browse files Browse the repository at this point in the history
PR-URL: #19828
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
mafintosh committed Apr 16, 2018
1 parent 5cc948b commit f64bebf
Show file tree
Hide file tree
Showing 9 changed files with 917 additions and 0 deletions.
6 changes: 6 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -1431,6 +1431,12 @@ An attempt was made to call [`stream.pipe()`][] on a [`Writable`][] stream.

An attempt was made to call [`stream.write()`][] with a `null` chunk.

<a id="ERR_STREAM_PREMATURE_CLOSE"></a>
### ERR_STREAM_PREMATURE_CLOSE

An error returned by `stream.finished()` and `stream.pipeline()`, when a stream
or a pipeline ends non gracefully with no explicit error.

<a id="ERR_STREAM_PUSH_AFTER_EOF"></a>
### ERR_STREAM_PUSH_AFTER_EOF

Expand Down
106 changes: 106 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ There are four fundamental stream types within Node.js:
* [Transform][] - Duplex streams that can modify or transform the data as it
is written and read (for example [`zlib.createDeflate()`][]).

Additionally this module includes the utility functions [pipeline][] and
[finished][].

### Object Mode

All streams created by Node.js APIs operate exclusively on strings and `Buffer`
Expand Down Expand Up @@ -1287,6 +1290,107 @@ implementors should not override this method, but instead implement
[`readable._destroy()`][readable-_destroy].
The default implementation of `_destroy()` for `Transform` also emit `'close'`.

### stream.finished(stream, callback)
<!-- YAML
added: REPLACEME
-->

* `stream` {Stream} A readable and/or writable stream.
* `callback` {Function} A callback function that takes an optional error
argument.

A function to get notified when a stream is no longer readable, writable
or has experienced an error or a premature close event.

```js
const { finished } = require('stream');

const rs = fs.createReadStream('archive.tar');

finished(rs, (err) => {
if (err) {
console.error('Stream failed', err);
} else {
console.log('Stream is done reading');
}
});

rs.resume(); // drain the stream
```

Especially useful in error handling scenarios where a stream is destroyed
prematurely (like an aborted HTTP request), and will not emit `'end'`
or `'finish'`.

The `finished` API is promisify'able as well;

```js
const finished = util.promisify(stream.finished);

const rs = fs.createReadStream('archive.tar');

async function run() {
await finished(rs);
console.log('Stream is done reading');
}

run().catch(console.error);
rs.resume(); // drain the stream
```

### stream.pipeline(...streams[, callback])
<!-- YAML
added: REPLACEME
-->

* `...streams` {Stream} Two or more streams to pipe between.
* `callback` {Function} A callback function that takes an optional error
argument.

A module method to pipe between streams forwarding errors and properly cleaning
up and provide a callback when the pipeline is complete.

```js
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.

// A pipeline to gzip a potentially huge tar file efficiently:

pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
(err) => {
if (err) {
console.error('Pipeline failed', err);
} else {
console.log('Pipeline succeeded');
}
}
);
```

The `pipeline` API is promisify'able as well:

```js
const pipeline = util.promisify(stream.pipeline);

async function run() {
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz')
);
console.log('Pipeline succeeded');
}

run().catch(console.error);
```

## API for Stream Implementers

<!--type=misc-->
Expand Down Expand Up @@ -2397,6 +2501,8 @@ contain multi-byte characters.
[http-incoming-message]: http.html#http_class_http_incomingmessage
[zlib]: zlib.html
[hwm-gotcha]: #stream_highwatermark_discrepancy_after_calling_readable_setencoding
[pipeline]: #stream_stream_pipeline_streams_callback
[finished]: #stream_stream_finished_stream_callback
[stream-_flush]: #stream_transform_flush_callback
[stream-_read]: #stream_readable_read_size_1
[stream-_transform]: #stream_transform_transform_chunk_encoding_callback
Expand Down
1 change: 1 addition & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,7 @@ E('ERR_STDOUT_CLOSE', 'process.stdout cannot be closed', Error);
E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error);
E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error);
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error);
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);
E('ERR_STREAM_UNSHIFT_AFTER_END_EVENT',
'stream.unshift() after end event', Error);
Expand Down
96 changes: 96 additions & 0 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Ported from https://github.com/mafintosh/end-of-stream with
// permission from the author, Mathias Buus (@mafintosh).

'use strict';

const {
ERR_STREAM_PREMATURE_CLOSE
} = require('internal/errors').codes;

function noop() {}

function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}

function once(callback) {
let called = false;
return function(err) {
if (called) return;
called = true;
callback.call(this, err);
};
}

function eos(stream, opts, callback) {
if (typeof opts === 'function') return eos(stream, null, opts);
if (!opts) opts = {};

callback = once(callback || noop);

const ws = stream._writableState;
const rs = stream._readableState;
let readable = opts.readable || (opts.readable !== false && stream.readable);
let writable = opts.writable || (opts.writable !== false && stream.writable);

const onlegacyfinish = () => {
if (!stream.writable) onfinish();
};

const onfinish = () => {
writable = false;
if (!readable) callback.call(stream);
};

const onend = () => {
readable = false;
if (!writable) callback.call(stream);
};

const onerror = (err) => {
callback.call(stream, err);
};

const onclose = () => {
if (readable && !(rs && rs.ended)) {
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
}
if (writable && !(ws && ws.ended)) {
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
}
};

const onrequest = () => {
stream.req.on('finish', onfinish);
};

if (isRequest(stream)) {
stream.on('complete', onfinish);
stream.on('abort', onclose);
if (stream.req) onrequest();
else stream.on('request', onrequest);
} else if (writable && !ws) { // legacy streams
stream.on('end', onlegacyfinish);
stream.on('close', onlegacyfinish);
}

stream.on('end', onend);
stream.on('finish', onfinish);
if (opts.error !== false) stream.on('error', onerror);
stream.on('close', onclose);

return function() {
stream.removeListener('complete', onfinish);
stream.removeListener('abort', onclose);
stream.removeListener('request', onrequest);
if (stream.req) stream.req.removeListener('finish', onfinish);
stream.removeListener('end', onlegacyfinish);
stream.removeListener('close', onlegacyfinish);
stream.removeListener('finish', onfinish);
stream.removeListener('end', onend);
stream.removeListener('error', onerror);
stream.removeListener('close', onclose);
};
}

module.exports = eos;
95 changes: 95 additions & 0 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Ported from https://github.com/mafintosh/pump with
// permission from the author, Mathias Buus (@mafintosh).

'use strict';

const eos = require('internal/streams/end-of-stream');

const {
ERR_MISSING_ARGS,
ERR_STREAM_DESTROYED
} = require('internal/errors').codes;

function once(callback) {
let called = false;
return function(err) {
if (called) return;
called = true;
callback(err);
};
}

function noop() {}

function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}

function destroyer(stream, reading, writing, callback) {
callback = once(callback);

let closed = false;
stream.on('close', () => {
closed = true;
});

eos(stream, { readable: reading, writable: writing }, (err) => {
if (err) return callback(err);
closed = true;
callback();
});

let destroyed = false;
return (err) => {
if (closed) return;
if (destroyed) return;
destroyed = true;

// request.destroy just do .end - .abort is what we want
if (isRequest(stream)) return stream.abort();
if (typeof stream.destroy === 'function') return stream.destroy();

callback(err || new ERR_STREAM_DESTROYED('pipe'));
};
}

function call(fn) {
fn();
}

function pipe(from, to) {
return from.pipe(to);
}

function popCallback(streams) {
if (!streams.length) return noop;
if (typeof streams[streams.length - 1] !== 'function') return noop;
return streams.pop();
}

function pipeline(...streams) {
const callback = popCallback(streams);

if (Array.isArray(streams[0])) streams = streams[0];

if (streams.length < 2) {
throw new ERR_MISSING_ARGS('streams');
}

let error;
const destroys = streams.map(function(stream, i) {
const reading = i < streams.length - 1;
const writing = i > 0;
return destroyer(stream, reading, writing, function(err) {
if (!error) error = err;
if (err) destroys.forEach(call);
if (reading) return;
destroys.forEach(call);
callback(error);
});
});

return streams.reduce(pipe);
}

module.exports = pipeline;
5 changes: 5 additions & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
'use strict';

const { Buffer } = require('buffer');
const pipeline = require('internal/streams/pipeline');
const eos = require('internal/streams/end-of-stream');

// Note: export Stream before Readable/Writable/Duplex/...
// to avoid a cross-reference(require) issues
Expand All @@ -33,6 +35,9 @@ Stream.Duplex = require('_stream_duplex');
Stream.Transform = require('_stream_transform');
Stream.PassThrough = require('_stream_passthrough');

Stream.pipeline = pipeline;
Stream.finished = eos;

// Backwards-compat with node 0.4.x
Stream.Stream = Stream;

Expand Down
2 changes: 2 additions & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@
'lib/internal/streams/legacy.js',
'lib/internal/streams/destroy.js',
'lib/internal/streams/state.js',
'lib/internal/streams/pipeline.js',
'lib/internal/streams/end-of-stream.js',
'lib/internal/wrap_js_stream.js',
'deps/v8/tools/splaytree.js',
'deps/v8/tools/codemap.js',
Expand Down
Loading

0 comments on commit f64bebf

Please sign in to comment.