Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for host process stdio. #5056

Merged
merged 14 commits into from
Feb 28, 2023
3 changes: 2 additions & 1 deletion include/ccf/node/host_processes_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace ccf
}

virtual void trigger_host_process_launch(
const std::vector<std::string>& args) = 0;
const std::vector<std::string>& args,
const std::vector<uint8_t>& input = {}) = 0;
};
}
8 changes: 4 additions & 4 deletions src/enclave/interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ enum AppMessage : ringbuffer::Message
};

DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(
AppMessage::launch_host_process, std::string);
AppMessage::launch_host_process, std::string, std::vector<uint8_t>);

struct LaunchHostProcessMessage
struct HostProcessArguments
{
std::vector<std::string> args;
};

DECLARE_JSON_TYPE(LaunchHostProcessMessage);
DECLARE_JSON_REQUIRED_FIELDS(LaunchHostProcessMessage, args);
DECLARE_JSON_TYPE(HostProcessArguments);
DECLARE_JSON_REQUIRED_FIELDS(HostProcessArguments, args);
217 changes: 206 additions & 11 deletions src/host/process_launcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,178 @@

namespace asynchost
{
struct ProcessPipe : public with_uv_handle<uv_pipe_t>
{
public:
ProcessPipe()
{
uv_handle.data = this;
uv_pipe_init(uv_default_loop(), &uv_handle, 0);
}
virtual ~ProcessPipe() = default;

uv_stream_t* stream()
{
return (uv_stream_t*)&uv_handle;
}

protected:
int pid = 0;
plietar marked this conversation as resolved.
Show resolved Hide resolved
};

/**
* Read the output of a process line by line and print each one to our logs.
*/
class ProcessReader : public ProcessPipe
{
public:
ProcessReader(std::string name) : name(name) {}

void start(pid_t pid)
{
this->pid = pid;

int rc = uv_read_start((uv_stream_t*)&uv_handle, on_alloc_cb, on_read_cb);
if (rc < 0)
{
LOG_FAIL_FMT("uv_read_start failed: {}", uv_strerror(rc));
close();
}
}

private:
static void on_alloc_cb(
uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
{
static_cast<ProcessReader*>(handle->data)->on_alloc(suggested_size, buf);
}

static void on_read_cb(
uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf)
{
static_cast<ProcessReader*>(handle->data)->on_read(nread, buf);
}

void on_alloc(size_t suggested_size, uv_buf_t* buf)
{
auto alloc_size = std::min<size_t>(suggested_size, 1024);
LOG_TRACE_FMT(
"Allocating {} bytes for reading from host process pid={}",
alloc_size,
pid);

buf->base = new char[suggested_size];
achamayou marked this conversation as resolved.
Show resolved Hide resolved
buf->len = alloc_size;
}

void on_read(ssize_t nread, const uv_buf_t* buf)
{
if (nread < 0)
{
LOG_DEBUG_FMT(
"ProcessReader on_read: status={} pid={} file={}",
uv_strerror(nread),
pid,
name);
// Print any trailing text which didn't have a newline
if (!buffer.empty())
{
LOG_INFO_FMT("{} from process {}: {}", name, pid, buffer);
}
close();
}
else if (nread > 0)
{
buffer.insert(buffer.end(), buf->base, buf->base + nread);
LOG_DEBUG_FMT(
"Read {} bytes from host process, total={} file={}",
nread,
buffer.size(),
name);
print_lines();
}
on_free(buf);
}

void on_free(const uv_buf_t* buf)
{
delete[] buf->base;
}

/**
* Take each line out of the buffer and print it to the logs.
*/
void print_lines()
{
auto start = buffer.begin();
while (true)
{
auto newline = std::find(start, buffer.end(), '\n');
if (newline == buffer.end())
{
break;
}

size_t count = newline - start;
std::string_view line(&*start, count);
LOG_INFO_FMT("{} from process {}: {}", name, pid, line);

// Move past the newline character so we can look for the next one.
start = newline + 1;
}
buffer.erase(buffer.begin(), start);
}

std::string name;
std::string buffer;
};

/**
* Write a byte buffer to a process' standard input.
*/
class ProcessWriter : public ProcessPipe
{
public:
ProcessWriter(std::vector<uint8_t>&& data) : buffer(std::move(data))
{
request.data = this;
}

void start(pid_t pid)
{
this->pid = pid;

LOG_DEBUG_FMT(
"Writing {} bytes to host process pid={}", buffer.size(), pid);

uv_buf_t buf = {(char*)buffer.data(), buffer.size()};
int rc =
uv_write(&request, (uv_stream_t*)&uv_handle, &buf, 1, on_write_done_cb);

if (rc < 0)
{
LOG_FAIL_FMT("uv_write failed: {}", uv_strerror(rc));
close();
}
}

private:
static void on_write_done_cb(uv_write_t* req, int status)
{
static_cast<ProcessWriter*>(req->data)->on_write_done(req, status);
}

void on_write_done(uv_write_t* req, int status)
{
LOG_DEBUG_FMT(
"Write to host process completed: status={} pid={}", status, pid);
close();
}

uv_write_t request;
std::vector<uint8_t> buffer;
};

class ProcessLauncher
{
static constexpr size_t max_processes = 8;
Expand All @@ -20,15 +192,16 @@ namespace asynchost

struct QueueEntry
{
LaunchHostProcessMessage msg;
std::vector<std::string> args;
std::vector<uint8_t> input;
std::chrono::steady_clock::time_point queued_at;
};

std::queue<QueueEntry> queued;

struct ProcessEntry
{
LaunchHostProcessMessage msg;
std::vector<std::string> args;
std::chrono::steady_clock::time_point started_at;
};

Expand All @@ -53,8 +226,7 @@ namespace asynchost
now - entry.queued_at)
.count();

auto& msg = entry.msg;
auto& args = msg.args;
const auto& args = entry.args;

std::vector<const char*> argv;
for (size_t i = 0; i < args.size(); i++)
Expand All @@ -63,30 +235,49 @@ namespace asynchost
}
argv.push_back(nullptr);

close_ptr<ProcessReader> stdout_reader("stdout");
close_ptr<ProcessReader> stderr_reader("stderr");
close_ptr<ProcessWriter> stdin_writer(std::move(entry.input));

auto handle = new uv_process_t;
handle->data = this;

uv_stdio_container_t stdio[3];
stdio[0].flags = (uv_stdio_flags)(UV_CREATE_PIPE | UV_READABLE_PIPE);
stdio[0].data.stream = stdin_writer->stream();

stdio[1].flags = (uv_stdio_flags)(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
stdio[1].data.stream = stdout_reader->stream();

stdio[2].flags = (uv_stdio_flags)(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
stdio[2].data.stream = stderr_reader->stream();

uv_process_options_t options = {};
options.file = argv.at(0);
options.args = const_cast<char**>(argv.data());
options.exit_cb = ProcessLauncher::on_process_exit;
options.stdio = stdio;
options.stdio_count = 3;

auto rc = uv_spawn(uv_default_loop(), handle, &options);

if (rc != 0)
{
LOG_FAIL_FMT("Error starting host process: {}", uv_strerror(rc));
return;
}

LOG_DEBUG_FMT(
LOG_INFO_FMT(
plietar marked this conversation as resolved.
Show resolved Hide resolved
"Launching host process: pid={} queuetime={}ms cmd={}",
handle->pid,
queue_time_ms,
fmt::join(args, " "));

stdin_writer.release()->start(handle->pid);
stdout_reader.release()->start(handle->pid);
stderr_reader.release()->start(handle->pid);

auto started_at = std::chrono::steady_clock::now();
ProcessEntry process_entry{std::move(entry.msg), started_at};
ProcessEntry process_entry{std::move(entry.args), started_at};
running.insert({handle->pid, std::move(process_entry)});
}

Expand All @@ -111,7 +302,7 @@ namespace asynchost
handle->pid,
exit_status,
runtime_ms,
fmt::join(process.msg.args, " "));
fmt::join(process.args, " "));

running.erase(handle->pid);

Expand All @@ -133,15 +324,19 @@ namespace asynchost
disp,
AppMessage::launch_host_process,
[this](const uint8_t* data, size_t size) {
auto [json] =
auto [json, input] =
ringbuffer::read_message<AppMessage::launch_host_process>(
data, size);

auto obj = nlohmann::json::parse(json);
auto msg = obj.get<LaunchHostProcessMessage>();
auto msg = obj.get<HostProcessArguments>();

auto queued_at = std::chrono::steady_clock::now();
QueueEntry entry{msg, queued_at};
QueueEntry entry{
std::move(msg.args),
std::move(input),
queued_at,
};

LOG_DEBUG_FMT("Queueing host process launch: {}", json);

Expand Down
18 changes: 16 additions & 2 deletions src/host/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,20 @@ namespace asynchost

~close_ptr()
{
raw->close();
if (raw != nullptr)
{
raw->close();
}
}

T* operator->()
{
return raw;
}

T* release()
{
return std::exchange(raw, nullptr);
}
};

Expand Down Expand Up @@ -74,7 +87,7 @@ namespace asynchost

virtual ~with_uv_handle() = default;

private:
protected:
template <typename T>
friend class close_ptr;

Expand All @@ -86,6 +99,7 @@ namespace asynchost
}
}

private:
static void on_close(uv_handle_t* handle)
{
static_cast<with_uv_handle<handle_type>*>(handle->data)->on_close();
Expand Down
Loading