Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/mp/proxy-io.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ class Connection
// to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
// error in cases where f deletes this Connection object.
m_on_disconnect.add(m_network.onDisconnect().then(
[this, f = std::move(f)]() mutable { m_loop.m_task_set->add(kj::evalLater(kj::mv(f))); }));
[f = std::move(f), this]() mutable { m_loop.m_task_set->add(kj::evalLater(kj::mv(f))); }));
}

EventLoop& m_loop;
Expand Down
49 changes: 19 additions & 30 deletions include/mp/proxy-types.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,6 @@ void CustomBuildField(TypeList<>,
context.setCallbackThread(callback_thread->second.m_client);
}

// Invoke promise1, then promise2, and return result of promise2.
template <typename T, typename U>
kj::Promise<U> JoinPromises(kj::Promise<T>&& prom1, kj::Promise<U>&& prom2)
{
return prom1.then([prom2 = kj::mv(prom2)]() mutable { return kj::mv(prom2); });
}

//! PassField override for mp.Context arguments. Return asynchronously and call
//! function on other thread found in context.
template <typename Accessor, typename ServerContext, typename Fn, typename... Args>
Expand All @@ -119,9 +112,8 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
auto& server = server_context.proxy_server;
int req = server_context.req;
auto invoke = MakeAsyncCallable(
[&server, req, fn, args...,
fulfiller = kj::mv(future.fulfiller),
call_context = kj::mv(server_context.call_context)]() mutable {
[fulfiller = kj::mv(future.fulfiller),
call_context = kj::mv(server_context.call_context), &server, req, fn, args...]() mutable {
const auto& params = call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
ServerContext server_context{server, call_context, req};
Expand Down Expand Up @@ -181,27 +173,24 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
// be a local Thread::Server object, but it needs to be looked up
// asynchronously with getLocalServer().
auto thread_client = context_arg.getThread();
return JoinPromises(server.m_context.connection->m_threads.getLocalServer(thread_client)
.then([&server, invoke, req](const kj::Maybe<Thread::Server&>& perhaps) {
// Assuming the thread object is found, pass it a pointer to the
// `invoke` lambda above which will invoke the function on that
// thread.
KJ_IF_MAYBE(thread_server, perhaps)
{
const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
server.m_context.connection->m_loop.log() << "IPC server post request #" << req << " {"
<< thread.m_thread_context.thread_name << "}";
thread.m_thread_context.waiter->post(std::move(invoke));
}
else
{
server.m_context.connection->m_loop.log() << "IPC server error request #" << req
<< ", missing thread to execute request";
throw std::runtime_error("invalid thread handle");
}
}),
return server.m_context.connection->m_threads.getLocalServer(thread_client)
.then([&server, invoke, req](const kj::Maybe<Thread::Server&>& perhaps) {
// Assuming the thread object is found, pass it a pointer to the
// `invoke` lambda above which will invoke the function on that
// thread.
KJ_IF_MAYBE (thread_server, perhaps) {
const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
server.m_context.connection->m_loop.log()
<< "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
thread.m_thread_context.waiter->post(std::move(invoke));
} else {
server.m_context.connection->m_loop.log()
<< "IPC server error request #" << req << ", missing thread to execute request";
throw std::runtime_error("invalid thread handle");
}
})
// Wait for the invocation to finish before returning to the caller.
kj::mv(future.promise));
.then([invoke_wait = kj::mv(future.promise)]() mutable { return kj::mv(invoke_wait); });
}

// Destination parameter type that can be passed to ReadField function as an
Expand Down
11 changes: 0 additions & 11 deletions include/mp/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,6 @@ struct Priority<0>
{
};

//! Function parameter type for discarded argument. Useful to be able to write a
//! function overload that discards arguments as a normal function instead of a
//! template function.
struct Discard
{
template <typename... Args>
Discard(Args&&...)
{
}
};

//! Return capnp type name with filename prefix removed.
template <typename T>
const char* TypeName()
Expand Down