From ab03635fb1fd9d380d214116d2ba5bd96b2b9311 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 13 Oct 2015 02:16:39 -0400 Subject: [PATCH] http: fix stalled pipeline bug This is a two-part fix: - Fix pending data notification in `OutgoingMessage` to notify server about flushed data too - Fix pause/resume behavior for the consumed socket. `resume` event is emitted on a next tick, and `socket._paused` can already be `true` at this time. Pause the socket again to avoid PAUSED error on parser. Fix: https://github.com/nodejs/node/issues/3332 PR-URL: https://github.com/nodejs/node/pull/3342 Reviewed-By: James M Snell Reviewed-By: Trevor Norris --- lib/_http_common.js | 4 ++ lib/_http_outgoing.js | 66 +++++++++---------- lib/_http_server.js | 39 +++++++++-- src/node_http_parser.cc | 17 +++-- test/parallel/test-http-pipeline-regr-3332.js | 41 ++++++++++++ 5 files changed, 120 insertions(+), 47 deletions(-) create mode 100644 test/parallel/test-http-pipeline-regr-3332.js diff --git a/lib/_http_common.js b/lib/_http_common.js index addf9a0fa4f5d0..5140d366661cb4 100644 --- a/lib/_http_common.js +++ b/lib/_http_common.js @@ -140,6 +140,7 @@ var parsers = new FreeList('parsers', 1000, function() { parser._headers = []; parser._url = ''; + parser._consumed = false; // Only called in the slow case where slow means // that the request headers were either fragmented @@ -167,6 +168,9 @@ function freeParser(parser, req, socket) { if (parser) { parser._headers = []; parser.onIncoming = null; + if (parser._consumed) + parser.unconsume(); + parser._consumed = false; if (parser.socket) parser.socket.parser = null; parser.socket = null; diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index 6650c06c3305fe..e28609f8869a11 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -135,7 +135,7 @@ OutgoingMessage.prototype._send = function(data, encoding, callback) { this.outputEncodings.unshift('binary'); this.outputCallbacks.unshift(null); this.outputSize += this._header.length; - if (this._onPendingData !== null) + if (typeof this._onPendingData === 'function') this._onPendingData(this._header.length); } this._headerSent = true; @@ -158,22 +158,7 @@ OutgoingMessage.prototype._writeRaw = function(data, encoding, callback) { // There might be pending data in the this.output buffer. var outputLength = this.output.length; if (outputLength > 0) { - var output = this.output; - var outputEncodings = this.outputEncodings; - var outputCallbacks = this.outputCallbacks; - connection.cork(); - for (var i = 0; i < outputLength; i++) { - connection.write(output[i], outputEncodings[i], - outputCallbacks[i]); - } - connection.uncork(); - - this.output = []; - this.outputEncodings = []; - this.outputCallbacks = []; - if (this._onPendingData !== null) - this._onPendingData(-this.outputSize); - this.outputSize = 0; + this._flushOutput(connection); } else if (data.length === 0) { if (typeof callback === 'function') process.nextTick(callback); @@ -198,7 +183,7 @@ OutgoingMessage.prototype._buffer = function(data, encoding, callback) { this.outputEncodings.push(encoding); this.outputCallbacks.push(callback); this.outputSize += data.length; - if (this._onPendingData !== null) + if (typeof this._onPendingData === 'function') this._onPendingData(data.length); return false; }; @@ -644,26 +629,11 @@ OutgoingMessage.prototype._finish = function() { // to attempt to flush any pending messages out to the socket. OutgoingMessage.prototype._flush = function() { var socket = this.socket; - var outputLength, ret; + var ret; if (socket && socket.writable) { // There might be remaining data in this.output; write it out - outputLength = this.output.length; - if (outputLength > 0) { - var output = this.output; - var outputEncodings = this.outputEncodings; - var outputCallbacks = this.outputCallbacks; - socket.cork(); - for (var i = 0; i < outputLength; i++) { - ret = socket.write(output[i], outputEncodings[i], - outputCallbacks[i]); - } - socket.uncork(); - - this.output = []; - this.outputEncodings = []; - this.outputCallbacks = []; - } + ret = this._flushOutput(socket); if (this.finished) { // This is a queue to the server or client to bring in the next this. @@ -675,6 +645,32 @@ OutgoingMessage.prototype._flush = function() { } }; +OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) { + var ret; + var outputLength = this.output.length; + if (outputLength <= 0) + return ret; + + var output = this.output; + var outputEncodings = this.outputEncodings; + var outputCallbacks = this.outputCallbacks; + socket.cork(); + for (var i = 0; i < outputLength; i++) { + ret = socket.write(output[i], outputEncodings[i], + outputCallbacks[i]); + } + socket.uncork(); + + this.output = []; + this.outputEncodings = []; + this.outputCallbacks = []; + if (typeof this._onPendingData === 'function') + this._onPendingData(-this.outputSize); + this.outputSize = 0; + + return ret; +}; + OutgoingMessage.prototype.flushHeaders = function() { if (!this._header) { diff --git a/lib/_http_server.js b/lib/_http_server.js index c11d36912e27e7..dc7276d0ae729d 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -343,8 +343,10 @@ function connectionListener(socket) { socket.on = socketOnWrap; var external = socket._handle._externalStream; - if (external) + if (external) { + parser._consumed = true; parser.consume(external); + } external = null; parser[kOnExecute] = onParserExecute; @@ -382,7 +384,7 @@ function connectionListener(socket) { socket.removeListener('data', socketOnData); socket.removeListener('end', socketOnEnd); socket.removeListener('close', serverSocketCloseListener); - parser.unconsume(socket._handle._externalStream); + unconsume(parser, socket); parser.finish(); freeParser(parser, req, null); parser = null; @@ -530,13 +532,38 @@ function connectionListener(socket) { exports._connectionListener = connectionListener; function onSocketResume() { - if (this._handle) + // It may seem that the socket is resumed, but this is an enemy's trick to + // deceive us! `resume` is emitted asynchronously, and may be called from + // `incoming.readStart()`. Stop the socket again here, just to preserve the + // state. + // + // We don't care about stream semantics for the consumed socket anyway. + if (this._paused) { + this.pause(); + return; + } + + if (this._handle && !this._handle.reading) { + this._handle.reading = true; this._handle.readStart(); + } } function onSocketPause() { - if (this._handle) + if (this._handle && this._handle.reading) { + this._handle.reading = false; this._handle.readStop(); + } +} + +function unconsume(parser, socket) { + if (socket._handle) { + if (parser._consumed) + parser.unconsume(socket._handle._externalStream); + parser._consumed = false; + socket.removeListener('pause', onSocketPause); + socket.removeListener('resume', onSocketResume); + } } function socketOnWrap(ev, fn) { @@ -546,8 +573,8 @@ function socketOnWrap(ev, fn) { return res; } - if (this._handle && (ev === 'data' || ev === 'readable')) - this.parser.unconsume(this._handle._externalStream); + if (ev === 'data' || ev === 'readable') + unconsume(this.parser, this); return res; } diff --git a/src/node_http_parser.cc b/src/node_http_parser.cc index 5f831ec50696aa..ff3dfb26e529af 100644 --- a/src/node_http_parser.cc +++ b/src/node_http_parser.cc @@ -484,13 +484,18 @@ class Parser : public BaseObject { if (parser->prev_alloc_cb_.is_empty()) return; - CHECK(args[0]->IsExternal()); - Local stream_obj = args[0].As(); - StreamBase* stream = static_cast(stream_obj->Value()); - CHECK_NE(stream, nullptr); + // Restore stream's callbacks + if (args.Length() == 1 && args[0]->IsExternal()) { + Local stream_obj = args[0].As(); + StreamBase* stream = static_cast(stream_obj->Value()); + CHECK_NE(stream, nullptr); + + stream->set_alloc_cb(parser->prev_alloc_cb_); + stream->set_read_cb(parser->prev_read_cb_); + } - stream->set_alloc_cb(parser->prev_alloc_cb_); - stream->set_read_cb(parser->prev_read_cb_); + parser->prev_alloc_cb_.clear(); + parser->prev_read_cb_.clear(); } diff --git a/test/parallel/test-http-pipeline-regr-3332.js b/test/parallel/test-http-pipeline-regr-3332.js new file mode 100644 index 00000000000000..061e202d975f32 --- /dev/null +++ b/test/parallel/test-http-pipeline-regr-3332.js @@ -0,0 +1,41 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const http = require('http'); +const net = require('net'); + +const big = new Buffer(16 * 1024); +big.fill('A'); + +const COUNT = 1e4; + +var received = 0; + +var client; +const server = http.createServer(function(req, res) { + res.end(big, function() { + if (++received === COUNT) { + server.close(); + client.end(); + } + }); +}).listen(common.PORT, function() { + var req = new Array(COUNT + 1).join('GET / HTTP/1.1\r\n\r\n'); + client = net.connect(common.PORT, function() { + client.write(req); + }); + + // Just let the test terminate instead of hanging + client.on('close', function() { + if (received !== COUNT) + server.close(); + }); + client.resume(); +}); + +process.on('exit', function() { + // The server should pause connection on pipeline flood, but it shoul still + // resume it and finish processing the requests, when its output queue will + // be empty again. + assert.equal(received, COUNT); +});