Skip to content

Commit

Permalink
stream: construct
Browse files Browse the repository at this point in the history
Provide a standardized way of asynchronously creating and
initializing resources before performing any work.

Refs: #29314

PR-URL: #29656
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Denys Otrishko <shishugi@gmail.com>
  • Loading branch information
ronag committed May 27, 2020
1 parent 9949a2e commit fb8cc72
Show file tree
Hide file tree
Showing 5 changed files with 544 additions and 20 deletions.
171 changes: 164 additions & 7 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,7 @@ added: v9.3.0

* {number}

Return the value of `highWaterMark` passed when constructing this
`Writable`.
Return the value of `highWaterMark` passed when creating this `Writable`.

##### `writable.writableLength`
<!-- YAML
Expand Down Expand Up @@ -1193,8 +1192,7 @@ added: v9.3.0

* {number}

Returns the value of `highWaterMark` passed when constructing this
`Readable`.
Returns the value of `highWaterMark` passed when creating this `Readable`.

##### `readable.readableLength`
<!-- YAML
Expand Down Expand Up @@ -1792,7 +1790,7 @@ expectations.
added: v1.2.0
-->

For many simple cases, it is possible to construct a stream without relying on
For many simple cases, it is possible to create a stream without relying on
inheritance. This can be accomplished by directly creating instances of the
`stream.Writable`, `stream.Readable`, `stream.Duplex` or `stream.Transform`
objects and passing appropriate methods as constructor options.
Expand All @@ -1801,8 +1799,14 @@ objects and passing appropriate methods as constructor options.
const { Writable } = require('stream');

const myWritable = new Writable({
construct(callback) {
// Initialize state and load resources...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
// Free resources...
}
});
```
Expand Down Expand Up @@ -1861,6 +1865,8 @@ changes:
[`stream._destroy()`][writable-_destroy] method.
* `final` {Function} Implementation for the
[`stream._final()`][stream-_final] method.
* `construct` {Function} Implementation for the
[`stream._construct()`][writable-_construct] method.
* `autoDestroy` {boolean} Whether this stream should automatically call
`.destroy()` on itself after ending. **Default:** `true`.

Expand Down Expand Up @@ -1906,6 +1912,56 @@ const myWritable = new Writable({
});
```

#### `writable._construct(callback)`
<!-- YAML
added: REPLACEME
-->

* `callback` {Function} Call this function (optionally with an error
argument) when the stream has finished initializing.

The `_construct()` method MUST NOT be called directly. It may be implemented
by child classes, and if so, will be called by the internal `Writable`
class methods only.

This optional function will be called in a tick after the stream constructor
has returned, delaying any `_write`, `_final` and `_destroy` calls until
`callback` is called. This is useful to initialize state or asynchronously
initialize resources before the stream can be used.

```js
const { Writable } = require('stream');
const fs = require('fs');

class WriteStream extends Writable {
constructor(filename) {
super();
this.filename = filename;
this.fd = fd;
}
_construct(callback) {
fs.open(this.filename, (fd, err) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback);
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
```

#### `writable._write(chunk, encoding, callback)`
<!-- YAML
changes:
Expand Down Expand Up @@ -2130,6 +2186,8 @@ changes:
method.
* `destroy` {Function} Implementation for the
[`stream._destroy()`][readable-_destroy] method.
* `construct` {Function} Implementation for the
[`stream._construct()`][readable-_construct] method.
* `autoDestroy` {boolean} Whether this stream should automatically call
`.destroy()` on itself after ending. **Default:** `true`.

Expand Down Expand Up @@ -2172,6 +2230,63 @@ const myReadable = new Readable({
});
```

#### `readable._construct(callback)`
<!-- YAML
added: REPLACEME
-->

* `callback` {Function} Call this function (optionally with an error
argument) when the stream has finished initializing.

The `_construct()` method MUST NOT be called directly. It may be implemented
by child classes, and if so, will be called by the internal `Readable`
class methods only.

This optional function will be called by the stream constructor,
delaying any `_read` and `_destroy` calls until `callback` is called. This is
useful to initialize state or asynchronously initialize resources before the
stream can be used.

```js
const { Readable } = require('stream');
const fs = require('fs');

class ReadStream extends Readable {
constructor(filename) {
super();
this.filename = filename;
this.fd = null;
}
_construct(callback) {
fs.open(this.filename, (fd, err) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_read(n) {
const buf = Buffer.alloc(n);
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err);
} else {
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null);
}
});
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
```

#### `readable._read(size)`
<!-- YAML
added: v0.9.4
Expand Down Expand Up @@ -2427,6 +2542,46 @@ const myDuplex = new Duplex({
});
```

When using pipeline:

```js
const { Transform, pipeline } = require('stream');
const fs = require('fs');

pipeline(
fs.createReadStream('object.json')
.setEncoding('utf-8'),
new Transform({
decodeStrings: false, // Accept string input rather than Buffers
construct(callback) {
this.data = '';
callback();
},
transform(chunk, encoding, callback) {
this.data += chunk;
callback();
},
flush(callback) {
try {
// Make sure is valid json.
JSON.parse(this.data);
this.push(this.data);
} catch (err) {
callback(err);
}
}
}),
fs.createWriteStream('valid-object.json'),
(err) => {
if (err) {
console.error('failed', err);
} else {
console.log('completed');
}
}
);
```

#### An Example Duplex Stream

