Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 52 additions & 2 deletions include/mp/proxy-types.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,23 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
Context::Reader context_arg = Accessor::get(params);
ServerContext server_context{server, call_context, req};
{
// Before invoking the function, store a reference to the
// callbackThread provided by the client in the
// thread_local.request_threads map. This way, if this
// server thread needs to execute any RPCs that call back to
// the client, they will happen on the same client thread
// that is waiting for this function, just like what would
// happen if this were a normal function call made on the
// local stack.
//
// If the request_threads map already has an entry for this
// connection, it will be left unchanged, and it indicates
// that the current thread is an RPC client thread which is
// in the middle of an RPC call, and the current RPC call is
// a nested call from the remote thread handling that RPC
// call. In this case, the callbackThread value should point
// to the same thread already in the map, so there is no
// need to update the map.
auto& request_threads = g_thread_context.request_threads;
auto request_thread = request_threads.find(server.m_context.connection);
if (request_thread == request_threads.end()) {
Expand All @@ -136,8 +153,11 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
/* destroy_connection= */ false))
.first;
} else {
// If recursive call, avoid remove request_threads map
// entry in KJ_DEFER below.
// The requests_threads map already has an entry for
// this connection, so this must be a recursive call.
// Avoid modifying the map in this case by resetting the
// request_thread iterator, so the KJ_DEFER statement
// below doesn't do anything.
request_thread = request_threads.end();
}
KJ_DEFER(if (request_thread != request_threads.end()) request_threads.erase(request_thread));
Expand All @@ -157,9 +177,15 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
}
});

// Lookup Thread object specified by the client. The specified thread should
// be a local Thread::Server object, but it needs to be looked up
// asynchronously with getLocalServer().
auto thread_client = context_arg.getThread();
return JoinPromises(server.m_context.connection->m_threads.getLocalServer(thread_client)
.then([&server, invoke, req](const kj::Maybe<Thread::Server&>& perhaps) {
// Assuming the thread object is found, pass it a pointer to the
// `invoke` lambda above which will invoke the function on that
// thread.
KJ_IF_MAYBE(thread_server, perhaps)
{
const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
Expand All @@ -174,6 +200,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
throw std::runtime_error("invalid thread handle");
}
}),
// Wait for the invocation to finish before returning to the caller.
kj::mv(future.promise));
}

Expand Down Expand Up @@ -1423,6 +1450,15 @@ void serverDestroy(Server& server)
server.m_context.connection->m_loop.log() << "IPC server destroy" << typeid(server).name();
}

