Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v8.x backport] async_hooks: add missing async_hooks destroys in AsyncReset #23410

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmark/http/bench-parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ function processHeader(header, n) {
bench.start();
for (var i = 0; i < n; i++) {
parser.execute(header, 0, header.length);
parser.reinitialize(REQUEST);
parser.reinitialize(REQUEST, i > 0);
}
bench.end(n);
}
Expand Down
2 changes: 1 addition & 1 deletion benchmark/misc/freelist.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const bench = common.createBenchmark(main, {
});

function main(conf) {
const FreeList = require('internal/freelist');
const { FreeList } = require('internal/freelist');
const n = conf.n;
const poolSize = 1000;
const list = new FreeList('test', poolSize, Object);
Expand Down
2 changes: 1 addition & 1 deletion lib/_http_agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ Agent.prototype.addRequest = function addRequest(req, options, port/*legacy*/,
var socket = this.freeSockets[name].shift();
// Guard against an uninitialized or user supplied Socket.
if (socket._handle && typeof socket._handle.asyncReset === 'function') {
// Assign the handle a new asyncId and run any init() hooks.
// Assign the handle a new asyncId and run any destroy()/init() hooks.
socket._handle.asyncReset();
socket[async_id_symbol] = socket._handle.getAsyncId();
}
Expand Down
3 changes: 2 additions & 1 deletion lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const { Buffer } = require('buffer');
const { urlToOptions, searchParamsSymbol } = require('internal/url');
const { outHeadersKey, ondrain } = require('internal/http');
const { nextTick } = require('internal/process/next_tick');
const is_reused_symbol = require('internal/freelist').symbols.is_reused_symbol;

// The actual list of disallowed characters in regexp form is more like:
// /[^A-Za-z0-9\-._~!$&'()*+,;=/:@]/
Expand Down Expand Up @@ -618,7 +619,7 @@ function tickOnSocket(req, socket) {
var parser = parsers.alloc();
req.socket = socket;
req.connection = socket;
parser.reinitialize(HTTPParser.RESPONSE);
parser.reinitialize(HTTPParser.RESPONSE, parser[is_reused_symbol]);
parser.socket = socket;
parser.incoming = null;
parser.outgoing = req;
Expand Down
2 changes: 1 addition & 1 deletion lib/_http_common.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

const { methods, HTTPParser } = process.binding('http_parser');

const FreeList = require('internal/freelist');
const { FreeList } = require('internal/freelist');
const { ondrain } = require('internal/http');
const incoming = require('_http_incoming');
const {
Expand Down
3 changes: 2 additions & 1 deletion lib/_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const {
defaultTriggerAsyncIdScope,
getOrSetAsyncId
} = require('internal/async_hooks');
const is_reused_symbol = require('internal/freelist').symbols.is_reused_symbol;
const { IncomingMessage } = require('_http_incoming');

const kServerResponse = Symbol('ServerResponse');
Expand Down Expand Up @@ -331,7 +332,7 @@ function connectionListenerInternal(server, socket) {
socket.on('timeout', socketOnTimeout);

var parser = parsers.alloc();
parser.reinitialize(HTTPParser.REQUEST);
parser.reinitialize(HTTPParser.REQUEST, parser[is_reused_symbol]);
parser.socket = socket;
socket.parser = parser;
parser.incoming = null;
Expand Down
21 changes: 17 additions & 4 deletions lib/internal/freelist.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict';

const is_reused_symbol = Symbol('isReused');

class FreeList {
constructor(name, max, ctor) {
this.name = name;
Expand All @@ -9,9 +11,15 @@ class FreeList {
}

alloc() {
return this.list.length ?
this.list.pop() :
this.ctor.apply(this, arguments);
let item;
if (this.list.length > 0) {
item = this.list.pop();
item[is_reused_symbol] = true;
} else {
item = this.ctor.apply(this, arguments);
item[is_reused_symbol] = false;
}
return item;
}

free(obj) {
Expand All @@ -23,4 +31,9 @@ class FreeList {
}
}

module.exports = FreeList;
module.exports = {
FreeList,
symbols: {
is_reused_symbol
}
};
9 changes: 9 additions & 0 deletions src/async_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ AsyncWrap::AsyncWrap(Environment* env,
// Shift provider value over to prevent id collision.
persistent().SetWrapperClassId(NODE_ASYNC_ID_OFFSET + provider_type_);

async_id_ = -1;
// Use AsyncReset() call to execute the init() callbacks.
AsyncReset(execution_async_id, silent);
}
Expand Down Expand Up @@ -681,6 +682,14 @@ void AsyncWrap::EmitDestroy(Environment* env, double async_id) {
// and reused over their lifetime. This way a new uid can be assigned when
// the resource is pulled out of the pool and put back into use.
void AsyncWrap::AsyncReset(double execution_async_id, bool silent) {
if (async_id_ != -1) {
// This instance was in use before, we have already emitted an init with
// its previous async_id and need to emit a matching destroy for that
// before generating a new async_id.
EmitDestroy(env(), async_id_);
}

// Now we can assign a new async_id_ to this instance.
async_id_ =
execution_async_id == -1 ? env()->new_async_id() : execution_async_id;
trigger_async_id_ = env()->get_default_trigger_async_id();
Expand Down
11 changes: 9 additions & 2 deletions src/node_http_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,9 @@ class Parser : public AsyncWrap {
static void Reinitialize(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);

CHECK(args[0]->IsInt32());
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is actually new for the v8.x branch but since I was adding a new param here anyway (args[1]/isReused) I figured it wouldn't hurt to copy the existing check from master here.

CHECK(args[1]->IsBoolean());
bool isReused = args[1]->IsTrue();
http_parser_type type =
static_cast<http_parser_type>(args[0]->Int32Value());

Expand All @@ -472,8 +475,12 @@ class Parser : public AsyncWrap {
ASSIGN_OR_RETURN_UNWRAP(&parser, args.Holder());
// Should always be called from the same context.
CHECK_EQ(env, parser->env());
// The parser is being reused. Reset the async id and call init() callbacks.
parser->AsyncReset();
// This parser has either just been created or it is being reused.
// We must only call AsyncReset for the latter case, because AsyncReset has
// already been called via the constructor for the former case.
if (isReused) {
parser->AsyncReset();
}
parser->Init(type);
}

Expand Down
13 changes: 2 additions & 11 deletions test/async-hooks/test-graph.http.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,15 @@ process.on('exit', function() {
{ type: 'HTTPPARSER',
id: 'httpparser:1',
triggerAsyncId: 'tcpserver:1' },
{ type: 'HTTPPARSER',
id: 'httpparser:2',
triggerAsyncId: 'tcpserver:1' },
{ type: 'TCPWRAP', id: 'tcp:2', triggerAsyncId: 'tcpserver:1' },
{ type: 'Timeout', id: 'timeout:1', triggerAsyncId: 'tcp:2' },
{ type: 'TIMERWRAP', id: 'timer:1', triggerAsyncId: 'tcp:2' },
{ type: 'HTTPPARSER',
id: 'httpparser:3',
triggerAsyncId: 'tcp:2' },
{ type: 'HTTPPARSER',
id: 'httpparser:4',
id: 'httpparser:2',
triggerAsyncId: 'tcp:2' },
{ type: 'Timeout',
id: 'timeout:2',
triggerAsyncId: 'httpparser:4' },
{ type: 'TIMERWRAP',
id: 'timer:2',
triggerAsyncId: 'httpparser:4' },
triggerAsyncId: 'httpparser:2' },
{ type: 'SHUTDOWNWRAP',
id: 'shutdown:1',
triggerAsyncId: 'tcp:2' } ]
Expand Down
83 changes: 83 additions & 0 deletions test/parallel/test-async-hooks-http-agent-destroy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const async_id_symbol = process.binding('async_wrap').async_id_symbol;
const async_hooks = require('async_hooks');
const http = require('http');

// Regression test for https://github.com/nodejs/node/issues/19859
// Checks that an http.Agent emits a destroy for the old asyncId before calling
// asyncReset()s when reusing a socket handle. The setup is nearly identical to
// parallel/test-async-hooks-http-agent (which focuses on the assertion that
// a fresh asyncId is assigned to the net.Socket instance).

const destroyedIds = new Set();
async_hooks.createHook({
destroy: common.mustCallAtLeast((asyncId) => {
destroyedIds.add(asyncId);
}, 1)
}).enable();

// Make sure a single socket is transparently reused for 2 requests.
const agent = new http.Agent({
keepAlive: true,
keepAliveMsecs: Infinity,
maxSockets: 1
});

const server = http.createServer(common.mustCall((req, res) => {
req.once('data', common.mustCallAtLeast(() => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.write('foo');
}));
req.on('end', common.mustCall(() => {
res.end('bar');
}));
}, 2)).listen(0, common.mustCall(() => {
const port = server.address().port;
const payload = 'hello world';

// First request. This is useless except for adding a socket to the
// agent’s pool for reuse.
const r1 = http.request({
agent, port, method: 'POST'
}, common.mustCall((res) => {
// Remember which socket we used.
const socket = res.socket;
const asyncIdAtFirstRequest = socket[async_id_symbol];
assert.ok(asyncIdAtFirstRequest > 0, `${asyncIdAtFirstRequest} > 0`);
// Check that request and response share their socket.
assert.strictEqual(r1.socket, socket);

res.on('data', common.mustCallAtLeast(() => {}));
res.on('end', common.mustCall(() => {
// setImmediate() to give the agent time to register the freed socket.
setImmediate(common.mustCall(() => {
// The socket is free for reuse now.
assert.strictEqual(socket[async_id_symbol], -1);

// second request:
const r2 = http.request({
agent, port, method: 'POST'
}, common.mustCall((res) => {
assert.ok(destroyedIds.has(asyncIdAtFirstRequest));

// Empty payload, to hit the “right” code path.
r2.end('');

res.on('data', common.mustCallAtLeast(() => {}));
res.on('end', common.mustCall(() => {
// Clean up to let the event loop stop.
server.close();
agent.destroy();
}));
}));

// Schedule a payload to be written immediately, but do not end the
// request just yet.
r2.write(payload);
}));
}));
}));
r1.end(payload);
}));
61 changes: 61 additions & 0 deletions test/parallel/test-async-hooks-http-parser-destroy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
'use strict';
const common = require('../common');
const Countdown = require('../common/countdown');
const assert = require('assert');
const async_hooks = require('async_hooks');
const http = require('http');

// Regression test for https://github.com/nodejs/node/issues/19859.
// Checks that matching destroys are emitted when creating new/reusing old http
// parser instances.

const N = 50;
const KEEP_ALIVE = 100;

const createdIds = [];
const destroyedIds = [];
async_hooks.createHook({
init: common.mustCallAtLeast((asyncId, type) => {
if (type === 'HTTPPARSER') {
createdIds.push(asyncId);
}
}, N),
destroy: (asyncId) => {
destroyedIds.push(asyncId);
}
}).enable();

const server = http.createServer(function(req, res) {
res.end('Hello');
});

const keepAliveAgent = new http.Agent({
keepAlive: true,
keepAliveMsecs: KEEP_ALIVE,
});

const countdown = new Countdown(N, () => {
server.close(() => {
// give the server sockets time to close (which will also free their
// associated parser objects) after the server has been closed.
setTimeout(() => {
createdIds.forEach((createdAsyncId) => {
assert.ok(destroyedIds.indexOf(createdAsyncId) >= 0);
});
}, KEEP_ALIVE * 2);
});
});

server.listen(0, function() {
for (let i = 0; i < N; ++i) {
(function makeRequest() {
http.get({
port: server.address().port,
agent: keepAliveAgent
}, function(res) {
countdown.dec();
res.resume();
});
})();
}
});
25 changes: 12 additions & 13 deletions test/parallel/test-freelist.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,27 @@

require('../common');
const assert = require('assert');
const FreeList = require('internal/freelist');
const { FreeList } = require('internal/freelist');

assert.strictEqual(typeof FreeList, 'function');

const flist1 = new FreeList('flist1', 3, String);
const flist1 = new FreeList('flist1', 3, Object);

// Allocating when empty, should not change the list size
const result = flist1.alloc('test');
assert.strictEqual(typeof result, 'string');
assert.strictEqual(result, 'test');
const result = flist1.alloc();
assert.strictEqual(typeof result, 'object');
assert.strictEqual(flist1.list.length, 0);

// Exhaust the free list
assert(flist1.free('test1'));
assert(flist1.free('test2'));
assert(flist1.free('test3'));
assert(flist1.free({ id: 'test1' }));
assert(flist1.free({ id: 'test2' }));
assert(flist1.free({ id: 'test3' }));

// Now it should not return 'true', as max length is exceeded
assert.strictEqual(flist1.free('test4'), false);
assert.strictEqual(flist1.free('test5'), false);
assert.strictEqual(flist1.free({ id: 'test4' }), false);
assert.strictEqual(flist1.free({ id: 'test5' }), false);

// At this point 'alloc' should just return the stored values
assert.strictEqual(flist1.alloc(), 'test3');
assert.strictEqual(flist1.alloc(), 'test2');
assert.strictEqual(flist1.alloc(), 'test1');
assert.strictEqual(flist1.alloc().id, 'test3');
assert.strictEqual(flist1.alloc().id, 'test2');
assert.strictEqual(flist1.alloc().id, 'test1');
4 changes: 2 additions & 2 deletions test/parallel/test-http-parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ function expectBody(expected) {
throw new Error('hello world');
};

parser.reinitialize(HTTPParser.REQUEST);
parser.reinitialize(HTTPParser.REQUEST, true);

assert.throws(function() {
parser.execute(request, 0, request.length);
Expand Down Expand Up @@ -554,7 +554,7 @@ function expectBody(expected) {
parser[kOnBody] = expectBody('ping');
parser.execute(req1, 0, req1.length);

parser.reinitialize(REQUEST);
parser.reinitialize(REQUEST, true);
parser[kOnBody] = expectBody('pong');
parser[kOnHeadersComplete] = onHeadersComplete2;
parser.execute(req2, 0, req2.length);
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-internal-modules-expose.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ const config = process.binding('config');

console.log(config, process.argv);

assert.strictEqual(typeof require('internal/freelist'), 'function');
assert.strictEqual(typeof require('internal/freelist').FreeList, 'function');
assert.strictEqual(config.exposeInternals, true);
Loading