Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async_wrap,lib: Firing async-wrap callbacks for next-tick callbacks #6082

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions lib/async_wrap.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
'use strict';

const async_wrap = process.binding('async_wrap');

const nextIdArray = async_wrap.getNextAsyncIdArray();
const currentIdArray = async_wrap.getCurrentAsyncIdArray();
const fieldsArray = async_wrap.getAsyncHookFields();
const asyncCallbacks = async_wrap.getAsyncCallbacks();

function getLittleEndian(a) {
return a[0] + a[1] * 0x100000000;
}

function getBigEndian(a) {
return a[1] + a[0] * 0x100000000;
}

function setLittleEndian(a, val) {

if (val < 0) {
throw new Error('Negative value not supported');
}

var lword = val & 0xffffffff;
var hword = 0;
if (val > 0xffffffff) {
// effectively we're doing shift-right by 32 bits. Javascript bit
// operators convert operands to 32 bits, so we lose the
// high-order bits if we try to use >>> or >>.
hword = Math.floor((val / 0x100000000));
}
a[0] = lword;
a[1] = hword;
}

function setBigEndian(a, val) {

if (val < 0) {
throw new Error('Negative value not supported');
}

var lword = val & 0xffffffff;
var hword = 0;
if (val > 0xffffffff) {
// effectively we're doing shift-right by 32 bits. Javascript bit
// operators convert operands to 32 bits, so we lose the
// high-order bits if we try to use >>> or >>.
hword = Math.floor((val / 0x100000000));
}
a[1] = lword;
a[0] = hword;
}

function incrementLittleEndian(a) {
// carry-over if lsb is maxed out
if (a[0] === 0xffffffff) {
a[0] = 0;
a[1]++;
}
a[0]++;
return a[0] + a[1] * 0x100000000;
}

function incrementBigEndian(a) {
// carry-over if lsb is maxed out
if (a[1] === 0xffffffff) {
a[1] = 0;
a[0]++;
}
a[1]++;
return a[1] + a[0] * 0x100000000;
}

function getCurrentIdLittleEndian() {
return getLittleEndian(currentIdArray);
}

function getCurrentIdBigEndian() {
return getBigEndian(currentIdArray);
}

function setCurrentIdLittleEndian(val) {
return setLittleEndian(currentIdArray, val);
}

function setCurrentIdBigEndian(val) {
return setBigEndian(currentIdArray, val);
}

function incrementNextIdLittleEndian() {
return incrementLittleEndian(nextIdArray);
}

function incrementNextIdBigEndian() {
return incrementBigEndian(nextIdArray);
}

// must match enum definitions in AsyncHook class in env.h
const kEnableCallbacks = 0;

function callbacksEnabled() {
return (fieldsArray[kEnableCallbacks] !== 0 ? true : false);
}

const getCurrentAsyncId =
process.binding('os').isBigEndian ?
getCurrentIdBigEndian : getCurrentIdLittleEndian;

const setCurrentAsyncId =
process.binding('os').isBigEndian ?
setCurrentIdBigEndian : setCurrentIdLittleEndian;

const incrementNextAsyncId =
process.binding('os').isBigEndian ?
incrementNextIdBigEndian : incrementNextIdLittleEndian;

function notifyAsyncEnqueue(asyncId) {
if (callbacksEnabled()) {
const asyncState = {};
for (let i = 0; i < asyncCallbacks.length; i++) {
if (asyncCallbacks[i].init) {
/* init(asyncId, provider, parentId, parentObject) */
asyncCallbacks[i].init.call(asyncState, asyncId,
async_wrap.Providers.NEXTTICK, undefined, undefined);
}
}
return asyncState;
}
return undefined;
}

function notifyAsyncStart(asyncId, asyncState) {
setCurrentAsyncId(asyncId);
if (asyncState) {
for (let i = 0; i < asyncCallbacks.length; i++) {
if (asyncCallbacks[i].pre) {
/* pre(asyncId); */
asyncCallbacks[i].pre.call(asyncState, asyncId);
}
}
}
}

function notifyAsyncEnd(asyncId, asyncState, callbackThrew) {
if (asyncState) {
for (let i = 0; i < asyncCallbacks.length; i++) {
if (asyncCallbacks[i].post) {
/* post(asyncId, didUserCodeThrow); */
asyncCallbacks[i].post.call(asyncState, asyncId, callbackThrew);
}
}

setCurrentAsyncId(0);

for (let i = 0; i < asyncCallbacks.length; i++) {
if (asyncCallbacks[i].destroy) {
/* destroy(asyncId); */
asyncCallbacks[i].destroy.call(undefined, asyncId);
}
}
}
}

module.exports.incrementNextAsyncId = incrementNextAsyncId;
module.exports.getCurrentAsyncId = getCurrentAsyncId;
module.exports.setCurrentAsyncId = setCurrentAsyncId;
module.exports.callbacksEnabled = callbacksEnabled;
module.exports.notifyAsyncEnqueue = notifyAsyncEnqueue;
module.exports.notifyAsyncStart = notifyAsyncStart;
module.exports.notifyAsyncEnd = notifyAsyncEnd;
63 changes: 55 additions & 8 deletions lib/internal/process/next_tick.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
exports.setup = setupNextTick;

