Skip to content

Commit

Permalink
#197: Implement progress to NanAsyncWorker.
Browse files Browse the repository at this point in the history
  • Loading branch information
brett19 committed Oct 26, 2014
1 parent 9e93ffa commit c73ddeb
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 0 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ LINT_SOURCES = \
examples/async_pi_estimate/sync.h \
nan.h \
test/cpp/asyncworker.cpp \
test/cpp/asyncprogressworker.cpp \
test/cpp/asyncworkererror.cpp \
test/cpp/bufferworkerpersistent.cpp \
test/cpp/gc.cpp \
Expand Down
118 changes: 118 additions & 0 deletions nan.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,27 @@

#undef notset

// uv helpers
#ifdef UV_VERSION_MAJOR
#ifndef UV_VERSION_PATCH
#define UV_VERSION_PATCH 0
#endif
#define NAUV_UVVERSION ((UV_VERSION_MAJOR << 16) | \
(UV_VERSION_MINOR << 8) | \
(UV_VERSION_PATCH))
#else
#define NAUV_UVVERSION 0x000b00
#endif


#if NAUV_UVVERSION < 0x000b00
#define NAUV_WORK_CB(func) \
void func(uv_async_t *async, int)
#else
#define NAUV_WORK_CB(func) \
void func(uv_async_t *async)
#endif

// some generic helpers

template<typename T> NAN_INLINE bool NanSetPointerSafe(
Expand Down Expand Up @@ -2034,6 +2055,103 @@ NAN_INLINE void NanAsyncExecuteComplete (uv_work_t* req) {
delete worker;
}

/* abstract */ class NanAsyncProgressWorker : public NanAsyncWorker {
public:
explicit NanAsyncProgressWorker(NanCallback *callback_)
: NanAsyncWorker(callback_), asyncdata_(NULL), asyncsize_(0) {
async = new uv_async_t;
uv_async_init(
uv_default_loop()
, async
, AsyncProgress_
);
async->data = this;

uv_mutex_init(&async_lock);
}

virtual ~NanAsyncProgressWorker() {
uv_close(reinterpret_cast<uv_handle_t*>(async), AsyncClose_);
uv_mutex_destroy(&async_lock);

if (asyncdata_) {
delete[] asyncdata_;
}
}

void WorkProgress() {
uv_mutex_lock(&async_lock);
char *data = asyncdata_;
int size = asyncsize_;
asyncdata_ = NULL;
uv_mutex_unlock(&async_lock);

HandleProgressCallback(data, size);
delete[] data;
}

class ExecutionProgress {
friend class NanAsyncProgressWorker;
public:
// You could do fancy generics with templates here.
void Send(const char* data, size_t size) const {
that_->SendProgress_(data, size);
}

private:
explicit ExecutionProgress(NanAsyncProgressWorker* that) : that_(that) {}
// Prohibit copying and assignment.
ExecutionProgress(const ExecutionProgress&);
void operator=(const ExecutionProgress&);
#if __cplusplus >= 201103L
// Prohibit C++11 move semantics.
ExecutionProgress(ExecutionProgress&&) = delete;
void operator=(ExecutionProgress&&) = delete;
#endif
NanAsyncProgressWorker* const that_;
};

virtual void Execute(const ExecutionProgress& progress) = 0;
virtual void HandleProgressCallback(const char *data, size_t size) = 0;

private:
void Execute() /*final override*/ {
ExecutionProgress progress(this);
Execute(progress);
}

void SendProgress_(const char *data, size_t size) {
char *new_data = new char[size];
memcpy(new_data, data, size);

uv_mutex_lock(&async_lock);
char *old_data = asyncdata_;
asyncdata_ = new_data;
asyncsize_ = size;
uv_mutex_unlock(&async_lock);

if (old_data) {
delete[] old_data;
}
uv_async_send(async);
}

NAN_INLINE static NAUV_WORK_CB(AsyncProgress_) {
NanAsyncProgressWorker *worker =
static_cast<NanAsyncProgressWorker*>(async->data);
worker->WorkProgress();
}

NAN_INLINE static void AsyncClose_(uv_handle_t* handle) {
delete reinterpret_cast<uv_async_t*>(handle);
}

uv_async_t *async;
uv_mutex_t async_lock;
char *asyncdata_;
size_t asyncsize_;
};

NAN_INLINE void NanAsyncQueueWorker (NanAsyncWorker* worker) {
uv_queue_work(
uv_default_loop()
Expand Down
7 changes: 7 additions & 0 deletions test/binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,11 @@
"<!(node -e \"require('..')\")"
]
}
, {
"target_name" : "asyncprogressworker"
, "sources" : [ "cpp/asyncprogressworker.cpp" ]
, "include_dirs": [
"<!(node -e \"require('..')\")"
]
}
]}
66 changes: 66 additions & 0 deletions test/cpp/asyncprogressworker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**********************************************************************************
* NAN - Native Abstractions for Node.js
*
* Copyright (c) 2014 NAN contributors
*
* MIT +no-false-attribs License <https://github.com/rvagg/nan/blob/master/LICENSE>
**********************************************************************************/

#ifndef _WIN32
#include <unistd.h>
#define Sleep(x) usleep((x)*1000)
#endif
#include <nan.h>

class ProgressWorker : public NanAsyncProgressWorker {
public:
ProgressWorker(
NanCallback *callback
, NanCallback *progress
, int milliseconds
, int iters)
: NanAsyncProgressWorker(callback), progress(progress)
, milliseconds(milliseconds), iters(iters) {}
~ProgressWorker() {}

void Execute (const NanAsyncProgressWorker::ExecutionProgress& progress) {
for (int i = 0; i < iters; ++i) {
progress.Send(reinterpret_cast<const char*>(&i), sizeof(int));
Sleep(milliseconds);
}
}

void HandleProgressCallback(const char *data, size_t size) {
NanScope();

v8::Local<v8::Value> argv[] = {
NanNew<v8::Integer>(*reinterpret_cast<int*>(const_cast<char*>(data)))
};
progress->Call(1, argv);
}

private:
NanCallback *progress;
int milliseconds;
int iters;
};

NAN_METHOD(DoProgress) {
NanScope();
NanCallback *progress = new NanCallback(args[2].As<v8::Function>());
NanCallback *callback = new NanCallback(args[3].As<v8::Function>());
NanAsyncQueueWorker(new ProgressWorker(
callback
, progress
, args[0]->Uint32Value()
, args[1]->Uint32Value()));
NanReturnUndefined();
}

void Init(v8::Handle<v8::Object> exports) {
exports->Set(
NanNew<v8::String>("a")
, NanNew<v8::FunctionTemplate>(DoProgress)->GetFunction());
}

NODE_MODULE(asyncprogressworker, Init)
15 changes: 15 additions & 0 deletions test/js/asyncprogressworker-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
const test = require('tap').test
, testRoot = require('path').resolve(__dirname, '..')
, bindings = require('bindings')({ module_root: testRoot, bindings: 'asyncprogressworker' });

test('asyncprogressworker', function (t) {
var worker = bindings.a
, progressed = 0
worker(100, 5, function(i) {
t.ok(i === progressed, 'got the progress updates #' + i);
progressed++;
}, function () {
t.ok(progressed === 5, 'got all progress updates')
t.end()
})
})

0 comments on commit c73ddeb

Please sign in to comment.