Skip to content

Commit

Permalink
fix: use close event on response instead of socket
Browse files Browse the repository at this point in the history
In #1880, we switched from using the close event on `req` to close on
`req.socket`. This addressed the intended issue but can trigger frequent
warnings when keep-alive is used due to a listener being added for each
request on the same socket.

By using the close event on `res` instead, we address both the issue of
event ordering in Node.js >= 16 that the original change was targeting
and the event emitter warning leak.
  • Loading branch information
josephharrington committed Jan 31, 2022
1 parent 4d404d4 commit 243580d
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 15 deletions.
2 changes: 1 addition & 1 deletion lib/plugins/bodyReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ function bodyReader(options) {
// add 'close and 'aborted' event handlers so that requests (and their
// corresponding memory) don't leak if client stops sending data half
// way through a POST request
req.socket.once('close', next);
res.once('close', next);
req.once('aborted', next);
req.resume();
}
Expand Down
136 changes: 122 additions & 14 deletions test/plugins/bodyReader.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,25 @@ var CLIENT;
var PORT;

describe('body reader', function() {
describe('gzip content encoding', function() {
beforeEach(function(done) {
SERVER = restify.createServer({
dtrace: helper.dtrace,
log: helper.getLog('server')
});
beforeEach(function(done) {
SERVER = restify.createServer({
dtrace: helper.dtrace,
log: helper.getLog('server')
});

SERVER.listen(0, '127.0.0.1', function() {
PORT = SERVER.address().port;
SERVER.listen(0, '127.0.0.1', function() {
PORT = SERVER.address().port;

done();
});
done();
});
});

afterEach(function(done) {
CLIENT.close();
SERVER.close(done);
});
afterEach(function(done) {
CLIENT.close();
SERVER.close(done);
});

describe('gzip content encoding', function() {
it('should parse gzip encoded content', function(done) {
SERVER.use(restify.plugins.bodyParser());

Expand Down Expand Up @@ -187,4 +187,112 @@ describe('body reader', function() {
req.write(postData);
});
});

it('should not add a listener for each call on same socket', done => {
SERVER.use(restify.plugins.bodyParser());

let serverReq, serverRes, serverReqSocket;
SERVER.post('/meals', function(req, res, next) {
serverReq = req;
serverRes = res;
serverReqSocket = req.socket;
res.send();
next();
});

CLIENT = restifyClients.createJsonClient({
url: 'http://127.0.0.1:' + PORT,
retry: false,
agent: new http.Agent({ keepAlive: true })
});

CLIENT.post('/meals', { breakfast: 'pancakes' }, (err, _, res) => {
assert.ifError(err);
assert.equal(res.statusCode, 200);

const firstReqSocket = serverReqSocket;
const numReqListeners = listenerCount(serverReq);
const numResListeners = listenerCount(serverRes);
const numReqSocketListeners = listenerCount(serverReq.socket);

// Without setImmediate, the second request will not reuse the socket.
setImmediate(() => {
CLIENT.post('/meals', { lunch: 'salad' }, (err2, __, res2) => {
assert.ifError(err2);
assert.equal(res2.statusCode, 200);
assert.equal(
serverReqSocket,
firstReqSocket,
'This test should issue two requests that share the ' +
'same socket.'
);
// The number of listeners on each emitter should not have
// increased since the first request.
assert.equal(listenerCount(serverReq), numReqListeners);
assert.equal(listenerCount(serverRes), numResListeners);
assert.equal(
listenerCount(serverReq.socket),
numReqSocketListeners
);
done();
});
});
});
});

it('should call next for each successful request on same socket', done => {
let nextCallCount = 0;
SERVER.use(restify.plugins.bodyParser());
SERVER.use((req, res, next) => {
nextCallCount += 1;
next();
});

let serverReqSocket;
SERVER.post('/meals', function(req, res, next) {
res.send();
next();
});

CLIENT = restifyClients.createJsonClient({
url: 'http://127.0.0.1:' + PORT,
retry: false,
agent: new http.Agent({ keepAlive: true })
});

CLIENT.post('/meals', { breakfast: 'waffles' }, (err, _, res) => {
assert.ifError(err);
assert.equal(res.statusCode, 200);
const firstReqSocket = serverReqSocket;
assert.equal(nextCallCount, 1);

// Without setImmediate, the second request will not reuse the socket.
setImmediate(() => {
CLIENT.post('/meals', { lunch: 'candy' }, (err2, __, res2) => {
assert.ifError(err2);
assert.equal(res2.statusCode, 200);
assert.equal(
serverReqSocket,
firstReqSocket,
'This test should issue two requests that share the ' +
'same socket.'
);
assert.equal(nextCallCount, 2);
done();
});
});
});
});
});

/**
* @param {EventEmitter} emitter - An emitter
* @returns {number} - The total number of listeners across all events
*/
function listenerCount(emitter) {
let numListeners = 0;
for (const eventName of emitter.eventNames()) {
numListeners += emitter.listenerCount(eventName);
}
return numListeners;
}

0 comments on commit 243580d

Please sign in to comment.