Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 11 additions & 18 deletions include/mp/proxy-io.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,16 @@ struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
ProxyClient(const ProxyClient&) = delete;
~ProxyClient();

void setCleanup(std::function<void()> cleanup);
void setCleanup(std::function<void()> fn);

//! Cleanup function to run when the connection is closed. If the Connection
//! gets destroyed before this ProxyClient<Thread> object, this cleanup
//! callback lets it destroy this object and remove its entry in the
//! thread's request_threads or callback_threads map (after resetting
//! m_cleanup so the destructor does not try to access it). But if this
//! m_cleanup_it so the destructor does not try to access it). But if this
//! object gets destroyed before the Connection, there's no need to run the
//! cleanup function and the destructor will unregister it.
std::optional<CleanupIt> m_cleanup;
std::optional<CleanupIt> m_cleanup_it;
};

template <>
Expand Down Expand Up @@ -379,7 +379,7 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
}

// Handler for the connection getting destroyed before this client object.
auto cleanup = m_context.connection->addSyncCleanup([this]() {
auto cleanup_it = m_context.connection->addSyncCleanup([this]() {
// Release client capability by move-assigning to temporary.
{
typename Interface::Client(std::move(self().m_client));
Expand All @@ -402,11 +402,11 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
// The first case is handled here when m_context.connection is not null. The
// second case is handled by the cleanup function, which sets m_context.connection to
// null so nothing happens here.
m_context.cleanup.emplace_front([this, destroy_connection, cleanup]{
m_context.cleanup_fns.emplace_front([this, destroy_connection, cleanup_it]{
if (m_context.connection) {
// Remove cleanup callback so it doesn't run and try to access
// this object after it's already destroyed.
m_context.connection->removeSyncCleanup(cleanup);
m_context.connection->removeSyncCleanup(cleanup_it);

// If the capnp interface defines a destroy method, call it to destroy
// the remote object, waiting for it to be deleted server side. If the
Expand Down Expand Up @@ -437,9 +437,7 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
template <typename Interface, typename Impl>
ProxyClientBase<Interface, Impl>::~ProxyClientBase() noexcept
{
for (auto& cleanup : m_context.cleanup) {
cleanup();
}
CleanupRun(m_context.cleanup_fns);
}

template <typename Interface, typename Impl>
Expand Down Expand Up @@ -476,14 +474,12 @@ ProxyServerBase<Interface, Impl>::~ProxyServerBase()
// connection is broken). Probably some refactoring of the destructor
// and invokeDestroy function is possible to make this cleaner and more
// consistent.
m_context.connection->addAsyncCleanup([impl=std::move(m_impl), c=std::move(m_context.cleanup)]() mutable {
m_context.connection->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(m_context.cleanup_fns)]() mutable {
impl.reset();
for (auto& cleanup : c) {
cleanup();
}
CleanupRun(fns);
});
}
assert(m_context.cleanup.size() == 0);
assert(m_context.cleanup_fns.size() == 0);
std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
m_context.connection->m_loop.removeClient(lock);
}
Expand All @@ -509,10 +505,7 @@ template <typename Interface, typename Impl>
void ProxyServerBase<Interface, Impl>::invokeDestroy()
{
m_impl.reset();
for (auto& cleanup : m_context.cleanup) {
cleanup();
}
m_context.cleanup.clear();
CleanupRun(m_context.cleanup_fns);
}

using ConnThreads = std::map<Connection*, ProxyClient<Thread>>;
Expand Down
12 changes: 10 additions & 2 deletions include/mp/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,19 @@ struct ProxyType;
using CleanupList = std::list<std::function<void()>>;
using CleanupIt = typename CleanupList::iterator;

inline void CleanupRun(CleanupList& fns) {
while (!fns.empty()) {
auto fn = std::move(fns.front());
fns.pop_front();
fn();
}
}

//! Context data associated with proxy client and server classes.
struct ProxyContext
{
Connection* connection;
std::list<std::function<void()>> cleanup;
CleanupList cleanup_fns;

ProxyContext(Connection* connection) : connection(connection) {}
};
Expand Down Expand Up @@ -147,7 +155,7 @@ struct ProxyServerBase : public virtual Interface_::Server
//! state can be destroyed without blocking, because ProxyServer destructors are
//! called from the EventLoop thread, and if they block, it could deadlock the
//! program. One way to do avoid blocking is to clean up the state by pushing
//! cleanup callbacks to the m_context.cleanup list, which run after the server
//! cleanup callbacks to the m_context.cleanup_fns list, which run after the server
//! m_impl object is destroyed on the same thread destroying it (which will
//! either be an IPC worker thread if the ProxyServer is being explicitly
//! destroyed by a client calling a destroy() method with a Context argument and
Expand Down
19 changes: 11 additions & 8 deletions src/mp/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,9 @@ std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex,
// destructor unregisters the cleanup.

// Connection is being destroyed before thread client is, so reset
// thread client m_cleanup member so thread client destructor does not
// thread client m_cleanup_it member so thread client destructor does not
// try unregister this callback after connection is destroyed.
thread->second.m_cleanup.reset();
thread->second.m_cleanup_it.reset();
// Remove connection pointer about to be destroyed from the map
std::unique_lock<std::mutex> lock(mutex);
threads.erase(thread);
Expand All @@ -295,16 +295,19 @@ std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex,

ProxyClient<Thread>::~ProxyClient()
{
if (m_cleanup) {
m_context.connection->removeSyncCleanup(*m_cleanup);
// If thread is being destroyed before connection is destroyed, remove the
// cleanup callback that was registered to handle the connection being
// destroyed before the thread being destroyed.
if (m_cleanup_it) {
m_context.connection->removeSyncCleanup(*m_cleanup_it);
}
}

void ProxyClient<Thread>::setCleanup(std::function<void()> cleanup)
void ProxyClient<Thread>::setCleanup(std::function<void()> fn)
{
assert(cleanup);
assert(!m_cleanup);
m_cleanup = m_context.connection->addSyncCleanup(cleanup);
assert(fn);
assert(!m_cleanup_it);
m_cleanup_it = m_context.connection->addSyncCleanup(fn);
}

ProxyServer<Thread>::ProxyServer(ThreadContext& thread_context, std::thread&& thread)
Expand Down
2 changes: 1 addition & 1 deletion test/mp/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ KJ_TEST("Call FooInterface methods")
thread.join();

bool destroyed = false;
foo->m_context.cleanup.emplace_front([&destroyed]{ destroyed = true; });
foo->m_context.cleanup_fns.emplace_front([&destroyed]{ destroyed = true; });
foo.reset();
KJ_EXPECT(destroyed);
}
Expand Down