From 84b05d3918ff68138a3e0b7155e17fdad0571380 Mon Sep 17 00:00:00 2001 From: Robin Christ Date: Sun, 29 Dec 2019 16:56:38 +0100 Subject: [PATCH] Add improved C++17 Async Workers --- nan.h | 404 ++-------------------------------- nan_worker_cpp17.tcc | 497 ++++++++++++++++++++++++++++++++++++++++++ nan_worker_legacy.tcc | 408 ++++++++++++++++++++++++++++++++++ 3 files changed, 918 insertions(+), 391 deletions(-) create mode 100644 nan_worker_cpp17.tcc create mode 100644 nan_worker_legacy.tcc diff --git a/nan.h b/nan.h index c616ff5f..8e9c7187 100644 --- a/nan.h +++ b/nan.h @@ -57,6 +57,10 @@ # define NAN_ENABLE_STRING_VIEW NAN_HAS_CPLUSPLUS_17 #endif +#ifndef NAN_ENABLE_CPP17_WORKERS +# define NAN_ENABLE_CPP17_WORKERS NAN_HAS_CPLUSPLUS_17 +#endif + #include #include #include @@ -81,6 +85,10 @@ #if NAN_ENABLE_STRING_VIEW # include #endif +#if NAN_ENABLE_CPP17_WORKERS +# include +#endif + // uv helpers #ifdef UV_VERSION_MAJOR @@ -1910,398 +1918,12 @@ inline MaybeLocal Call( Call(method_string, recv, argc, argv).FromMaybe(v8::Local())); } -/* abstract */ class AsyncWorker { - public: - explicit AsyncWorker(Callback *callback_, - const char* resource_name = "nan:AsyncWorker") - : callback(callback_), errmsg_(NULL) { - request.data = this; - - HandleScope scope; - v8::Local obj = New(); - persistentHandle.Reset(obj); - async_resource = new AsyncResource(resource_name, obj); - } - - virtual ~AsyncWorker() { - HandleScope scope; - - if (!persistentHandle.IsEmpty()) - persistentHandle.Reset(); - delete callback; - delete[] errmsg_; - delete async_resource; - } - - virtual void WorkComplete() { - HandleScope scope; - - if (errmsg_ == NULL) - HandleOKCallback(); - else - HandleErrorCallback(); - delete callback; - callback = NULL; - } - - inline void SaveToPersistent( - const char *key, const v8::Local &value) { - HandleScope scope; - Set(New(persistentHandle), New(key).ToLocalChecked(), value).FromJust(); - } - - inline void SaveToPersistent( - const v8::Local &key, const v8::Local &value) { - HandleScope scope; - Set(New(persistentHandle), key, value).FromJust(); - } - - inline void SaveToPersistent( - uint32_t index, const v8::Local &value) { - HandleScope scope; - Set(New(persistentHandle), index, value).FromJust(); - } - - inline v8::Local GetFromPersistent(const char *key) const { - EscapableHandleScope scope; - return scope.Escape( - Get(New(persistentHandle), New(key).ToLocalChecked()) - .FromMaybe(v8::Local())); - } - - inline v8::Local - GetFromPersistent(const v8::Local &key) const { - EscapableHandleScope scope; - return scope.Escape( - Get(New(persistentHandle), key) - .FromMaybe(v8::Local())); - } - - inline v8::Local GetFromPersistent(uint32_t index) const { - EscapableHandleScope scope; - return scope.Escape( - Get(New(persistentHandle), index) - .FromMaybe(v8::Local())); - } - - virtual void Execute() = 0; - - uv_work_t request; - - virtual void Destroy() { - delete this; - } - - protected: - Persistent persistentHandle; - Callback *callback; - AsyncResource *async_resource; - - virtual void HandleOKCallback() { - HandleScope scope; - - callback->Call(0, NULL, async_resource); - } - - virtual void HandleErrorCallback() { - HandleScope scope; - - v8::Local argv[] = { - v8::Exception::Error(New(ErrorMessage()).ToLocalChecked()) - }; - callback->Call(1, argv, async_resource); - } - - void SetErrorMessage(const char *msg) { - delete[] errmsg_; - - size_t size = strlen(msg) + 1; - errmsg_ = new char[size]; - memcpy(errmsg_, msg, size); - } - - const char* ErrorMessage() const { - return errmsg_; - } - - private: - NAN_DISALLOW_ASSIGN_COPY_MOVE(AsyncWorker) - char *errmsg_; -}; - -/* abstract */ class AsyncBareProgressWorkerBase : public AsyncWorker { - public: - explicit AsyncBareProgressWorkerBase( - Callback *callback_, - const char* resource_name = "nan:AsyncBareProgressWorkerBase") - : AsyncWorker(callback_, resource_name) { - uv_async_init( - GetCurrentEventLoop() - , &async - , AsyncProgress_ - ); - async.data = this; - } - - virtual ~AsyncBareProgressWorkerBase() { - } - - virtual void WorkProgress() = 0; - - virtual void Destroy() { - uv_close(reinterpret_cast(&async), AsyncClose_); - } - - private: - inline static NAUV_WORK_CB(AsyncProgress_) { - AsyncBareProgressWorkerBase *worker = - static_cast(async->data); - worker->WorkProgress(); - } - - inline static void AsyncClose_(uv_handle_t* handle) { - AsyncBareProgressWorkerBase *worker = - static_cast(handle->data); - delete worker; - } - - protected: - uv_async_t async; -}; - -template -/* abstract */ -class AsyncBareProgressWorker : public AsyncBareProgressWorkerBase { - public: - explicit AsyncBareProgressWorker( - Callback *callback_, - const char* resource_name = "nan:AsyncBareProgressWorker") - : AsyncBareProgressWorkerBase(callback_, resource_name) { - uv_mutex_init(&async_lock); - } - - virtual ~AsyncBareProgressWorker() { - uv_mutex_destroy(&async_lock); - } - - class ExecutionProgress { - friend class AsyncBareProgressWorker; - public: - void Signal() const { - uv_mutex_lock(&that_->async_lock); - uv_async_send(&that_->async); - uv_mutex_unlock(&that_->async_lock); - } - - void Send(const T* data, size_t count) const { - that_->SendProgress_(data, count); - } - - private: - explicit ExecutionProgress(AsyncBareProgressWorker *that) : that_(that) {} - NAN_DISALLOW_ASSIGN_COPY_MOVE(ExecutionProgress) - AsyncBareProgressWorker* const that_; - }; - - virtual void Execute(const ExecutionProgress& progress) = 0; - virtual void HandleProgressCallback(const T *data, size_t size) = 0; - - protected: - uv_mutex_t async_lock; - - private: - void Execute() /*final override*/ { - ExecutionProgress progress(this); - Execute(progress); - } - - virtual void SendProgress_(const T *data, size_t count) = 0; -}; - -template -/* abstract */ -class AsyncProgressWorkerBase : public AsyncBareProgressWorker { - public: - explicit AsyncProgressWorkerBase( - Callback *callback_, - const char* resource_name = "nan:AsyncProgressWorkerBase") - : AsyncBareProgressWorker(callback_, resource_name), asyncdata_(NULL), - asyncsize_(0) { - } - - virtual ~AsyncProgressWorkerBase() { - delete[] asyncdata_; - } - - void WorkProgress() { - uv_mutex_lock(&this->async_lock); - T *data = asyncdata_; - size_t size = asyncsize_; - asyncdata_ = NULL; - asyncsize_ = 0; - uv_mutex_unlock(&this->async_lock); - - // Don't send progress events after we've already completed. - if (this->callback) { - this->HandleProgressCallback(data, size); - } - delete[] data; - } - - private: - void SendProgress_(const T *data, size_t count) { - T *new_data = new T[count]; - std::copy(data, data + count, new_data); - - uv_mutex_lock(&this->async_lock); - T *old_data = asyncdata_; - asyncdata_ = new_data; - asyncsize_ = count; - uv_async_send(&this->async); - uv_mutex_unlock(&this->async_lock); - - delete[] old_data; - } - - T *asyncdata_; - size_t asyncsize_; -}; - -// This ensures compatibility to the previous un-templated AsyncProgressWorker -// class definition. -typedef AsyncProgressWorkerBase AsyncProgressWorker; - -template -/* abstract */ -class AsyncBareProgressQueueWorker : public AsyncBareProgressWorkerBase { - public: - explicit AsyncBareProgressQueueWorker( - Callback *callback_, - const char* resource_name = "nan:AsyncBareProgressQueueWorker") - : AsyncBareProgressWorkerBase(callback_, resource_name) { - } - - virtual ~AsyncBareProgressQueueWorker() { - } - - class ExecutionProgress { - friend class AsyncBareProgressQueueWorker; - public: - void Send(const T* data, size_t count) const { - that_->SendProgress_(data, count); - } - - private: - explicit ExecutionProgress(AsyncBareProgressQueueWorker *that) - : that_(that) {} - NAN_DISALLOW_ASSIGN_COPY_MOVE(ExecutionProgress) - AsyncBareProgressQueueWorker* const that_; - }; - - virtual void Execute(const ExecutionProgress& progress) = 0; - virtual void HandleProgressCallback(const T *data, size_t size) = 0; - - private: - void Execute() /*final override*/ { - ExecutionProgress progress(this); - Execute(progress); - } - - virtual void SendProgress_(const T *data, size_t count) = 0; -}; - -template -/* abstract */ -class AsyncProgressQueueWorker : public AsyncBareProgressQueueWorker { - public: - explicit AsyncProgressQueueWorker( - Callback *callback_, - const char* resource_name = "nan:AsyncProgressQueueWorker") - : AsyncBareProgressQueueWorker(callback_) { - uv_mutex_init(&async_lock); - } - - virtual ~AsyncProgressQueueWorker() { - uv_mutex_lock(&async_lock); - - while (!asyncdata_.empty()) { - std::pair &datapair = asyncdata_.front(); - T *data = datapair.first; - - asyncdata_.pop(); - - delete[] data; - } - - uv_mutex_unlock(&async_lock); - uv_mutex_destroy(&async_lock); - } - - void WorkComplete() { - WorkProgress(); - AsyncWorker::WorkComplete(); - } - - void WorkProgress() { - uv_mutex_lock(&async_lock); - - while (!asyncdata_.empty()) { - std::pair &datapair = asyncdata_.front(); - - T *data = datapair.first; - size_t size = datapair.second; - - asyncdata_.pop(); - uv_mutex_unlock(&async_lock); - - // Don't send progress events after we've already completed. - if (this->callback) { - this->HandleProgressCallback(data, size); - } - - delete[] data; - - uv_mutex_lock(&async_lock); - } - - uv_mutex_unlock(&async_lock); - } - - private: - void SendProgress_(const T *data, size_t count) { - T *new_data = new T[count]; - std::copy(data, data + count, new_data); - - uv_mutex_lock(&async_lock); - asyncdata_.push(std::pair(new_data, count)); - uv_mutex_unlock(&async_lock); - - uv_async_send(&this->async); - } - - uv_mutex_t async_lock; - std::queue > asyncdata_; -}; - -inline void AsyncExecute (uv_work_t* req) { - AsyncWorker *worker = static_cast(req->data); - worker->Execute(); -} - -inline void AsyncExecuteComplete (uv_work_t* req) { - AsyncWorker* worker = static_cast(req->data); - worker->WorkComplete(); - worker->Destroy(); -} +#if NAN_ENABLE_CPP17_WORKERS +# include "nan_worker_cpp17.tcc" +#else +# include "nan_worker_legacy.tcc" +#endif -inline void AsyncQueueWorker (AsyncWorker* worker) { - uv_queue_work( - GetCurrentEventLoop() - , &worker->request - , AsyncExecute - , reinterpret_cast(AsyncExecuteComplete) - ); -} namespace imp { diff --git a/nan_worker_cpp17.tcc b/nan_worker_cpp17.tcc new file mode 100644 index 00000000..27b2e625 --- /dev/null +++ b/nan_worker_cpp17.tcc @@ -0,0 +1,497 @@ +/********************************************************************* + * NAN - Native Abstractions for Node.js + * + * Copyright (c) 2018 NAN contributors + * + * MIT License + ********************************************************************/ + +#ifndef NAN_H_ +# error You cannot include nan_worker_cpp17.tcc outside of nan.h! +#endif + +#if !NAN_ENABLE_CPP17_WORKERS +# error You need to include nan_worker_legacy.tcc if you want to use legacy workers! +#endif + +/* abstract */ +class AsyncWorker { +public: + explicit AsyncWorker( + Callback *callback_, + const char* resource_name = "nan::AsyncWorker" + ) : + callback(callback_), + errmsg_(nullptr) + { + request.data = this; + + HandleScope scope; + v8::Local obj = New(); + persistentHandle.Reset(obj); + async_resource = new AsyncResource(resource_name, obj); + } + + virtual ~AsyncWorker() { + HandleScope scope; + + if (!persistentHandle.IsEmpty()) { + persistentHandle.Reset(); + } + delete callback; + delete[] errmsg_; + delete async_resource; + } + + virtual void WorkComplete() { + HandleScope scope; + + if (errmsg_ == nullptr) { + HandleOKCallback(); + } else { + HandleErrorCallback(); + } + + delete callback; + callback = nullptr; + } + + inline void SaveToPersistent( + const char *key, + const v8::Local& value + ) { + HandleScope scope; + Set(New(persistentHandle), New(key).ToLocalChecked(), value).FromJust(); + } + + inline void SaveToPersistent( + const v8::Local& key, + const v8::Local& value + ) { + HandleScope scope; + Set(New(persistentHandle), key, value).FromJust(); + } + + inline void SaveToPersistent( + uint32_t index, + const v8::Local& value + ) { + HandleScope scope; + Set(New(persistentHandle), index, value).FromJust(); + } + + inline v8::Local GetFromPersistent(const char *key) const { + EscapableHandleScope scope; + return scope.Escape( + Get(New(persistentHandle), New(key).ToLocalChecked()) + .FromMaybe(v8::Local())); + } + + inline v8::Local + GetFromPersistent(const v8::Local &key) const { + EscapableHandleScope scope; + return scope.Escape( + Get(New(persistentHandle), key) + .FromMaybe(v8::Local())); + } + + inline v8::Local GetFromPersistent(uint32_t index) const { + EscapableHandleScope scope; + return scope.Escape( + Get(New(persistentHandle), index) + .FromMaybe(v8::Local())); + } + + virtual void Execute() = 0; + + uv_work_t request; + + virtual void Destroy() { + delete this; + } + +protected: + Persistent persistentHandle; + Callback *callback; + AsyncResource *async_resource; + + virtual void HandleOKCallback() { + HandleScope scope; + + callback->Call(0, nullptr, async_resource); + } + + virtual void HandleErrorCallback() { + HandleScope scope; + + v8::Local argv[] = { + v8::Exception::Error(New(ErrorMessage()).ToLocalChecked()) + }; + callback->Call(1, argv, async_resource); + } + + void SetErrorMessage(const char *msg) { + delete[] errmsg_; + + size_t size = strlen(msg) + 1; + errmsg_ = new char[size]; + memcpy(errmsg_, msg, size); + } + + const char* ErrorMessage() const { + return errmsg_; + } + +private: + NAN_DISALLOW_ASSIGN_COPY_MOVE(AsyncWorker) + char *errmsg_; +}; + +/* abstract */ +class AsyncBareProgressWorkerBase : public AsyncWorker { +public: + explicit AsyncBareProgressWorkerBase( + Callback *callback_, + const char* resource_name = "nan::AsyncBareProgressWorkerBase" + ) : + AsyncWorker(callback_, resource_name) + { + uv_async_init( + GetCurrentEventLoop(), + &async, + AsyncProgress_ + ); + async.data = this; + } + + virtual ~AsyncBareProgressWorkerBase() = default; + + virtual void WorkProgress() = 0; + + virtual void Destroy() { + uv_close(reinterpret_cast(&async), AsyncClose_); + } + + private: + inline static NAUV_WORK_CB(AsyncProgress_) { + AsyncBareProgressWorkerBase *worker = + static_cast(async->data); + worker->WorkProgress(); + } + + inline static void AsyncClose_(uv_handle_t* handle) { + AsyncBareProgressWorkerBase *worker = + static_cast(handle->data); + delete worker; + } + +protected: + uv_async_t async; +}; + +template +/* abstract */ +class AsyncBareProgressWorker : public AsyncBareProgressWorkerBase { +public: + explicit AsyncBareProgressWorker( + Callback *callback_, + const char* resource_name = "nan::AsyncBareProgressWorker" + ) : + AsyncBareProgressWorkerBase(callback_, resource_name) + { + uv_mutex_init(&async_lock); + } + + virtual ~AsyncBareProgressWorker() { + uv_mutex_destroy(&async_lock); + } + + class ExecutionProgress { + friend class AsyncBareProgressWorker; + public: + void Signal() const { + uv_mutex_lock(&that_->async_lock); + uv_async_send(&that_->async); + uv_mutex_unlock(&that_->async_lock); + } + + void Send(const T* data, size_t count) const { + that_->SendProgress_(data, count); + } + + private: + explicit ExecutionProgress(AsyncBareProgressWorker *that) : that_(that) {} + NAN_DISALLOW_ASSIGN_COPY_MOVE(ExecutionProgress) + AsyncBareProgressWorker* const that_; + }; + + virtual void Execute(const ExecutionProgress& progress) = 0; + virtual void HandleProgressCallback(const T *data, size_t size) = 0; + + protected: + uv_mutex_t async_lock; + + private: + void Execute() /*final override*/ { + ExecutionProgress progress(this); + Execute(progress); + } + + virtual void SendProgress_(const T *data, size_t count) = 0; +}; + +template +/* abstract */ +class AsyncProgressWorkerBase : public AsyncBareProgressWorker { +public: + explicit AsyncProgressWorkerBase( + Callback *callback_, + const char* resource_name = "nan::AsyncProgressWorkerBase" + ) : + AsyncBareProgressWorker(callback_, resource_name), + asyncdata_(nullptr), + asyncsize_(0) + { + } + + virtual ~AsyncProgressWorkerBase() { + delete[] asyncdata_; + } + + void WorkProgress() { + uv_mutex_lock(&this->async_lock); + T *data = asyncdata_; + size_t size = asyncsize_; + asyncdata_ = nullptr; + asyncsize_ = 0; + uv_mutex_unlock(&this->async_lock); + + // Don't send progress events after we've already completed. + if (this->callback) { + this->HandleProgressCallback(data, size); + } + delete[] data; + } + + private: + void SendProgress_(const T *data, size_t count) { + T *new_data = new T[count]; + std::copy(data, data + count, new_data); + + uv_mutex_lock(&this->async_lock); + T *old_data = asyncdata_; + asyncdata_ = new_data; + asyncsize_ = count; + uv_async_send(&this->async); + uv_mutex_unlock(&this->async_lock); + + delete[] old_data; + } + + T *asyncdata_; + size_t asyncsize_; +}; + +// This ensures compatibility to the previous un-templated AsyncProgressWorker +// class definition. +typedef AsyncProgressWorkerBase AsyncProgressWorker; + +template +/* abstract */ +class AsyncBareProgressQueueWorker : public AsyncBareProgressWorkerBase { +public: + explicit AsyncBareProgressQueueWorker( + Callback *callback_, + const char* resource_name = "nan::AsyncBareProgressQueueWorker" + ) : + AsyncBareProgressWorkerBase(callback_, resource_name) + { + } + + virtual ~AsyncBareProgressQueueWorker() = default; + + class ExecutionProgress { + friend class AsyncBareProgressQueueWorker; + + public: + void Send(std::unique_ptr messages, size_t nMessages) const { + that_->SendProgress_(std::move(messages), nMessages); + } + + NAN_DEPRECATED void Send(const T* messages, size_t nMessages) const { + that_->SendProgress_(messages, nMessages); + } + + private: + explicit ExecutionProgress(AsyncBareProgressQueueWorker *that) + : that_(that) {} + + NAN_DISALLOW_ASSIGN_COPY_MOVE(ExecutionProgress) + AsyncBareProgressQueueWorker* const that_; + }; + + virtual void Execute(const ExecutionProgress& progress) = 0; + virtual void HandleProgressCallback(const T *data, size_t size) = 0; + + private: + void Execute() final { + ExecutionProgress progress(this); + Execute(progress); + } + + virtual void SendProgress_(std::unique_ptr messages, size_t nMessages) = 0; + + NAN_DEPRECATED virtual void SendProgress_(const T *messages, size_t nMessages) = 0; +}; + +template +/* abstract */ +class AsyncProgressQueueWorker : public AsyncBareProgressQueueWorker { +public: + using ProgressMessageQueueType = + std::queue< + std::pair< + std::unique_ptr>, + size_t + > + >; + + explicit AsyncProgressQueueWorker( + Callback *callback_, + const char* resource_name = "nan::AsyncProgressQueueWorker" + ) : + AsyncBareProgressQueueWorker(callback_) + { + + } + + virtual ~AsyncProgressQueueWorker() { + //Just to be sure + std::lock_guard lockGuard(queueLock_); + + //Copy and swap, the held data is automatically deleted by the smart pointers + ProgressMessageQueueType().swap(progressMessageQueue_); + } + + virtual void WorkComplete() override { + WorkProgress(); + AsyncWorker::WorkComplete(); + } + + virtual void WorkProgress() override { + std::unique_lock lock(queueLock_); + + while (!progressMessageQueue_.empty()) { + auto& datapair = progressMessageQueue_.front(); + + auto buf = std::move(datapair.first); + auto nMessages = datapair.second; + + progressMessageQueue_.pop(); + lock.unlock(); + + // Don't send progress events after we've already completed. + if (this->callback) { + this->HandleProgressCallback(buf.get(), nMessages); + } + + //Delete the array held by the smtart pointer before we reacquire the lock in + //order to keep the time the queue is locked as low as possibles + buf.reset(); + + lock.lock(); + } + + //Lock will be automatically released + } + +private: + virtual void SendProgress_(std::unique_ptr messages, size_t nMessages) override { + { + std::lock_guard lockGuard(queueLock_); + progressMessageQueue_.emplace(std::move(messages), nMessages); + } + + uv_async_send(&this->async); + } + + NAN_DEPRECATED virtual void SendProgress_(const T * const messages, const size_t nMessages) override { + + auto buffer = [&]() -> std::unique_ptr> { + if(messages != nullptr) { + //Allocate memory + const size_t bufferSize = sizeof(T) * nMessages; + void* rawBuf = std::aligned_alloc(alignof(T), bufferSize); + if(!rawBuf) { + //No need to deallocate anything + throw std::bad_alloc(); + } + + auto messageBuf = static_cast(rawBuf); + //Wrap the buffer into a unique_ptr to prevent memory leaks + //We do not need to worry about std::unique_ptr's constructor to throw + //and produce a memory leak, because it is noexcept! + std::unique_ptr> messageBufferPtr( + messageBuf, + //Custom deleter! + [nMessages](T* ptr) { + //No need to wrap destroy_n in a try catch block + //If something goes wrong, std::terminate is called anyways + std::destroy_n(ptr, nMessages); + std::free(ptr); + } + ); + + //Copy the data to the uninitialised memory + if constexpr(std::is_trivially_copyable_v) { + //Use fast memcpy path + std::memcpy(rawBuf, messages, bufferSize); + } else { + //Use the generic way which does not require T to be default constructible + std::uninitialized_copy_n(messages, nMessages, messageBuf); + } + + return messageBufferPtr; + } else { + //Allow the user to send nullptrs if he desires to do so... + //We need to take special care because of memcpy and stuff + return std::unique_ptr(nullptr); + } + }(); + + { + std::lock_guard lockGuard(queueLock_); + progressMessageQueue_.emplace(std::move(buffer), nMessages); + } + + //Wake up the event loop! + uv_async_send(&this->async); + } + + std::mutex queueLock_; + ProgressMessageQueueType progressMessageQueue_; +}; + +inline void AsyncExecute(uv_work_t* req) { + AsyncWorker *worker = static_cast(req->data); + worker->Execute(); +} + +inline void AsyncExecuteComplete(uv_work_t* req) { + AsyncWorker* worker = static_cast(req->data); + worker->WorkComplete(); + worker->Destroy(); +} + +inline void AsyncQueueWorker(AsyncWorker* worker) { + uv_queue_work( + GetCurrentEventLoop(), + &worker->request, + AsyncExecute, + reinterpret_cast(AsyncExecuteComplete) + ); +} + +inline void LaunchAsyncWorker(AsyncWorker* worker) { + AsyncQueueWorker(worker); +} \ No newline at end of file diff --git a/nan_worker_legacy.tcc b/nan_worker_legacy.tcc new file mode 100644 index 00000000..a34e79a9 --- /dev/null +++ b/nan_worker_legacy.tcc @@ -0,0 +1,408 @@ +/********************************************************************* + * NAN - Native Abstractions for Node.js + * + * Copyright (c) 2018 NAN contributors + * + * MIT License + ********************************************************************/ + +#ifndef NAN_H_ +# error You cannot include nan_worker_legacy.tcc outside of nan.h! +#endif + +#if NAN_ENABLE_CPP17_WORKERS +# error You need to include nan_worker_cpp17.tcc if you want to use C++17 workers! +#endif + +/* abstract */ class AsyncWorker { + public: + explicit AsyncWorker(Callback *callback_, + const char* resource_name = "nan:AsyncWorker") + : callback(callback_), errmsg_(NULL) { + request.data = this; + + HandleScope scope; + v8::Local obj = New(); + persistentHandle.Reset(obj); + async_resource = new AsyncResource(resource_name, obj); + } + + virtual ~AsyncWorker() { + HandleScope scope; + + if (!persistentHandle.IsEmpty()) + persistentHandle.Reset(); + delete callback; + delete[] errmsg_; + delete async_resource; + } + + virtual void WorkComplete() { + HandleScope scope; + + if (errmsg_ == NULL) + HandleOKCallback(); + else + HandleErrorCallback(); + delete callback; + callback = NULL; + } + + inline void SaveToPersistent( + const char *key, const v8::Local &value) { + HandleScope scope; + Set(New(persistentHandle), New(key).ToLocalChecked(), value).FromJust(); + } + + inline void SaveToPersistent( + const v8::Local &key, const v8::Local &value) { + HandleScope scope; + Set(New(persistentHandle), key, value).FromJust(); + } + + inline void SaveToPersistent( + uint32_t index, const v8::Local &value) { + HandleScope scope; + Set(New(persistentHandle), index, value).FromJust(); + } + + inline v8::Local GetFromPersistent(const char *key) const { + EscapableHandleScope scope; + return scope.Escape( + Get(New(persistentHandle), New(key).ToLocalChecked()) + .FromMaybe(v8::Local())); + } + + inline v8::Local + GetFromPersistent(const v8::Local &key) const { + EscapableHandleScope scope; + return scope.Escape( + Get(New(persistentHandle), key) + .FromMaybe(v8::Local())); + } + + inline v8::Local GetFromPersistent(uint32_t index) const { + EscapableHandleScope scope; + return scope.Escape( + Get(New(persistentHandle), index) + .FromMaybe(v8::Local())); + } + + virtual void Execute() = 0; + + uv_work_t request; + + virtual void Destroy() { + delete this; + } + + protected: + Persistent persistentHandle; + Callback *callback; + AsyncResource *async_resource; + + virtual void HandleOKCallback() { + HandleScope scope; + + callback->Call(0, NULL, async_resource); + } + + virtual void HandleErrorCallback() { + HandleScope scope; + + v8::Local argv[] = { + v8::Exception::Error(New(ErrorMessage()).ToLocalChecked()) + }; + callback->Call(1, argv, async_resource); + } + + void SetErrorMessage(const char *msg) { + delete[] errmsg_; + + size_t size = strlen(msg) + 1; + errmsg_ = new char[size]; + memcpy(errmsg_, msg, size); + } + + const char* ErrorMessage() const { + return errmsg_; + } + + private: + NAN_DISALLOW_ASSIGN_COPY_MOVE(AsyncWorker) + char *errmsg_; +}; + +/* abstract */ class AsyncBareProgressWorkerBase : public AsyncWorker { + public: + explicit AsyncBareProgressWorkerBase( + Callback *callback_, + const char* resource_name = "nan:AsyncBareProgressWorkerBase") + : AsyncWorker(callback_, resource_name) { + uv_async_init( + GetCurrentEventLoop() + , &async + , AsyncProgress_ + ); + async.data = this; + } + + virtual ~AsyncBareProgressWorkerBase() { + } + + virtual void WorkProgress() = 0; + + virtual void Destroy() { + uv_close(reinterpret_cast(&async), AsyncClose_); + } + + private: + inline static NAUV_WORK_CB(AsyncProgress_) { + AsyncBareProgressWorkerBase *worker = + static_cast(async->data); + worker->WorkProgress(); + } + + inline static void AsyncClose_(uv_handle_t* handle) { + AsyncBareProgressWorkerBase *worker = + static_cast(handle->data); + delete worker; + } + + protected: + uv_async_t async; +}; + +template +/* abstract */ +class AsyncBareProgressWorker : public AsyncBareProgressWorkerBase { + public: + explicit AsyncBareProgressWorker( + Callback *callback_, + const char* resource_name = "nan:AsyncBareProgressWorker") + : AsyncBareProgressWorkerBase(callback_, resource_name) { + uv_mutex_init(&async_lock); + } + + virtual ~AsyncBareProgressWorker() { + uv_mutex_destroy(&async_lock); + } + + class ExecutionProgress { + friend class AsyncBareProgressWorker; + public: + void Signal() const { + uv_mutex_lock(&that_->async_lock); + uv_async_send(&that_->async); + uv_mutex_unlock(&that_->async_lock); + } + + void Send(const T* data, size_t count) const { + that_->SendProgress_(data, count); + } + + private: + explicit ExecutionProgress(AsyncBareProgressWorker *that) : that_(that) {} + NAN_DISALLOW_ASSIGN_COPY_MOVE(ExecutionProgress) + AsyncBareProgressWorker* const that_; + }; + + virtual void Execute(const ExecutionProgress& progress) = 0; + virtual void HandleProgressCallback(const T *data, size_t size) = 0; + + protected: + uv_mutex_t async_lock; + + private: + void Execute() /*final override*/ { + ExecutionProgress progress(this); + Execute(progress); + } + + virtual void SendProgress_(const T *data, size_t count) = 0; +}; + +template +/* abstract */ +class AsyncProgressWorkerBase : public AsyncBareProgressWorker { + public: + explicit AsyncProgressWorkerBase( + Callback *callback_, + const char* resource_name = "nan:AsyncProgressWorkerBase") + : AsyncBareProgressWorker(callback_, resource_name), asyncdata_(NULL), + asyncsize_(0) { + } + + virtual ~AsyncProgressWorkerBase() { + delete[] asyncdata_; + } + + void WorkProgress() { + uv_mutex_lock(&this->async_lock); + T *data = asyncdata_; + size_t size = asyncsize_; + asyncdata_ = NULL; + asyncsize_ = 0; + uv_mutex_unlock(&this->async_lock); + + // Don't send progress events after we've already completed. + if (this->callback) { + this->HandleProgressCallback(data, size); + } + delete[] data; + } + + private: + void SendProgress_(const T *data, size_t count) { + T *new_data = new T[count]; + std::copy(data, data + count, new_data); + + uv_mutex_lock(&this->async_lock); + T *old_data = asyncdata_; + asyncdata_ = new_data; + asyncsize_ = count; + uv_async_send(&this->async); + uv_mutex_unlock(&this->async_lock); + + delete[] old_data; + } + + T *asyncdata_; + size_t asyncsize_; +}; + +// This ensures compatibility to the previous un-templated AsyncProgressWorker +// class definition. +typedef AsyncProgressWorkerBase AsyncProgressWorker; + +template +/* abstract */ +class AsyncBareProgressQueueWorker : public AsyncBareProgressWorkerBase { + public: + explicit AsyncBareProgressQueueWorker( + Callback *callback_, + const char* resource_name = "nan:AsyncBareProgressQueueWorker") + : AsyncBareProgressWorkerBase(callback_, resource_name) { + } + + virtual ~AsyncBareProgressQueueWorker() { + } + + class ExecutionProgress { + friend class AsyncBareProgressQueueWorker; + public: + void Send(const T* data, size_t count) const { + that_->SendProgress_(data, count); + } + + private: + explicit ExecutionProgress(AsyncBareProgressQueueWorker *that) + : that_(that) {} + NAN_DISALLOW_ASSIGN_COPY_MOVE(ExecutionProgress) + AsyncBareProgressQueueWorker* const that_; + }; + + virtual void Execute(const ExecutionProgress& progress) = 0; + virtual void HandleProgressCallback(const T *data, size_t size) = 0; + + private: + void Execute() /*final override*/ { + ExecutionProgress progress(this); + Execute(progress); + } + + virtual void SendProgress_(const T *data, size_t count) = 0; +}; + +template +/* abstract */ +class AsyncProgressQueueWorker : public AsyncBareProgressQueueWorker { + public: + explicit AsyncProgressQueueWorker( + Callback *callback_, + const char* resource_name = "nan:AsyncProgressQueueWorker") + : AsyncBareProgressQueueWorker(callback_) { + uv_mutex_init(&async_lock); + } + + virtual ~AsyncProgressQueueWorker() { + uv_mutex_lock(&async_lock); + + while (!asyncdata_.empty()) { + std::pair &datapair = asyncdata_.front(); + T *data = datapair.first; + + asyncdata_.pop(); + + delete[] data; + } + + uv_mutex_unlock(&async_lock); + uv_mutex_destroy(&async_lock); + } + + void WorkComplete() { + WorkProgress(); + AsyncWorker::WorkComplete(); + } + + void WorkProgress() { + uv_mutex_lock(&async_lock); + + while (!asyncdata_.empty()) { + std::pair &datapair = asyncdata_.front(); + + T *data = datapair.first; + size_t size = datapair.second; + + asyncdata_.pop(); + uv_mutex_unlock(&async_lock); + + // Don't send progress events after we've already completed. + if (this->callback) { + this->HandleProgressCallback(data, size); + } + + delete[] data; + + uv_mutex_lock(&async_lock); + } + + uv_mutex_unlock(&async_lock); + } + + private: + void SendProgress_(const T *data, size_t count) { + T *new_data = new T[count]; + std::copy(data, data + count, new_data); + + uv_mutex_lock(&async_lock); + asyncdata_.push(std::pair(new_data, count)); + uv_mutex_unlock(&async_lock); + + uv_async_send(&this->async); + } + + uv_mutex_t async_lock; + std::queue > asyncdata_; +}; + +inline void AsyncExecute (uv_work_t* req) { + AsyncWorker *worker = static_cast(req->data); + worker->Execute(); +} + +inline void AsyncExecuteComplete (uv_work_t* req) { + AsyncWorker* worker = static_cast(req->data); + worker->WorkComplete(); + worker->Destroy(); +} + +inline void AsyncQueueWorker (AsyncWorker* worker) { + uv_queue_work( + GetCurrentEventLoop() + , &worker->request + , AsyncExecute + , reinterpret_cast(AsyncExecuteComplete) + ); +} \ No newline at end of file