Skip to content

Commit

Permalink
Merge #42: Support attaching custom cleanup functions to proxy client…
Browse files Browse the repository at this point in the history
… and server classes

ce8e8b6 Add ProxyTypeRegister typeid map (Russell Yanofsky)
fbdaaa7 Add cleanup callbacks to ProxyContext (Russell Yanofsky)
34e9b78 refactor: Move connection field to ProxyContext struct (Russell Yanofsky)
39ad0f5 Generate ProxyType traits for interface types (Russell Yanofsky)

Pull request description:

  This avoids the need to add interface base classes with `addCloseHook` methods in bitcoin/bitcoin#19160 and bitcoin/bitcoin#10102

Top commit has no ACKs.

Tree-SHA512: 33a6b3e2e9e6111f7d8c031b6010b824e0d630c027fb8a650d24326a017e3f3db11e4dcb6d6467a14a53417e64124301ccd80b9e684d0e5044fa804e0a1f49dc
  • Loading branch information
ryanofsky committed Jan 28, 2021
2 parents 1b4012c + ce8e8b6 commit 35d2091
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 62 deletions.
79 changes: 48 additions & 31 deletions include/mp/proxy-io.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct ServerInvokeContext : InvokeContext
int req;

ServerInvokeContext(ProxyServer& proxy_server, CallContext& call_context, int req)
: InvokeContext{proxy_server.m_connection}, proxy_server{proxy_server}, call_context{call_context}, req{req}
: InvokeContext{*proxy_server.m_context.connection}, proxy_server{proxy_server}, call_context{call_context}, req{req}
{
}
};
Expand Down Expand Up @@ -350,29 +350,27 @@ template <typename Interface, typename Impl>
ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client client,
Connection* connection,
bool destroy_connection)
: m_client(std::move(client)), m_connection(connection), m_destroy_connection(destroy_connection)
: m_client(std::move(client)), m_context(connection)

