Skip to content

Commit

Permalink
src: fix implementation of Signal
Browse files Browse the repository at this point in the history
PR-URL: #1216
Reviewed-By: Michael Dawson <midawson@redhat.com
Reviewed-By: Chengzhong Wu <legendecas@gmail.com>
  • Loading branch information
KevinEady authored and mhdawson committed Dec 2, 2022
1 parent de5a502 commit edf630c
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 25 deletions.
25 changes: 23 additions & 2 deletions doc/async_worker_variants.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,11 @@ virtual void Napi::AsyncProgressWorker::OnOK();

### OnProgress

This method is invoked when the computation in the `Napi::AsyncProgressWorker::ExecutionProcess::Send`
method was called during worker thread execution.
This method is invoked when the computation in the
`Napi::AsyncProgressWorker::ExecutionProcess::Send` method was called during
worker thread execution. This method can also be triggered via a call to
`Napi::AsyncProgress[Queue]Worker::ExecutionProcess::Signal`, in which case the
`data` parameter will be `nullptr`.

```cpp
virtual void Napi::AsyncProgressWorker::OnProgress(const T* data, size_t count)
Expand Down Expand Up @@ -251,6 +254,15 @@ class instead which is documented further down this page.
void Napi::AsyncProgressWorker::ExecutionProcess::Send(const T* data, size_t count) const;
```
### Signal
`Napi::AsyncProgressWorker::ExecutionProcess::Signal` triggers an invocation of
`Napi::AsyncProgressWorker::OnProgress` with `nullptr` as the `data` parameter.
```cpp
void Napi::AsyncProgressWorker::ExecutionProcess::Signal();
```

## Example

