Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions include/mp/proxy-io.h
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,41 @@ struct ThreadContext
bool loop_thread = false;
};

//! Given stream file descriptor, make a new ProxyClient object to send requests
//! over the stream. Also create a new Connection object embedded in the
//! client that is freed when the client is closed.
template <typename InitInterface>
std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int fd, bool add_client)
{
typename InitInterface::Client init_client(nullptr);
std::unique_ptr<Connection> connection;
loop.sync([&] {
auto stream =
loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
connection = std::make_unique<Connection>(loop, kj::mv(stream), add_client);
init_client = connection->m_rpc_system.bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
Connection* connection_ptr = connection.get();
connection->onDisconnect([&loop, connection_ptr] {
loop.log() << "IPC client: unexpected network disconnect.";
delete connection_ptr;
});
});
return std::make_unique<ProxyClient<InitInterface>>(
kj::mv(init_client), connection.release(), /* destroy_connection= */ true);
}

//! Given stream and a callback to construct a new ProxyServer object that
//! handles requests from the stream, create a new Connection callback, pass it
//! to the callback, use the returned ProxyServer to handle requests, and delete
//! the proxyserver if the connection is disconnected.
//! This should be called from the event loop thread.
void ServeStream(EventLoop& loop,
kj::Own<kj::AsyncIoStream>&& stream,
std::function<capnp::Capability::Client(Connection&)> make_server);

//! Same as above but accept file descriptor rather than stream object.
void ServeStream(EventLoop& loop, int fd, std::function<capnp::Capability::Client(Connection&)> make_server);

extern thread_local ThreadContext g_thread_context;

} // namespace mp
Expand Down
18 changes: 18 additions & 0 deletions include/mp/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <tuple>
#include <type_traits>
#include <utility>
#include <vector>

namespace mp {

Expand Down Expand Up @@ -360,6 +361,23 @@ std::string ThreadName(const char* exe_name);
//! errors in python unit tests.
std::string LogEscape(const kj::StringTree& string);

//! Callback type used by SpawnProcess below.
using FdToArgsFn = std::function<std::vector<std::string>(int fd)>;

//! Spawn a new process that communicates with the current process over a socket
//! pair. Returns pid through an output argument, and file descriptor for the
//! local side of the socket. Invokes fd_to_args callback with the remote file
//! descriptor number which returns the command line arguments that should be
//! used to execute the process, and which should have the remote file
//! descriptor embedded in whatever format the child process expects.
int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args);

//! Call execvp with vector args.
void ExecProcess(const std::vector<std::string>& args);

//! Wait for a process to exit and return its exit code.
int WaitProcess(int pid);

inline char* CharCast(char* c) { return c; }
inline char* CharCast(unsigned char* c) { return (char*)c; }
inline const char* CharCast(const char* c) { return c; }
Expand Down
20 changes: 19 additions & 1 deletion src/mp/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,28 @@ kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)

std::atomic<int> server_reqs{0};


std::string LongThreadName(const char* exe_name)
{
return g_thread_context.thread_name.empty() ? ThreadName(exe_name) : g_thread_context.thread_name;
}

void ServeStream(EventLoop& loop,
kj::Own<kj::AsyncIoStream>&& stream,
std::function<capnp::Capability::Client(Connection&)> make_server)
{
loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), make_server);
auto it = loop.m_incoming_connections.begin();
it->onDisconnect([&loop, it] {
loop.log() << "IPC server: socket disconnected.";
loop.m_incoming_connections.erase(it);
});
}

void ServeStream(EventLoop& loop, int fd, std::function<capnp::Capability::Client(Connection&)> make_server)
{
ServeStream(loop,
loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
std::move(make_server));
}

} // namespace mp
65 changes: 64 additions & 1 deletion src/mp/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,32 @@
#include <mp/util.h>

#include <kj/array.h>
#include <mp/proxy.h>
#include <pthread.h>
#include <sstream>
#include <stdio.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/un.h>
#include <sys/wait.h>
#include <syscall.h>
#include <unistd.h>

namespace mp {
namespace {

//! Return highest possible file descriptor.
size_t MaxFd()
{
struct rlimit nofile;
if (getrlimit(RLIMIT_NOFILE, &nofile) == 0) {
return nofile.rlim_cur - 1;
} else {
return 1023;
}
}

} // namespace

std::string ThreadName(const char* exe_name)
{
Expand Down Expand Up @@ -54,4 +72,49 @@ std::string LogEscape(const kj::StringTree& string)
return result;
}

int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args)
{
int fds[2];
if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) != 0) {
throw std::system_error(errno, std::system_category());
}

pid = fork();
if (close(fds[pid ? 0 : 1]) != 0) {
throw std::system_error(errno, std::system_category());
}
if (!pid) {
int maxFd = MaxFd();
for (int fd = 3; fd < maxFd; ++fd) {
if (fd != fds[0]) {
close(fd);
}
}
ExecProcess(fd_to_args(fds[0]));
}
return fds[1];
}

void ExecProcess(const std::vector<std::string>& args)
{
std::vector<char*> argv;
for (const auto& arg : args) {
argv.push_back(const_cast<char*>(arg.c_str()));
}
argv.push_back(nullptr);
if (execvp(argv[0], argv.data()) != 0) {
perror("execlp failed");
_exit(1);
}
}

int WaitProcess(int pid)
{
int status;
if (::waitpid(pid, &status, 0 /* options */) != pid) {
throw std::system_error(errno, std::system_category());
}
return status;
}

} // namespace mp