diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index cc8706a7..25a63354 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -475,6 +475,41 @@ struct ThreadContext bool loop_thread = false; }; +//! Given stream file descriptor, make a new ProxyClient object to send requests +//! over the stream. Also create a new Connection object embedded in the +//! client that is freed when the client is closed. +template +std::unique_ptr> ConnectStream(EventLoop& loop, int fd, bool add_client) +{ + typename InitInterface::Client init_client(nullptr); + std::unique_ptr connection; + loop.sync([&] { + auto stream = + loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP); + connection = std::make_unique(loop, kj::mv(stream), add_client); + init_client = connection->m_rpc_system.bootstrap(ServerVatId().vat_id).castAs(); + Connection* connection_ptr = connection.get(); + connection->onDisconnect([&loop, connection_ptr] { + loop.log() << "IPC client: unexpected network disconnect."; + delete connection_ptr; + }); + }); + return std::make_unique>( + kj::mv(init_client), connection.release(), /* destroy_connection= */ true); +} + +//! Given stream and a callback to construct a new ProxyServer object that +//! handles requests from the stream, create a new Connection callback, pass it +//! to the callback, use the returned ProxyServer to handle requests, and delete +//! the proxyserver if the connection is disconnected. +//! This should be called from the event loop thread. +void ServeStream(EventLoop& loop, + kj::Own&& stream, + std::function make_server); + +//! Same as above but accept file descriptor rather than stream object. +void ServeStream(EventLoop& loop, int fd, std::function make_server); + extern thread_local ThreadContext g_thread_context; } // namespace mp diff --git a/include/mp/util.h b/include/mp/util.h index 5f672776..2f781c49 100644 --- a/include/mp/util.h +++ b/include/mp/util.h @@ -17,6 +17,7 @@ #include #include #include +#include namespace mp { @@ -360,6 +361,23 @@ std::string ThreadName(const char* exe_name); //! errors in python unit tests. std::string LogEscape(const kj::StringTree& string); +//! Callback type used by SpawnProcess below. +using FdToArgsFn = std::function(int fd)>; + +//! Spawn a new process that communicates with the current process over a socket +//! pair. Returns pid through an output argument, and file descriptor for the +//! local side of the socket. Invokes fd_to_args callback with the remote file +//! descriptor number which returns the command line arguments that should be +//! used to execute the process, and which should have the remote file +//! descriptor embedded in whatever format the child process expects. +int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args); + +//! Call execvp with vector args. +void ExecProcess(const std::vector& args); + +//! Wait for a process to exit and return its exit code. +int WaitProcess(int pid); + inline char* CharCast(char* c) { return c; } inline char* CharCast(unsigned char* c) { return (char*)c; } inline const char* CharCast(const char* c) { return c; } diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index 75a5c4a4..94a71a34 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -311,10 +311,28 @@ kj::Promise ProxyServer::makeThread(MakeThreadContext context) std::atomic server_reqs{0}; - std::string LongThreadName(const char* exe_name) { return g_thread_context.thread_name.empty() ? ThreadName(exe_name) : g_thread_context.thread_name; } +void ServeStream(EventLoop& loop, + kj::Own&& stream, + std::function make_server) +{ + loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), make_server); + auto it = loop.m_incoming_connections.begin(); + it->onDisconnect([&loop, it] { + loop.log() << "IPC server: socket disconnected."; + loop.m_incoming_connections.erase(it); + }); +} + +void ServeStream(EventLoop& loop, int fd, std::function make_server) +{ + ServeStream(loop, + loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), + std::move(make_server)); +} + } // namespace mp diff --git a/src/mp/util.cpp b/src/mp/util.cpp index fa42525b..33e08c59 100644 --- a/src/mp/util.cpp +++ b/src/mp/util.cpp @@ -5,14 +5,32 @@ #include #include -#include #include #include #include +#include +#include +#include +#include +#include #include #include namespace mp { +namespace { + +//! Return highest possible file descriptor. +size_t MaxFd() +{ + struct rlimit nofile; + if (getrlimit(RLIMIT_NOFILE, &nofile) == 0) { + return nofile.rlim_cur - 1; + } else { + return 1023; + } +} + +} // namespace std::string ThreadName(const char* exe_name) { @@ -54,4 +72,49 @@ std::string LogEscape(const kj::StringTree& string) return result; } +int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args) +{ + int fds[2]; + if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) != 0) { + throw std::system_error(errno, std::system_category()); + } + + pid = fork(); + if (close(fds[pid ? 0 : 1]) != 0) { + throw std::system_error(errno, std::system_category()); + } + if (!pid) { + int maxFd = MaxFd(); + for (int fd = 3; fd < maxFd; ++fd) { + if (fd != fds[0]) { + close(fd); + } + } + ExecProcess(fd_to_args(fds[0])); + } + return fds[1]; +} + +void ExecProcess(const std::vector& args) +{ + std::vector argv; + for (const auto& arg : args) { + argv.push_back(const_cast(arg.c_str())); + } + argv.push_back(nullptr); + if (execvp(argv[0], argv.data()) != 0) { + perror("execlp failed"); + _exit(1); + } +} + +int WaitProcess(int pid) +{ + int status; + if (::waitpid(pid, &status, 0 /* options */) != pid) { + throw std::system_error(errno, std::system_category()); + } + return status; +} + } // namespace mp