The first step to use the `Napi::AsyncProgressWorker` class is to create a new class that
Expand Down Expand Up @@ -415,6 +427,15 @@ with each data item.
void Napi::AsyncProgressQueueWorker::ExecutionProcess::Send(const T* data, size_t count) const;
```
### Signal
`Napi::AsyncProgressQueueWorker::ExecutionProcess::Signal` triggers an invocation of
`Napi::AsyncProgressQueueWorker::OnProgress` with `nullptr` as the `data` parameter.
```cpp
void Napi::AsyncProgressQueueWorker::ExecutionProcess::Signal() const;
```

## Example

The code below shows an example of the `Napi::AsyncProgressQueueWorker` implementation, but
Expand Down
21 changes: 15 additions & 6 deletions napi-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -5940,7 +5940,8 @@ inline AsyncProgressWorker<T>::AsyncProgressWorker(const Object& receiver,
const Object& resource)
: AsyncProgressWorkerBase(receiver, callback, resource_name, resource),
_asyncdata(nullptr),
_asyncsize(0) {}
_asyncsize(0),
_signaled(false) {}

#if NAPI_VERSION > 4
template <class T>
Expand Down Expand Up @@ -5980,12 +5981,15 @@ template <class T>
inline void AsyncProgressWorker<T>::OnWorkProgress(void*) {
T* data;
size_t size;
bool signaled;
{
std::lock_guard<std::mutex> lock(this->_mutex);
data = this->_asyncdata;
size = this->_asyncsize;
signaled = this->_signaled;
this->_asyncdata = nullptr;
this->_asyncsize = 0;
this->_signaled = false;
}

/**
Expand All @@ -5995,7 +5999,7 @@ inline void AsyncProgressWorker<T>::OnWorkProgress(void*) {
* the deferring the signal of uv_async_t is been sent again, i.e. potential
* not coalesced two calls of the TSFN callback.
*/
if (data == nullptr) {
if (data == nullptr && !signaled) {
return;
}

Expand All @@ -6014,20 +6018,25 @@ inline void AsyncProgressWorker<T>::SendProgress_(const T* data, size_t count) {
old_data = _asyncdata;
_asyncdata = new_data;
_asyncsize = count;
_signaled = false;
}
this->NonBlockingCall(nullptr);

delete[] old_data;
}

template <class T>
inline void AsyncProgressWorker<T>::Signal() const {
inline void AsyncProgressWorker<T>::Signal() {
{
std::lock_guard<std::mutex> lock(this->_mutex);
_signaled = true;
}
this->NonBlockingCall(static_cast<T*>(nullptr));
}

template <class T>
inline void AsyncProgressWorker<T>::ExecutionProgress::Signal() const {
_worker->Signal();
this->_worker->Signal();
}

template <class T>
Expand Down Expand Up @@ -6130,7 +6139,7 @@ inline void AsyncProgressQueueWorker<T>::SendProgress_(const T* data,

template <class T>
inline void AsyncProgressQueueWorker<T>::Signal() const {
this->NonBlockingCall(nullptr);
this->SendProgress_(static_cast<T*>(nullptr), 0);
}

template <class T>
Expand All @@ -6142,7 +6151,7 @@ inline void AsyncProgressQueueWorker<T>::OnWorkComplete(Napi::Env env,

template <class T>
inline void AsyncProgressQueueWorker<T>::ExecutionProgress::Signal() const {
_worker->Signal();
_worker->SendProgress_(static_cast<T*>(nullptr), 0);
}

template <class T>
Expand Down
3 changes: 2 additions & 1 deletion napi.h
Original file line number Diff line number Diff line change
Expand Up @@ -3004,12 +3004,13 @@ class AsyncProgressWorker : public AsyncProgressWorkerBase<void> {

private:
void Execute() override;
void Signal() const;
void Signal();
void SendProgress_(const T* data, size_t count);

std::mutex _mutex;
T* _asyncdata;
size_t _asyncsize;
bool _signaled;
};

template <class T>
Expand Down
16 changes: 13 additions & 3 deletions test/async_progress_queue_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class TestWorker : public AsyncProgressQueueWorker<ProgressData> {

if (_times < 0) {
SetError("test error");
} else {
progress.Signal();
}
ProgressData data{0};
for (int32_t idx = 0; idx < _times; idx++) {
Expand All @@ -49,11 +51,18 @@ class TestWorker : public AsyncProgressQueueWorker<ProgressData> {
}
}

void OnProgress(const ProgressData* data, size_t /* count */) override {
void OnProgress(const ProgressData* data, size_t count) override {
Napi::Env env = Env();
_test_case_count++;
if (!_js_progress_cb.IsEmpty()) {
Number progress = Number::New(env, data->progress);
_js_progress_cb.Call(Receiver().Value(), {progress});
if (_test_case_count == 1) {
if (count != 0) {
SetError("expect 0 count of data on 1st call");
}
} else {
Number progress = Number::New(env, data->progress);
_js_progress_cb.Call(Receiver().Value(), {progress});
}
}
}

Expand All @@ -68,6 +77,7 @@ class TestWorker : public AsyncProgressQueueWorker<ProgressData> {
}

int32_t _times;
size_t _test_case_count = 0;
FunctionReference _js_progress_cb;
};

Expand Down
74 changes: 65 additions & 9 deletions test/async_progress_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,17 @@ class MalignWorker : public AsyncProgressWorker<ProgressData> {

protected:
void Execute(const ExecutionProgress& progress) override {
std::unique_lock<std::mutex> lock(_cvm);
// Testing a nullptr send is acceptable.
progress.Send(nullptr, 0);
_cv.wait(lock);
{
std::unique_lock<std::mutex> lock(_cvm);
// Testing a nullptr send is acceptable.
progress.Send(nullptr, 0);
_cv.wait(lock, [this] { return _test_case_count == 1; });
}
{
std::unique_lock<std::mutex> lock(_cvm);
progress.Signal();
_cv.wait(lock, [this] { return _test_case_count == 2; });
}
// Testing busy looping on send doesn't trigger unexpected empty data
// OnProgress call.
for (size_t i = 0; i < 1000000; i++) {
Expand All @@ -92,16 +99,21 @@ class MalignWorker : public AsyncProgressWorker<ProgressData> {

void OnProgress(const ProgressData* /* data */, size_t count) override {
Napi::Env env = Env();
_test_case_count++;
{
std::lock_guard<std::mutex> lock(_cvm);
_test_case_count++;
}
bool error = false;
Napi::String reason = Napi::String::New(env, "No error");
if (_test_case_count == 1 && count != 0) {
if (_test_case_count <= 2 && count != 0) {
error = true;
reason = Napi::String::New(env, "expect 0 count of data on 1st call");
reason =
Napi::String::New(env, "expect 0 count of data on 1st and 2nd call");
}
if (_test_case_count > 1 && count != 1) {
if (_test_case_count > 2 && count != 1) {
error = true;
reason = Napi::String::New(env, "expect 1 count of data on non-1st call");
reason = Napi::String::New(
env, "expect 1 count of data on non-1st and non-2nd call");
}
_progress.MakeCallback(Receiver().Value(),
{Napi::Boolean::New(env, error), reason});
Expand All @@ -122,12 +134,56 @@ class MalignWorker : public AsyncProgressWorker<ProgressData> {
std::mutex _cvm;
FunctionReference _progress;
};

// Calling a Signal after a SendProgress should not clear progress data
class SignalAfterProgressTestWorker : public AsyncProgressWorker<ProgressData> {
public:
static void DoWork(const CallbackInfo& info) {
Function cb = info[0].As<Function>();
Function progress = info[1].As<Function>();

SignalAfterProgressTestWorker* worker = new SignalAfterProgressTestWorker(
cb, progress, "TestResource", Object::New(info.Env()));
worker->Queue();
}

protected:
void Execute(const ExecutionProgress& progress) override {
ProgressData data{0};
progress.Send(&data, 1);
progress.Signal();
}

void OnProgress(const ProgressData* /* data */, size_t count) override {
Napi::Env env = Env();
bool error = false;
Napi::String reason = Napi::String::New(env, "No error");
if (count != 1) {
error = true;
reason = Napi::String::New(env, "expect 1 count of data");
}
_progress.MakeCallback(Receiver().Value(),
{Napi::Boolean::New(env, error), reason});
}

private:
SignalAfterProgressTestWorker(Function cb,
Function progress,
const char* resource_name,
const Object& resource)
: AsyncProgressWorker(cb, resource_name, resource) {
_progress.Reset(progress, 1);
}
FunctionReference _progress;
};
} // namespace

Object InitAsyncProgressWorker(Env env) {
Object exports = Object::New(env);
exports["doWork"] = Function::New(env, TestWorker::DoWork);
exports["doMalignTest"] = Function::New(env, MalignWorker::DoWork);
exports["doSignalAfterProgressTest"] =
Function::New(env, SignalAfterProgressTestWorker::DoWork);
return exports;
}

Expand Down
13 changes: 9 additions & 4 deletions test/async_progress_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ module.exports = common.runTest(test);
async function test ({ asyncprogressworker }) {
await success(asyncprogressworker);
await fail(asyncprogressworker);
await malignTest(asyncprogressworker);
await signalTest(asyncprogressworker.doMalignTest);
await signalTest(asyncprogressworker.doSignalAfterProgressTest);
}

function success (binding) {
Expand Down Expand Up @@ -44,17 +45,21 @@ function fail (binding) {
});
}

function malignTest (binding) {
function signalTest (bindingFunction) {
return new Promise((resolve, reject) => {
binding.doMalignTest(
bindingFunction(
common.mustCall((err) => {
if (err) {
return reject(err);
}
resolve();
}),
common.mustCallAtLeast((error, reason) => {
assert(!error, reason);
try {
assert(!error, reason);
} catch (e) {
reject(e);
}
}, 1)
);
});
Expand Down

0 comments on commit edf630c

Please sign in to comment.