{
{
std::unique_lock<std::mutex> lock(m_connection->m_loop.m_mutex);
m_connection->m_loop.addClient(lock);
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.addClient(lock);
}
m_cleanup = m_connection->addSyncCleanup([this]() {

// Handler for the connection getting destroyed before this client object.
auto cleanup = m_context.connection->addSyncCleanup([this]() {
// Release client capability by move-assigning to temporary.
{
typename Interface::Client(std::move(self().m_client));
}
{
std::unique_lock<std::mutex> lock(m_connection->m_loop.m_mutex);
m_connection->m_loop.removeClient(lock);
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.removeClient(lock);
}
m_connection = nullptr;
m_context.connection = nullptr;
});
self().construct();
}

template <typename Interface, typename Impl>
ProxyClientBase<Interface, Impl>::~ProxyClientBase() noexcept
{
// Two shutdown sequences are supported:
//
// - A normal sequence where client proxy objects are deleted by external
Expand All @@ -381,43 +379,54 @@ ProxyClientBase<Interface, Impl>::~ProxyClientBase() noexcept
// - A garbage collection sequence where the connection or event loop shuts
// down while external code is still holding client references.
//
// The first case is handled here in destructor when m_loop is not null. The
// second case is handled by the m_cleanup function, which sets m_connection to
// The first case is handled here when m_context.connection is not null. The
// second case is handled by the cleanup function, which sets m_context.connection to
// null so nothing happens here.
if (m_connection) {
// Remove m_cleanup callback so it doesn't run and try to access
m_context.cleanup.emplace_front([this, destroy_connection, cleanup]{
if (m_context.connection) {
// Remove cleanup callback so it doesn't run and try to access
// this object after it's already destroyed.
m_connection->removeSyncCleanup(m_cleanup);
m_context.connection->removeSyncCleanup(cleanup);

// Destroy remote object, waiting for it to deleted server side.
self().destroy();

// FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
m_connection->m_loop.sync([&]() {
m_context.connection->m_loop.sync([&]() {
// Release client capability by move-assigning to temporary.
{
typename Interface::Client(std::move(self().m_client));
}
{
std::unique_lock<std::mutex> lock(m_connection->m_loop.m_mutex);
m_connection->m_loop.removeClient(lock);
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.removeClient(lock);
}

if (m_destroy_connection) {
delete m_connection;
m_connection = nullptr;
if (destroy_connection) {
delete m_context.connection;
m_context.connection = nullptr;
}
});
}
});
self().construct();
}

template <typename Interface, typename Impl>
ProxyClientBase<Interface, Impl>::~ProxyClientBase() noexcept
{
for (auto& cleanup : m_context.cleanup) {
cleanup();
}
}

template <typename Interface, typename Impl>
ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Connection& connection)
: m_impl(std::move(impl)), m_connection(connection)
: m_impl(std::move(impl)), m_context(&connection)
{
assert(m_impl);
std::unique_lock<std::mutex> lock(m_connection.m_loop.m_mutex);
m_connection.m_loop.addClient(lock);
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.addClient(lock);
}

template <typename Interface, typename Impl>
Expand All @@ -429,17 +438,25 @@ ProxyServerBase<Interface, Impl>::~ProxyServerBase()
// destructor on, run asynchronously. Do not run destructor on current
// (event loop) thread since destructors could be making IPC calls or
// doing expensive cleanup.
auto impl = std::move(m_impl);
m_connection.addAsyncCleanup([impl]() mutable { impl.reset(); });
m_context.connection->addAsyncCleanup([impl=std::move(m_impl), c=std::move(m_context.cleanup)]() mutable {
impl.reset();
for (auto& cleanup : c) {
cleanup();
}
});
}
std::unique_lock<std::mutex> lock(m_connection.m_loop.m_mutex);
m_connection.m_loop.removeClient(lock);
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.removeClient(lock);
}

template <typename Interface, typename Impl>
void ProxyServerBase<Interface, Impl>::invokeDestroy()
{
m_impl.reset();
for (auto& cleanup : m_context.cleanup) {
cleanup();
}
m_context.cleanup.clear();
}

struct ThreadContext
Expand Down
64 changes: 38 additions & 26 deletions include/mp/proxy-types.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <exception>
#include <optional>
#include <set>
#include <typeindex>
#include <vector>

namespace mp {
Expand Down Expand Up @@ -125,12 +126,12 @@ auto PassField(TypeList<>, ServerContext& server_context, const Fn& fn, const Ar
ServerContext server_context{server, call_context, req};
{
auto& request_threads = g_thread_context.request_threads;
auto request_thread = request_threads.find(&server.m_connection);
auto request_thread = request_threads.find(server.m_context.connection);
if (request_thread == request_threads.end()) {
request_thread =
g_thread_context.request_threads
.emplace(std::piecewise_construct, std::forward_as_tuple(&server.m_connection),
std::forward_as_tuple(context_arg.getCallbackThread(), &server.m_connection,
.emplace(std::piecewise_construct, std::forward_as_tuple(server.m_context.connection),
std::forward_as_tuple(context_arg.getCallbackThread(), server.m_context.connection,
/* destroy_connection= */ false))
.first;
} else {
Expand All @@ -142,32 +143,32 @@ auto PassField(TypeList<>, ServerContext& server_context, const Fn& fn, const Ar
fn.invoke(server_context, args...);
}
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
server.m_connection.m_loop.sync([&] {
server.m_context.connection->m_loop.sync([&] {
auto fulfiller_dispose = kj::mv(fulfiller);
fulfiller_dispose->fulfill(kj::mv(call_context));
});
}))
{
server.m_connection.m_loop.sync([&]() {
server.m_context.connection->m_loop.sync([&]() {
auto fulfiller_dispose = kj::mv(fulfiller);
fulfiller_dispose->reject(kj::mv(*exception));
});
}
})));

auto thread_client = context_arg.getThread();
return JoinPromises(server.m_connection.m_threads.getLocalServer(thread_client)
return JoinPromises(server.m_context.connection->m_threads.getLocalServer(thread_client)
.then([&server, invoke, req](kj::Maybe<Thread::Server&> perhaps) {
KJ_IF_MAYBE(thread_server, perhaps)
{
const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
server.m_connection.m_loop.log() << "IPC server post request #" << req << " {"
server.m_context.connection->m_loop.log() << "IPC server post request #" << req << " {"
<< thread.m_thread_context.thread_name << "}";
thread.m_thread_context.waiter->post(std::move(invoke));
}
else
{
server.m_connection.m_loop.log() << "IPC server error request #" << req
server.m_context.connection->m_loop.log() << "IPC server error request #" << req
<< ", missing thread to execute request";
throw std::runtime_error("invalid thread handle");
}
Expand Down Expand Up @@ -1014,7 +1015,7 @@ auto PassField(TypeList<LocalType&>, ServerContext& server_context, Fn&& fn, Arg
const auto& params = server_context.call_context.getParams();
const auto& input = Make<StructField, Accessor>(params);
using Interface = typename Decay<decltype(input.get())>::Calls;
auto param = std::make_unique<ProxyClient<Interface>>(input.get(), &server_context.proxy_server.m_connection, false);
auto param = std::make_unique<ProxyClient<Interface>>(input.get(), server_context.proxy_server.m_context.connection, false);
fn.invoke(server_context, std::forward<Args>(args)..., *param);
}

Expand Down Expand Up @@ -1345,8 +1346,8 @@ struct CapRequestTraits<::capnp::Request<_Params, _Results>>
template <typename Client>
void clientDestroy(Client& client)
{
if (client.m_connection) {
client.m_connection->m_loop.log() << "IPC client destroy " << typeid(client).name();
if (client.m_context.connection) {
client.m_context.connection->m_loop.log() << "IPC client destroy " << typeid(client).name();
} else {
KJ_LOG(INFO, "IPC interrupted client destroy", typeid(client).name());
}
Expand All @@ -1355,18 +1356,18 @@ void clientDestroy(Client& client)
template <typename Server>
void serverDestroy(Server& server)
{
server.m_connection.m_loop.log() << "IPC server destroy" << typeid(server).name();
server.m_context.connection->m_loop.log() << "IPC server destroy" << typeid(server).name();
}

template <typename ProxyClient, typename GetRequest, typename... FieldObjs>
void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, FieldObjs&&... fields)
{
if (!proxy_client.m_connection) {
if (!proxy_client.m_context.connection) {
throw std::logic_error("clientInvoke call made after disconnect");
}
if (!g_thread_context.waiter) {
assert(g_thread_context.thread_name.empty());
g_thread_context.thread_name = ThreadName(proxy_client.m_connection->m_loop.m_exe_name);
g_thread_context.thread_name = ThreadName(proxy_client.m_context.connection->m_loop.m_exe_name);
// If next assert triggers, it means clientInvoke is being called from
// the capnp event loop thread. This can happen when a ProxyServer
// method implementation that runs synchronously on the event loop
Expand All @@ -1377,26 +1378,26 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
// declaration so the server method runs in a dedicated thread.
assert(!g_thread_context.loop_thread);
g_thread_context.waiter = std::make_unique<Waiter>();
proxy_client.m_connection->m_loop.logPlain()
proxy_client.m_context.connection->m_loop.logPlain()
<< "{" << g_thread_context.thread_name
<< "} IPC client first request from current thread, constructing waiter";
}
ClientInvokeContext invoke_context{*proxy_client.m_connection, g_thread_context};
ClientInvokeContext invoke_context{*proxy_client.m_context.connection, g_thread_context};
std::exception_ptr exception;
std::string kj_exception;
bool done = false;
proxy_client.m_connection->m_loop.sync([&]() {
proxy_client.m_context.connection->m_loop.sync([&]() {
auto request = (proxy_client.m_client.*get_request)(nullptr);
using Request = CapRequestTraits<decltype(request)>;
using FieldList = typename ProxyClientMethodTraits<typename Request::Params>::Fields;
IterateFields().handleChain(invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...);
proxy_client.m_connection->m_loop.logPlain()
proxy_client.m_context.connection->m_loop.logPlain()
<< "{" << invoke_context.thread_context.thread_name << "} IPC client send "
<< TypeName<typename Request::Params>() << " " << LogEscape(request.toString());

proxy_client.m_connection->m_loop.m_task_set->add(request.send().then(
proxy_client.m_context.connection->m_loop.m_task_set->add(request.send().then(
[&](::capnp::Response<typename Request::Results>&& response) {
proxy_client.m_connection->m_loop.logPlain()
proxy_client.m_context.connection->m_loop.logPlain()
<< "{" << invoke_context.thread_context.thread_name << "} IPC client recv "
<< TypeName<typename Request::Results>() << " " << LogEscape(response.toString());
try {
Expand All @@ -1411,7 +1412,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
},
[&](const ::kj::Exception& e) {
kj_exception = kj::str("kj::Exception: ", e).cStr();
proxy_client.m_connection->m_loop.logPlain()
proxy_client.m_context.connection->m_loop.logPlain()
<< "{" << invoke_context.thread_context.thread_name << "} IPC client exception " << kj_exception;
std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
done = true;
Expand All @@ -1422,7 +1423,7 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
invoke_context.thread_context.waiter->wait(lock, [&done]() { return done; });
if (exception) std::rethrow_exception(exception);
if (!kj_exception.empty()) proxy_client.m_connection->m_loop.raise() << kj_exception;
if (!kj_exception.empty()) proxy_client.m_context.connection->m_loop.raise() << kj_exception;
}

//! Invoke callable `fn()` that may return void. If it does return void, replace
Expand Down Expand Up @@ -1454,7 +1455,7 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
using Results = typename decltype(call_context.getResults())::Builds;

int req = ++server_reqs;
server.m_connection.m_loop.log() << "IPC server recv request #" << req << " "
server.m_context.connection->m_loop.log() << "IPC server recv request #" << req << " "
<< TypeName<typename Params::Reads>() << " " << LogEscape(params.toString());

try {
Expand All @@ -1464,18 +1465,29 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
return ReplaceVoid([&]() { return fn.invoke(server_context, ArgList()); },
[&]() { return kj::Promise<CallContext>(kj::mv(call_context)); })
.then([&server, req](CallContext call_context) {
server.m_connection.m_loop.log() << "IPC server send response #" << req << " " << TypeName<Results>()
server.m_context.connection->m_loop.log() << "IPC server send response #" << req << " " << TypeName<Results>()
<< " " << LogEscape(call_context.getResults().toString());
});
} catch (const std::exception& e) {
server.m_connection.m_loop.log() << "IPC server unhandled exception: " << e.what();
server.m_context.connection->m_loop.log() << "IPC server unhandled exception: " << e.what();
throw;
} catch (...) {
server.m_connection.m_loop.log() << "IPC server unhandled exception";
server.m_context.connection->m_loop.log() << "IPC server unhandled exception";
throw;
}
}

//! Map to convert client interface pointers to ProxyContext struct references
//! at runtime using typeids.
struct ProxyTypeRegister {
template<typename Interface>
ProxyTypeRegister(TypeList<Interface>) {
types().emplace(typeid(Interface), [](void* iface) -> ProxyContext& { return static_cast<typename mp::ProxyType<Interface>::Client&>(*static_cast<Interface*>(iface)).m_context; });
}
using Types = std::map<std::type_index, ProxyContext&(*)(void*)>;
static Types& types() { static Types types; return types; }
};

} // namespace mp

#endif // MP_PROXY_TYPES_H
16 changes: 11 additions & 5 deletions include/mp/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ struct ProxyType;
using CleanupList = std::list<std::function<void()>>;
using CleanupIt = typename CleanupList::iterator;

//! Context data associated with proxy client and server classes.
struct ProxyContext
{
Connection* connection;
std::list<std::function<void()>> cleanup;

ProxyContext(Connection* connection) : connection(connection) {}
};

//! Base class for generated ProxyClient classes that implement a C++ interface
//! and forward calls to a capnp interface.
template <typename Interface_, typename Impl_>
Expand All @@ -59,10 +68,7 @@ class ProxyClientBase : public Impl_
ProxyClient<Interface>& self() { return static_cast<ProxyClient<Interface>&>(*this); }

typename Interface::Client m_client;
Connection* m_connection;
bool m_destroy_connection;
CleanupIt m_cleanup; //!< Pointer to self-cleanup callback registered to handle connection object getting destroyed
//!< before this client object.
ProxyContext m_context;
};

//! Customizable (through template specialization) base class used in generated ProxyClient implementations from
Expand Down Expand Up @@ -100,7 +106,7 @@ struct ProxyServerBase : public virtual Interface_::Server
* appropriate times depending on semantics of the particular method being
* wrapped. */
std::shared_ptr<Impl> m_impl;
Connection& m_connection;
ProxyContext m_context;
};

//! Customizable (through template specialization) base class used in generated ProxyServer implementations from
Expand Down
Loading

0 comments on commit 35d2091

Please sign in to comment.