Skip to content
This repository has been archived by the owner on Aug 31, 2018. It is now read-only.

src: clean up handles (preparation for workers) #85

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,7 @@
'<(OBJ_PATH)<(OBJ_SEPARATOR)node_debug_options.<(OBJ_SUFFIX)',
'<(OBJ_PATH)<(OBJ_SEPARATOR)async-wrap.<(OBJ_SUFFIX)',
'<(OBJ_PATH)<(OBJ_SEPARATOR)env.<(OBJ_SUFFIX)',
'<(OBJ_PATH)<(OBJ_SEPARATOR)handle_wrap.<(OBJ_SUFFIX)',
'<(OBJ_PATH)<(OBJ_SEPARATOR)node.<(OBJ_SUFFIX)',
'<(OBJ_PATH)<(OBJ_SEPARATOR)node_buffer.<(OBJ_SUFFIX)',
'<(OBJ_PATH)<(OBJ_SEPARATOR)node_i18n.<(OBJ_SUFFIX)',
Expand Down
40 changes: 15 additions & 25 deletions src/cares_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,8 @@ void ares_poll_cb(uv_poll_t* watcher, int status, int events) {
}


void ares_poll_close_cb(uv_handle_t* watcher) {
node_ares_task* task = ContainerOf(&node_ares_task::poll_watcher,
reinterpret_cast<uv_poll_t*>(watcher));
void ares_poll_close_cb(uv_poll_t* watcher) {
node_ares_task* task = ContainerOf(&node_ares_task::poll_watcher, watcher);
free(task);
}

Expand Down Expand Up @@ -353,8 +352,7 @@ void ares_sockstate_cb(void* data,
"When an ares socket is closed we should have a handle for it");

channel->task_list()->erase(it);
uv_close(reinterpret_cast<uv_handle_t*>(&task->poll_watcher),
ares_poll_close_cb);
channel->env()->CloseHandle(&task->poll_watcher, ares_poll_close_cb);

if (channel->task_list()->empty()) {
uv_timer_stop(channel->timer_handle());
Expand Down Expand Up @@ -543,10 +541,7 @@ ChannelWrap::~ChannelWrap() {
void ChannelWrap::CleanupTimer() {
if (timer_handle_ == nullptr) return;

uv_close(reinterpret_cast<uv_handle_t*>(timer_handle_),
[](uv_handle_t* handle) {
delete reinterpret_cast<uv_timer_t*>(handle);
});
env()->CloseHandle(timer_handle_, [](uv_timer_t* handle){ delete handle; });
timer_handle_ = nullptr;
}

Expand Down Expand Up @@ -641,8 +636,7 @@ class QueryWrap : public AsyncWrap {
static_cast<void*>(this));
}

static void CaresAsyncClose(uv_handle_t* handle) {
uv_async_t* async = reinterpret_cast<uv_async_t*>(handle);
static void CaresAsyncClose(uv_async_t* async) {
auto data = static_cast<struct CaresAsyncData*>(async->data);
delete data->wrap;
delete data;
Expand All @@ -667,7 +661,7 @@ class QueryWrap : public AsyncWrap {
free(host);
}

uv_close(reinterpret_cast<uv_handle_t*>(handle), CaresAsyncClose);
wrap->env()->CloseHandle(handle, CaresAsyncClose);
}

static void Callback(void *arg, int status, int timeouts,
Expand Down Expand Up @@ -1975,13 +1969,11 @@ void GetAddrInfo(const FunctionCallbackInfo<Value>& args) {
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = flags;

int err = uv_getaddrinfo(env->event_loop(),
req_wrap->req(),
AfterGetAddrInfo,
*hostname,
nullptr,
&hints);
req_wrap->Dispatched();
int err = req_wrap->Dispatch(uv_getaddrinfo,
AfterGetAddrInfo,
*hostname,
nullptr,
&hints);
if (err)
delete req_wrap;

Expand All @@ -2005,12 +1997,10 @@ void GetNameInfo(const FunctionCallbackInfo<Value>& args) {

GetNameInfoReqWrap* req_wrap = new GetNameInfoReqWrap(env, req_wrap_obj);

int err = uv_getnameinfo(env->event_loop(),
req_wrap->req(),
AfterGetNameInfo,
(struct sockaddr*)&addr,
NI_NAMEREQD);
req_wrap->Dispatched();
int err = req_wrap->Dispatch(uv_getnameinfo,
AfterGetNameInfo,
(struct sockaddr*)&addr,
NI_NAMEREQD);
if (err)
delete req_wrap;

Expand Down
2 changes: 1 addition & 1 deletion src/connection_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class ConnectionWrap : public StreamWrap {
ConnectionWrap(Environment* env,
v8::Local<v8::Object> object,
ProviderType provider);
~ConnectionWrap() {
virtual ~ConnectionWrap() {
}

UVType handle_;
Expand Down
33 changes: 30 additions & 3 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ inline Environment::Environment(IsolateData* isolate_data,
#if HAVE_INSPECTOR
inspector_agent_(this),
#endif
handle_cleanup_waiting_(0),
http_parser_buffer_(nullptr),
fs_stats_field_array_(nullptr),
context_(context->GetIsolate(), context) {
Expand Down Expand Up @@ -374,8 +373,36 @@ inline void Environment::RegisterHandleCleanup(uv_handle_t* handle,
handle_cleanup_queue_.PushBack(new HandleCleanup(handle, cb, arg));
}

inline void Environment::FinishHandleCleanup(uv_handle_t* handle) {
handle_cleanup_waiting_--;
template <typename T, typename OnCloseCallback>
inline void Environment::CloseHandle(T* handle, OnCloseCallback callback) {
handle_cleanup_waiting_++;
static_assert(sizeof(T) >= sizeof(uv_handle_t), "T is a libuv handle");
static_assert(offsetof(T, data) == offsetof(uv_handle_t, data),
"T is a libuv handle");
static_assert(offsetof(T, close_cb) == offsetof(uv_handle_t, close_cb),
"T is a libuv handle");
struct CloseData {
Environment* env;
OnCloseCallback callback;
void* original_data;
};
handle->data = new CloseData { this, callback, handle->data };
uv_close(reinterpret_cast<uv_handle_t*>(handle), [](uv_handle_t* handle) {
CloseData* data = static_cast<CloseData*>(handle->data);
data->env->handle_cleanup_waiting_--;
handle->data = data->original_data;
data->callback(reinterpret_cast<T*>(handle));
delete data;
});
}

void Environment::IncreaseWaitingRequestCounter() {
request_waiting_++;
}

void Environment::DecreaseWaitingRequestCounter() {
request_waiting_--;
CHECK_GE(request_waiting_, 0);
}

inline uv_loop_t* Environment::event_loop() const {
Expand Down
22 changes: 15 additions & 7 deletions src/env.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "node_internals.h"
#include "async-wrap.h"
#include "v8-profiler.h"
#include "req-wrap-inl.h"

#if defined(_MSC_VER)
#define getpid GetCurrentProcessId
Expand Down Expand Up @@ -51,11 +52,7 @@ void Environment::Start(int argc,
uv_timer_init(event_loop(), destroy_ids_timer_handle());

auto close_and_finish = [](Environment* env, uv_handle_t* handle, void* arg) {
handle->data = env;

uv_close(handle, [](uv_handle_t* handle) {
static_cast<Environment*>(handle->data)->FinishHandleCleanup(handle);
});
env->CloseHandle(handle, [](uv_handle_t* handle) {});
};

RegisterHandleCleanup(
Expand Down Expand Up @@ -95,14 +92,22 @@ void Environment::Start(int argc,
}

void Environment::CleanupHandles() {
for (auto r : req_wrap_queue_)
r->Cancel();

for (auto w : handle_wrap_queue_)
w->Close();

while (HandleCleanup* hc = handle_cleanup_queue_.PopFront()) {
handle_cleanup_waiting_++;
hc->cb_(this, hc->handle_, hc->arg_);
delete hc;
}

while (handle_cleanup_waiting_ != 0)
while (handle_cleanup_waiting_ != 0 ||
request_waiting_ != 0 ||
!handle_wrap_queue_.IsEmpty()) {
uv_run(event_loop(), UV_RUN_ONCE);
}
}

void Environment::StartProfilerIdleNotifier() {
Expand Down Expand Up @@ -167,6 +172,8 @@ void Environment::PrintSyncTrace() const {
}

void Environment::RunCleanup() {
CleanupHandles();

while (!cleanup_hooks_.empty()) {
std::vector<CleanupHookCallback> callbacks;
// Concatenate all vectors in cleanup_hooks_
Expand All @@ -182,6 +189,7 @@ void Environment::RunCleanup() {

for (const CleanupHookCallback& cb : callbacks) {
cb.fun_(cb.arg_);
CleanupHandles();
}
}
}
Expand Down
10 changes: 8 additions & 2 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,12 @@ class Environment {
inline void RegisterHandleCleanup(uv_handle_t* handle,
HandleCleanupCb cb,
void *arg);
inline void FinishHandleCleanup(uv_handle_t* handle);

template <typename T, typename OnCloseCallback>
inline void CloseHandle(T* handle, OnCloseCallback callback);

inline void IncreaseWaitingRequestCounter();
inline void DecreaseWaitingRequestCounter();

inline AsyncHooks* async_hooks();
inline DomainFlag* domain_flag();
Expand Down Expand Up @@ -722,7 +727,8 @@ class Environment {
ReqWrapQueue req_wrap_queue_;
ListHead<HandleCleanup,
&HandleCleanup::handle_cleanup_queue_> handle_cleanup_queue_;
int handle_cleanup_waiting_;
int handle_cleanup_waiting_ = 0;
int request_waiting_ = 0;

double* heap_statistics_buffer_ = nullptr;
double* heap_space_statistics_buffer_ = nullptr;
Expand Down
6 changes: 4 additions & 2 deletions src/fs_event_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,12 @@ FSEventWrap::FSEventWrap(Environment* env, Local<Object> object)
: HandleWrap(env,
object,
reinterpret_cast<uv_handle_t*>(&handle_),
AsyncWrap::PROVIDER_FSEVENTWRAP) {}
AsyncWrap::PROVIDER_FSEVENTWRAP) {
MarkAsUninitialized();
}


FSEventWrap::~FSEventWrap() {
CHECK_EQ(initialized_, false);
}


Expand Down Expand Up @@ -132,6 +133,7 @@ void FSEventWrap::Start(const FunctionCallbackInfo<Value>& args) {

int err = uv_fs_event_init(wrap->env()->event_loop(), &wrap->handle_);
if (err == 0) {
wrap->MarkAsInitialized();
wrap->initialized_ = true;

err = uv_fs_event_start(&wrap->handle_, OnEvent, *path, flags);
Expand Down
38 changes: 25 additions & 13 deletions src/handle_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,29 +64,40 @@ void HandleWrap::HasRef(const FunctionCallbackInfo<Value>& args) {


void HandleWrap::Close(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);

HandleWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());

// Guard against uninitialized handle or double close.
if (!IsAlive(wrap))
return;
wrap->Close(args[0]);
}

if (wrap->state_ != kInitialized)
void HandleWrap::Close(v8::Local<v8::Value> close_callback) {
if (state_ != kInitialized)
return;

CHECK_EQ(false, wrap->persistent().IsEmpty());
uv_close(wrap->handle_, OnClose);
wrap->state_ = kClosing;
CHECK_EQ(false, persistent().IsEmpty());
uv_close(handle_, OnClose);
state_ = kClosing;

if (args[0]->IsFunction()) {
wrap->object()->Set(env->onclose_string(), args[0]);
wrap->state_ = kClosingWithCallback;
if (!close_callback.IsEmpty() && close_callback->IsFunction()) {
object()->Set(env()->context(), env()->onclose_string(), close_callback)
.FromJust();
state_ = kClosingWithCallback;
}
}


void HandleWrap::MarkAsInitialized() {
env()->handle_wrap_queue()->PushBack(this);
state_ = kInitialized;
}


void HandleWrap::MarkAsUninitialized() {
handle_wrap_queue_.Remove();
state_ = kClosed;
}


HandleWrap::HandleWrap(Environment* env,
Local<Object> object,
uv_handle_t* handle,
Expand All @@ -102,7 +113,6 @@ HandleWrap::HandleWrap(Environment* env,


HandleWrap::~HandleWrap() {
CHECK(persistent().IsEmpty());
}


Expand All @@ -119,6 +129,8 @@ void HandleWrap::OnClose(uv_handle_t* handle) {
const bool have_close_callback = (wrap->state_ == kClosingWithCallback);
wrap->state_ = kClosed;

wrap->OnClose();

if (have_close_callback)
wrap->MakeCallback(env->onclose_string(), 0, nullptr);

Expand Down
8 changes: 7 additions & 1 deletion src/handle_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,18 @@ class HandleWrap : public AsyncWrap {

inline uv_handle_t* GetHandle() const { return handle_; }

void Close(v8::Local<v8::Value> close_callback = v8::Local<v8::Value>());

protected:
HandleWrap(Environment* env,
v8::Local<v8::Object> object,
uv_handle_t* handle,
AsyncWrap::ProviderType provider);
~HandleWrap() override;
virtual ~HandleWrap();
virtual void OnClose() {}

void MarkAsInitialized();
void MarkAsUninitialized();

private:
friend class Environment;
Expand Down
10 changes: 6 additions & 4 deletions src/node_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3382,6 +3382,9 @@ class Work : public node::AsyncResource {
// Establish a handle scope here so that every callback doesn't have to.
// Also it is needed for the exception-handling below.
v8::HandleScope scope(env->isolate);
auto env_ = node::Environment::GetCurrent(env->isolate);
env_->DecreaseWaitingRequestCounter();

CallbackScope callback_scope(work);

work->_complete(env, ConvertUVErrorCode(status), work->_data);
Expand Down Expand Up @@ -3470,13 +3473,12 @@ napi_status napi_queue_async_work(napi_env env, napi_async_work work) {
CHECK_ARG(env, work);

// Consider: Encapsulate the uv_loop_t into an opaque pointer parameter.
// Currently the environment event loop is the same as the UV default loop.
// Someday (if node ever supports multiple isolates), it may be better to get
// the loop from node::Environment::GetCurrent(env->isolate)->event_loop();
uv_loop_t* event_loop = uv_default_loop();
auto env_ = node::Environment::GetCurrent(env->isolate);
uv_loop_t* event_loop = env_->event_loop();

uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work);

env_->IncreaseWaitingRequestCounter();
CALL_UV(env, uv_queue_work(event_loop,
w->Request(),
uvimpl::Work::ExecuteCallback,
Expand Down
Loading