Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http: fix stalled pipeline bug (v3.x backport) #3558

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 31 additions & 31 deletions lib/_http_outgoing.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ OutgoingMessage.prototype._send = function(data, encoding, callback) {
this.outputEncodings.unshift('binary');
this.outputCallbacks.unshift(null);
this.outputSize += this._header.length;
if (this._onPendingData !== null)
if (typeof this._onPendingData === 'function')
this._onPendingData(this._header.length);
}
this._headerSent = true;
Expand All @@ -147,20 +147,7 @@ OutgoingMessage.prototype._writeRaw = function(data, encoding, callback) {
// There might be pending data in the this.output buffer.
var outputLength = this.output.length;
if (outputLength > 0) {
var output = this.output;
var outputEncodings = this.outputEncodings;
var outputCallbacks = this.outputCallbacks;
for (var i = 0; i < outputLength; i++) {
connection.write(output[i], outputEncodings[i],
outputCallbacks[i]);
}

this.output = [];
this.outputEncodings = [];
this.outputCallbacks = [];
if (this._onPendingData !== null)
this._onPendingData(-this.outputSize);
this.outputSize = 0;
this._flushOutput(connection);
} else if (data.length === 0) {
if (typeof callback === 'function')
process.nextTick(callback);
Expand All @@ -185,7 +172,7 @@ OutgoingMessage.prototype._buffer = function(data, encoding, callback) {
this.outputEncodings.push(encoding);
this.outputCallbacks.push(callback);
this.outputSize += data.length;
if (this._onPendingData !== null)
if (typeof this._onPendingData === 'function')
this._onPendingData(data.length);
return false;
};
Expand Down Expand Up @@ -618,24 +605,11 @@ OutgoingMessage.prototype._finish = function() {
// to attempt to flush any pending messages out to the socket.
OutgoingMessage.prototype._flush = function() {
var socket = this.socket;
var outputLength, ret;
var ret;

if (socket && socket.writable) {
// There might be remaining data in this.output; write it out
outputLength = this.output.length;
if (outputLength > 0) {
var output = this.output;
var outputEncodings = this.outputEncodings;
var outputCallbacks = this.outputCallbacks;
for (var i = 0; i < outputLength; i++) {
ret = socket.write(output[i], outputEncodings[i],
outputCallbacks[i]);
}

this.output = [];
this.outputEncodings = [];
this.outputCallbacks = [];
}
ret = this._flushOutput(socket);

if (this.finished) {
// This is a queue to the server or client to bring in the next this.
Expand All @@ -647,6 +621,32 @@ OutgoingMessage.prototype._flush = function() {
}
};

OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) {
var ret;
var outputLength = this.output.length;
if (outputLength <= 0)
return ret;

var output = this.output;
var outputEncodings = this.outputEncodings;
var outputCallbacks = this.outputCallbacks;
socket.cork();
for (var i = 0; i < outputLength; i++) {
ret = socket.write(output[i], outputEncodings[i],
outputCallbacks[i]);
}
socket.uncork();

this.output = [];
this.outputEncodings = [];
this.outputCallbacks = [];
if (typeof this._onPendingData === 'function')
this._onPendingData(-this.outputSize);
this.outputSize = 0;

return ret;
};


OutgoingMessage.prototype.flushHeaders = function() {
if (!this._header) {
Expand Down
41 changes: 41 additions & 0 deletions test/parallel/test-http-pipeline-regr-3332.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const http = require('http');
const net = require('net');

const big = new Buffer(16 * 1024);
big.fill('A');

const COUNT = 1e4;

var received = 0;

var client;
const server = http.createServer(function(req, res) {
res.end(big, function() {
if (++received === COUNT) {
server.close();
client.end();
}
});
}).listen(common.PORT, function() {
var req = new Array(COUNT + 1).join('GET / HTTP/1.1\r\n\r\n');
client = net.connect(common.PORT, function() {
client.write(req);
});

// Just let the test terminate instead of hanging
client.on('close', function() {
if (received !== COUNT)
server.close();
});
client.resume();
});

process.on('exit', function() {
// The server should pause connection on pipeline flood, but it shoul still
// resume it and finish processing the requests, when its output queue will
// be empty again.
assert.equal(received, COUNT);
});