Skip to content

Commit

Permalink
Make subscribe and unsubscribe asynchronous (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
devongovett authored Apr 5, 2019
1 parent f81cca3 commit 23a7caf
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 156 deletions.
248 changes: 94 additions & 154 deletions src/FSChanges.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
#include "Event.hh"
#include "Backend.hh"
#include "Watcher.hh"
#include "PromiseRunner.hh"

using namespace Napi;

class FSAsyncRunner;
typedef void (*AsyncFunction)(FSAsyncRunner *);

std::unordered_set<std::string> getIgnore(Env env, Value opts) {
std::unordered_set<std::string> ignore;

Expand Down Expand Up @@ -40,129 +38,66 @@ std::shared_ptr<Backend> getBackend(Env env, Value opts) {
return Backend::getShared(backendName);
}

class FSAsyncRunner {
class WriteSnapshotRunner : public PromiseRunner {
public:
const Env env;

std::shared_ptr<Backend> backend;
std::shared_ptr<Watcher> watcher;
std::string snapshotPath;
bool returnEvents;
bool useSharedWatcher;

FSAsyncRunner(Env env, Value dir, Value snap, Value opts, Promise::Deferred r, AsyncFunction func, bool useSharedWatcher)
: env(env),
snapshotPath(std::string(snap.As<String>().Utf8Value().c_str())),
returnEvents(false),
useSharedWatcher(useSharedWatcher),
func(func), deferred(r) {

napi_status status = napi_create_async_work(env, nullptr, env.Undefined(),
OnExecute, OnWorkComplete, this, &this->work);
if(status != napi_ok) {
work = nullptr;
const napi_extended_error_info *error_info = 0;
napi_get_last_error_info(env, &error_info);
if(error_info->error_message)
Error::New(env, error_info->error_message).ThrowAsJavaScriptException();
else
Error::New(env).ThrowAsJavaScriptException();
}

if (useSharedWatcher) {
watcher = Watcher::getShared(
std::string(dir.As<String>().Utf8Value().c_str()),
getIgnore(env, opts)
);
} else {
watcher = std::make_shared<Watcher>(
std::string(dir.As<String>().Utf8Value().c_str()),
getIgnore(env, opts)
);
}
WriteSnapshotRunner(Env env, Value dir, Value snap, Value opts)
: PromiseRunner(env),
snapshotPath(std::string(snap.As<String>().Utf8Value().c_str())) {
watcher = Watcher::getShared(
std::string(dir.As<String>().Utf8Value().c_str()),
getIgnore(env, opts)
);

backend = getBackend(env, opts);
}

void Queue() {
if(this->work) {
napi_status status = napi_queue_async_work(env, this->work);
NAPI_THROW_IF_FAILED_VOID(env, status);
}
~WriteSnapshotRunner() {
watcher->unref();
backend->unref();
}

private:
napi_async_work work;
AsyncFunction func;
Promise::Deferred deferred;

static void OnExecute(napi_env env, void* this_pointer) {
FSAsyncRunner* self = (FSAsyncRunner*) this_pointer;
self->Execute();
}

static void OnWorkComplete(napi_env env, napi_status status, void* this_pointer) {
FSAsyncRunner* self = (FSAsyncRunner*) this_pointer;
if (status != napi_cancelled) {
HandleScope scope(self->env);
if(status == napi_ok) {
status = napi_delete_async_work(self->env, self->work);
if(status == napi_ok) {
self->OnOK();
delete self;
return;
}
}
}
std::shared_ptr<Backend> backend;
std::shared_ptr<Watcher> watcher;
std::string snapshotPath;

// fallthrough for error handling
const napi_extended_error_info *error_info = 0;
napi_get_last_error_info(env, &error_info);
if(error_info->error_message){
self->OnError(Error::New(env, error_info->error_message));
} else {
self->OnError(Error::New(env));
}
delete self;
void execute() override {
backend->writeSnapshot(*watcher, &snapshotPath);
}
};

class GetEventsSinceRunner : public PromiseRunner {
public:
GetEventsSinceRunner(Env env, Value dir, Value snap, Value opts)
: PromiseRunner(env),
snapshotPath(std::string(snap.As<String>().Utf8Value().c_str())) {
watcher = std::make_shared<Watcher>(
std::string(dir.As<String>().Utf8Value().c_str()),
getIgnore(env, opts)
);

void Execute() {
this->func(this);
backend = getBackend(env, opts);
}

void OnOK() {
HandleScope scope(env);
Value result;

if (this->returnEvents) {
result = watcher->mEvents.toJS(env);
} else {
result = env.Null();
}

~GetEventsSinceRunner() {
watcher->unref();
backend->unref();
this->deferred.Resolve(result);
}
private:
std::shared_ptr<Backend> backend;
std::shared_ptr<Watcher> watcher;
std::string snapshotPath;

void OnError(const Error& e) {
watcher->unref();
backend->unref();
this->deferred.Reject(e.Value());
void execute() override {
backend->getEventsSince(*watcher, &snapshotPath);
}
};

void writeSnapshotAsync(FSAsyncRunner *runner) {
runner->backend->writeSnapshot(*runner->watcher, &runner->snapshotPath);
}

void getEventsSinceAsync(FSAsyncRunner *runner) {
runner->backend->getEventsSince(*runner->watcher, &runner->snapshotPath);
runner->returnEvents = true;
}
Value getResult() override {
return watcher->mEvents.toJS(env);
}
};

Value queueWork(const CallbackInfo& info, AsyncFunction func, bool useSharedWatcher) {
template<class Runner>
Value queueSnapshotWork(const CallbackInfo& info) {
Env env = info.Env();
if (info.Length() < 1 || !info[0].IsString()) {
TypeError::New(env, "Expected a string").ThrowAsJavaScriptException();
Expand All @@ -179,57 +114,68 @@ Value queueWork(const CallbackInfo& info, AsyncFunction func, bool useSharedWatc
return env.Null();
}

Promise::Deferred deferred = Promise::Deferred::New(env);
FSAsyncRunner *runner = new FSAsyncRunner(info.Env(), info[0], info[1], info[2], deferred, func, useSharedWatcher);
runner->Queue();

return deferred.Promise();
Runner *runner = new Runner(info.Env(), info[0], info[1], info[2]);
return runner->queue();
}

Value writeSnapshot(const CallbackInfo& info) {
return queueWork(info, writeSnapshotAsync, true);
return queueSnapshotWork<WriteSnapshotRunner>(info);
}

Value getEventsSince(const CallbackInfo& info) {
return queueWork(info, getEventsSinceAsync, false);
return queueSnapshotWork<GetEventsSinceRunner>(info);
}

Value subscribe(const CallbackInfo& info) {
Env env = info.Env();
if (info.Length() < 1 || !info[0].IsString()) {
TypeError::New(env, "Expected a string").ThrowAsJavaScriptException();
return env.Null();
}
class SubscribeRunner : public PromiseRunner {
public:
SubscribeRunner(Env env, Value dir, Value fn, Value opts) : PromiseRunner(env) {
watcher = Watcher::getShared(
std::string(dir.As<String>().Utf8Value().c_str()),
getIgnore(env, opts)
);

if (info.Length() < 2 || !info[1].IsFunction()) {
TypeError::New(env, "Expected a function").ThrowAsJavaScriptException();
return env.Null();
backend = getBackend(env, opts);
shouldWatch = watcher->watch(fn.As<Function>());
}

if (info.Length() >= 3 && !info[2].IsObject()) {
TypeError::New(env, "Expected an object").ThrowAsJavaScriptException();
return env.Null();
private:
std::shared_ptr<Watcher> watcher;
std::shared_ptr<Backend> backend;
bool shouldWatch;

void execute() override {
if (shouldWatch) {
backend->watch(*watcher);
}
}
};

try {
std::shared_ptr<Watcher> watcher = Watcher::getShared(
std::string(info[0].As<String>().Utf8Value().c_str()),
getIgnore(env, info[2])
class UnsubscribeRunner : public PromiseRunner {
public:
UnsubscribeRunner(Env env, Value dir, Value fn, Value opts) : PromiseRunner(env) {
watcher = Watcher::getShared(
std::string(dir.As<String>().Utf8Value().c_str()),
getIgnore(env, opts)
);

bool added = watcher->watch(info[1].As<Function>());
if (added) {
std::shared_ptr<Backend> b = getBackend(env, info[2]);
b->watch(*watcher);
}
} catch (const char *err) {
Error::New(env, err).ThrowAsJavaScriptException();
backend = getBackend(env, opts);
shouldUnwatch = watcher->unwatch(fn.As<Function>());
}

return env.Null();
}
private:
std::shared_ptr<Watcher> watcher;
std::shared_ptr<Backend> backend;
bool shouldUnwatch;

Value unsubscribe(const CallbackInfo& info) {
void execute() override {
if (shouldUnwatch) {
backend->unwatch(*watcher);
}
}
};

template<class Runner>
Value queueSubscriptionWork(const CallbackInfo& info) {
Env env = info.Env();
if (info.Length() < 1 || !info[0].IsString()) {
TypeError::New(env, "Expected a string").ThrowAsJavaScriptException();
Expand All @@ -246,22 +192,16 @@ Value unsubscribe(const CallbackInfo& info) {
return env.Null();
}

try {
std::shared_ptr<Watcher> watcher = Watcher::getShared(
std::string(info[0].As<String>().Utf8Value().c_str()),
getIgnore(env, info[2])
);
Runner *runner = new Runner(info.Env(), info[0], info[1], info[2]);
return runner->queue();
}

bool removed = watcher->unwatch(info[1].As<Function>());
if (removed) {
std::shared_ptr<Backend> b = getBackend(env, info[2]);;
b->unwatch(*watcher);
}
} catch (const char *err) {
Error::New(env, err).ThrowAsJavaScriptException();
}

return env.Null();
Value subscribe(const CallbackInfo& info) {
return queueSubscriptionWork<SubscribeRunner>(info);
}

Value unsubscribe(const CallbackInfo& info) {
return queueSubscriptionWork<UnsubscribeRunner>(info);
}

Object Init(Env env, Object exports) {
Expand Down
Loading

0 comments on commit 23a7caf

Please sign in to comment.