diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index f58dc77f53278e..e26884e4896aad 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -49,6 +49,7 @@ function OutgoingMessage() { this.output = []; this.outputEncodings = []; this.outputCallbacks = []; + this.outputSize = 0; this.writable = true; @@ -71,6 +72,8 @@ function OutgoingMessage() { this._header = null; this._headers = null; this._headerNames = {}; + + this._onPendingData = null; } util.inherits(OutgoingMessage, Stream); @@ -120,6 +123,9 @@ OutgoingMessage.prototype._send = function(data, encoding, callback) { this.output.unshift(this._header); this.outputEncodings.unshift('binary'); this.outputCallbacks.unshift(null); + this.outputSize += this._header.length; + if (this._onPendingData !== null) + this._onPendingData(this._header.length); } this._headerSent = true; } @@ -152,6 +158,9 @@ OutgoingMessage.prototype._writeRaw = function(data, encoding, callback) { this.output = []; this.outputEncodings = []; this.outputCallbacks = []; + if (this._onPendingData !== null) + this._onPendingData(-this.outputSize); + this.outputSize = 0; } else if (data.length === 0) { if (typeof callback === 'function') process.nextTick(callback); @@ -175,6 +184,9 @@ OutgoingMessage.prototype._buffer = function(data, encoding, callback) { this.output.push(data); this.outputEncodings.push(encoding); this.outputCallbacks.push(callback); + this.outputSize += data.length; + if (this._onPendingData !== null) + this._onPendingData(data.length); return false; }; diff --git a/lib/_http_server.js b/lib/_http_server.js index f2e2dcf001b03d..2d6cb53176423b 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -241,6 +241,8 @@ function Server(requestListener) { }); this.timeout = 2 * 60 * 1000; + + this._pendingResponseData = 0; } util.inherits(Server, net.Server); @@ -260,6 +262,13 @@ function connectionListener(socket) { var self = this; var outgoing = []; var incoming = []; + var outgoingData = 0; + + function updateOutgoingData(delta) { + outgoingData += delta; + if (socket._paused && outgoingData < socket._writableState.highWaterMark) + return socketOnDrain(); + } function abortIncoming() { while (incoming.length) { @@ -425,8 +434,10 @@ function connectionListener(socket) { socket._paused = false; function socketOnDrain() { + var needPause = outgoingData > socket._writableState.highWaterMark; + // If we previously paused, then start reading again. - if (socket._paused) { + if (socket._paused && !needPause) { socket._paused = false; socket.parser.resume(); socket.resume(); @@ -440,7 +451,8 @@ function connectionListener(socket) { // so that we don't become overwhelmed by a flood of // pipelined requests that may never be resolved. if (!socket._paused) { - var needPause = socket._writableState.needDrain; + var needPause = socket._writableState.needDrain || + outgoingData >= socket._writableState.highWaterMark; if (needPause) { socket._paused = true; // We also need to pause the parser, but don't do that until after @@ -451,6 +463,7 @@ function connectionListener(socket) { } var res = new ServerResponse(req); + res._onPendingData = updateOutgoingData; res.shouldKeepAlive = shouldKeepAlive; DTRACE_HTTP_SERVER_REQUEST(req, socket);