Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to napi threadsafe functions #146

Merged
merged 13 commits into from
Jul 30, 2023
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@parcel/watcher",
"version": "2.2.0",
"version": "2.2.1-alpha.0",
"main": "index.js",
"types": "index.d.ts",
"repository": {
Expand Down
181 changes: 91 additions & 90 deletions src/Watcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ void removeShared(Watcher *watcher) {
Watcher::Watcher(std::string dir, std::unordered_set<std::string> ignorePaths, std::unordered_set<Glob> ignoreGlobs)
: mDir(dir),
mIgnorePaths(ignorePaths),
mIgnoreGlobs(ignoreGlobs),
mWatched(false),
mAsync(NULL),
mCallingCallbacks(false) {
mIgnoreGlobs(ignoreGlobs) {
mDebounce = Debounce::getShared();
mDebounce->add(this, [this] () {
triggerCallbacks();
Expand All @@ -68,112 +65,109 @@ void Watcher::notify() {
}
}

void Watcher::notifyError(std::exception &err) {
std::unique_lock<std::mutex> lk(mMutex);
if (mCallingCallbacks) {
mCallbackSignal.wait();
mCallbackSignal.reset();
}

mError = err.what();
triggerCallbacks();
}

void Watcher::triggerCallbacks() {
std::lock_guard<std::mutex> l(mCallbackEventsMutex);
if (mCallbacks.size() > 0 && (mEvents.size() > 0 || mError.size() > 0)) {
if (mCallingCallbacks) {
mCallbackSignal.wait();
mCallbackSignal.reset();
}

mCallbackEvents = mEvents.getEvents();
mEvents.clear();

uv_async_send(mAsync);
}
}
struct CallbackData {
std::string error;
std::vector<Event> events;
CallbackData(std::string error, std::vector<Event> events) : error(error), events(events) {}
};

Value Watcher::callbackEventsToJS(const Env& env) {
std::lock_guard<std::mutex> l(mCallbackEventsMutex);
Value callbackEventsToJS(const Env &env, std::vector<Event> &events) {
EscapableHandleScope scope(env);
Array arr = Array::New(env, mCallbackEvents.size());
Array arr = Array::New(env, events.size());
size_t currentEventIndex = 0;
for (auto eventIterator = mCallbackEvents.begin(); eventIterator != mCallbackEvents.end(); eventIterator++) {
for (auto eventIterator = events.begin(); eventIterator != events.end(); eventIterator++) {
arr.Set(currentEventIndex++, eventIterator->toJS(env));
}
return scope.Escape(arr);
}

// TODO: Doesn't this need some kind of locking?
void Watcher::clearCallbacks() {
mCallbacks.clear();
void callJSFunction(Napi::Env env, Function jsCallback, CallbackData *data) {
HandleScope scope(env);
auto err = data->error.size() > 0 ? Error::New(env, data->error).Value() : env.Null();
auto events = callbackEventsToJS(env, data->events);
jsCallback.Call({err, events});
delete data;

// Throw errors from the callback as fatal exceptions
// If we don't handle these node segfaults...
if (env.IsExceptionPending()) {
Napi::Error err = env.GetAndClearPendingException();
napi_fatal_exception(env, err.Value());
}
}

void Watcher::fireCallbacks(uv_async_t *handle) {
Watcher *watcher = (Watcher *)handle->data;
watcher->mCallingCallbacks = true;
void Watcher::notifyError(std::exception &err) {
std::unique_lock<std::mutex> lk(mMutex);
for (auto it = mCallbacks.begin(); it != mCallbacks.end(); it++) {
CallbackData *data = new CallbackData(err.what(), {});
it->tsfn.BlockingCall(data, callJSFunction);
}

watcher->mCallbacksIterator = watcher->mCallbacks.begin();
while (watcher->mCallbacksIterator != watcher->mCallbacks.end()) {
auto it = watcher->mCallbacksIterator;
HandleScope scope(it->Env());
auto err = watcher->mError.size() > 0 ? Error::New(it->Env(), watcher->mError).Value() : it->Env().Null();
auto events = watcher->callbackEventsToJS(it->Env());
clearCallbacks();
}

it->MakeCallback(it->Env().Global(), std::initializer_list<napi_value>{err, events});
// Throw errors from the callback as fatal exceptions
// If we don't handle these node segfaults...
if (it->Env().IsExceptionPending()) {
Napi::Error err = it->Env().GetAndClearPendingException();
napi_fatal_exception(it->Env(), err.Value());
}
// This function is called from the debounce thread.
void Watcher::triggerCallbacks() {
std::unique_lock<std::mutex> lk(mMutex);
if (mCallbacks.size() > 0 && mEvents.size() > 0) {
auto events = mEvents.getEvents();
mEvents.clear();

// If the iterator was changed, then the callback trigged an unwatch.
// The iterator will have been set to the next valid callback.
// If it is the same as before, increment it.
if (watcher->mCallbacksIterator == it) {
watcher->mCallbacksIterator++;
for (auto it = mCallbacks.begin(); it != mCallbacks.end(); it++) {
it->tsfn.BlockingCall(new CallbackData("", events), callJSFunction);
}
}
}

watcher->mCallingCallbacks = false;
// This should be called from the JavaScript thread.
bool Watcher::watch(Function callback) {
std::unique_lock<std::mutex> lk(mMutex);

if (watcher->mError.size() > 0) {
watcher->clearCallbacks();
auto it = findCallback(callback);
if (it != mCallbacks.end()) {
return false;
}

if (watcher->mCallbacks.size() == 0) {
watcher->unref();
} else {
watcher->mCallbackSignal.notify();
}
auto tsfn = ThreadSafeFunction::New(
callback.Env(),
callback,
"Watcher callback",
0, // Unlimited queue
1 // Initial thread count
);

mCallbacks.push_back(Callback {
tsfn,
Napi::Persistent(callback),
std::this_thread::get_id()
});

return true;
}

bool Watcher::watch(FunctionReference callback) {
std::unique_lock<std::mutex> lk(mMutex);
auto res = mCallbacks.insert(std::move(callback));
if (res.second && !mWatched) {
mAsync = new uv_async_t;
mAsync->data = (void *)this;
uv_async_init(uv_default_loop(), mAsync, Watcher::fireCallbacks);
mWatched = true;
return true;
// This should be called from the JavaScript thread.
std::vector<Callback>::iterator Watcher::findCallback(Function callback) {
for (auto it = mCallbacks.begin(); it != mCallbacks.end(); it++) {
// Only consider callbacks created by the same thread, or V8 will panic.
if (it->threadId == std::this_thread::get_id() && it->ref.Value() == callback) {
return it;
}
}

return false;
return mCallbacks.end();
}

// This should be called from the JavaScript thread.
bool Watcher::unwatch(Function callback) {
std::unique_lock<std::mutex> lk(mMutex);

bool removed = false;
for (auto it = mCallbacks.begin(); it != mCallbacks.end(); it++) {
if (it->Value() == callback) {
mCallbacksIterator = mCallbacks.erase(it);
removed = true;
break;
}
auto it = findCallback(callback);
if (it != mCallbacks.end()) {
it->tsfn.Release();
it->ref.Unref();
mCallbacks.erase(it);
removed = true;
}

if (removed && mCallbacks.size() == 0) {
Expand All @@ -185,18 +179,25 @@ bool Watcher::unwatch(Function callback) {
}

void Watcher::unref() {
if (mCallbacks.size() == 0 && !mCallingCallbacks) {
if (mWatched) {
mWatched = false;
uv_close((uv_handle_t *)mAsync, Watcher::onClose);
}

if (mCallbacks.size() == 0) {
removeShared(this);
}
}

void Watcher::onClose(uv_handle_t *handle) {
delete (uv_async_t *)handle;
void Watcher::destroy() {
std::unique_lock<std::mutex> lk(mMutex);
clearCallbacks();
}

// Private because it doesn't lock.
void Watcher::clearCallbacks() {
for (auto it = mCallbacks.begin(); it != mCallbacks.end(); it++) {
it->tsfn.Release();
it->ref.Unref();
}

mCallbacks.clear();
unref();
}

bool Watcher::isIgnored(std::string path) {
Expand All @@ -208,7 +209,7 @@ bool Watcher::isIgnored(std::string path) {
}

auto basePath = mDir + DIR_SEP;

if (path.rfind(basePath, 0) != 0) {
return false;
}
Expand Down
24 changes: 10 additions & 14 deletions src/Watcher.hh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include <condition_variable>
#include <unordered_set>
#include <set>
#include <uv.h>
#include <node_api.h>
#include "Glob.hh"
#include "Event.hh"
Expand All @@ -14,13 +13,18 @@

using namespace Napi;

struct Callback {
Napi::ThreadSafeFunction tsfn;
Napi::FunctionReference ref;
std::thread::id threadId;
};

struct Watcher {
std::string mDir;
std::unordered_set<std::string> mIgnorePaths;
std::unordered_set<Glob> mIgnoreGlobs;
EventList mEvents;
void *state;
bool mWatched;

Watcher(std::string dir, std::unordered_set<std::string> ignorePaths, std::unordered_set<Glob> ignoreGlobs);
~Watcher();
Expand All @@ -32,31 +36,23 @@ struct Watcher {
void wait();
void notify();
void notifyError(std::exception &err);
bool watch(FunctionReference callback);
bool watch(Function callback);
bool unwatch(Function callback);
void unref();
bool isIgnored(std::string path);
void destroy();

static std::shared_ptr<Watcher> getShared(std::string dir, std::unordered_set<std::string> ignorePaths, std::unordered_set<Glob> ignoreGlobs);

private:
std::mutex mMutex;
std::mutex mCallbackEventsMutex;
std::condition_variable mCond;
uv_async_t *mAsync;
std::set<FunctionReference> mCallbacks;
std::set<FunctionReference>::iterator mCallbacksIterator;
bool mCallingCallbacks;
std::vector<Event> mCallbackEvents;
std::vector<Callback> mCallbacks;
std::shared_ptr<Debounce> mDebounce;
Signal mCallbackSignal;
std::string mError;

Value callbackEventsToJS(const Env& env);
std::vector<Callback>::iterator findCallback(Function callback);
void clearCallbacks();
void triggerCallbacks();
static void fireCallbacks(uv_async_t *handle);
static void onClose(uv_handle_t *handle);
};

class WatcherError : public std::runtime_error {
Expand Down
12 changes: 8 additions & 4 deletions src/binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ std::unordered_set<std::string> getIgnorePaths(Env env, Value opts) {

std::unordered_set<Glob> getIgnoreGlobs(Env env, Value opts) {
std::unordered_set<Glob> result;

if (opts.IsObject()) {
Value v = opts.As<Object>().Get(String::New(env, "ignoreGlobs"));
if (v.IsArray()) {
Expand Down Expand Up @@ -165,7 +165,7 @@ class SubscribeRunner : public PromiseRunner {
);

backend = getBackend(env, opts);
callback = Persistent(fn.As<Function>());
watcher->watch(fn.As<Function>());
}

private:
Expand All @@ -174,8 +174,12 @@ class SubscribeRunner : public PromiseRunner {
FunctionReference callback;

void execute() override {
backend->watch(*watcher);
watcher->watch(std::move(callback));
try {
backend->watch(*watcher);
} catch (std::exception &err) {
watcher->destroy();
throw;
}
}
};

Expand Down
Loading