//! Entry point called by generated client code that looks like:
//!
//! ProxyClient<ClassName>::M0::Result ProxyClient<ClassName>::methodName(M0::Param<0> arg0, M0::Param<1> arg1) {
//! typename M0::Result result;
//! clientInvoke(*this, &InterfaceName::Client::methodNameRequest, MakeClientParam<...>(arg0), MakeClientParam<...>(arg1), MakeClientParam<...>(result));
//! return result;
//! }
//!
//! Ellipses above are where generated Accessor<> type declarations are inserted.
template <typename ProxyClient, typename GetRequest, typename... FieldObjs>
void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, FieldObjs&&... fields)
{
Expand Down Expand Up @@ -1511,6 +1547,13 @@ auto ReplaceVoid(Fn&& fn, Ret&& ret) ->

extern std::atomic<int> server_reqs;

//! Entry point called by generated server code that looks like:
//!
//! kj::Promise<void> ProxyServer<InterfaceName>::methodName(CallContext call_context) {
//! return serverInvoke(*this, call_context, MakeServerField<0, ...>(MakeServerField<1, ...>(Make<ServerRet, ...>(ServerCall()))));
//! }
//!
//! Ellipses above are where generated Accessor<> type declarations are inserted.
template <typename Server, typename CallContext, typename Fn>
kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
{
Expand All @@ -1526,6 +1569,13 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
using ServerContext = ServerInvokeContext<Server, CallContext>;
using ArgList = typename ProxyClientMethodTraits<typename Params::Reads>::Params;
ServerContext server_context{server, call_context, req};
// ReplaceVoid is used to support fn.invoke implementations that
// execute asynchronously and return promises, as well as
// implementations that execute synchronously and return void. The
// invoke function will be synchronous by default, but asynchronous if
// an mp.Context argument is passed, and the mp.Context PassField
// overload returns a promise executing the request in a worker thread
// and waiting for it to complete.
return ReplaceVoid([&]() { return fn.invoke(server_context, ArgList()); },
[&]() { return kj::Promise<CallContext>(kj::mv(call_context)); })
.then([&server, req](CallContext call_context) {
Expand Down
106 changes: 90 additions & 16 deletions include/mp/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,35 @@ class ProxyClientBase : public Impl_
ProxyClientBase(typename Interface::Client client, Connection* connection, bool destroy_connection);
~ProxyClientBase() noexcept;

// Methods called during client construction/destruction that can optionally
// be defined in capnp interface to trigger the server.
// construct/destroy methods called during client construction/destruction
// that can optionally be defined in capnp interfaces to invoke code on the
// server when proxy client objects are created and destroyed.
//
// The construct() method is not generally very useful, but can be used to
// run custom code on the server automatically when a ProxyClient client is
// constructed. The only current use is adding a construct method to Init
// interfaces that is called automatically on construction, so client and
// server exchange ThreadMap references and set Connection::m_thread_map
// values as soon as the Init client is created.
//
// construct @0 (threadMap: Proxy.ThreadMap) -> (threadMap: Proxy.ThreadMap);
//
// But construct() is not necessary for this, thread maps could be passed
// through a normal method that is just called explicitly rather than
// implicitly.
//
// The destroy() method is more generally useful than construct(), because
// it ensures that the server object will be destroyed synchronously before
// the client destructor returns, instead of asynchronously at some
// unpredictable time after the client object is already destroyed and
// client code has moved on. If the destroy method accepts a Context
// parameter like:
//
// destroy @0 (context: Proxy.Context) -> ();
//
// then it will also ensure that the destructor runs on the same thread the
// client used to make other RPC calls, instead of running on the server
// EventLoop thread and possibly blocking it.
void construct() {}
void destroy() {}

Expand Down Expand Up @@ -109,20 +136,52 @@ struct ProxyServerBase : public virtual Interface_::Server
ProxyContext m_context;
};

//! Customizable (through template specialization) base class used in generated ProxyServer implementations from
//! proxy-codegen.cpp.
//! Customizable (through template specialization) base class which ProxyServer
//! classes produced by generated code will inherit from. The default
//! specialization of this class just inherits from ProxyServerBase, but custom
//! specializations can be defined to control ProxyServer behavior.
//!
//! Specifically, it can be useful to specialize this class to add additional
//! state to ProxyServer classes, for example to cache state between IPC calls.
//! If this is done, however, care should be taken to ensure that the extra
//! state can be destroyed without blocking, because ProxyServer destructors are
//! called from the EventLoop thread, and if they block, it could deadlock the
//! program. One way to do avoid blocking is to clean up the state by pushing
//! cleanup callbacks to the m_context.cleanup list, which run after the server
//! m_impl object is destroyed on the same thread destroying it (which will
//! either be an IPC worker thread if the ProxyServer is being explicitly
//! destroyed by a client calling a destroy() method with a Context argument and
//! Context.thread value set, or the temporary EventLoop::m_async_thread used to
//! run destructors without blocking the event loop when no-longer used server
//! objects are garbage collected by Cap'n Proto.) Alternately, if cleanup needs
//! to run before m_impl is destroyed, the specialization can override
//! invokeDestroy and destructor methods to do that.
template <typename Interface, typename Impl>
struct ProxyServerCustom : public ProxyServerBase<Interface, Impl>
{
using ProxyServerBase<Interface, Impl>::ProxyServerBase;
};

//! Function traits class used to get method parameter and result types in generated ProxyClient implementations from
//! proxy-codegen.cpp.
//! Function traits class used to get method parameter and result types, used in
//! generated ProxyClient and ProxyServer classes produced by gen.cpp to get C++
//! method type information. The generated code accesses these traits via
//! intermediate ProxyClientMethodTraits and ProxyServerMethodTraits classes,
//! which it is possible to specialize to change the way method arguments and
//! return values are handled.
//!
//! Fields of the trait class are:
//!
//! Params - TypeList of C++ ClassName::methodName parameter types
//! Result - Return type of ClassName::method
//! Param<N> - helper to access individual parameters by index number.
//! Fields - helper alias that appends Result type to the Params typelist if
//! it not void.
template <class Fn>
struct FunctionTraits;

//! Specialization of above to extract result and params types.
//! Specialization of above extracting result and params types assuming the
//! template argument is a pointer-to-method type,
//! decltype(&ClassName::methodName)
template <class _Class, class _Result, class... _Params>
struct FunctionTraits<_Result (_Class::*const)(_Params...)>
{
Expand All @@ -134,16 +193,20 @@ struct FunctionTraits<_Result (_Class::*const)(_Params...)>
typename std::conditional<std::is_same<void, Result>::value, Params, TypeList<_Params..., _Result>>::type;
};

//! Traits class for a method specialized by method parameters.
//!
//! Param and Result typedefs can be customized to adjust parameter and return types on client side.
//! Traits class for a proxy method, providing the same
//! Params/Result/Param/Fields described in the FunctionTraits class above, plus
//! an additional invoke() method that calls the C++ method which is being
//! proxied, forwarding any arguments.
//!
//! Invoke method customized to adjust parameter and return types on server side.
//! The template argument should be the InterfaceName::MethodNameParams class
//! (generated by Cap'n Proto) associated with the method.
//!
//! Normal method calls go through the ProxyMethodTraits struct specialization
//! below, not this default struct, which is only used if there is no
//! ProxyMethod::impl method pointer, which is only true for construct/destroy
//! methods.
//! Note: The class definition here is just the fallback definition used when
//! the other specialization below doesn't match. The fallback is only used for
//! capnp methods which do not have corresponding C++ methods, which in practice
//! is just the two special construct() and destroy() methods described in \ref
//! ProxyClientBase. These methods don't have any C++ parameters or return
//! types, so the trait information below reflects that.
template <typename MethodParams, typename Enable = void>
struct ProxyMethodTraits
{
Expand All @@ -157,7 +220,18 @@ struct ProxyMethodTraits
}
};

//! Specialization of above.
//! Specialization of above for proxy methods that have a
//! ProxyMethod<InterfaceName::MethodNameParams>::impl pointer-to-method
//! constant defined by generated code. This includes all functions defined in
//! the capnp interface except any construct() or destroy() methods, that are
//! assumed not to correspond to real member functions in the C++ class, and
//! will use the fallback traits definition above. The generated code this
//! specialization relies on looks like:
//!
//! struct ProxyMethod<InterfaceName::MethodNameParams>
//! {
//! static constexpr auto impl = &ClassName::methodName;
//! };
template <typename MethodParams>
struct ProxyMethodTraits<MethodParams, Require<decltype(ProxyMethod<MethodParams>::impl)>>
: public FunctionTraits<decltype(ProxyMethod<MethodParams>::impl)>
Expand Down