From 21db5dd358f27ee8d352cc1ba9049994c4589345 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 29 Mar 2021 11:20:57 +0200 Subject: [PATCH] Revert "http: align with stream.Writable" This reverts commit e2f5bb7574e7e8189a3e38fc4d00c1213fd7160f. Reverted as it caused a significant performance regression. See: https://github.com/nodejs/node/issues/37937 --- lib/_http_client.js | 52 ++- lib/_http_outgoing.js | 331 ++++++++---------- lib/_http_server.js | 20 +- lib/internal/http.js | 1 + test/parallel/test-http-agent-keepalive.js | 17 +- .../test-http-outgoing-end-multiple.js | 20 +- ...http-outgoing-message-capture-rejection.js | 2 +- test/parallel/test-stream-pipeline.js | 34 ++ 8 files changed, 232 insertions(+), 245 deletions(-) diff --git a/lib/_http_client.js b/lib/_http_client.js index f293e3e6486f72..78532e9e5cb57d 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -58,7 +58,7 @@ const Agent = require('_http_agent'); const { Buffer } = require('buffer'); const { defaultTriggerAsyncIdScope } = require('internal/async_hooks'); const { URL, urlToHttpOptions, searchParamsSymbol } = require('internal/url'); -const { kOutHeaders } = require('internal/http'); +const { kOutHeaders, kNeedDrain } = require('internal/http'); const { connResetException, codes } = require('internal/errors'); const { ERR_HTTP_HEADERS_SENT, @@ -98,7 +98,7 @@ class HTTPClientAsyncResource { } function ClientRequest(input, options, cb) { - FunctionPrototypeCall(OutgoingMessage, this, { autoDestroy: false }); + FunctionPrototypeCall(OutgoingMessage, this); if (typeof input === 'string') { const urlStr = input; @@ -304,7 +304,7 @@ function ClientRequest(input, options, cb) { if (typeof optsWithoutSignal.createConnection === 'function') { const oncreate = once((err, socket) => { if (err) { - process.nextTick(() => emitError(this, err)); + process.nextTick(() => this.emit('error', err)); } else { this.onSocket(socket); } @@ -373,8 +373,8 @@ function emitAbortNT(req) { function ondrain() { const msg = this._httpMessage; - if (msg && !msg.finished && msg._writableState.needDrain) { - msg._writableState.needDrain = false; + if (msg && !msg.finished && msg[kNeedDrain]) { + msg[kNeedDrain] = false; msg.emit('drain'); } } @@ -400,7 +400,8 @@ function socketCloseListener() { if (!res.complete) { res.destroy(connResetException('aborted')); } - emitClose(req); + req._closed = true; + req.emit('close'); if (!res.aborted && res.readable) { res.push(null); } @@ -410,9 +411,10 @@ function socketCloseListener() { // receive a response. The error needs to // fire on the request. req.socket._hadError = true; - emitError(req, connResetException('socket hang up')); + req.emit('error', connResetException('socket hang up')); } - emitClose(req); + req._closed = true; + req.emit('close'); } // Too bad. That output wasn't getting written. @@ -436,7 +438,7 @@ function socketErrorListener(err) { // For Safety. Some additional errors might fire later on // and we need to make sure we don't double-fire the error event. req.socket._hadError = true; - emitError(req, err); + req.emit('error', err); } const parser = socket.parser; @@ -460,7 +462,7 @@ function socketOnEnd() { // If we don't have a response then we know that the socket // ended prematurely and we need to emit an error on the request. req.socket._hadError = true; - emitError(req, connResetException('socket hang up')); + req.emit('error', connResetException('socket hang up')); } if (parser) { parser.finish(); @@ -483,7 +485,7 @@ function socketOnData(d) { freeParser(parser, req, socket); socket.destroy(); req.socket._hadError = true; - emitError(req, ret); + req.emit('error', ret); } else if (parser.incoming && parser.incoming.upgrade) { // Upgrade (if status code 101) or CONNECT const bytesParsed = ret; @@ -515,7 +517,9 @@ function socketOnData(d) { socket.readableFlowing = null; req.emit(eventName, res, socket, bodyHead); - emitClose(req); + req.destroyed = true; + req._closed = true; + req.emit('close'); } else { // Requested Upgrade or used CONNECT method, but have no handler. socket.destroy(); @@ -700,7 +704,8 @@ function requestOnPrefinish() { } function emitFreeNT(req) { - emitClose(req); + req._closed = true; + req.emit('close'); if (req.socket) { req.socket.emit('free'); } @@ -781,10 +786,10 @@ function onSocketNT(req, socket, err) { err = connResetException('socket hang up'); } if (err) { - emitError(req, err); + req.emit('error', err); } req._closed = true; - emitClose(req); + req.emit('close'); } if (socket) { @@ -864,23 +869,6 @@ function setSocketTimeout(sock, msecs) { } } -function emitError(req, err) { - req.destroyed = true; - req._writableState.errored = err; - // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 - err.stack; // eslint-disable-line no-unused-expressions - req._writableState.errorEmitted = true; - req.emit('error', err); -} - -function emitClose(req) { - req.destroyed = true; - req._closed = true; - req._writableState.closed = true; - req._writableState.closeEmitted = true; - req.emit('close'); -} - ClientRequest.prototype.setNoDelay = function setNoDelay(noDelay) { this._deferToConnect('setNoDelay', [noDelay]); }; diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index 74e011d0ca9b21..d3904e7c41fc9c 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -28,7 +28,6 @@ const { ArrayPrototypeJoin, ArrayPrototypePush, ArrayPrototypeUnshift, - Error, FunctionPrototype, FunctionPrototypeBind, FunctionPrototypeCall, @@ -48,9 +47,9 @@ const { const { getDefaultHighWaterMark } = require('internal/streams/state'); const assert = require('internal/assert'); const EE = require('events'); -const { Stream, Writable } = require('stream'); +const Stream = require('stream'); const internalUtil = require('internal/util'); -const { kOutHeaders, utcDate } = require('internal/http'); +const { kOutHeaders, utcDate, kNeedDrain } = require('internal/http'); const { Buffer } = require('buffer'); const common = require('_http_common'); const checkIsHttpToken = common._checkIsHttpToken; @@ -79,12 +78,11 @@ const { } = require('internal/errors'); const { validateString } = require('internal/validators'); const { isUint8Array } = require('internal/util/types'); -const { construct, destroy } = require('internal/streams/destroy'); const HIGH_WATER_MARK = getDefaultHighWaterMark(); const { CRLF, debug } = common; -const kCorked = Symbol('kCorked'); +const kCorked = Symbol('corked'); const nop = FunctionPrototype; @@ -98,7 +96,7 @@ function isCookieField(s) { return s.length === 6 && StringPrototypeToLowerCase(s) === 'cookie'; } -function OutgoingMessage(opts) { +function OutgoingMessage() { FunctionPrototypeCall(Stream, this); // Queue that holds all currently pending data, until the response will be @@ -111,6 +109,9 @@ function OutgoingMessage(opts) { // TCP socket and HTTP Parser and thus handle the backpressure. this.outputSize = 0; + this.writable = true; + this.destroyed = false; + this._last = false; this.chunkedEncoding = false; this.shouldKeepAlive = true; @@ -124,9 +125,12 @@ function OutgoingMessage(opts) { this._contentLength = null; this._hasBody = true; this._trailer = ''; + this[kNeedDrain] = false; + this.finished = false; this._headerSent = false; this[kCorked] = 0; + this._closed = false; this.socket = null; this._header = null; @@ -135,58 +139,23 @@ function OutgoingMessage(opts) { this._keepAliveTimeout = 0; this._onPendingData = nop; - - this._writableState = { - objectMode: false, - writable: true, - constructed: false, - corked: 0, - prefinished: false, - destroyed: false, - closed: false, - closeEmitted: false, - errored: null, - errorEmitted: false, - needDrain: false, - autoDestroy: opts?.autoDestroy == null ? true : false, - emitClose: true, - ended: false, - ending: false, - finished: false - }; - - construct(this, () => { - this._flush(); - }); } -ObjectSetPrototypeOf(OutgoingMessage.prototype, Writable.prototype); -ObjectSetPrototypeOf(OutgoingMessage, Writable); - -ObjectDefineProperty(OutgoingMessage.prototype, 'finished', { - get() { - return this._writableState.ended; - }, - set(value) { - this._writableState.ended = value; - } -}); +ObjectSetPrototypeOf(OutgoingMessage.prototype, Stream.prototype); +ObjectSetPrototypeOf(OutgoingMessage, Stream); -ObjectDefineProperty(OutgoingMessage.prototype, '_closed', { +ObjectDefineProperty(OutgoingMessage.prototype, 'writableFinished', { get() { - return this._writableState.closed; - }, - set(value) { - this._writableState.closed = value; + return ( + this.finished && + this.outputSize === 0 && + (!this.socket || this.socket.writableLength === 0) + ); } }); -ObjectDefineProperty(OutgoingMessage.prototype, 'writable', { +ObjectDefineProperty(OutgoingMessage.prototype, 'writableObjectMode', { get() { - // Compat. - return this._writableState.writable; - }, - set(value) { - this._writableState.writable = value; + return false; } }); @@ -202,6 +171,13 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'writableHighWaterMark', { } }); +ObjectDefineProperty(OutgoingMessage.prototype, 'writableCorked', { + get() { + const corked = this.socket ? this.socket.writableCorked : 0; + return corked + this[kCorked]; + } +}); + ObjectDefineProperty(OutgoingMessage.prototype, '_headers', { get: internalUtil.deprecate(function() { return this.getHeaders(); @@ -265,6 +241,7 @@ ObjectDefineProperty(OutgoingMessage.prototype, '_headerNames', { }, 'OutgoingMessage.prototype._headerNames is deprecated', 'DEP0066') }); + OutgoingMessage.prototype._renderHeaders = function _renderHeaders() { if (this._header) { throw new ERR_HTTP_HEADERS_SENT('render'); @@ -286,8 +263,6 @@ OutgoingMessage.prototype._renderHeaders = function _renderHeaders() { }; OutgoingMessage.prototype.cork = function() { - this._writableState.corked++; - if (this.socket) { this.socket.cork(); } else { @@ -296,10 +271,6 @@ OutgoingMessage.prototype.cork = function() { }; OutgoingMessage.prototype.uncork = function() { - if (this._writableState.corked) { - this._writableState.corked--; - } - if (this.socket) { this.socket.uncork(); } else if (this[kCorked]) { @@ -308,6 +279,7 @@ OutgoingMessage.prototype.uncork = function() { }; OutgoingMessage.prototype.setTimeout = function setTimeout(msecs, callback) { + if (callback) { this.on('timeout', callback); } @@ -322,48 +294,22 @@ OutgoingMessage.prototype.setTimeout = function setTimeout(msecs, callback) { return this; }; -OutgoingMessage.prototype._construct = function(callback) { - if (this.socket) { - for (let n = 0; n < this[kCorked]; ++n) { - this.socket.cork(); - } - callback(); - } else { - // TODO(ronag): What if never assigned socket? - this.once('socket', function(socket) { - for (let n = 0; n < this[kCorked]; ++n) { - socket.cork(); - } - callback(); - }); - } -}; - -OutgoingMessage.prototype.destroy = destroy; -OutgoingMessage.prototype._destroy = function(err, callback) { - if (!this.writableEnded) { - this.aborted = true; - this.emit('aborted'); +// It's possible that the socket will be destroyed, and removed from +// any messages, before ever calling this. In that case, just skip +// it, since something else is destroying this connection anyway. +OutgoingMessage.prototype.destroy = function destroy(error) { + if (this.destroyed) { + return this; } - - // TODO(ronag): Why is this needed? - const cb = (err) => { - const triggerAsyncId = this.socket ? - this.socket[async_id_symbol] : undefined; - defaultTriggerAsyncIdScope(triggerAsyncId, callback, err); - }; + this.destroyed = true; if (this.socket) { - Stream.finished(this.socket.destroy(err), (er) => { - if (er && er.code !== 'ERR_STREAM_PREMATURE_CLOSE') { - err = er; - } - - cb(err); - }); + this.socket.destroy(error); } else { - cb(err); + this.once('socket', function socketDestroyOnConnect(socket) { + socket.destroy(error); + }); } return this; @@ -740,6 +686,16 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'headersSent', { get: function() { return !!this._header; } }); +ObjectDefineProperty(OutgoingMessage.prototype, 'writableEnded', { + get: function() { return this.finished; } +}); + +ObjectDefineProperty(OutgoingMessage.prototype, 'writableNeedDrain', { + get: function() { + return !this.destroyed && !this.finished && this[kNeedDrain]; + } +}); + const crlf_buf = Buffer.from(CRLF); OutgoingMessage.prototype.write = function write(chunk, encoding, callback) { if (typeof encoding === 'function') { @@ -747,15 +703,30 @@ OutgoingMessage.prototype.write = function write(chunk, encoding, callback) { encoding = null; } - const ret = write_(this, chunk, encoding, callback, false) === true; + const ret = write_(this, chunk, encoding, callback, false); if (!ret) - this._writableState.needDrain = true; + this[kNeedDrain] = true; return ret; }; -function write_(msg, chunk, encoding, callback, fromEnd) { - const state = msg._writableState; +function onError(msg, err, callback) { + const triggerAsyncId = msg.socket ? msg.socket[async_id_symbol] : undefined; + defaultTriggerAsyncIdScope(triggerAsyncId, + process.nextTick, + emitErrorNt, + msg, + err, + callback); +} + +function emitErrorNt(msg, err, callback) { + callback(err); + if (typeof msg.emit === 'function' && !msg._closed) { + msg.emit('error', err); + } +} +function write_(msg, chunk, encoding, callback, fromEnd) { if (typeof callback !== 'function') callback = nop; @@ -772,16 +743,19 @@ function write_(msg, chunk, encoding, callback, fromEnd) { } let err; - if (state.ending) { + if (msg.finished) { err = new ERR_STREAM_WRITE_AFTER_END(); - } else if (state.destroyed) { + } else if (msg.destroyed) { err = new ERR_STREAM_DESTROYED('write'); } if (err) { - process.nextTick(callback, err); - msg.destroy(err); - return err; + if (!msg.destroyed) { + onError(msg, err, callback); + } else { + process.nextTick(callback, err); + } + return false; } if (!msg._header) { @@ -798,9 +772,9 @@ function write_(msg, chunk, encoding, callback, fromEnd) { return true; } - if (!fromEnd && !state.corked) { - msg.cork(); - process.nextTick(uncorkNT, msg); + if (!fromEnd && msg.socket && !msg.socket.writableCorked) { + msg.socket.cork(); + process.nextTick(connectionCorkNT, msg.socket); } let ret; @@ -818,8 +792,8 @@ function write_(msg, chunk, encoding, callback, fromEnd) { } -function uncorkNT(msg) { - msg.uncork(); +function connectionCorkNT(conn) { + conn.uncork(); } @@ -850,24 +824,12 @@ OutgoingMessage.prototype.addTrailers = function addTrailers(headers) { } }; -function onFinish(err) { - const state = this._writableState; - - if (err || state.errored || state.finished) { - return; - } - - state.finished = true; - this.emit('finish'); - - if (state.autoDestroy) { - this.destroy(); - } +function onFinish(outmsg) { + if (outmsg && outmsg.socket && outmsg.socket._hadError) return; + outmsg.emit('finish'); } OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { - const state = this._writableState; - if (typeof chunk === 'function') { callback = chunk; chunk = null; @@ -877,90 +839,74 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { encoding = null; } - this.cork(); - - let err; - if (chunk) { - const ret = write_(this, chunk, encoding, null, true); - if (ret instanceof Error) { - err = ret; + if (this.finished) { + onError(this, + new ERR_STREAM_WRITE_AFTER_END(), + typeof callback !== 'function' ? nop : callback); + return this; } - } - - if (err) { - // Do nothing... - } else if (!state.errored && !state.ending) { - state.ending = true; - if (!this._header) { - this._contentLength = 0; - this._implicitHeader(); + if (this.socket) { + this.socket.cork(); } - const finish = FunctionPrototypeBind(onFinish, this); - - state.finalCalled = true; - - if (this._hasBody && this.chunkedEncoding) { - this._send('0' + CRLF + this._trailer + CRLF, 'latin1', finish); - } else { - // Force a flush, HACK. - this._send('', 'latin1', finish); + write_(this, chunk, encoding, null, true); + } else if (this.finished) { + if (typeof callback === 'function') { + if (!this.writableFinished) { + this.on('finish', callback); + } else { + callback(new ERR_STREAM_ALREADY_FINISHED('end')); + } } - - while (state.corked) { - // Fully uncork connection on end(). - this.uncork(); + return this; + } else if (!this._header) { + if (this.socket) { + this.socket.cork(); } - state.ended = true; + this._contentLength = 0; + this._implicitHeader(); + } - // There is the first message on the outgoing queue, and we've sent - // everything to the socket. - debug('outgoing message end.'); - if (this.outputData.length === 0 && this.socket?._httpMessage === this) { - this._finish(); - } - } else if (state.finished) { - err = new ERR_STREAM_ALREADY_FINISHED('end'); - } else if (state.destroyed) { - err = new ERR_STREAM_DESTROYED('end'); + if (typeof callback === 'function') + this.once('finish', callback); + + const finish = FunctionPrototypeBind(onFinish, undefined, this); + + if (this._hasBody && this.chunkedEncoding) { + this._send('0\r\n' + this._trailer + '\r\n', 'latin1', finish); + } else { + // Force a flush, HACK. + this._send('', 'latin1', finish); } - if (typeof callback === 'function') { - if (err || state.finished) { - process.nextTick(callback, err); - } else { - onFinished(this, callback); - } + if (this.socket) { + // Fully uncork connection on end(). + this.socket._writableState.corked = 1; + this.socket.uncork(); } + this[kCorked] = 0; - this.uncork(); + this.finished = true; + + // There is the first message on the outgoing queue, and we've sent + // everything to the socket. + debug('outgoing message end.'); + if (this.outputData.length === 0 && + this.socket && + this.socket._httpMessage === this) { + this._finish(); + } return this; }; -function onFinished(msg, callback) { - const onDone = (err) => { - msg - .off('finish', onDone) - .off(EE.errorMonitor, onDone); - callback(err); - }; - msg - .on('finish', onDone) - .on(EE.errorMonitor, onDone); -} OutgoingMessage.prototype._finish = function _finish() { - const state = this._writableState; - - if (!state.prefinished) { - assert(this.socket); - state.prefinished = true; - this.emit('prefinish'); - } + assert(this.socket); + this.emit('prefinish'); }; @@ -993,14 +939,19 @@ OutgoingMessage.prototype._flush = function _flush() { if (this.finished) { // This is a queue to the server or client to bring in the next this. this._finish(); - } else if (ret && this._writableState.needDrain) { - this._writableState.needDrain = false; + } else if (ret && this[kNeedDrain]) { + this[kNeedDrain] = false; this.emit('drain'); } } }; OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) { + while (this[kCorked]) { + this[kCorked]--; + socket.cork(); + } + const outputLength = this.outputData.length; if (outputLength <= 0) return undefined; diff --git a/lib/_http_server.js b/lib/_http_server.js index ff6ad0b1897deb..dac5fedf433533 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -59,6 +59,7 @@ const { const { OutgoingMessage } = require('_http_outgoing'); const { kOutHeaders, + kNeedDrain, emitStatistics } = require('internal/http'); const { @@ -226,7 +227,11 @@ function onServerResponseClose() { // Ergo, we need to deal with stale 'close' events and handle the case // where the ServerResponse object has already been deconstructed. // Fortunately, that requires only a single if check. :-) - this._httpMessage?.destroy(); + if (this._httpMessage) { + this._httpMessage.destroyed = true; + this._httpMessage._closed = true; + this._httpMessage.emit('close'); + } } ServerResponse.prototype.assignSocket = function assignSocket(socket) { @@ -235,6 +240,7 @@ ServerResponse.prototype.assignSocket = function assignSocket(socket) { socket.on('close', onServerResponseClose); this.socket = socket; this.emit('socket', socket); + this._flush(); }; ServerResponse.prototype.detachSocket = function detachSocket(socket) { @@ -550,8 +556,8 @@ function socketOnDrain(socket, state) { } const msg = socket._httpMessage; - if (msg && !msg.finished && msg._writableState.needDrain) { - msg._writableState.needDrain = false; + if (msg && !msg.finished && msg[kNeedDrain]) { + msg[kNeedDrain] = false; msg.emit('drain'); } } @@ -578,6 +584,7 @@ function abortIncoming(incoming) { const req = ArrayPrototypeShift(incoming); req.destroy(connResetException('aborted')); } + // Abort socket._httpMessage ? } function socketOnEnd(server, socket, parser, state) { @@ -797,6 +804,7 @@ function resOnFinish(req, res, socket, state, server) { res.detachSocket(socket); clearIncoming(req); + process.nextTick(emitCloseNT, res); if (res._last) { if (typeof socket.destroySoon === 'function') { @@ -818,6 +826,12 @@ function resOnFinish(req, res, socket, state, server) { } } +function emitCloseNT(self) { + self.destroyed = true; + self._closed = true; + self.emit('close'); +} + // The following callback is issued after the headers have been read on a // new message. In this callback we setup the response object and pass it // to the user. diff --git a/lib/internal/http.js b/lib/internal/http.js index 28c4b245ab559f..f87dc8aa6cd01b 100644 --- a/lib/internal/http.js +++ b/lib/internal/http.js @@ -44,6 +44,7 @@ function emitStatistics(statistics) { module.exports = { kOutHeaders: Symbol('kOutHeaders'), + kNeedDrain: Symbol('kNeedDrain'), utcDate, emitStatistics }; diff --git a/test/parallel/test-http-agent-keepalive.js b/test/parallel/test-http-agent-keepalive.js index 10c9e2372ddec4..a1f902fab091e1 100644 --- a/test/parallel/test-http-agent-keepalive.js +++ b/test/parallel/test-http-agent-keepalive.js @@ -105,20 +105,17 @@ function remoteClose() { function remoteError() { // Remote server will destroy the socket const req = get('/error', common.mustNotCall()); - req.on('socket', common.mustCall((socket) => { - socket.on('end', common.mustCall(() => { - assert.strictEqual(agent.sockets[name].length, 1); - assert.strictEqual(agent.freeSockets[name], undefined); - })); - })); req.on('error', common.mustCall((err) => { assert(err); assert.strictEqual(err.message, 'socket hang up'); - })); - req.on('close', common.mustCall((err) => { - assert.strictEqual(agent.sockets[name], undefined); + assert.strictEqual(agent.sockets[name].length, 1); assert.strictEqual(agent.freeSockets[name], undefined); - server.close(); + // Wait socket 'close' event emit + setTimeout(common.mustCall(() => { + assert.strictEqual(agent.sockets[name], undefined); + assert.strictEqual(agent.freeSockets[name], undefined); + server.close(); + }), common.platformTimeout(1)); })); } diff --git a/test/parallel/test-http-outgoing-end-multiple.js b/test/parallel/test-http-outgoing-end-multiple.js index 8a3a42aa6ce77e..696443f9390cd0 100644 --- a/test/parallel/test-http-outgoing-end-multiple.js +++ b/test/parallel/test-http-outgoing-end-multiple.js @@ -3,21 +3,23 @@ const common = require('../common'); const assert = require('assert'); const http = require('http'); +const onWriteAfterEndError = common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END'); +}, 2); + const server = http.createServer(common.mustCall(function(req, res) { res.end('testing ended state', common.mustCall()); assert.strictEqual(res.writableCorked, 0); res.end(common.mustCall((err) => { - assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END'); - })); - res.end('end', common.mustCall((err) => { - assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END'); - })); - res.on('error', common.mustCall((err) => { - assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END'); + assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED'); })); - res.on('close', common.mustCall(() => { + assert.strictEqual(res.writableCorked, 0); + res.end('end', onWriteAfterEndError); + assert.strictEqual(res.writableCorked, 0); + res.on('error', onWriteAfterEndError); + res.on('finish', common.mustCall(() => { res.end(common.mustCall((err) => { - assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); + assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED'); server.close(); })); })); diff --git a/test/parallel/test-http-outgoing-message-capture-rejection.js b/test/parallel/test-http-outgoing-message-capture-rejection.js index 763f125181e86d..9fe9bdb21331d8 100644 --- a/test/parallel/test-http-outgoing-message-capture-rejection.js +++ b/test/parallel/test-http-outgoing-message-capture-rejection.js @@ -14,7 +14,7 @@ events.captureRejections = true; throw _err; })); - res.on('error', common.mustCall((err) => { + res.socket.on('error', common.mustCall((err) => { assert.strictEqual(err, _err); })); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index c287b7c1c72cd7..ed5a3d9a0b54f4 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -253,6 +253,40 @@ const net = require('net'); }); } +{ + const server = http.createServer((req, res) => { + pipeline(req, res, common.mustSucceed()); + }); + + server.listen(0, () => { + const req = http.request({ + port: server.address().port + }); + + let sent = 0; + const rs = new Readable({ + read() { + if (sent++ > 10) { + return; + } + rs.push('hello'); + } + }); + + pipeline(rs, req, common.mustCall(() => { + server.close(); + })); + + req.on('response', (res) => { + let cnt = 10; + res.on('data', () => { + cnt--; + if (cnt === 0) rs.destroy(); + }); + }); + }); +} + { const makeTransform = () => { const tr = new Transform({