diff --git a/doc/api/errors.md b/doc/api/errors.md index b9c52ad0d9af89..70ac01de610d6a 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -1449,6 +1449,12 @@ An unspecified or non-specific system error has occurred within the Node.js process. The error object will have an `err.info` object property with additional details. + +### ERR_STREAM_DESTROYED + +A stream method was called that cannot complete because the stream was +destroyed using `stream.destroy()`. + ### ERR_TLS_CERT_ALTNAME_INVALID @@ -1615,11 +1621,6 @@ The fulfilled value of a linking promise is not a `vm.Module` object. The current module's status does not allow for this operation. The specific meaning of the error depends on the specific function. - -### ERR_ZLIB_BINDING_CLOSED - -An attempt was made to use a `zlib` object after it has already been closed. - ### ERR_ZLIB_INITIALIZATION_FAILED diff --git a/doc/api/stream.md b/doc/api/stream.md index 5db990d4d2cc3d..32e368f05f1875 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -543,8 +543,10 @@ added: v8.0.0 * Returns: {this} -Destroy the stream, and emit the passed error. After this call, the -writable stream has ended. Implementors should not override this method, +Destroy the stream, and emit the passed `error` and a `close` event. +After this call, the writable stream has ended and subsequent calls +to `write` / `end` will give an `ERR_STREAM_DESTROYED` error. +Implementors should not override this method, but instead implement [`writable._destroy`][writable-_destroy]. ### Readable Streams @@ -1167,8 +1169,9 @@ myReader.on('readable', () => { added: v8.0.0 --> -Destroy the stream, and emit `'error'`. After this call, the -readable stream will release any internal resources. +Destroy the stream, and emit `'error'` and `close`. After this call, the +readable stream will release any internal resources and subsequent calls +to `push` will be ignored. Implementors should not override this method, but instead implement [`readable._destroy`][readable-_destroy]. @@ -1382,6 +1385,12 @@ constructor and implement the `writable._write()` method. The `writable._writev()` method *may* also be implemented. #### Constructor: new stream.Writable([options]) + * `options` {Object} * `highWaterMark` {number} Buffer level when @@ -1395,6 +1404,8 @@ constructor and implement the `writable._write()` method. The it becomes possible to write JavaScript values other than string, `Buffer` or `Uint8Array` if supported by the stream implementation. Defaults to `false` + * `emitClose` {boolean} Whether or not the stream should emit `close` + after it has been destroyed. Defaults to `true` * `write` {Function} Implementation for the [`stream._write()`][stream-_write] method. * `writev` {Function} Implementation for the diff --git a/lib/_stream_duplex.js b/lib/_stream_duplex.js index 59ce83292789b5..1ccb931260ddbd 100644 --- a/lib/_stream_duplex.js +++ b/lib/_stream_duplex.js @@ -135,10 +135,3 @@ Object.defineProperty(Duplex.prototype, 'destroyed', { this._writableState.destroyed = value; } }); - -Duplex.prototype._destroy = function(err, cb) { - this.push(null); - this.end(); - - process.nextTick(cb, err); -}; diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index ba231ccda903c1..5781dfd471e72d 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -106,6 +106,9 @@ function ReadableState(options, stream) { this.readableListening = false; this.resumeScheduled = false; + // Should close be emitted on destroy. Defaults to true. + this.emitClose = options.emitClose !== false; + // has it been destroyed this.destroyed = false; @@ -177,7 +180,6 @@ Object.defineProperty(Readable.prototype, 'destroyed', { Readable.prototype.destroy = destroyImpl.destroy; Readable.prototype._undestroy = destroyImpl.undestroy; Readable.prototype._destroy = function(err, cb) { - this.push(null); cb(err); }; @@ -236,6 +238,8 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { addChunk(stream, state, chunk, true); } else if (state.ended) { stream.emit('error', new errors.Error('ERR_STREAM_PUSH_AFTER_EOF')); + } else if (state.destroyed) { + return false; } else { state.reading = false; if (state.decoder && !encoding) { diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index a9fcddda2d9c83..b82114ecaecd1d 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -132,7 +132,7 @@ function Transform(options) { } function prefinish() { - if (typeof this._flush === 'function') { + if (typeof this._flush === 'function' && !this._readableState.destroyed) { this._flush((er, data) => { done(this, er, data); }); @@ -194,7 +194,6 @@ Transform.prototype._read = function(n) { Transform.prototype._destroy = function(err, cb) { Duplex.prototype._destroy.call(this, err, (err2) => { cb(err2); - this.emit('close'); }); }; diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 2b7658813599e5..d5cfe07f171324 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -134,6 +134,9 @@ function WritableState(options, stream) { // True if the error was already emitted and should not be thrown again this.errorEmitted = false; + // Should close be emitted on destroy. Defaults to true. + this.emitClose = options.emitClose !== false; + // count buffered requests this.bufferedRequestCount = 0; @@ -390,7 +393,9 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) { state.writecb = cb; state.writing = true; state.sync = true; - if (writev) + if (state.destroyed) + state.onwrite(new errors.Error('ERR_STREAM_DESTROYED', 'write')); + else if (writev) stream._writev(chunk, state.onwrite); else stream._write(chunk, encoding, state.onwrite); @@ -604,7 +609,7 @@ function callFinal(stream, state) { } function prefinish(stream, state) { if (!state.prefinished && !state.finalCalled) { - if (typeof stream._final === 'function') { + if (typeof stream._final === 'function' && !state.destroyed) { state.pendingcb++; state.finalCalled = true; process.nextTick(callFinal, stream, state); @@ -681,6 +686,5 @@ Object.defineProperty(Writable.prototype, 'destroyed', { Writable.prototype.destroy = destroyImpl.destroy; Writable.prototype._undestroy = destroyImpl.undestroy; Writable.prototype._destroy = function(err, cb) { - this.end(); cb(err); }; diff --git a/lib/fs.js b/lib/fs.js index 3771efad10d762..917c3eb3a9f640 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -1929,6 +1929,9 @@ function ReadStream(path, options) { if (options.highWaterMark === undefined) options.highWaterMark = 64 * 1024; + // for backwards compat do not emit close on destroy. + options.emitClose = false; + Readable.call(this, options); // path will be ignored when fd is specified, so it can be falsy @@ -2084,6 +2087,9 @@ function WriteStream(path, options) { options = copyObject(getOptions(options, {})); + // for backwards compat do not emit close on destroy. + options.emitClose = false; + Writable.call(this, options); // path will be ignored when fd is specified, so it can be falsy diff --git a/lib/internal/errors.js b/lib/internal/errors.js index a4a79d671e4938..11f32ccdc17dc9 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -843,6 +843,7 @@ E('ERR_SOCKET_DGRAM_NOT_RUNNING', 'Not running', Error); E('ERR_STDERR_CLOSE', 'process.stderr cannot be closed', Error); E('ERR_STDOUT_CLOSE', 'process.stdout cannot be closed', Error); E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error); +E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed'); E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError); E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error); E('ERR_STREAM_READ_NOT_IMPLEMENTED', '_read() is not implemented', Error); @@ -908,7 +909,6 @@ E('ERR_VM_MODULE_NOT_LINKED', E('ERR_VM_MODULE_NOT_MODULE', 'Provided module is not an instance of Module', Error); E('ERR_VM_MODULE_STATUS', 'Module status %s', Error); -E('ERR_ZLIB_BINDING_CLOSED', 'zlib binding closed', Error); E('ERR_ZLIB_INITIALIZATION_FAILED', 'Initialization failed', Error); function sysError(code, syscall, path, dest, diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 71bb55ee23c89f..f60c6388af6cec 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -1475,6 +1475,7 @@ class Http2Stream extends Duplex { constructor(session, options) { options.allowHalfOpen = true; options.decodeStrings = false; + options.emitClose = false; super(options); this[async_id_symbol] = -1; diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 985332ac4607a8..5d29e182041cdc 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -30,6 +30,7 @@ function destroy(err, cb) { } this._destroy(err || null, (err) => { + process.nextTick(emitCloseNT, this); if (!cb && err) { process.nextTick(emitErrorNT, this, err); if (this._writableState) { @@ -43,6 +44,14 @@ function destroy(err, cb) { return this; } +function emitCloseNT(self) { + if (self._writableState && !self._writableState.emitClose) + return; + if (self._readableState && !self._readableState.emitClose) + return; + self.emit('close'); +} + function undestroy() { if (this._readableState) { this._readableState.destroyed = false; diff --git a/lib/net.js b/lib/net.js index 7583fcb27d1064..f2cb423f3003ea 100644 --- a/lib/net.js +++ b/lib/net.js @@ -232,6 +232,11 @@ function Socket(options) { options = { fd: options }; // Legacy interface. else if (options === undefined) options = {}; + else + options = util._extend({}, options); + + // For backwards compat do not emit close on destroy. + options.emitClose = false; stream.Duplex.call(this, options); diff --git a/lib/zlib.js b/lib/zlib.js index 93f878712add08..4adfd1ffa289fb 100644 --- a/lib/zlib.js +++ b/lib/zlib.js @@ -25,7 +25,6 @@ const { ERR_BUFFER_TOO_LARGE, ERR_INVALID_ARG_TYPE, ERR_OUT_OF_RANGE, - ERR_ZLIB_BINDING_CLOSED, ERR_ZLIB_INITIALIZATION_FAILED } = require('internal/errors').codes; const Transform = require('_stream_transform'); @@ -392,7 +391,7 @@ Zlib.prototype.flush = function flush(kind, callback) { Zlib.prototype.close = function close(callback) { _close(this, callback); - process.nextTick(emitCloseNT, this); + this.destroy(); }; Zlib.prototype._transform = function _transform(chunk, encoding, cb) { @@ -510,7 +509,7 @@ function processChunkSync(self, chunk, flushFlag) { function processChunk(self, chunk, flushFlag, cb) { var handle = self._handle; if (!handle) - return cb(new ERR_ZLIB_BINDING_CLOSED()); + assert(false, 'zlib binding closed'); handle.buffer = chunk; handle.cb = cb; @@ -603,10 +602,6 @@ function _close(engine, callback) { engine._handle = null; } -function emitCloseNT(self) { - self.emit('close'); -} - // generic zlib // minimal 2-byte header function Deflate(opts) { diff --git a/test/parallel/test-net-socket-destroy-send.js b/test/parallel/test-net-socket-destroy-send.js index a602b89253887d..aa587fc2e16896 100644 --- a/test/parallel/test-net-socket-destroy-send.js +++ b/test/parallel/test-net-socket-destroy-send.js @@ -13,14 +13,14 @@ server.listen(0, common.mustCall(function() { // Test destroy returns this, even on multiple calls when it short-circuits. assert.strictEqual(conn, conn.destroy().destroy()); conn.on('error', common.expectsError({ - code: 'ERR_SOCKET_CLOSED', - message: 'Socket is closed', + code: 'ERR_STREAM_DESTROYED', + message: 'Cannot call write after a stream was destroyed', type: Error })); conn.write(Buffer.from('kaboom'), common.expectsError({ - code: 'ERR_SOCKET_CLOSED', - message: 'Socket is closed', + code: 'ERR_STREAM_DESTROYED', + message: 'Cannot call write after a stream was destroyed', type: Error })); server.close(); diff --git a/test/parallel/test-stream-duplex-destroy.js b/test/parallel/test-stream-duplex-destroy.js index 00e334d64b5693..854d29ffc13049 100644 --- a/test/parallel/test-stream-duplex-destroy.js +++ b/test/parallel/test-stream-duplex-destroy.js @@ -13,8 +13,9 @@ const { inherits } = require('util'); duplex.resume(); - duplex.on('end', common.mustCall()); - duplex.on('finish', common.mustCall()); + duplex.on('end', common.mustNotCall()); + duplex.on('finish', common.mustNotCall()); + duplex.on('close', common.mustCall()); duplex.destroy(); assert.strictEqual(duplex.destroyed, true); @@ -29,8 +30,8 @@ const { inherits } = require('util'); const expected = new Error('kaboom'); - duplex.on('end', common.mustCall()); - duplex.on('finish', common.mustCall()); + duplex.on('end', common.mustNotCall()); + duplex.on('finish', common.mustNotCall()); duplex.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); })); @@ -78,6 +79,7 @@ const { inherits } = require('util'); // error is swallowed by the custom _destroy duplex.on('error', common.mustNotCall('no error event')); + duplex.on('close', common.mustCall()); duplex.destroy(expected); assert.strictEqual(duplex.destroyed, true); @@ -159,8 +161,8 @@ const { inherits } = require('util'); }); duplex.resume(); - duplex.on('finish', common.mustCall()); - duplex.on('end', common.mustCall()); + duplex.on('finish', common.mustNotCall()); + duplex.on('end', common.mustNotCall()); duplex.destroy(); assert.strictEqual(duplex.destroyed, true); diff --git a/test/parallel/test-stream-readable-destroy.js b/test/parallel/test-stream-readable-destroy.js index def20d26c34080..026aa8ca1603b8 100644 --- a/test/parallel/test-stream-readable-destroy.js +++ b/test/parallel/test-stream-readable-destroy.js @@ -11,7 +11,7 @@ const { inherits } = require('util'); }); read.resume(); - read.on('end', common.mustCall()); + read.on('close', common.mustCall()); read.destroy(); assert.strictEqual(read.destroyed, true); @@ -25,7 +25,8 @@ const { inherits } = require('util'); const expected = new Error('kaboom'); - read.on('end', common.mustCall()); + read.on('end', common.mustNotCall('no end event')); + read.on('close', common.mustCall()); read.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); })); @@ -47,6 +48,7 @@ const { inherits } = require('util'); const expected = new Error('kaboom'); read.on('end', common.mustNotCall('no end event')); + read.on('close', common.mustCall()); read.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); })); @@ -70,6 +72,7 @@ const { inherits } = require('util'); // error is swallowed by the custom _destroy read.on('error', common.mustNotCall('no error event')); + read.on('close', common.mustCall()); read.destroy(expected); assert.strictEqual(read.destroyed, true); @@ -106,6 +109,7 @@ const { inherits } = require('util'); const fail = common.mustNotCall('no end event'); read.on('end', fail); + read.on('close', common.mustCall()); read.destroy(); @@ -170,7 +174,18 @@ const { inherits } = require('util'); const expected = new Error('kaboom'); + read.on('close', common.mustCall()); read.destroy(expected, common.mustCall(function(err) { assert.strictEqual(expected, err); })); } + +{ + const read = new Readable({ + read() {} + }); + + read.destroy(); + read.push('hi'); + read.on('data', common.mustNotCall()); +} diff --git a/test/parallel/test-stream-transform-destroy.js b/test/parallel/test-stream-transform-destroy.js index c42fe1d6f96d08..47cce87264b5c1 100644 --- a/test/parallel/test-stream-transform-destroy.js +++ b/test/parallel/test-stream-transform-destroy.js @@ -11,9 +11,9 @@ const assert = require('assert'); transform.resume(); - transform.on('end', common.mustCall()); + transform.on('end', common.mustNotCall()); transform.on('close', common.mustCall()); - transform.on('finish', common.mustCall()); + transform.on('finish', common.mustNotCall()); transform.destroy(); } @@ -26,8 +26,8 @@ const assert = require('assert'); const expected = new Error('kaboom'); - transform.on('end', common.mustCall()); - transform.on('finish', common.mustCall()); + transform.on('end', common.mustNotCall()); + transform.on('finish', common.mustNotCall()); transform.on('close', common.mustCall()); transform.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); @@ -49,7 +49,7 @@ const assert = require('assert'); const expected = new Error('kaboom'); transform.on('finish', common.mustNotCall('no finish event')); - transform.on('close', common.mustNotCall('no close event')); + transform.on('close', common.mustCall()); transform.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); })); @@ -69,7 +69,7 @@ const assert = require('assert'); transform.resume(); transform.on('end', common.mustNotCall('no end event')); - transform.on('close', common.mustNotCall('no close event')); + transform.on('close', common.mustCall()); transform.on('finish', common.mustNotCall('no finish event')); // error is swallowed by the custom _destroy @@ -110,7 +110,7 @@ const assert = require('assert'); transform.on('finish', fail); transform.on('end', fail); - transform.on('close', fail); + transform.on('close', common.mustCall()); transform.destroy(); @@ -132,7 +132,7 @@ const assert = require('assert'); cb(expected); }, 1); - transform.on('close', common.mustNotCall('no close event')); + transform.on('close', common.mustCall()); transform.on('finish', common.mustNotCall('no finish event')); transform.on('end', common.mustNotCall('no end event')); transform.on('error', common.mustCall((err) => { diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index 46c48511177813..565a5564e2bc29 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -10,7 +10,8 @@ const { inherits } = require('util'); write(chunk, enc, cb) { cb(); } }); - write.on('finish', common.mustCall()); + write.on('finish', common.mustNotCall()); + write.on('close', common.mustCall()); write.destroy(); assert.strictEqual(write.destroyed, true); @@ -23,7 +24,8 @@ const { inherits } = require('util'); const expected = new Error('kaboom'); - write.on('finish', common.mustCall()); + write.on('finish', common.mustNotCall()); + write.on('close', common.mustCall()); write.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); })); @@ -45,6 +47,7 @@ const { inherits } = require('util'); const expected = new Error('kaboom'); write.on('finish', common.mustNotCall('no finish event')); + write.on('close', common.mustCall()); write.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); })); @@ -65,6 +68,7 @@ const { inherits } = require('util'); const expected = new Error('kaboom'); write.on('finish', common.mustNotCall('no finish event')); + write.on('close', common.mustCall()); // error is swallowed by the custom _destroy write.on('error', common.mustNotCall('no error event')); @@ -103,6 +107,7 @@ const { inherits } = require('util'); const fail = common.mustNotCall('no finish event'); write.on('finish', fail); + write.on('close', common.mustCall()); write.destroy(); @@ -123,6 +128,7 @@ const { inherits } = require('util'); cb(expected); }); + write.on('close', common.mustCall()); write.on('finish', common.mustNotCall('no finish event')); write.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); @@ -138,6 +144,7 @@ const { inherits } = require('util'); write(chunk, enc, cb) { cb(); } }); + write.on('close', common.mustCall()); write.on('error', common.mustCall()); write.destroy(new Error('kaboom 1')); @@ -155,7 +162,7 @@ const { inherits } = require('util'); assert.strictEqual(write.destroyed, true); // the internal destroy() mechanism should not be triggered - write.on('finish', common.mustNotCall()); + write.on('close', common.mustNotCall()); write.destroy(); } diff --git a/test/parallel/test-zlib-write-after-close.js b/test/parallel/test-zlib-write-after-close.js index 88d6643da8b994..160971b16bc30c 100644 --- a/test/parallel/test-zlib-write-after-close.js +++ b/test/parallel/test-zlib-write-after-close.js @@ -29,9 +29,9 @@ zlib.gzip('hello', common.mustCall(function(err, out) { common.expectsError( () => unzip.write(out), { - code: 'ERR_ZLIB_BINDING_CLOSED', + code: 'ERR_STREAM_DESTROYED', type: Error, - message: 'zlib binding closed' + message: 'Cannot call write after a stream was destroyed' } ); }));