Skip to content

Commit

Permalink
stream: add errored and closed props
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Nov 2, 2021
1 parent 9bd6d52 commit 998dd7c
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 7 deletions.
44 changes: 43 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,16 @@ further errors except from `_destroy()` may be emitted as `'error'`.
Implementors should not override this method,
but instead implement [`writable._destroy()`][writable-_destroy].

##### `writable.closed`

<!-- YAML
added: REPLACEME
-->

* {boolean}

Is `true` after `'close'` has been emitted.

##### `writable.destroyed`

<!-- YAML
Expand Down Expand Up @@ -611,6 +621,17 @@ added:
Number of times [`writable.uncork()`][stream-uncork] needs to be
called in order to fully uncork the stream.

##### `writable.writableErrored`

<!-- YAML
added:
REPLACEME
-->

* {Error}

Returns error if the stream has been destroyed with an error.

##### `writable.writableFinished`

<!-- YAML
Expand Down Expand Up @@ -1080,14 +1101,24 @@ further errors except from `_destroy()` may be emitted as `'error'`.
Implementors should not override this method, but instead implement
[`readable._destroy()`][readable-_destroy].

##### `readable.destroyed`
##### `readable.closed`

<!-- YAML
added: v8.0.0
-->

* {boolean}

Is `true` after `'close'` has been emitted.

##### `readable.destroyed`

<!-- YAML
added: REPLACEME
-->

* {boolean}

Is `true` after [`readable.destroy()`][readable-destroy] has been called.

##### `readable.isPaused()`
Expand Down Expand Up @@ -1346,6 +1377,17 @@ added: v12.9.0

Becomes `true` when [`'end'`][] event is emitted.

##### `readable.readableErrored`

<!-- YAML
added:
REPLACEME
-->

* {Error}

Returns error if the stream has been destroyed with an error.

##### `readable.readableFlowing`

<!-- YAML
Expand Down
4 changes: 3 additions & 1 deletion lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ const {
isReadable,
isReadableNodeStream,
isReadableFinished,
isReadableErrored,
isWritable,
isWritableNodeStream,
isWritableFinished,
isWritableErrored,
isNodeStream,
willEmitClose: _willEmitClose,
} = require('internal/streams/utils');
Expand Down Expand Up @@ -110,7 +112,7 @@ function eos(stream, options, callback) {
const onclose = () => {
closed = true;

const errored = wState?.errored || rState?.errored;
const errored = isWritableErrored(stream) || isReadableErrored(stream);

if (errored && typeof errored !== 'boolean') {
return callback.call(stream, errored);
Expand Down
18 changes: 14 additions & 4 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -1239,13 +1239,23 @@ ObjectDefineProperties(Readable.prototype, {
}
},

readableErrored: {
enumerable: false,
get() {
return this._readableState ? this._readableState.errored : null;
}
},

closed: {
get() {
return this._readableState ? this._readableState.closed : false;
}
},

destroyed: {
enumerable: false,
get() {
if (this._readableState === undefined) {
return false;
}
return this._readableState.destroyed;
return this._readableState ? this._readableState.destroyed : false;
},
set(value) {
// We ignore the value if the stream
Expand Down
30 changes: 30 additions & 0 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,39 @@ function isFinished(stream, opts) {
return true;
}

function isWritableErrored(stream) {
if (!isNodeStream(stream)) {
return null;
}

if (stream.writableErrored) {
return stream.writableErrored;
}

return stream._writableState?.errored ?? null;
}

function isReadableErrored(stream) {
if (!isNodeStream(stream)) {
return null;
}

if (stream.readableErrored) {
return stream.readableErrored;
}

return stream._readableState?.errored ?? null;
}

function isClosed(stream) {
if (!isNodeStream(stream)) {
return null;
}

if (typeof stream.closed === 'boolean') {
return stream.closed
}

const wState = stream._writableState;
const rState = stream._readableState;

Expand Down Expand Up @@ -226,11 +254,13 @@ module.exports = {
isReadableNodeStream,
isReadableEnded,
isReadableFinished,
isReadableErrored,
isNodeStream,
isWritable,
isWritableNodeStream,
isWritableEnded,
isWritableFinished,
isWritableErrored,
isServerRequest,
isServerResponse,
willEmitClose,
Expand Down
15 changes: 14 additions & 1 deletion lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,12 @@ function finish(stream, state) {

ObjectDefineProperties(Writable.prototype, {

closed: {
get() {
return this._writableState ? this._writableState.closed : false;
}
},

destroyed: {
get() {
return this._writableState ? this._writableState.destroyed : false;
Expand Down Expand Up @@ -846,7 +852,14 @@ ObjectDefineProperties(Writable.prototype, {
get() {
return this._writableState && this._writableState.length;
}
}
},

writableErrored: {
enumerable: false,
get() {
return this._writableState ? this._writableState.errored : null;
}
},
});

const destroy = destroyImpl.destroy;
Expand Down
4 changes: 4 additions & 0 deletions test/parallel/test-stream-finished.js
Original file line number Diff line number Diff line change
Expand Up @@ -612,8 +612,10 @@ testClosed((opts) => new Writable({ write() {}, ...opts }));
const w = new Writable();
const _err = new Error();
w.destroy(_err);
assert.strictEqual(w.writableErrored, _err);
finished(w, common.mustCall((err) => {
assert.strictEqual(_err, err);
assert.strictEqual(w.closed, true);
finished(w, common.mustCall((err) => {
assert.strictEqual(_err, err);
}));
Expand All @@ -623,7 +625,9 @@ testClosed((opts) => new Writable({ write() {}, ...opts }));
{
const w = new Writable();
w.destroy();
assert.strictEqual(w.writableErrored, null);
finished(w, common.mustCall((err) => {
assert.strictEqual(w.closed, true);
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
finished(w, common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
Expand Down

0 comments on commit 998dd7c

Please sign in to comment.