Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

process: refactor promise rejection handling #25200

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 24 additions & 18 deletions lib/internal/process/next_tick.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ const {
tickInfo,
// Used to run V8's micro task queue.
runMicrotasks,
setTickCallback,
initializePromiseRejectCallback
setTickCallback
} = internalBinding('task_queue');

const {
promiseRejectHandler,
emitPromiseRejectionWarnings
setHasRejectionToWarn,
hasRejectionToWarn,
listenForRejections,
processPromiseRejections
} = require('internal/process/promises');

const {
Expand All @@ -30,21 +31,27 @@ const { ERR_INVALID_CALLBACK } = require('internal/errors').codes;
const FixedQueue = require('internal/fixed_queue');

// *Must* match Environment::TickInfo::Fields in src/env.h.
const kHasScheduled = 0;
const kHasPromiseRejections = 1;
const kHasTickScheduled = 0;

function hasTickScheduled() {
return tickInfo[kHasTickScheduled] === 1;
}
function setHasTickScheduled(value) {
tickInfo[kHasTickScheduled] = value ? 1 : 0;
}

const queue = new FixedQueue();

function runNextTicks() {
if (tickInfo[kHasScheduled] === 0 && tickInfo[kHasPromiseRejections] === 0)
if (!hasTickScheduled() && !hasRejectionToWarn())
runMicrotasks();
if (tickInfo[kHasScheduled] === 0 && tickInfo[kHasPromiseRejections] === 0)
if (!hasTickScheduled() && !hasRejectionToWarn())
return;

internalTickCallback();
processTicksAndRejections();
}

function internalTickCallback() {
function processTicksAndRejections() {
let tock;
do {
while (tock = queue.shift()) {
Expand All @@ -70,10 +77,10 @@ function internalTickCallback() {

emitAfter(asyncId);
}
tickInfo[kHasScheduled] = 0;
setHasTickScheduled(false);
runMicrotasks();
} while (!queue.isEmpty() || emitPromiseRejectionWarnings());
tickInfo[kHasPromiseRejections] = 0;
} while (!queue.isEmpty() || processPromiseRejections());
setHasRejectionToWarn(false);
}

class TickObject {
Expand Down Expand Up @@ -119,18 +126,17 @@ function nextTick(callback) {
}

if (queue.isEmpty())
tickInfo[kHasScheduled] = 1;
setHasTickScheduled(true);
queue.push(new TickObject(callback, args, getDefaultTriggerAsyncId()));
}

// TODO(joyeecheung): make this a factory class so that node.js can
// control the side effects caused by the initializers.
exports.setup = function() {
// Initializes the per-isolate promise rejection callback which
// will call the handler being passed into this function.
initializePromiseRejectCallback(promiseRejectHandler);
// Sets the per-isolate promise rejection callback
listenForRejections();
// Sets the callback to be run in every tick.
setTickCallback(internalTickCallback);
setTickCallback(processTicksAndRejections);
return {
nextTick,
runNextTicks
Expand Down
61 changes: 46 additions & 15 deletions lib/internal/process/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,46 @@

const { safeToString } = internalBinding('util');
const {
promiseRejectEvents
tickInfo,
promiseRejectEvents: {
kPromiseRejectWithNoHandler,
kPromiseHandlerAddedAfterReject,
kPromiseResolveAfterResolved,
kPromiseRejectAfterResolved
},
setPromiseRejectCallback
} = internalBinding('task_queue');

// *Must* match Environment::TickInfo::Fields in src/env.h.
const kHasRejectionToWarn = 1;

const maybeUnhandledPromises = new WeakMap();
const pendingUnhandledRejections = [];
const asyncHandledRejections = [];
let lastPromiseId = 0;

function setHasRejectionToWarn(value) {
tickInfo[kHasRejectionToWarn] = value ? 1 : 0;
}

function hasRejectionToWarn() {
return tickInfo[kHasRejectionToWarn] === 1;
}

function promiseRejectHandler(type, promise, reason) {
switch (type) {
case promiseRejectEvents.kPromiseRejectWithNoHandler:
return unhandledRejection(promise, reason);
case promiseRejectEvents.kPromiseHandlerAddedAfterReject:
return handledRejection(promise);
case promiseRejectEvents.kPromiseResolveAfterResolved:
return resolveError('resolve', promise, reason);
case promiseRejectEvents.kPromiseRejectAfterResolved:
return resolveError('reject', promise, reason);
case kPromiseRejectWithNoHandler:
unhandledRejection(promise, reason);
break;
case kPromiseHandlerAddedAfterReject:
handledRejection(promise);
break;
case kPromiseResolveAfterResolved:
resolveError('resolve', promise, reason);
break;
case kPromiseRejectAfterResolved:
resolveError('reject', promise, reason);
break;
joyeecheung marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -38,7 +60,7 @@ function unhandledRejection(promise, reason) {
warned: false
});
pendingUnhandledRejections.push(promise);
return true;
setHasRejectionToWarn(true);
}

function handledRejection(promise) {
Expand All @@ -54,10 +76,11 @@ function handledRejection(promise) {
warning.name = 'PromiseRejectionHandledWarning';
warning.id = uid;
asyncHandledRejections.push({ promise, warning });
return true;
setHasRejectionToWarn(true);
return;
}
}
joyeecheung marked this conversation as resolved.
Show resolved Hide resolved
return false;
setHasRejectionToWarn(false);
}

const unhandledRejectionErrName = 'UnhandledPromiseRejectionWarning';
Expand Down Expand Up @@ -95,7 +118,9 @@ function emitDeprecationWarning() {
}
}

function emitPromiseRejectionWarnings() {
// If this method returns true, at least one more tick need to be
// scheduled to process any potential pending rejections
function processPromiseRejections() {
while (asyncHandledRejections.length > 0) {
const { promise, warning } = asyncHandledRejections.shift();
if (!process.emit('rejectionHandled', promise)) {
Expand All @@ -120,7 +145,13 @@ function emitPromiseRejectionWarnings() {
return maybeScheduledTicks || pendingUnhandledRejections.length !== 0;
}

function listenForRejections() {
setPromiseRejectCallback(promiseRejectHandler);
joyeecheung marked this conversation as resolved.
Show resolved Hide resolved
}

module.exports = {
promiseRejectHandler,
emitPromiseRejectionWarnings
hasRejectionToWarn,
setHasRejectionToWarn,
listenForRejections,
processPromiseRejections
};
14 changes: 10 additions & 4 deletions src/callback_scope.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

namespace node {

using v8::Function;
using v8::HandleScope;
using v8::Isolate;
using v8::Local;
Expand Down Expand Up @@ -95,7 +96,7 @@ void InternalCallbackScope::Close() {
Environment::TickInfo* tick_info = env_->tick_info();

if (!env_->can_call_into_js()) return;
if (!tick_info->has_scheduled()) {
if (!tick_info->has_tick_scheduled()) {
env_->isolate()->RunMicrotasks();
}

Expand All @@ -106,16 +107,21 @@ void InternalCallbackScope::Close() {
CHECK_EQ(env_->trigger_async_id(), 0);
}

if (!tick_info->has_scheduled() && !tick_info->has_promise_rejections()) {
if (!tick_info->has_tick_scheduled() && !tick_info->has_rejection_to_warn()) {
return;
}

Local<Object> process = env_->process_object();

if (!env_->can_call_into_js()) return;

if (env_->tick_callback_function()
->Call(env_->context(), process, 0, nullptr).IsEmpty()) {
Local<Function> tick_callback = env_->tick_callback_function();

// The tick is triggered before JS land calls SetTickCallback
// to initializes the tick callback during bootstrap.
CHECK(!tick_callback.IsEmpty());

if (tick_callback->Call(env_->context(), process, 0, nullptr).IsEmpty()) {
failed_ = true;
}
}
Expand Down
12 changes: 4 additions & 8 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,16 +265,12 @@ inline AliasedBuffer<uint8_t, v8::Uint8Array>& Environment::TickInfo::fields() {
return fields_;
}

inline bool Environment::TickInfo::has_scheduled() const {
return fields_[kHasScheduled] == 1;
inline bool Environment::TickInfo::has_tick_scheduled() const {
return fields_[kHasTickScheduled] == 1;
}

inline bool Environment::TickInfo::has_promise_rejections() const {
return fields_[kHasPromiseRejections] == 1;
}

inline void Environment::TickInfo::promise_rejections_toggle_on() {
fields_[kHasPromiseRejections] = 1;
inline bool Environment::TickInfo::has_rejection_to_warn() const {
return fields_[kHasRejectionToWarn] == 1;
}

inline void Environment::AssignToContext(v8::Local<v8::Context> context,
Expand Down
4 changes: 4 additions & 0 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ Environment::Environment(IsolateData* isolate_data,
if (options_->no_force_async_hooks_checks) {
async_hooks_.no_force_checks();
}

// TODO(addaleax): the per-isolate state should not be controlled by
// a single Environment.
isolate()->SetPromiseRejectCallback(task_queue::PromiseRejectCallback);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you leave a TODO comment for me? We’re letting a single Environment take control of per-Isolate state here – I know it’s been that way before, but it’s not what we should be doing…

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity - when would multiple Environments correspond to an isolate?

}

Environment::~Environment() {
Expand Down
10 changes: 4 additions & 6 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -574,18 +574,16 @@ class Environment {
class TickInfo {
public:
inline AliasedBuffer<uint8_t, v8::Uint8Array>& fields();
inline bool has_scheduled() const;
inline bool has_promise_rejections() const;

inline void promise_rejections_toggle_on();
inline bool has_tick_scheduled() const;
inline bool has_rejection_to_warn() const;

private:
friend class Environment; // So we can call the constructor.
inline explicit TickInfo(v8::Isolate* isolate);

enum Fields {
kHasScheduled,
kHasPromiseRejections,
kHasTickScheduled = 0,
kHasRejectionToWarn,
kFieldsCount
};

Expand Down
4 changes: 4 additions & 0 deletions src/node_internals.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ SlicedArguments::SlicedArguments(
size_ = size;
}

namespace task_queue {
void PromiseRejectCallback(v8::PromiseRejectMessage message);
} // namespace task_queue

v8::Maybe<bool> ProcessEmitWarning(Environment* env, const char* fmt, ...);
v8::Maybe<bool> ProcessEmitDeprecationWarning(Environment* env,
const char* warning,
Expand Down
27 changes: 10 additions & 17 deletions src/node_task_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ using v8::kPromiseRejectAfterResolved;
using v8::kPromiseRejectWithNoHandler;
using v8::kPromiseResolveAfterResolved;
using v8::Local;
using v8::MaybeLocal;
using v8::Number;
using v8::Object;
using v8::Promise;
Expand All @@ -37,7 +36,7 @@ static void SetTickCallback(const FunctionCallbackInfo<Value>& args) {
env->set_tick_callback_function(args[0].As<Function>());
}

static void PromiseRejectCallback(PromiseRejectMessage message) {
void PromiseRejectCallback(PromiseRejectMessage message) {
static std::atomic<uint64_t> unhandledRejections{0};
static std::atomic<uint64_t> rejectionsHandledAfter{0};

Expand All @@ -50,6 +49,10 @@ static void PromiseRejectCallback(PromiseRejectMessage message) {
if (env == nullptr) return;

Local<Function> callback = env->promise_reject_callback();
// The promise is rejected before JS land calls SetPromiseRejectCallback
// to initializes the promise reject callback during bootstrap.
CHECK(!callback.IsEmpty());

Local<Value> value;
Local<Value> type = Number::New(env->isolate(), event);

Expand Down Expand Up @@ -80,26 +83,16 @@ static void PromiseRejectCallback(PromiseRejectMessage message) {
}

Local<Value> args[] = { type, promise, value };
MaybeLocal<Value> ret = callback->Call(env->context(),
Undefined(isolate),
arraysize(args),
args);

if (!ret.IsEmpty() && ret.ToLocalChecked()->IsTrue())
env->tick_info()->promise_rejections_toggle_on();
USE(callback->Call(
env->context(), Undefined(isolate), arraysize(args), args));
}

static void InitializePromiseRejectCallback(
static void SetPromiseRejectCallback(
const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Isolate* isolate = env->isolate();

CHECK(args[0]->IsFunction());

// TODO(joyeecheung): this may be moved to somewhere earlier in the bootstrap
// to make sure it's only called once
isolate->SetPromiseRejectCallback(PromiseRejectCallback);

env->set_promise_reject_callback(args[0].As<Function>());
}

Expand All @@ -126,8 +119,8 @@ static void Initialize(Local<Object> target,
FIXED_ONE_BYTE_STRING(isolate, "promiseRejectEvents"),
events).FromJust();
env->SetMethod(target,
"initializePromiseRejectCallback",
InitializePromiseRejectCallback);
"setPromiseRejectCallback",
SetPromiseRejectCallback);
}

} // namespace task_queue
Expand Down
2 changes: 1 addition & 1 deletion test/message/events_unhandled_error_nexttick.out
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Error
at startup (internal/bootstrap/node.js:*:*)
Emitted 'error' event at:
at process.nextTick (*events_unhandled_error_nexttick.js:*:*)
at internalTickCallback (internal/process/next_tick.js:*:*)
at processTicksAndRejections (internal/process/next_tick.js:*:*)
at process.runNextTicks [as _tickCallback] (internal/process/next_tick.js:*:*)
at Function.Module.runMain (internal/modules/cjs/loader.js:*:*)
at executeUserCode (internal/bootstrap/node.js:*:*)
Expand Down
2 changes: 1 addition & 1 deletion test/message/nexttick_throw.out
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
^
ReferenceError: undefined_reference_error_maker is not defined
at *test*message*nexttick_throw.js:*:*
at internalTickCallback (internal/process/next_tick.js:*:*)
at processTicksAndRejections (internal/process/next_tick.js:*:*)
at process.runNextTicks [as _tickCallback] (internal/process/next_tick.js:*:*)
at Function.Module.runMain (internal/modules/cjs/loader.js:*:*)
at executeUserCode (internal/bootstrap/node.js:*:*)
Expand Down
Loading