The following illustrates a simple example of a `Duplex` stream that wraps a
Expand Down Expand Up @@ -2706,8 +2861,8 @@ unhandled post-destroy errors.

#### Creating Readable Streams with Async Generators

We can construct a Node.js Readable Stream from an asynchronous generator
using the `Readable.from()` utility method:
A Node.js Readable Stream can be created from an asynchronous generator using
the `Readable.from()` utility method:

```js
const { Readable } = require('stream');
Expand Down Expand Up @@ -2960,6 +3115,7 @@ contain multi-byte characters.
[http-incoming-message]: http.html#http_class_http_incomingmessage
[hwm-gotcha]: #stream_highwatermark_discrepancy_after_calling_readable_setencoding
[object-mode]: #stream_object_mode
[readable-_construct]: #stream_readable_construct_callback
[readable-_destroy]: #stream_readable_destroy_err_callback
[readable-destroy]: #stream_readable_destroy_error
[stream-_final]: #stream_writable_final_callback
Expand All @@ -2976,6 +3132,7 @@ contain multi-byte characters.
[stream-uncork]: #stream_writable_uncork
[stream-write]: #stream_writable_write_chunk_encoding_callback
[Stream Three States]: #stream_three_states
[writable-_construct]: #stream_writable_construct_callback
[writable-_destroy]: #stream_writable_destroy_err_callback
[writable-destroy]: #stream_writable_destroy_error
[writable-new]: #stream_constructor_new_stream_writable_options
Expand Down
24 changes: 19 additions & 5 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ function ReadableState(options, stream, isDuplex) {
this.endEmitted = false;
this.reading = false;

// Stream is still being constructed and cannot be
// destroyed until construction finished or failed.
// Async construction is opt in, therefore we start as
// constructed.
this.constructed = true;

// A flag to be able to tell if the event 'readable'/'data' is emitted
// immediately, or on a later tick. We set this to true at first, because
// any actions that shouldn't happen until "later" should generally also
Expand Down Expand Up @@ -197,9 +203,16 @@ function Readable(options) {

if (typeof options.destroy === 'function')
this._destroy = options.destroy;

if (typeof options.construct === 'function')
this._construct = options.construct;
}

Stream.call(this, options);

destroyImpl.construct(this, () => {
maybeReadMore(this, this._readableState);
});
}

Readable.prototype.destroy = destroyImpl.destroy;
Expand Down Expand Up @@ -461,11 +474,12 @@ Readable.prototype.read = function(n) {
}

// However, if we've ended, then there's no point, if we're already
// reading, then it's unnecessary, and if we're destroyed or errored,
// then it's not allowed.
if (state.ended || state.reading || state.destroyed || state.errored) {
// reading, then it's unnecessary, if we're constructing we have to wait,
// and if we're destroyed or errored, then it's not allowed,
if (state.ended || state.reading || state.destroyed || state.errored ||
!state.constructed) {
doRead = false;
debug('reading or ended', doRead);
debug('reading, ended or constructing', doRead);
} else if (doRead) {
debug('do read');
state.reading = true;
Expand Down Expand Up @@ -587,7 +601,7 @@ function emitReadable_(stream) {
// However, if we're not ended, or reading, and the length < hwm,
// then go ahead and try to read some more preemptively.
function maybeReadMore(stream, state) {
if (!state.readingMore) {
if (!state.readingMore && state.constructed) {
state.readingMore = true;
process.nextTick(maybeReadMore_, stream, state);
}
Expand Down
27 changes: 25 additions & 2 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ function WritableState(options, stream, isDuplex) {
// this must be 0 before 'finish' can be emitted.
this.pendingcb = 0;

// Stream is still being constructed and cannot be
// destroyed until construction finished or failed.
// Async construction is opt in, therefore we start as
// constructed.
this.constructed = true;

// Emit prefinish if the only thing we're waiting for is _write cbs
// This is relevant for synchronous Transform streams.
this.prefinished = false;
Expand Down Expand Up @@ -249,9 +255,22 @@ function Writable(options) {

if (typeof options.final === 'function')
this._final = options.final;

if (typeof options.construct === 'function')
this._construct = options.construct;
}

Stream.call(this, options);

destroyImpl.construct(this, () => {
const state = this._writableState;

if (!state.writing) {
clearBuffer(this, state);
}

finishMaybe(this, state);
});
}

// Otherwise people can pipe Writable streams, which is just wrong.
Expand Down Expand Up @@ -342,7 +361,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {

state.length += len;

if (state.writing || state.corked || state.errored) {
if (state.writing || state.corked || state.errored || !state.constructed) {
state.buffered.push({ chunk, encoding, callback });
if (state.allBuffers && encoding !== 'buffer') {
state.allBuffers = false;
Expand Down Expand Up @@ -492,7 +511,10 @@ function errorBuffer(state, err) {

// If there's something in the buffer waiting, then process it.
function clearBuffer(stream, state) {
if (state.corked || state.bufferProcessing || state.destroyed) {
if (state.corked ||
state.bufferProcessing ||
state.destroyed ||
!state.constructed) {
return;
}

Expand Down Expand Up @@ -600,6 +622,7 @@ Writable.prototype.end = function(chunk, encoding, cb) {

function needFinish(state) {
return (state.ending &&
state.constructed &&
state.length === 0 &&
!state.errored &&
state.buffered.length === 0 &&
Expand Down
Loading

0 comments on commit fb8cc72

Please sign in to comment.