Skip to content

Commit 129b5b0

Browse files
mcollinajasnell
authored andcommitted
stream: add destroy and _destroy methods.
Adds destroy() and _destroy() methods to Readable, Writable, Duplex and Transform. It also standardizes the behavior and the implementation of destroy(), which has been inconsistent in userland and core. This PR also updates all the subsystems of core to use the new destroy(). PR-URL: #12925 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Calvin Metcalf <calvin.metcalf@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
1 parent d6e20cb commit 129b5b0

18 files changed

+964
-68
lines changed

doc/api/stream.md

+47-1
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,15 @@ write('hello', () => {
499499

500500
A Writable stream in object mode will always ignore the `encoding` argument.
501501

502+
##### writable.destroy([error])
503+
<!-- YAML
504+
added: REPLACEME
505+
-->
506+
507+
Destroy the stream, and emit the passed error. After this call, the
508+
writible stream has ended. Implementors should not override this method,
509+
but instead implement [`writable._destroy`][writable-_destroy].
510+
502511
### Readable Streams
503512

504513
Readable streams are an abstraction for a *source* from which data is
@@ -1070,6 +1079,16 @@ myReader.on('readable', () => {
10701079
});
10711080
```
10721081

1082+
##### readable.destroy([error])
1083+
<!-- YAML
1084+
added: REPLACEME
1085+
-->
1086+
1087+
Destroy the stream, and emit `'error'`. After this call, the
1088+
readable stream will release any internal resources.
1089+
Implementors should not override this method, but instead implement
1090+
[`readable._destroy`][readable-_destroy].
1091+
10731092
### Duplex and Transform Streams
10741093

10751094
#### Class: stream.Duplex
@@ -1109,6 +1128,16 @@ Examples of Transform streams include:
11091128
* [zlib streams][zlib]
11101129
* [crypto streams][crypto]
11111130

1131+
##### transform.destroy([error])
1132+
<!-- YAML
1133+
added: REPLACEME
1134+
-->
1135+
1136+
Destroy the stream, and emit `'error'`. After this call, the
1137+
transform stream would release any internal resources.
1138+
implementors should not override this method, but instead implement
1139+
[`readable._destroy`][readable-_destroy].
1140+
The default implementation of `_destroy` for `Transform` also emit `'close'`.
11121141

11131142
## API for Stream Implementers
11141143

@@ -1248,6 +1277,8 @@ constructor and implement the `writable._write()` method. The
12481277
[`stream._write()`][stream-_write] method.
12491278
* `writev` {Function} Implementation for the
12501279
[`stream._writev()`][stream-_writev] method.
1280+
* `destroy` {Function} Implementation for the
1281+
[`stream._destroy()`][writable-_destroy] method.
12511282

12521283
For example:
12531284

@@ -1358,6 +1389,15 @@ The `writable._writev()` method is prefixed with an underscore because it is
13581389
internal to the class that defines it, and should never be called directly by
13591390
user programs.
13601391

1392+
#### writable.\_destroy(err, callback)
1393+
<!-- YAML
1394+
added: REPLACEME
1395+
-->
1396+
1397+
* `err` {Error} An error.
1398+
* `callback` {Function} A callback function that takes an optional error argument
1399+
which is invoked when the writable is destroyed.
1400+
13611401
#### Errors While Writing
13621402

13631403
It is recommended that errors occurring during the processing of the
@@ -1428,6 +1468,8 @@ constructor and implement the `readable._read()` method.
14281468
a single value instead of a Buffer of size n. Defaults to `false`
14291469
* `read` {Function} Implementation for the [`stream._read()`][stream-_read]
14301470
method.
1471+
* `destroy` {Function} Implementation for the [`stream._destroy()`][readable-_destroy]
1472+
method.
14311473

14321474
For example:
14331475

@@ -2079,4 +2121,8 @@ readable buffer so there is nothing for a user to consume.
20792121
[stream-read]: #stream_readable_read_size
20802122
[stream-resume]: #stream_readable_resume
20812123
[stream-write]: #stream_writable_write_chunk_encoding_callback
2082-
[zlib]: zlib.html
2124+
[readable-_destroy]: #stream_readable_destroy_err_callback
2125+
[writable-_destroy]: #stream_writable_destroy_err_callback
2126+
[TCP sockets]: net.html#net_class_net_socket
2127+
[Transform]: #stream_class_stream_transform
2128+
[Writable]: #stream_class_stream_writable

lib/_stream_duplex.js

+30
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,33 @@ function onend() {
7676
function onEndNT(self) {
7777
self.end();
7878
}
79+
80+
Object.defineProperty(Duplex.prototype, 'destroyed', {
81+
get() {
82+
if (this._readableState === undefined ||
83+
this._writableState === undefined) {
84+
return false;
85+
}
86+
return this._readableState.destroyed && this._writableState.destroyed;
87+
},
88+
set(value) {
89+
// we ignore the value if the stream
90+
// has not been initialized yet
91+
if (this._readableState === undefined ||
92+
this._writableState === undefined) {
93+
return;
94+
}
95+
96+
// backward compatibility, the user is explicitly
97+
// managing destroyed
98+
this._readableState.destroyed = value;
99+
this._writableState.destroyed = value;
100+
}
101+
});
102+
103+
Duplex.prototype._destroy = function(err, cb) {
104+
this.push(null);
105+
this.end();
106+
107+
process.nextTick(cb, err);
108+
};

lib/_stream_readable.js

+38-2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const Buffer = require('buffer').Buffer;
3030
const util = require('util');
3131
const debug = util.debuglog('stream');
3232
const BufferList = require('internal/streams/BufferList');
33+
const destroyImpl = require('internal/streams/destroy');
3334
var StringDecoder;
3435

3536
util.inherits(Readable, Stream);
@@ -99,6 +100,9 @@ function ReadableState(options, stream) {
99100
this.readableListening = false;
100101
this.resumeScheduled = false;
101102

103+
// has it been destroyed
104+
this.destroyed = false;
105+
102106
// Crypto is kind of old and crusty. Historically, its default string
103107
// encoding is 'binary' so we have to make this configurable.
104108
// Everything else in the universe uses 'utf8', though.
@@ -129,12 +133,44 @@ function Readable(options) {
129133
// legacy
130134
this.readable = true;
131135

132-
if (options && typeof options.read === 'function')
133-
this._read = options.read;
136+
if (options) {
137+
if (typeof options.read === 'function')
138+
this._read = options.read;
139+
140+
if (typeof options.destroy === 'function')
141+
this._destroy = options.destroy;
142+
}
134143

135144
Stream.call(this);
136145
}
137146

147+
Object.defineProperty(Readable.prototype, 'destroyed', {
148+
get() {
149+
if (this._readableState === undefined) {
150+
return false;
151+
}
152+
return this._readableState.destroyed;
153+
},
154+
set(value) {
155+
// we ignore the value if the stream
156+
// has not been initialized yet
157+
if (!this._readableState) {
158+
return;
159+
}
160+
161+
// backward compatibility, the user is explicitly
162+
// managing destroyed
163+
this._readableState.destroyed = value;
164+
}
165+
});
166+
167+
Readable.prototype.destroy = destroyImpl.destroy;
168+
Readable.prototype._undestroy = destroyImpl.undestroy;
169+
Readable.prototype._destroy = function(err, cb) {
170+
this.push(null);
171+
cb(err);
172+
};
173+
138174
// Manually shove something into the read() buffer.
139175
// This returns true if the highWaterMark has not been hit yet,
140176
// similar to how Writable.write() returns true if you should

lib/_stream_transform.js

+8
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,14 @@ Transform.prototype._read = function(n) {
196196
};
197197

198198

199+
Transform.prototype._destroy = function(err, cb) {
200+
Duplex.prototype._destroy.call(this, err, (err2) => {
201+
cb(err2);
202+
this.emit('close');
203+
});
204+
};
205+
206+
199207
function done(stream, er, data) {
200208
if (er)
201209
return stream.emit('error', er);

lib/_stream_writable.js

+34
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ const util = require('util');
3232
const internalUtil = require('internal/util');
3333
const Stream = require('stream');
3434
const Buffer = require('buffer').Buffer;
35+
const destroyImpl = require('internal/streams/destroy');
3536

3637
util.inherits(Writable, Stream);
3738

@@ -66,6 +67,9 @@ function WritableState(options, stream) {
6667
// when 'finish' is emitted
6768
this.finished = false;
6869

70+
// has it been destroyed
71+
this.destroyed = false;
72+
6973
// should we decode strings into buffers before passing to _write?
7074
// this is here so that some node-core streams can optimize string
7175
// handling at a lower level.
@@ -192,6 +196,9 @@ function Writable(options) {
192196

193197
if (typeof options.writev === 'function')
194198
this._writev = options.writev;
199+
200+
if (typeof options.destroy === 'function')
201+
this._destroy = options.destroy;
195202
}
196203

197204
Stream.call(this);
@@ -563,3 +570,30 @@ function onCorkedFinish(corkReq, state, err) {
563570
state.corkedRequestsFree = corkReq;
564571
}
565572
}
573+
574+
Object.defineProperty(Writable.prototype, 'destroyed', {
575+
get() {
576+
if (this._writableState === undefined) {
577+
return false;
578+
}
579+
return this._writableState.destroyed;
580+
},
581+
set(value) {
582+
// we ignore the value if the stream
583+
// has not been initialized yet
584+
if (!this._writableState) {
585+
return;
586+
}
587+
588+
// backward compatibility, the user is explicitly
589+
// managing destroyed
590+
this._writableState.destroyed = value;
591+
}
592+
});
593+
594+
Writable.prototype.destroy = destroyImpl.destroy;
595+
Writable.prototype._undestroy = destroyImpl.undestroy;
596+
Writable.prototype._destroy = function(err, cb) {
597+
this.end();
598+
cb(err);
599+
};

lib/fs.js

+5-6
Original file line numberDiff line numberDiff line change
@@ -2052,11 +2052,10 @@ ReadStream.prototype._read = function(n) {
20522052
};
20532053

20542054

2055-
ReadStream.prototype.destroy = function() {
2056-
if (this.destroyed)
2057-
return;
2058-
this.destroyed = true;
2059-
this.close();
2055+
ReadStream.prototype._destroy = function(err, cb) {
2056+
this.close(function(err2) {
2057+
cb(err || err2);
2058+
});
20602059
};
20612060

20622061

@@ -2223,7 +2222,7 @@ WriteStream.prototype._writev = function(data, cb) {
22232222
};
22242223

22252224

2226-
WriteStream.prototype.destroy = ReadStream.prototype.destroy;
2225+
WriteStream.prototype._destroy = ReadStream.prototype._destroy;
22272226
WriteStream.prototype.close = ReadStream.prototype.close;
22282227

22292228
// There is no shutdown() for files.

lib/internal/process/stdio.js

+8-4
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ function setupStdio() {
1818
function getStdout() {
1919
if (stdout) return stdout;
2020
stdout = createWritableStdioStream(1);
21-
stdout.destroy = stdout.destroySoon = function(er) {
21+
stdout.destroySoon = stdout.destroy;
22+
stdout._destroy = function(er, cb) {
23+
// avoid errors if we already emitted
2224
const errors = lazyErrors();
2325
er = er || new errors.Error('ERR_STDOUT_CLOSE');
24-
stdout.emit('error', er);
26+
cb(er);
2527
};
2628
if (stdout.isTTY) {
2729
process.on('SIGWINCH', () => stdout._refreshSize());
@@ -32,10 +34,12 @@ function setupStdio() {
3234
function getStderr() {
3335
if (stderr) return stderr;
3436
stderr = createWritableStdioStream(2);
35-
stderr.destroy = stderr.destroySoon = function(er) {
37+
stderr.destroySoon = stderr.destroy;
38+
stderr._destroy = function(er, cb) {
39+
// avoid errors if we already emitted
3640
const errors = lazyErrors();
3741
er = er || new errors.Error('ERR_STDERR_CLOSE');
38-
stderr.emit('error', er);
42+
cb(er);
3943
};
4044
if (stderr.isTTY) {
4145
process.on('SIGWINCH', () => stderr._refreshSize());

lib/internal/streams/destroy.js

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
'use strict';
2+
3+
// undocumented cb() API, needed for core, not for public API
4+
function destroy(err, cb) {
5+
const readableDestroyed = this._readableState &&
6+
this._readableState.destroyed;
7+
const writableDestroyed = this._writableState &&
8+
this._writableState.destroyed;
9+
10+
if (readableDestroyed || writableDestroyed) {
11+
if (err && (!this._writableState || !this._writableState.errorEmitted)) {
12+
process.nextTick(emitErrorNT, this, err);
13+
}
14+
return;
15+
}
16+
17+
// we set destroyed to true before firing error callbacks in order
18+
// to make it re-entrance safe in case destroy() is called within callbacks
19+
20+
if (this._readableState) {
21+
this._readableState.destroyed = true;
22+
}
23+
24+
// if this is a duplex stream mark the writable part as destroyed as well
25+
if (this._writableState) {
26+
this._writableState.destroyed = true;
27+
}
28+
29+
this._destroy(err || null, (err) => {
30+
if (!cb && err) {
31+
process.nextTick(emitErrorNT, this, err);
32+
if (this._writableState) {
33+
this._writableState.errorEmitted = true;
34+
}
35+
} else if (cb) {
36+
cb(err);
37+
}
38+
});
39+
}
40+
41+
function undestroy() {
42+
if (this._readableState) {
43+
this._readableState.destroyed = false;
44+
this._readableState.reading = false;
45+
this._readableState.ended = false;
46+
this._readableState.endEmitted = false;
47+
}
48+
49+
if (this._writableState) {
50+
this._writableState.destroyed = false;
51+
this._writableState.ended = false;
52+
this._writableState.ending = false;
53+
this._writableState.finished = false;
54+
this._writableState.errorEmitted = false;
55+
}
56+
}
57+
58+
function emitErrorNT(self, err) {
59+
self.emit('error', err);
60+
}
61+
62+
module.exports = {
63+
destroy,
64+
undestroy
65+
};

0 commit comments

Comments
 (0)