From 0ab1b1cdbd01e0f02a185f6b5bf6e1630ddbc93d Mon Sep 17 00:00:00 2001 From: Devon Govett Date: Mon, 3 Jul 2023 22:38:51 -0600 Subject: [PATCH 01/12] Migrate to napi threadsafe functions --- src/Watcher.cc | 181 ++++++++++++++++++----------------- src/Watcher.hh | 25 +++-- src/binding.cc | 12 ++- src/macos/FSEventsBackend.cc | 8 +- test/watcher.js | 33 +++++++ 5 files changed, 146 insertions(+), 113 deletions(-) diff --git a/src/Watcher.cc b/src/Watcher.cc index 384c243d..751e638a 100644 --- a/src/Watcher.cc +++ b/src/Watcher.cc @@ -40,10 +40,7 @@ void removeShared(Watcher *watcher) { Watcher::Watcher(std::string dir, std::unordered_set ignorePaths, std::unordered_set ignoreGlobs) : mDir(dir), mIgnorePaths(ignorePaths), - mIgnoreGlobs(ignoreGlobs), - mWatched(false), - mAsync(NULL), - mCallingCallbacks(false) { + mIgnoreGlobs(ignoreGlobs) { mDebounce = Debounce::getShared(); mDebounce->add(this, [this] () { triggerCallbacks(); @@ -68,112 +65,109 @@ void Watcher::notify() { } } -void Watcher::notifyError(std::exception &err) { - std::unique_lock lk(mMutex); - if (mCallingCallbacks) { - mCallbackSignal.wait(); - mCallbackSignal.reset(); - } - - mError = err.what(); - triggerCallbacks(); -} - -void Watcher::triggerCallbacks() { - std::lock_guard l(mCallbackEventsMutex); - if (mCallbacks.size() > 0 && (mEvents.size() > 0 || mError.size() > 0)) { - if (mCallingCallbacks) { - mCallbackSignal.wait(); - mCallbackSignal.reset(); - } - - mCallbackEvents = mEvents.getEvents(); - mEvents.clear(); - - uv_async_send(mAsync); - } -} +struct CallbackData { + std::string error; + std::vector events; + CallbackData(std::string error, std::vector events) : error(error), events(events) {} +}; -Value Watcher::callbackEventsToJS(const Env& env) { - std::lock_guard l(mCallbackEventsMutex); +Value callbackEventsToJS(const Env &env, std::vector &events) { EscapableHandleScope scope(env); - Array arr = Array::New(env, mCallbackEvents.size()); + Array arr = Array::New(env, events.size()); size_t currentEventIndex = 0; - for (auto eventIterator = mCallbackEvents.begin(); eventIterator != mCallbackEvents.end(); eventIterator++) { + for (auto eventIterator = events.begin(); eventIterator != events.end(); eventIterator++) { arr.Set(currentEventIndex++, eventIterator->toJS(env)); } return scope.Escape(arr); } -// TODO: Doesn't this need some kind of locking? -void Watcher::clearCallbacks() { - mCallbacks.clear(); +void callJSFunction(Napi::Env env, Function jsCallback, CallbackData *data) { + HandleScope scope(env); + auto err = data->error.size() > 0 ? Error::New(env, data->error).Value() : env.Null(); + auto events = callbackEventsToJS(env, data->events); + jsCallback.Call({err, events}); + delete data; + + // Throw errors from the callback as fatal exceptions + // If we don't handle these node segfaults... + if (env.IsExceptionPending()) { + Napi::Error err = env.GetAndClearPendingException(); + napi_fatal_exception(env, err.Value()); + } } -void Watcher::fireCallbacks(uv_async_t *handle) { - Watcher *watcher = (Watcher *)handle->data; - watcher->mCallingCallbacks = true; +void Watcher::notifyError(std::exception &err) { + std::unique_lock lk(mMutex); + for (auto it = mCallbacks.begin(); it != mCallbacks.end(); it++) { + CallbackData *data = new CallbackData(err.what(), {}); + it->tsfn.BlockingCall(data, callJSFunction); + } - watcher->mCallbacksIterator = watcher->mCallbacks.begin(); - while (watcher->mCallbacksIterator != watcher->mCallbacks.end()) { - auto it = watcher->mCallbacksIterator; - HandleScope scope(it->Env()); - auto err = watcher->mError.size() > 0 ? Error::New(it->Env(), watcher->mError).Value() : it->Env().Null(); - auto events = watcher->callbackEventsToJS(it->Env()); + clearCallbacks(); +} - it->MakeCallback(it->Env().Global(), std::initializer_list{err, events}); - // Throw errors from the callback as fatal exceptions - // If we don't handle these node segfaults... - if (it->Env().IsExceptionPending()) { - Napi::Error err = it->Env().GetAndClearPendingException(); - napi_fatal_exception(it->Env(), err.Value()); - } +// This function is called from the debounce thread. +void Watcher::triggerCallbacks() { + std::unique_lock lk(mMutex); + if (mCallbacks.size() > 0 && mEvents.size() > 0) { + auto events = mEvents.getEvents(); + mEvents.clear(); - // If the iterator was changed, then the callback trigged an unwatch. - // The iterator will have been set to the next valid callback. - // If it is the same as before, increment it. - if (watcher->mCallbacksIterator == it) { - watcher->mCallbacksIterator++; + for (auto it = mCallbacks.begin(); it != mCallbacks.end(); it++) { + it->tsfn.BlockingCall(new CallbackData("", events), callJSFunction); } } +} - watcher->mCallingCallbacks = false; +// This should be called from the JavaScript thread. +bool Watcher::watch(Function callback) { + std::unique_lock lk(mMutex); - if (watcher->mError.size() > 0) { - watcher->clearCallbacks(); + auto it = findCallback(callback); + if (it != mCallbacks.end()) { + return false; } - if (watcher->mCallbacks.size() == 0) { - watcher->unref(); - } else { - watcher->mCallbackSignal.notify(); - } + auto tsfn = ThreadSafeFunction::New( + callback.Env(), + callback, + "Watcher callback", + 0, // Unlimited queue + 1 // Initial thread count + ); + + mCallbacks.push_back(Callback { + .tsfn = tsfn, + .ref = Napi::Persistent(callback), + .threadId = std::this_thread::get_id() + }); + + return true; } -bool Watcher::watch(FunctionReference callback) { - std::unique_lock lk(mMutex); - auto res = mCallbacks.insert(std::move(callback)); - if (res.second && !mWatched) { - mAsync = new uv_async_t; - mAsync->data = (void *)this; - uv_async_init(uv_default_loop(), mAsync, Watcher::fireCallbacks); - mWatched = true; - return true; +// This should be called from the JavaScript thread. +std::vector::iterator Watcher::findCallback(Function callback) { + for (auto it = mCallbacks.begin(); it != mCallbacks.end(); it++) { + // Only consider callbacks created by the same thread, or V8 will panic. + if (it->threadId == std::this_thread::get_id() && it->ref.Value() == callback) { + return it; + } } - return false; + return mCallbacks.end(); } +// This should be called from the JavaScript thread. bool Watcher::unwatch(Function callback) { std::unique_lock lk(mMutex); bool removed = false; - for (auto it = mCallbacks.begin(); it != mCallbacks.end(); it++) { - if (it->Value() == callback) { - mCallbacksIterator = mCallbacks.erase(it); - removed = true; - break; - } + auto it = findCallback(callback); + if (it != mCallbacks.end()) { + it->tsfn.Release(); + it->ref.Unref(); + mCallbacks.erase(it); + removed = true; } if (removed && mCallbacks.size() == 0) { @@ -185,18 +179,25 @@ bool Watcher::unwatch(Function callback) { } void Watcher::unref() { - if (mCallbacks.size() == 0 && !mCallingCallbacks) { - if (mWatched) { - mWatched = false; - uv_close((uv_handle_t *)mAsync, Watcher::onClose); - } - + if (mCallbacks.size() == 0) { removeShared(this); } } -void Watcher::onClose(uv_handle_t *handle) { - delete (uv_async_t *)handle; +void Watcher::destroy() { + std::unique_lock lk(mMutex); + clearCallbacks(); +} + +// Private because it doesn't lock. +void Watcher::clearCallbacks() { + for (auto it = mCallbacks.begin(); it != mCallbacks.end(); it++) { + it->tsfn.Release(); + it->ref.Unref(); + } + + mCallbacks.clear(); + unref(); } bool Watcher::isIgnored(std::string path) { @@ -208,7 +209,7 @@ bool Watcher::isIgnored(std::string path) { } auto basePath = mDir + DIR_SEP; - + if (path.rfind(basePath, 0) != 0) { return false; } diff --git a/src/Watcher.hh b/src/Watcher.hh index 5aac4f59..e6fee663 100644 --- a/src/Watcher.hh +++ b/src/Watcher.hh @@ -4,8 +4,8 @@ #include #include #include -#include #include +#include #include "Glob.hh" #include "Event.hh" #include "Debounce.hh" @@ -14,13 +14,18 @@ using namespace Napi; +struct Callback { + Napi::ThreadSafeFunction tsfn; + Napi::FunctionReference ref; + std::thread::id threadId; +}; + struct Watcher { std::string mDir; std::unordered_set mIgnorePaths; std::unordered_set mIgnoreGlobs; EventList mEvents; void *state; - bool mWatched; Watcher(std::string dir, std::unordered_set ignorePaths, std::unordered_set ignoreGlobs); ~Watcher(); @@ -32,31 +37,23 @@ struct Watcher { void wait(); void notify(); void notifyError(std::exception &err); - bool watch(FunctionReference callback); + bool watch(Function callback); bool unwatch(Function callback); void unref(); bool isIgnored(std::string path); + void destroy(); static std::shared_ptr getShared(std::string dir, std::unordered_set ignorePaths, std::unordered_set ignoreGlobs); private: std::mutex mMutex; - std::mutex mCallbackEventsMutex; std::condition_variable mCond; - uv_async_t *mAsync; - std::set mCallbacks; - std::set::iterator mCallbacksIterator; - bool mCallingCallbacks; - std::vector mCallbackEvents; + std::vector mCallbacks; std::shared_ptr mDebounce; - Signal mCallbackSignal; - std::string mError; - Value callbackEventsToJS(const Env& env); + std::vector::iterator findCallback(Function callback); void clearCallbacks(); void triggerCallbacks(); - static void fireCallbacks(uv_async_t *handle); - static void onClose(uv_handle_t *handle); }; class WatcherError : public std::runtime_error { diff --git a/src/binding.cc b/src/binding.cc index 70a01a1b..46571949 100644 --- a/src/binding.cc +++ b/src/binding.cc @@ -31,7 +31,7 @@ std::unordered_set getIgnorePaths(Env env, Value opts) { std::unordered_set getIgnoreGlobs(Env env, Value opts) { std::unordered_set result; - + if (opts.IsObject()) { Value v = opts.As().Get(String::New(env, "ignoreGlobs")); if (v.IsArray()) { @@ -165,7 +165,7 @@ class SubscribeRunner : public PromiseRunner { ); backend = getBackend(env, opts); - callback = Persistent(fn.As()); + watcher->watch(fn.As()); } private: @@ -174,8 +174,12 @@ class SubscribeRunner : public PromiseRunner { FunctionReference callback; void execute() override { - backend->watch(*watcher); - watcher->watch(std::move(callback)); + try { + backend->watch(*watcher); + } catch (std::exception &err) { + watcher->destroy(); + throw; + } } }; diff --git a/src/macos/FSEventsBackend.cc b/src/macos/FSEventsBackend.cc index c584f3ce..191bf044 100644 --- a/src/macos/FSEventsBackend.cc +++ b/src/macos/FSEventsBackend.cc @@ -106,7 +106,7 @@ void FSEventsCallback( if (stat(paths[i], &file)) { continue; } - + // Ignore if mtime is the same as the last event. // This prevents duplicate events from being emitted. // If tv_nsec is zero, the file system probably only has second-level @@ -151,7 +151,7 @@ void FSEventsCallback( if (entry && entry->mtime == mtime && file.st_mtimespec.tv_nsec != 0) { continue; } - + // Some mounted file systems report a creation time of 0/unix epoch which we special case. if (isModified && (entry || (ctime <= since && ctime != 0))) { state->tree->update(paths[i], mtime); @@ -163,9 +163,7 @@ void FSEventsCallback( } } - if (watcher->mWatched) { - watcher->notify(); - } + watcher->notify(); // Stop watching if the root directory was deleted. if (deletedRoot) { diff --git a/test/watcher.js b/test/watcher.js index 77371740..3ca28ea2 100644 --- a/test/watcher.js +++ b/test/watcher.js @@ -3,6 +3,7 @@ const assert = require('assert'); const fs = require('fs-extra'); const path = require('path'); const {execSync} = require('child_process'); +const {Worker} = require('worker_threads'); let backends = []; if (process.platform === 'darwin') { @@ -789,6 +790,38 @@ describe('watcher', () => { assert(threw, 'did not throw'); }); }); + + it('should support worker threads', async function () { + let worker = new Worker(` + const {parentPort} = require('worker_threads'); + const {tmpDir, backend, modulePath} = require('worker_threads').workerData; + const watcher = require(modulePath); + async function run() { + let sub = await watcher.subscribe(tmpDir, async (err, events) => { + await sub.unsubscribe(); + parentPort.postMessage('success'); + }, {backend}); + parentPort.postMessage('ready'); + } + + run(); + `, {eval: true, workerData: {tmpDir, backend, modulePath: require.resolve('../')}}); + + await new Promise((resolve, reject) => { + worker.once('message', resolve); + worker.once('error', reject); + }); + + let workerPromise = new Promise((resolve, reject) => { + worker.once('message', resolve); + worker.once('error', reject); + }); + + let f = getFilename(); + fs.writeFile(f, 'hello world'); + let [res] = await Promise.all([nextEvent(), workerPromise]); + assert.deepEqual(res, [{type: 'create', path: f}]); + }); }); }); From ec4e315c5ac0ee7f522db972ac4383f99910e592 Mon Sep 17 00:00:00 2001 From: Devon Govett Date: Mon, 3 Jul 2023 22:54:37 -0600 Subject: [PATCH 02/12] Fix compiler errors --- src/Watcher.cc | 6 +++--- src/linux/InotifyBackend.cc | 1 + src/unix/legacy.cc | 1 + 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/Watcher.cc b/src/Watcher.cc index 751e638a..2bae611a 100644 --- a/src/Watcher.cc +++ b/src/Watcher.cc @@ -137,9 +137,9 @@ bool Watcher::watch(Function callback) { ); mCallbacks.push_back(Callback { - .tsfn = tsfn, - .ref = Napi::Persistent(callback), - .threadId = std::this_thread::get_id() + tsfn, + Napi::Persistent(callback), + std::this_thread::get_id() }); return true; diff --git a/src/linux/InotifyBackend.cc b/src/linux/InotifyBackend.cc index d498ccaf..9ec267c5 100644 --- a/src/linux/InotifyBackend.cc +++ b/src/linux/InotifyBackend.cc @@ -1,6 +1,7 @@ #include #include #include +#include #include "InotifyBackend.hh" #define INOTIFY_MASK \ diff --git a/src/unix/legacy.cc b/src/unix/legacy.cc index 01911282..97963c9f 100644 --- a/src/unix/legacy.cc +++ b/src/unix/legacy.cc @@ -13,6 +13,7 @@ #endif #include #include +#include #include "../DirTree.hh" #include "../shared/BruteForceBackend.hh" From b835a764dbb4b67e1331720b0b906690474bb7e7 Mon Sep 17 00:00:00 2001 From: Devon Govett Date: Mon, 3 Jul 2023 23:09:06 -0600 Subject: [PATCH 03/12] Don't break windows --- src/Watcher.hh | 1 - src/unix/fts.cc | 1 + src/watchman/WatchmanBackend.cc | 7 ++++--- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/Watcher.hh b/src/Watcher.hh index e6fee663..ad39e397 100644 --- a/src/Watcher.hh +++ b/src/Watcher.hh @@ -5,7 +5,6 @@ #include #include #include -#include #include "Glob.hh" #include "Event.hh" #include "Debounce.hh" diff --git a/src/unix/fts.cc b/src/unix/fts.cc index 6b411297..14a538b7 100644 --- a/src/unix/fts.cc +++ b/src/unix/fts.cc @@ -7,6 +7,7 @@ #define __THROW #include +#include #include "../DirTree.hh" #include "../shared/BruteForceBackend.hh" diff --git a/src/watchman/WatchmanBackend.cc b/src/watchman/WatchmanBackend.cc index cba4251b..04640037 100644 --- a/src/watchman/WatchmanBackend.cc +++ b/src/watchman/WatchmanBackend.cc @@ -13,6 +13,7 @@ #define popen _popen #define pclose _pclose #else +#include #define normalizePath(dir) dir #endif @@ -53,7 +54,7 @@ std::string getSockPath() { BSER b = readBSER([fp] (char *buf, size_t len) { return fread(buf, sizeof(char), len, fp); }); - + pclose(fp); auto objValue = b.objectValue(); @@ -113,7 +114,7 @@ void handleFiles(Watcher &watcher, BSER::Object obj) { if (found == obj.end()) { throw WatcherError("Error reading changes from watchman", &watcher); } - + auto files = found->second.arrayValue(); for (auto it = files.begin(); it != files.end(); it++) { auto file = it->objectValue(); @@ -325,7 +326,7 @@ void WatchmanBackend::subscribe(Watcher &watcher) { void WatchmanBackend::unsubscribe(Watcher &watcher) { std::string id = getId(watcher); auto erased = mSubscriptions.erase(id); - + if (erased) { BSER::Array cmd; cmd.push_back("unsubscribe"); From 944b62a12a101cc38ada06c372a0946eb2eb19ef Mon Sep 17 00:00:00 2001 From: Devon Govett Date: Mon, 3 Jul 2023 23:18:25 -0600 Subject: [PATCH 04/12] more fixes --- src/linux/InotifyBackend.cc | 1 + src/watchman/IPC.hh | 33 +++++++++++++++++---------------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/src/linux/InotifyBackend.cc b/src/linux/InotifyBackend.cc index 9ec267c5..b81d0bb3 100644 --- a/src/linux/InotifyBackend.cc +++ b/src/linux/InotifyBackend.cc @@ -2,6 +2,7 @@ #include #include #include +#include #include "InotifyBackend.hh" #define INOTIFY_MASK \ diff --git a/src/watchman/IPC.hh b/src/watchman/IPC.hh index 3641b6d6..6e852c8d 100644 --- a/src/watchman/IPC.hh +++ b/src/watchman/IPC.hh @@ -5,6 +5,7 @@ #include #ifdef _WIN32 +#include #include #else #include @@ -19,12 +20,12 @@ public: #ifdef _WIN32 while (true) { mPipe = CreateFile( - path.data(), // pipe name - GENERIC_READ | GENERIC_WRITE, // read and write access - 0, // no sharing + path.data(), // pipe name + GENERIC_READ | GENERIC_WRITE, // read and write access + 0, // no sharing NULL, // default security attributes - OPEN_EXISTING, // opens existing pipe - FILE_FLAG_OVERLAPPED, // attributes + OPEN_EXISTING, // opens existing pipe + FILE_FLAG_OVERLAPPED, // attributes NULL // no template file ); @@ -74,11 +75,11 @@ public: OVERLAPPED overlapped; overlapped.hEvent = mWriter; bool success = WriteFile( - mPipe, // pipe handle - buf.data(), // message - buf.size(), // message length - NULL, // bytes written - &overlapped // overlapped + mPipe, // pipe handle + buf.data(), // message + buf.size(), // message length + NULL, // bytes written + &overlapped // overlapped ); if (mStopped) { @@ -122,11 +123,11 @@ public: OVERLAPPED overlapped; overlapped.hEvent = mReader; bool success = ReadFile( - mPipe, // pipe handle - buf, // buffer to receive reply - len, // size of buffer - NULL, // number of bytes read - &overlapped // overlapped + mPipe, // pipe handle + buf, // buffer to receive reply + len, // size of buffer + NULL, // number of bytes read + &overlapped // overlapped ); if (!success && !mStopped) { @@ -140,7 +141,7 @@ public: if (!success && !mStopped) { throw std::runtime_error("GetOverlappedResult failed"); } - + return read; #else int r = ::read(mSock, buf, len); From 15309d53d73ea66afba5b5996718834a18d6d5f4 Mon Sep 17 00:00:00 2001 From: Devon Govett Date: Tue, 4 Jul 2023 11:25:09 -0600 Subject: [PATCH 05/12] debug --- test/watcher.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/watcher.js b/test/watcher.js index 3ca28ea2..5a57bd90 100644 --- a/test/watcher.js +++ b/test/watcher.js @@ -79,7 +79,7 @@ describe('watcher', () => { }); describe('files', () => { - it('should emit when a file is created', async () => { + it.only('should emit when a file is created', async () => { let f = getFilename(); fs.writeFile(f, 'hello world'); let res = await nextEvent(); From efb241e7ee03ab612ad373ae2e0573eaef167f13 Mon Sep 17 00:00:00 2001 From: Devon Govett Date: Tue, 4 Jul 2023 11:27:57 -0600 Subject: [PATCH 06/12] test --- test/watcher.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/watcher.js b/test/watcher.js index 5a57bd90..061117d6 100644 --- a/test/watcher.js +++ b/test/watcher.js @@ -79,7 +79,7 @@ describe('watcher', () => { }); describe('files', () => { - it.only('should emit when a file is created', async () => { + it('should emit when a file is created', async () => { let f = getFilename(); fs.writeFile(f, 'hello world'); let res = await nextEvent(); @@ -821,6 +821,8 @@ describe('watcher', () => { fs.writeFile(f, 'hello world'); let [res] = await Promise.all([nextEvent(), workerPromise]); assert.deepEqual(res, [{type: 'create', path: f}]); + + await worker.terminate(); }); }); }); From fbc51d6ad5bb21648f42a92a90f6de27c9bbde63 Mon Sep 17 00:00:00 2001 From: Devon Govett Date: Tue, 4 Jul 2023 23:35:28 -0600 Subject: [PATCH 07/12] Try fixing tests --- test/watcher.js | 58 +++++++++++++++++++++++++------------------------ 1 file changed, 30 insertions(+), 28 deletions(-) diff --git a/test/watcher.js b/test/watcher.js index 061117d6..ad710abd 100644 --- a/test/watcher.js +++ b/test/watcher.js @@ -791,38 +791,40 @@ describe('watcher', () => { }); }); - it('should support worker threads', async function () { - let worker = new Worker(` - const {parentPort} = require('worker_threads'); - const {tmpDir, backend, modulePath} = require('worker_threads').workerData; - const watcher = require(modulePath); - async function run() { - let sub = await watcher.subscribe(tmpDir, async (err, events) => { - await sub.unsubscribe(); - parentPort.postMessage('success'); - }, {backend}); - parentPort.postMessage('ready'); - } + describe('workers', () => { + it('should support worker threads', async function () { + let worker = new Worker(` + const {parentPort} = require('worker_threads'); + const {tmpDir, backend, modulePath} = require('worker_threads').workerData; + const watcher = require(modulePath); + async function run() { + let sub = await watcher.subscribe(tmpDir, async (err, events) => { + await sub.unsubscribe(); + parentPort.postMessage('success'); + }, {backend}); + parentPort.postMessage('ready'); + } + + run(); + `, {eval: true, workerData: {tmpDir, backend, modulePath: require.resolve('../')}}); + + await new Promise((resolve, reject) => { + worker.once('message', resolve); + worker.once('error', reject); + }); - run(); - `, {eval: true, workerData: {tmpDir, backend, modulePath: require.resolve('../')}}); + let workerPromise = new Promise((resolve, reject) => { + worker.once('message', resolve); + worker.once('error', reject); + }); - await new Promise((resolve, reject) => { - worker.once('message', resolve); - worker.once('error', reject); - }); + let f = getFilename(); + fs.writeFile(f, 'hello world'); + let [res] = await Promise.all([nextEvent(), workerPromise]); + assert.deepEqual(res, [{type: 'create', path: f}]); - let workerPromise = new Promise((resolve, reject) => { - worker.once('message', resolve); - worker.once('error', reject); + await worker.terminate(); }); - - let f = getFilename(); - fs.writeFile(f, 'hello world'); - let [res] = await Promise.all([nextEvent(), workerPromise]); - assert.deepEqual(res, [{type: 'create', path: f}]); - - await worker.terminate(); }); }); }); From 39c9bbf423ecc6d490f854e377ba49892b3a7276 Mon Sep 17 00:00:00 2001 From: Devon Govett Date: Wed, 5 Jul 2023 09:26:21 -0600 Subject: [PATCH 08/12] kick --- test/watcher.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/watcher.js b/test/watcher.js index ad710abd..ae6df458 100644 --- a/test/watcher.js +++ b/test/watcher.js @@ -791,7 +791,7 @@ describe('watcher', () => { }); }); - describe('workers', () => { + describe('worker threads', () => { it('should support worker threads', async function () { let worker = new Worker(` const {parentPort} = require('worker_threads'); From 15a02dee1ec8486dd79b23ac4ef54a5e0bd838b2 Mon Sep 17 00:00:00 2001 From: Devon Govett Date: Wed, 5 Jul 2023 09:32:25 -0600 Subject: [PATCH 09/12] alpha version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 99702aac..f15ecc39 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@parcel/watcher", - "version": "2.2.0", + "version": "2.2.1-alpha.0", "main": "index.js", "types": "index.d.ts", "repository": { From d968e9e1bea39d9f9d19fcf9646dd2b7181c1a4c Mon Sep 17 00:00:00 2001 From: Devon Govett Date: Sat, 29 Jul 2023 14:02:00 -0700 Subject: [PATCH 10/12] Fix includes for kqueue backend --- src/kqueue/KqueueBackend.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/kqueue/KqueueBackend.cc b/src/kqueue/KqueueBackend.cc index f6d97725..6cb9f951 100644 --- a/src/kqueue/KqueueBackend.cc +++ b/src/kqueue/KqueueBackend.cc @@ -2,6 +2,9 @@ #include #include #include +#include +#include +#include #include "KqueueBackend.hh" #if __APPLE__ From 291b7510f97d0e735bd00e6467c436c4e7a2d07a Mon Sep 17 00:00:00 2001 From: Devon Govett Date: Sat, 29 Jul 2023 19:04:40 -0700 Subject: [PATCH 11/12] Fix kqueue --- src/kqueue/KqueueBackend.cc | 71 +++++++++++++++++++++------------ test/watcher.js | 78 ++++++++++++++++++------------------- 2 files changed, 85 insertions(+), 64 deletions(-) diff --git a/src/kqueue/KqueueBackend.cc b/src/kqueue/KqueueBackend.cc index 6cb9f951..f5843024 100644 --- a/src/kqueue/KqueueBackend.cc +++ b/src/kqueue/KqueueBackend.cc @@ -207,6 +207,11 @@ bool KqueueBackend::compareDir(int fd, std::string &path, std::unordered_set subs = findSubscriptions(path); std::string dirStart = path + DIR_SEP; + std::unordered_set> trees; + for (auto it = subs.begin(); it != subs.end(); it++) { + trees.emplace((*it)->tree); + } + std::unordered_set entries; struct dirent *entry; while ((entry = readdir(dir))) { @@ -217,39 +222,55 @@ bool KqueueBackend::compareDir(int fd, std::string &path, std::unordered_setd_name; entries.emplace(fullpath); - for (auto it = subs.begin(); it != subs.end(); it++) { - KqueueSubscription *sub = *it; - if (sub->watcher->isIgnored(fullpath)) { - continue; - } - - if (!sub->tree->find(fullpath)) { + for (auto it = trees.begin(); it != trees.end(); it++) { + std::shared_ptr tree = *it; + if (!tree->find(fullpath)) { struct stat st; fstatat(fd, entry->d_name, &st, AT_SYMLINK_NOFOLLOW); - sub->tree->add(fullpath, CONVERT_TIME(st.st_mtim), S_ISDIR(st.st_mode)); - sub->watcher->mEvents.create(fullpath); - watchers.emplace(sub->watcher); - - bool success = watchDir(*sub->watcher, fullpath, sub->tree); - if (!success) { - sub->tree->remove(fullpath); - return false; + tree->add(fullpath, CONVERT_TIME(st.st_mtim), S_ISDIR(st.st_mode)); + + // Notify all watchers with the same tree. + for (auto i = subs.begin(); i != subs.end(); i++) { + KqueueSubscription *sub = *i; + if (sub->tree == tree) { + if (sub->watcher->isIgnored(fullpath)) { + continue; + } + + sub->watcher->mEvents.create(fullpath); + watchers.emplace(sub->watcher); + + bool success = watchDir(*sub->watcher, fullpath, sub->tree); + if (!success) { + sub->tree->remove(fullpath); + return false; + } + } } } } } - for (auto it = subs.begin(); it != subs.end(); it++) { - KqueueSubscription *sub = *it; - for (auto it = sub->tree->entries.begin(); it != sub->tree->entries.end();) { - if (it->first.rfind(dirStart, 0) == 0 && entries.count(it->first) == 0) { - sub->watcher->mEvents.remove(it->first); - watchers.emplace(sub->watcher); - mFdToEntry.erase((int)(size_t)it->second.state); - mSubscriptions.erase(it->first); - it = sub->tree->entries.erase(it); + for (auto it = trees.begin(); it != trees.end(); it++) { + std::shared_ptr tree = *it; + for (auto entry = tree->entries.begin(); entry != tree->entries.end();) { + if (entry->first.rfind(dirStart, 0) == 0 && entries.count(entry->first) == 0) { + // Notify all watchers with the same tree. + for (auto i = subs.begin(); i != subs.end(); i++) { + if ((*i)->tree == tree) { + KqueueSubscription *sub = *i; + if (!sub->watcher->isIgnored(entry->first)) { + sub->watcher->mEvents.remove(entry->first); + watchers.emplace(sub->watcher); + } + } + } + + mFdToEntry.erase((int)(size_t)entry->second.state); + mSubscriptions.erase(entry->first); + entry = tree->entries.erase(entry); } else { - it++; + entry++; } } } diff --git a/test/watcher.js b/test/watcher.js index 6f7a2774..5cb92c46 100644 --- a/test/watcher.js +++ b/test/watcher.js @@ -556,45 +556,7 @@ describe('watcher', () => { }); }); - describe('ignore', () => { - it('should ignore a directory', async () => { - let f1 = getFilename(); - let f2 = getFilename(path.basename(ignoreDir)); - await fs.mkdir(ignoreDir); - - fs.writeFile(f1, 'hello'); - fs.writeFile(f2, 'sup'); - - let res = await nextEvent(); - assert.deepEqual(res, [{type: 'create', path: f1}]); - }); - - it('should ignore a file', async () => { - let f1 = getFilename(); - - fs.writeFile(f1, 'hello'); - fs.writeFile(ignoreFile, 'sup'); - - let res = await nextEvent(); - assert.deepEqual(res, [{type: 'create', path: f1}]); - }); - - it('should ignore globs', async () => { - fs.writeFile(path.join(ignoreGlobDir, 'test.txt'), 'hello'); - fs.writeFile(path.join(ignoreGlobDir, 'test.ignore'), 'hello'); - fs.writeFile(path.join(ignoreGlobDir, 'ignore', 'test.txt'), 'hello'); - fs.writeFile(path.join(ignoreGlobDir, 'ignore', 'test.ignore'), 'hello'); - fs.writeFile(path.join(ignoreGlobDir, 'erongi', 'test.txt'), 'hello'); - fs.writeFile(path.join(ignoreGlobDir, 'erongi', 'deep', 'test.txt'), 'hello'); - - let res = await nextEvent(); - assert.deepEqual(res, [ - {type: 'create', path: path.join(ignoreGlobDir, 'test.txt')}, - ]); - }); - }); - - describe('multiple', () => { + describe.skip('multiple', () => { it('should support multiple watchers for the same directory', async () => { let dir = path.join( fs.realpathSync(require('os').tmpdir()), @@ -850,6 +812,44 @@ describe('watcher', () => { await worker.terminate(); }); }); + + describe('ignore', () => { + it('should ignore a directory', async () => { + let f1 = getFilename(); + let f2 = getFilename(path.basename(ignoreDir)); + await fs.mkdir(ignoreDir); + + fs.writeFile(f1, 'hello'); + fs.writeFile(f2, 'sup'); + + let res = await nextEvent(); + assert.deepEqual(res, [{type: 'create', path: f1}]); + }); + + it('should ignore a file', async () => { + let f1 = getFilename(); + + fs.writeFile(f1, 'hello'); + fs.writeFile(ignoreFile, 'sup'); + + let res = await nextEvent(); + assert.deepEqual(res, [{type: 'create', path: f1}]); + }); + + it('should ignore globs', async () => { + fs.writeFile(path.join(ignoreGlobDir, 'test.txt'), 'hello'); + fs.writeFile(path.join(ignoreGlobDir, 'test.ignore'), 'hello'); + fs.writeFile(path.join(ignoreGlobDir, 'ignore', 'test.txt'), 'hello'); + fs.writeFile(path.join(ignoreGlobDir, 'ignore', 'test.ignore'), 'hello'); + fs.writeFile(path.join(ignoreGlobDir, 'erongi', 'test.txt'), 'hello'); + fs.writeFile(path.join(ignoreGlobDir, 'erongi', 'deep', 'test.txt'), 'hello'); + + let res = await nextEvent(); + assert.deepEqual(res, [ + {type: 'create', path: path.join(ignoreGlobDir, 'test.txt')}, + ]); + }); + }); }); }); From d491099daf7c6bb30a88f838f69af26cacc25755 Mon Sep 17 00:00:00 2001 From: Devon Govett Date: Sat, 29 Jul 2023 20:09:25 -0700 Subject: [PATCH 12/12] remove skip --- test/watcher.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/watcher.js b/test/watcher.js index 5cb92c46..d574048a 100644 --- a/test/watcher.js +++ b/test/watcher.js @@ -556,7 +556,7 @@ describe('watcher', () => { }); }); - describe.skip('multiple', () => { + describe('multiple', () => { it('should support multiple watchers for the same directory', async () => { let dir = path.join( fs.realpathSync(require('os').tmpdir()),