diff --git a/lib/_http_client.js b/lib/_http_client.js index 68eb125e0e10e9..4dc1f60cd7284c 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -186,13 +186,13 @@ function ClientRequest(options, cb) { } self.onSocket(socket); self._deferToConnect(null, null, function() { - self._flush(); + self._sendHeaders(); self = null; }); } self._deferToConnect(null, null, function() { - self._flush(); + self._sendHeaders(); self = null; }); } @@ -510,8 +510,6 @@ function emitFreeNT(socket) { function tickOnSocket(req, socket) { var parser = parsers.alloc(); - req.socket = socket; - req.connection = socket; parser.reinitialize(HTTPParser.RESPONSE); parser.socket = socket; parser.incoming = null; @@ -538,7 +536,8 @@ function tickOnSocket(req, socket) { socket.on('data', socketOnData); socket.on('end', socketOnEnd); socket.on('close', socketCloseListener); - req.emit('socket', socket); + + req._assignSocket(socket); } ClientRequest.prototype.onSocket = function(socket) { diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index 77bfe4ba77d416..3d317470c95d67 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -1,7 +1,7 @@ 'use strict'; const assert = require('assert').ok; -const Stream = require('stream'); +const Writable = require('stream').Writable; const timers = require('timers'); const util = require('util'); const internalUtil = require('internal/util'); @@ -44,21 +44,12 @@ utcDate._onTimeout = function() { function OutgoingMessage() { - Stream.call(this); + Writable.call(this, { + decodeStrings: false + }); - // Queue that holds all currently pending data, until the response will be - // assigned to the socket (until it will its turn in the HTTP pipeline). - this.output = []; - this.outputEncodings = []; - this.outputCallbacks = []; - - // `outputSize` is an approximate measure of how much data is queued on this - // response. `_onPendingData` will be invoked to update similar global - // per-connection counter. That counter will be used to pause/unpause the - // TCP socket and HTTP Parser and thus handle the backpressure. - this.outputSize = 0; - - this.writable = true; + // Start corked, will be uncorked on `_assignSocket` + this.cork(); this._last = false; this.chunkedEncoding = false; @@ -71,7 +62,6 @@ function OutgoingMessage() { this._hasBody = true; this._trailer = ''; - this.finished = false; this._headerSent = false; this.socket = null; @@ -81,13 +71,25 @@ function OutgoingMessage() { this._headerNames = {}; this._onPendingData = null; + this._onPendingLast = 0; + + this.on('prefinish', this._onPrefinish); } -util.inherits(OutgoingMessage, Stream); +util.inherits(OutgoingMessage, Writable); exports.OutgoingMessage = OutgoingMessage; +OutgoingMessage.prototype._assignSocket = function(socket) { + this.socket = socket; + this.connection = socket; + this.uncork(); + + this.emit('socket', socket); +}; + + OutgoingMessage.prototype.setTimeout = function(msecs, callback) { if (callback) { @@ -109,81 +111,187 @@ OutgoingMessage.prototype.setTimeout = function(msecs, callback) { // any messages, before ever calling this. In that case, just skip // it, since something else is destroying this connection anyway. OutgoingMessage.prototype.destroy = function(error) { - if (this.socket) - this.socket.destroy(error); - else - this.once('socket', function(socket) { + const socket = this.socket; + + if (socket) { + // NOTE: we may have some nextTick-corked data pending to write. There is + // no point in nextTick optimization here anymore, so just write everything + // to the socket + socket.uncork(); + socket.destroy(error); + } else { + this.once('socket', (socket) => { + socket.uncork(); socket.destroy(error); }); + } +}; + + +OutgoingMessage.prototype._sendHeaders = function(data) { + if (this._headerSent) + return; + + if (!this.socket) { + this.once('socket', () => { + this._sendHeaders(); + }); + return; + } + + if (!this._header) + this._implicitHeader(); + + this._headerSent = true; + if (this.socket.writable && !this.socket.destroyed) + this.socket.write(this._header); }; -// This abstract either writing directly to the socket or buffering it. -OutgoingMessage.prototype._send = function(data, encoding, callback) { - // This is a shameful hack to get the headers and first body chunk onto - // the same packet. Future versions of Node are going to take care of - // this at a lower level and in a more general way. +OutgoingMessage.prototype._countPendingData = function _countPendingData() { + if (!this._onPendingData) + return; + + const before = this._onPendingLast; + const now = this._writableState.length; + this._onPendingLast = now; + + this._onPendingData(now - before); +}; + + +OutgoingMessage.prototype.write = function write(data, encoding, callback) { + const res = Writable.prototype.write.call(this, data, encoding, callback); + + this._countPendingData(); + debug('write ret = %j', res); + + return res; +}; + + +OutgoingMessage.prototype._corkUncork = function _corkUncork(socket) { + if (!socket.corked) { + socket.cork(); + process.nextTick(socketUncorkNT, socket); + } +}; + + +OutgoingMessage.prototype._write = function _write(data, encoding, callback) { + const socket = this.socket; + + assert(socket); + + this._countPendingData(); + + // Ignore writes after end + if (!socket.writable || socket.destroyed) + return; + + // NOTE: Logic is duplicated to concat with `data` + // (Fast case) if (!this._headerSent) { + if (!this._header) + this._implicitHeader(); + + this._headerSent = true; + this._corkUncork(socket); + socket.write(this._header); + } + + if (!this._hasBody) { + debug('This type of response MUST NOT have a body. ' + + 'Ignoring write() calls.'); + return callback(null); + } + + // Empty writes don't play well with chunked encoding + if (data.length === 0) + return process.nextTick(callback); + + if (this.chunkedEncoding) { if (typeof data === 'string' && encoding !== 'hex' && - encoding !== 'base64') { - data = this._header + data; + encoding !== 'base64' && + encoding !== 'binary') { + const len = Buffer.byteLength(data, encoding); + const chunk = len.toString(16) + CRLF + data + CRLF; + socket.write(chunk, encoding, callback); } else { - this.output.unshift(this._header); - this.outputEncodings.unshift('binary'); - this.outputCallbacks.unshift(null); - this.outputSize += this._header.length; - if (typeof this._onPendingData === 'function') - this._onPendingData(this._header.length); + let len; + + // buffer, or a non-toString-friendly encoding + if (typeof data === 'string') + len = Buffer.byteLength(data, encoding); + else + len = data.length; + + this._corkUncork(socket); + socket.write(len.toString(16), 'binary', null); + socket.write(crlf_buf, null, null); + socket.write(data, encoding, null); + socket.write(crlf_buf, null, callback); } - this._headerSent = true; + } else { + socket.write(data, encoding, callback); } - return this._writeRaw(data, encoding, callback); }; -OutgoingMessage.prototype._writeRaw = function(data, encoding, callback) { - if (typeof encoding === 'function') { - callback = encoding; - encoding = null; - } +// +// XXX(streams): I wish there was a way to override some of this behavior in +// Writable +// +OutgoingMessage.prototype.end = function end(chunk, encoding, cb) { + // This is odd legacy behavior, required for compatibility + // TODO(indutny): perhaps remove it at some point? + if (this._writableState.ended) + return false; - var connection = this.connection; - if (connection && - connection._httpMessage === this && - connection.writable && - !connection.destroyed) { - // There might be pending data in the this.output buffer. - var outputLength = this.output.length; - if (outputLength > 0) { - this._flushOutput(connection); - } else if (data.length === 0) { - if (typeof callback === 'function') - process.nextTick(callback); - return true; + // Fast case `Content-Length`, immediate `res.end(...)` + if (!this._header && this._writableState.length === 0) { + if (chunk) { + if (typeof chunk === 'string') + this._contentLength = Buffer.byteLength(chunk, encoding); + else + this._contentLength = chunk.length; + } else { + this._contentLength = 0; } + } - // Directly write to socket. - return connection.write(data, encoding, callback); - } else if (connection && connection.destroyed) { - // The socket was destroyed. If we're still trying to write to it, - // then we haven't gotten the 'close' event yet. - return false; - } else { - // buffer, as long as we're not destroyed. - return this._buffer(data, encoding, callback); + if (!this._header) + this._implicitHeader(); + + if (this.socket) { + const ret = Writable.prototype.end.call(this, chunk, encoding, cb); + this._countPendingData(); + debug('outgoing message end.'); + return ret; } -}; + if (typeof chunk === 'function') { + cb = chunk; + chunk = null; + encoding = null; + } else if (typeof encoding === 'function') { + cb = encoding; + encoding = null; + } + + if (chunk !== null && chunk !== undefined) + this.write(chunk, encoding); -OutgoingMessage.prototype._buffer = function(data, encoding, callback) { - this.output.push(data); - this.outputEncodings.push(encoding); - this.outputCallbacks.push(callback); - this.outputSize += data.length; - if (typeof this._onPendingData === 'function') - this._onPendingData(data.length); - return false; + // No more `.write()` calls + // NOTE: .ending is intentionally `false` + this._writableState.ended = true; + + this.once('socket', () => { + debug('outgoing message end.'); + Writable.prototype.end.call(this, cb); + this._countPendingData(); + }); }; @@ -297,7 +405,8 @@ OutgoingMessage.prototype._storeHeader = function(firstLine, headers) { // wait until the first body chunk, or close(), is sent to flush, // UNLESS we're sending Expect: 100-continue. - if (state.sentExpect) this._send(''); + if (state.sentExpect) + this._sendHeaders(); }; function storeHeader(self, state, field, value) { @@ -421,75 +530,9 @@ Object.defineProperty(OutgoingMessage.prototype, 'headersSent', { }); -OutgoingMessage.prototype.write = function(chunk, encoding, callback) { - if (this.finished) { - var err = new Error('write after end'); - process.nextTick(writeAfterEndNT, this, err, callback); - - return true; - } - - if (!this._header) { - this._implicitHeader(); - } - - if (!this._hasBody) { - debug('This type of response MUST NOT have a body. ' + - 'Ignoring write() calls.'); - return true; - } - - if (typeof chunk !== 'string' && !(chunk instanceof Buffer)) { - throw new TypeError('First argument must be a string or Buffer'); - } - - - // If we get an empty string or buffer, then just do nothing, and - // signal the user to keep writing. - if (chunk.length === 0) return true; - - var len, ret; - if (this.chunkedEncoding) { - if (typeof chunk === 'string' && - encoding !== 'hex' && - encoding !== 'base64' && - encoding !== 'binary') { - len = Buffer.byteLength(chunk, encoding); - chunk = len.toString(16) + CRLF + chunk + CRLF; - ret = this._send(chunk, encoding, callback); - } else { - // buffer, or a non-toString-friendly encoding - if (typeof chunk === 'string') - len = Buffer.byteLength(chunk, encoding); - else - len = chunk.length; - - if (this.connection && !this.connection.corked) { - this.connection.cork(); - process.nextTick(connectionCorkNT, this.connection); - } - this._send(len.toString(16), 'binary', null); - this._send(crlf_buf, null, null); - this._send(chunk, encoding, null); - ret = this._send(crlf_buf, null, callback); - } - } else { - ret = this._send(chunk, encoding, callback); - } - - debug('write ret = ' + ret); - return ret; -}; - - -function writeAfterEndNT(self, err, callback) { - self.emit('error', err); - if (callback) callback(err); -} - - -function connectionCorkNT(conn) { - conn.uncork(); +function socketUncorkNT(conn) { + if (conn.writable && !conn.destroyed) + conn.uncork(); } @@ -529,161 +572,24 @@ OutgoingMessage.prototype.addTrailers = function(headers) { const crlf_buf = Buffer.from('\r\n'); -OutgoingMessage.prototype.end = function(data, encoding, callback) { - if (typeof data === 'function') { - callback = data; - data = null; - } else if (typeof encoding === 'function') { - callback = encoding; - encoding = null; - } +OutgoingMessage.prototype._onPrefinish = function() { + const socket = this.socket; - if (data && typeof data !== 'string' && !(data instanceof Buffer)) { - throw new TypeError('First argument must be a string or Buffer'); - } + this._countPendingData(); + if (!socket || !socket.writable || socket.destroyed) + return; - if (this.finished) { - return false; - } - - var self = this; - function finish() { - self.emit('finish'); - } - - if (typeof callback === 'function') - this.once('finish', callback); - - if (!this._header) { - if (data) { - if (typeof data === 'string') - this._contentLength = Buffer.byteLength(data, encoding); - else - this._contentLength = data.length; - } else { - this._contentLength = 0; - } - this._implicitHeader(); - } - - if (data && !this._hasBody) { - debug('This type of response MUST NOT have a body. ' + - 'Ignoring data passed to end().'); - data = null; - } - - if (this.connection && data) - this.connection.cork(); - - var ret; - if (data) { - // Normal body write. - this.write(data, encoding); - } - - if (this._hasBody && this.chunkedEncoding) { - ret = this._send('0\r\n' + this._trailer + '\r\n', 'binary', finish); - } else { - // Force a flush, HACK. - ret = this._send('', 'binary', finish); - } - - if (this.connection && data) - this.connection.uncork(); - - this.finished = true; - - // There is the first message on the outgoing queue, and we've sent - // everything to the socket. - debug('outgoing message end.'); - if (this.output.length === 0 && - this.connection && - this.connection._httpMessage === this) { - this._finish(); - } - - return ret; -}; - - -OutgoingMessage.prototype._finish = function() { - assert(this.connection); - this.emit('prefinish'); -}; - - -// This logic is probably a bit confusing. Let me explain a bit: -// -// In both HTTP servers and clients it is possible to queue up several -// outgoing messages. This is easiest to imagine in the case of a client. -// Take the following situation: -// -// req1 = client.request('GET', '/'); -// req2 = client.request('POST', '/'); -// -// When the user does -// -// req2.write('hello world\n'); -// -// it's possible that the first request has not been completely flushed to -// the socket yet. Thus the outgoing messages need to be prepared to queue -// up data internally before sending it on further to the socket's queue. -// -// This function, outgoingFlush(), is called by both the Server and Client -// to attempt to flush any pending messages out to the socket. -OutgoingMessage.prototype._flush = function() { - var socket = this.socket; - var ret; - - if (socket && socket.writable) { - // There might be remaining data in this.output; write it out - ret = this._flushOutput(socket); - - if (this.finished) { - // This is a queue to the server or client to bring in the next this. - this._finish(); - } else if (ret) { - // This is necessary to prevent https from breaking - this.emit('drain'); - } - } -}; - -OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) { - var ret; - var outputLength = this.output.length; - if (outputLength <= 0) - return ret; - - var output = this.output; - var outputEncodings = this.outputEncodings; - var outputCallbacks = this.outputCallbacks; socket.cork(); - for (var i = 0; i < outputLength; i++) { - ret = socket.write(output[i], outputEncodings[i], - outputCallbacks[i]); - } - socket.uncork(); + this._sendHeaders(); - this.output = []; - this.outputEncodings = []; - this.outputCallbacks = []; - if (typeof this._onPendingData === 'function') - this._onPendingData(-this.outputSize); - this.outputSize = 0; - - return ret; + if (this._hasBody && this.chunkedEncoding) + socket.write('0\r\n' + this._trailer + '\r\n', 'binary'); + socket.uncork(); }; -OutgoingMessage.prototype.flushHeaders = function() { - if (!this._header) { - this._implicitHeader(); - } +OutgoingMessage.prototype.flushHeaders = OutgoingMessage.prototype._sendHeaders; - // Force-flush the headers. - this._send(''); -}; OutgoingMessage.prototype.flush = internalUtil.deprecate(function() { this.flushHeaders(); diff --git a/lib/_http_server.js b/lib/_http_server.js index 117202fd3e0e4f..7ef2a2a89ce1c1 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -135,10 +135,7 @@ ServerResponse.prototype.assignSocket = function(socket) { assert(!socket._httpMessage); socket._httpMessage = this; socket.on('close', onServerResponseClose); - this.socket = socket; - this.connection = socket; - this.emit('socket', socket); - this._flush(); + this._assignSocket(socket); }; ServerResponse.prototype.detachSocket = function(socket) { @@ -149,7 +146,8 @@ ServerResponse.prototype.detachSocket = function(socket) { }; ServerResponse.prototype.writeContinue = function(cb) { - this._writeRaw('HTTP/1.1 100 Continue' + CRLF + CRLF, 'ascii', cb); + if (this.socket && this.socket.writable) + this.socket.write('HTTP/1.1 100 Continue' + CRLF + CRLF, 'ascii', cb); this._sent100 = true; }; diff --git a/test/parallel/test-http-1.0-keep-alive.js b/test/parallel/test-http-1.0-keep-alive.js index fa49707c24a8d5..ce75e8b46cca90 100644 --- a/test/parallel/test-http-1.0-keep-alive.js +++ b/test/parallel/test-http-1.0-keep-alive.js @@ -121,8 +121,8 @@ function check(tests) { console.error(' > CLIENT ONDATA %j %j', s.length, s.toString()); current++; if (ctx.expectClose) return; - conn.removeListener('close', onclose); conn.removeListener('data', ondata); + conn.removeListener('close', onclose); connected(); } conn.on('data', ondata); diff --git a/test/parallel/test-http-1.0.js b/test/parallel/test-http-1.0.js index facf5c98eacb83..34e7026c5cc16b 100644 --- a/test/parallel/test-http-1.0.js +++ b/test/parallel/test-http-1.0.js @@ -73,9 +73,11 @@ function test(handler, request_generator, response_validator) { assert.equal(0, req.httpVersionMinor); res.sendDate = false; res.writeHead(200, {'Content-Type': 'text/plain'}); - res.write('Hello, '); res._send(''); - res.write('world!'); res._send(''); - res.end(); + res.write('Hello, ', () => { + res.write('world!', () => { + res.end(); + }); + }); } function request_generator() { @@ -109,9 +111,11 @@ function test(handler, request_generator, response_validator) { assert.equal(1, req.httpVersionMinor); res.sendDate = false; res.writeHead(200, {'Content-Type': 'text/plain'}); - res.write('Hello, '); res._send(''); - res.write('world!'); res._send(''); - res.end(); + res.write('Hello, ', () => { + res.write('world!', () => { + res.end(); + }); + }); } function request_generator() { diff --git a/test/parallel/test-http-destroyed-socket-write2.js b/test/parallel/test-http-destroyed-socket-write2.js index ac6ab92c17c072..9171072a82968c 100644 --- a/test/parallel/test-http-destroyed-socket-write2.js +++ b/test/parallel/test-http-destroyed-socket-write2.js @@ -42,8 +42,7 @@ server.listen(common.PORT, function() { break; } - assert.equal(req.output.length, 0); - assert.equal(req.outputEncodings.length, 0); + assert.equal(req._writableState.length, 0); server.close(); })); diff --git a/test/parallel/test-http-pipeline-flood.js b/test/parallel/test-http-pipeline-flood.js index 47acb821068f01..1334f537165c1d 100644 --- a/test/parallel/test-http-pipeline-flood.js +++ b/test/parallel/test-http-pipeline-flood.js @@ -47,6 +47,9 @@ function parent() { } backloggedReqs++; } + + // Ignore write errors, they happen when we kill child + res.on('error', () => {}); res.end(); }); diff --git a/test/parallel/test-http-res-write-after-end.js b/test/parallel/test-http-res-write-after-end.js index 206f4273ec70c7..007362cd855836 100644 --- a/test/parallel/test-http-res-write-after-end.js +++ b/test/parallel/test-http-res-write-after-end.js @@ -14,7 +14,7 @@ var server = http.Server(function(req, res) { res.end(); var r = res.write('This should raise an error.'); - assert.equal(r, true, 'write after end should return true'); + assert.equal(r, false, 'write after end should return false'); }); server.listen(common.PORT, function() {