function setupNextTick() {
const async_wrap = require('async_wrap');

const promises = require('internal/process/promises');
const emitPendingUnhandledRejections = promises.setup(scheduleMicrotasks);
var nextTickQueue = [];
Expand Down Expand Up @@ -85,19 +87,40 @@ function setupNextTick() {
// Run callbacks that have no domain.
// Using domains will cause this to be overridden.
function _tickCallback() {

var callback, args, tock;

do {
while (tickInfo[kIndex] < tickInfo[kLength]) {
tock = nextTickQueue[tickInfo[kIndex]++];
callback = tock.callback;

args = tock.args;
// Using separate callback execution functions allows direct
// callback invocation with small numbers of arguments to avoid the
// performance hit associated with using `fn.apply()`
_combinedTickCallback(args, callback);
if (!tock.asyncState) {
async_wrap.setCurrentAsyncId(tock.asyncId);
_combinedTickCallback(args, callback);
async_wrap.setCurrentAsyncId(0);
}
else {
var callbackThrew = true;
try {
async_wrap.notifyAsyncStart(tock.asyncId, tock.asyncState);

// Using separate callback execution functions allows direct
// callback invocation with small numbers of arguments to avoid the
// performance hit associated with using `fn.apply()`
_combinedTickCallback(args, callback);
callbackThrew = false;
}
finally {
async_wrap.notifyAsyncEnd(
tock.asyncId, tock.asyncState, callbackThrew);
}
}

if (1e4 < tickInfo[kIndex])
tickDone();

}
tickDone();
_runMicrotasks();
Expand All @@ -116,10 +139,29 @@ function setupNextTick() {
args = tock.args;
if (domain)
domain.enter();
// Using separate callback execution functions allows direct
// callback invocation with small numbers of arguments to avoid the
// performance hit associated with using `fn.apply()`
_combinedTickCallback(args, callback);

if (!tock.asyncState) {
async_wrap.setCurrentAsyncId(tock.asyncId);
_combinedTickCallback(args, callback);
async_wrap.setCurrentAsyncId(0);
}
else {
var callbackThrew = true;
try {
async_wrap.notifyAsyncStart(tock.asyncId, tock.asyncState);

// Using separate callback execution functions allows direct
// callback invocation with small numbers of arguments to avoid the
// performance hit associated with using `fn.apply()`
_combinedTickCallback(args, callback);
callbackThrew = false;
}
finally {
async_wrap.notifyAsyncEnd(
tock.asyncId, tock.asyncState, callbackThrew);
}
}

if (1e4 < tickInfo[kIndex])
tickDone();
if (domain)
Expand All @@ -135,6 +177,10 @@ function setupNextTick() {
this.callback = c;
this.domain = process.domain || null;
this.args = args;
this.asyncId = async_wrap.incrementNextAsyncId();
if (async_wrap.callbacksEnabled()) {
this.asyncState = async_wrap.notifyAsyncEnqueue(this.asyncId);
}
}

function nextTick(callback) {
Expand All @@ -154,4 +200,5 @@ function setupNextTick() {
nextTickQueue.push(new TickObject(callback, args));
tickInfo[kLength]++;
}

}
1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
'lib/_debug_agent.js',
'lib/_debugger.js',
'lib/assert.js',
'lib/async_wrap.js',
'lib/buffer.js',
'lib/child_process.js',
'lib/console.js',
Expand Down
57 changes: 5 additions & 52 deletions src/async-wrap-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,69 +18,22 @@ inline AsyncWrap::AsyncWrap(Environment* env,
ProviderType provider,
AsyncWrap* parent)
: BaseObject(env, object), bits_(static_cast<uint32_t>(provider) << 1),
uid_(env->get_async_wrap_uid()) {
uid_(env->async_hooks()->get_next_async_wrap_uid()) {
CHECK_NE(provider, PROVIDER_NONE);
CHECK_GE(object->InternalFieldCount(), 1);

// Shift provider value over to prevent id collision.
persistent().SetWrapperClassId(NODE_ASYNC_ID_OFFSET + provider);

v8::Local<v8::Function> init_fn = env->async_hooks_init_function();

// No init callback exists, no reason to go on.
if (init_fn.IsEmpty())
return;

// If async wrap callbacks are disabled and no parent was passed that has
// run the init callback then return.
if (!env->async_wrap_callbacks_enabled() &&
(parent == nullptr || !parent->ran_init_callback()))
return;

v8::HandleScope scope(env->isolate());

v8::Local<v8::Value> argv[] = {
v8::Integer::New(env->isolate(), get_uid()),
v8::Int32::New(env->isolate(), provider),
Null(env->isolate()),
Null(env->isolate())
};

if (parent != nullptr) {
argv[2] = v8::Integer::New(env->isolate(), parent->get_uid());
argv[3] = parent->object();
if (AsyncWrap::FireAsyncInitCallbacks(env, get_uid(), object, provider, parent)) {
bits_ |= 1; // ran_init_callback() is true now.
}

v8::TryCatch try_catch(env->isolate());

v8::MaybeLocal<v8::Value> ret =
init_fn->Call(env->context(), object, arraysize(argv), argv);

if (ret.IsEmpty()) {
ClearFatalExceptionHandlers(env);
FatalException(env->isolate(), try_catch);
}

bits_ |= 1; // ran_init_callback() is true now.
}


inline AsyncWrap::~AsyncWrap() {
if (!ran_init_callback())
return;

v8::Local<v8::Function> fn = env()->async_hooks_destroy_function();
if (!fn.IsEmpty()) {
v8::HandleScope scope(env()->isolate());
v8::Local<v8::Value> uid = v8::Integer::New(env()->isolate(), get_uid());
v8::TryCatch try_catch(env()->isolate());
v8::MaybeLocal<v8::Value> ret =
fn->Call(env()->context(), v8::Null(env()->isolate()), 1, &uid);
if (ret.IsEmpty()) {
ClearFatalExceptionHandlers(env());
FatalException(env()->isolate(), try_catch);
}
}
v8::HandleScope scope(env()->isolate());
FireAsyncDestroyCallbacks(env(), ran_init_callback(), v8::Integer::New(env()->isolate(), get_uid()));
}


Expand Down
Loading