-
Notifications
You must be signed in to change notification settings - Fork 1.3k
[core] Clean up FileSource #3033
Changes from all commits
6f88197
2eb0203
36581f3
1caf89c
7137239
43c6e2f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ | |
#include <utility> | ||
#include <queue> | ||
#include <mutex> | ||
#include <atomic> | ||
|
||
namespace mbgl { | ||
namespace util { | ||
|
@@ -45,7 +46,7 @@ class RunLoop : private util::noncopyable { | |
template <class Fn, class... Args> | ||
std::unique_ptr<WorkRequest> | ||
invokeCancellable(Fn&& fn, Args&&... args) { | ||
auto flag = std::make_shared<bool>(); | ||
auto flag = std::make_shared<std::atomic<bool>>(); | ||
*flag = false; | ||
|
||
auto tuple = std::make_tuple(std::move(args)...); | ||
|
@@ -64,14 +65,23 @@ class RunLoop : private util::noncopyable { | |
template <class Fn, class Cb, class... Args> | ||
std::unique_ptr<WorkRequest> | ||
invokeWithCallback(Fn&& fn, Cb&& callback, Args&&... args) { | ||
auto flag = std::make_shared<bool>(); | ||
auto flag = std::make_shared<std::atomic<bool>>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it is just a flag, I would use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would but I don't think it's possible to portably use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ugh, indeed. |
||
*flag = false; | ||
|
||
auto after = RunLoop::current.get()->bind([flag, callback] (auto&&... results) { | ||
// Create a lambda L1 that invokes another lambda L2 on the current RunLoop R, that calls | ||
// the callback C. Both lambdas check the flag before proceeding. L1 needs to check the flag | ||
// because if the request was cancelled, then R might have been destroyed. L2 needs to check | ||
// the flag because the request may have been cancelled after L2 was invoked but before it | ||
// began executing. | ||
auto after = [flag, current = RunLoop::current.get(), callback1 = std::move(callback)] (auto&&... results1) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. |
||
if (!*flag) { | ||
callback(std::move(results)...); | ||
current->invoke([flag, callback2 = std::move(callback1)] (auto&&... results2) { | ||
if (!*flag) { | ||
callback2(std::move(results2)...); | ||
} | ||
}, std::move(results1)...); | ||
} | ||
}); | ||
}; | ||
|
||
auto tuple = std::make_tuple(std::move(args)..., after); | ||
auto task = std::make_shared<Invoker<Fn, decltype(tuple)>>( | ||
|
@@ -85,22 +95,13 @@ class RunLoop : private util::noncopyable { | |
return std::make_unique<WorkRequest>(task); | ||
} | ||
|
||
// Return a function that invokes the given function on this RunLoop. | ||
template <class Fn> | ||
auto bind(Fn&& fn) { | ||
return [this, fn = std::move(fn)] (auto&&... args) { | ||
// `this->` is a workaround for https://gcc.gnu.org/bugzilla/show_bug.cgi?id=61636 | ||
this->invoke(std::move(fn), std::move(args)...); | ||
}; | ||
} | ||
|
||
uv_loop_t* get() { return async.get()->loop; } | ||
|
||
private: | ||
template <class F, class P> | ||
class Invoker : public WorkTask { | ||
public: | ||
Invoker(F&& f, P&& p, std::shared_ptr<bool> canceled_ = nullptr) | ||
Invoker(F&& f, P&& p, std::shared_ptr<std::atomic<bool>> canceled_ = nullptr) | ||
: canceled(canceled_), | ||
func(std::move(f)), | ||
params(std::move(p)) { | ||
|
@@ -134,7 +135,7 @@ class RunLoop : private util::noncopyable { | |
} | ||
|
||
std::recursive_mutex mutex; | ||
std::shared_ptr<bool> canceled; | ||
std::shared_ptr<std::atomic<bool>> canceled; | ||
|
||
F func; | ||
P params; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,138 +1,38 @@ | ||
#include "node_file_source.hpp" | ||
#include "node_request.hpp" | ||
#include "util/async_queue.hpp" | ||
|
||
#include <mbgl/storage/request.hpp> | ||
#include "node_mapbox_gl_native.hpp" | ||
|
||
namespace node_mbgl { | ||
|
||
struct NodeFileSource::Action { | ||
const enum : bool { Add, Cancel } type; | ||
mbgl::Resource const resource; | ||
class NodeFileSourceRequest : public mbgl::FileRequest { | ||
public: | ||
std::unique_ptr<mbgl::WorkRequest> workRequest; | ||
}; | ||
|
||
NodeFileSource::NodeFileSource(v8::Local<v8::Object> options_) : | ||
queue(new Queue(uv_default_loop(), [this](Action &action) { | ||
if (action.type == Action::Add) { | ||
processAdd(action.resource); | ||
} else if (action.type == Action::Cancel) { | ||
processCancel(action.resource); | ||
} | ||
})) | ||
{ | ||
NodeFileSource::NodeFileSource(v8::Local<v8::Object> options_) { | ||
options.Reset(options_); | ||
|
||
// Make sure that the queue doesn't block the loop from exiting. | ||
queue->unref(); | ||
} | ||
|
||
NodeFileSource::~NodeFileSource() { | ||
queue->stop(); | ||
queue = nullptr; | ||
|
||
options.Reset(); | ||
} | ||
|
||
mbgl::Request* NodeFileSource::request(const mbgl::Resource& resource, uv_loop_t* loop, Callback callback) { | ||
auto req = new mbgl::Request(resource, loop, std::move(callback)); | ||
|
||
std::lock_guard<std::mutex> lock(observersMutex); | ||
|
||
assert(observers.find(resource) == observers.end()); | ||
observers[resource] = req; | ||
|
||
// This function can be called from any thread. Make sure we're executing the actual call in the | ||
// file source loop by sending it over the queue. It will be processed in processAction(). | ||
queue->send(Action{ Action::Add, resource }); | ||
|
||
return req; | ||
} | ||
|
||
void NodeFileSource::cancel(mbgl::Request* req) { | ||
req->cancel(); | ||
|
||
std::lock_guard<std::mutex> lock(observersMutex); | ||
|
||
auto it = observers.find(req->resource); | ||
if (it == observers.end()) { | ||
return; | ||
} | ||
|
||
observers.erase(it); | ||
|
||
// This function can be called from any thread. Make sure we're executing the actual call in the | ||
// file source loop by sending it over the queue. It will be processed in processAction(). | ||
queue->send(Action{ Action::Cancel, req->resource }); | ||
|
||
req->destruct(); | ||
} | ||
|
||
void NodeFileSource::processAdd(const mbgl::Resource& resource) { | ||
Nan::HandleScope scope; | ||
|
||
// Make sure the loop stays alive as long as request is pending. | ||
if (pending.empty()) { | ||
queue->ref(); | ||
} | ||
|
||
auto requestHandle = NodeRequest::Create(this, resource)->ToObject(); | ||
pending.emplace(resource, requestHandle); | ||
|
||
auto callback = Nan::GetFunction(Nan::New<v8::FunctionTemplate>(NodeRequest::Respond, requestHandle)).ToLocalChecked(); | ||
callback->SetName(Nan::New("respond").ToLocalChecked()); | ||
|
||
v8::Local<v8::Value> argv[] = { requestHandle, callback }; | ||
Nan::MakeCallback(Nan::New(options), "request", 2, argv); | ||
} | ||
|
||
void NodeFileSource::processCancel(const mbgl::Resource& resource) { | ||
Nan::HandleScope scope; | ||
|
||
auto it = pending.find(resource); | ||
if (it == pending.end()) { | ||
// The response callback was already fired. There is no point in calling the cancelation | ||
// callback because the request is already completed. | ||
} else { | ||
v8::Local<v8::Object> requestHandle = Nan::New(it->second); | ||
it->second.Reset(); | ||
pending.erase(it); | ||
|
||
// Make sure the the loop can exit when there are no pending requests. | ||
if (pending.empty()) { | ||
queue->unref(); | ||
} | ||
|
||
if (Nan::Has(Nan::New(options), Nan::New("cancel").ToLocalChecked()).FromJust()) { | ||
v8::Local<v8::Value> argv[] = { requestHandle }; | ||
Nan::MakeCallback(Nan::New(options), "cancel", 1, argv); | ||
} | ||
|
||
// Set the request handle in the request wrapper handle to null | ||
Nan::ObjectWrap::Unwrap<NodeRequest>(requestHandle)->cancel(); | ||
} | ||
} | ||
|
||
void NodeFileSource::notify(const mbgl::Resource& resource, const std::shared_ptr<const mbgl::Response>& response) { | ||
// First, remove the request, since it might be destructed at any point now. | ||
auto it = pending.find(resource); | ||
if (it != pending.end()) { | ||
it->second.Reset(); | ||
pending.erase(it); | ||
std::unique_ptr<mbgl::FileRequest> NodeFileSource::request(const mbgl::Resource& resource, Callback callback) { | ||
auto req = std::make_unique<NodeFileSourceRequest>(); | ||
|
||
// Make sure the the loop can exit when there are no pending requests. | ||
if (pending.empty()) { | ||
queue->unref(); | ||
} | ||
} | ||
// This function can be called from any thread. Make sure we're executing the | ||
// JS implementation in the node event loop. | ||
req->workRequest = NodeRunLoop().invokeWithCallback([this] (mbgl::Resource res, Callback cb) { | ||
Nan::HandleScope scope; | ||
|
||
std::lock_guard<std::mutex> lock(observersMutex); | ||
auto requestHandle = NodeRequest::Create(res, cb)->ToObject(); | ||
auto callbackHandle = Nan::GetFunction(Nan::New<v8::FunctionTemplate>(NodeRequest::Respond, requestHandle)).ToLocalChecked(); | ||
|
||
auto observersIt = observers.find(resource); | ||
if (observersIt == observers.end()) { | ||
return; | ||
} | ||
v8::Local<v8::Value> argv[] = { requestHandle, callbackHandle }; | ||
Nan::MakeCallback(Nan::New(options), "request", 2, argv); | ||
}, callback, resource); | ||
|
||
observersIt->second->notify(response); | ||
return std::move(req); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class should be
final
and implemented atfile_source.cpp
to enforce correctness on everyone implementing theFileSource
interface. I would also name itFileSourceWork
to avoid the confusion likeDefaultFileRequest
andDefaultFileRequestImpl
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered an approach like this but ultimately decided to leave it up to the
FileSource
implementation what the derived destructor does. The default implementation cancels the request, but the node implementation does not.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Node request could have a no-op cancel maybe.