Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for host process stdio. #5056

Merged
merged 14 commits into from
Feb 28, 2023
2 changes: 1 addition & 1 deletion .daily_canary
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
___ ___
(- -) (o o) | Y & +
(~ ~) (+ +) | Y & +
( V ) z O z O +---'---'
/--x-m- /--m-m---xXx--/--yy------
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Added

- New `/node/index/strategies` endpoint, which will list all indexing strategies currently installed alongside a description of how far each has progressed.
- When starting a host subprocess, applications may now pass data to its standard input. Additionally, the process' output is captured and logged by CCF (#5056).

## [4.0.0-dev4]

Expand Down
18 changes: 8 additions & 10 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -729,16 +729,14 @@ if(BUILD_TESTS)
${CMAKE_SOURCE_DIR}/tests
)

if(LONG_TESTS)
add_e2e_test(
NAME launch_host_process_test
PYTHON_SCRIPT
${CMAKE_SOURCE_DIR}/tests/js-launch-host-process/host_process.py
CONSENSUS cft
ADDITIONAL_ARGS --js-app-bundle
${CMAKE_SOURCE_DIR}/tests/js-launch-host-process
)
endif()
add_e2e_test(
NAME launch_host_process_test
PYTHON_SCRIPT
${CMAKE_SOURCE_DIR}/tests/js-launch-host-process/host_process.py
CONSENSUS cft
ADDITIONAL_ARGS --js-app-bundle
${CMAKE_SOURCE_DIR}/tests/js-launch-host-process
)

add_e2e_test(
NAME governance_test
Expand Down
3 changes: 2 additions & 1 deletion include/ccf/node/host_processes_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace ccf
}

virtual void trigger_host_process_launch(
const std::vector<std::string>& args) = 0;
const std::vector<std::string>& args,
const std::vector<uint8_t>& input = {}) = 0;
};
}
8 changes: 4 additions & 4 deletions src/enclave/interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ enum AppMessage : ringbuffer::Message
};

DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(
AppMessage::launch_host_process, std::string);
AppMessage::launch_host_process, std::string, std::vector<uint8_t>);

struct LaunchHostProcessMessage
struct HostProcessArguments
{
std::vector<std::string> args;
};

DECLARE_JSON_TYPE(LaunchHostProcessMessage);
DECLARE_JSON_REQUIRED_FIELDS(LaunchHostProcessMessage, args);
DECLARE_JSON_TYPE(HostProcessArguments);
DECLARE_JSON_REQUIRED_FIELDS(HostProcessArguments, args);
241 changes: 225 additions & 16 deletions src/host/process_launcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,180 @@

namespace asynchost
{
struct ProcessPipe : public with_uv_handle<uv_pipe_t>
{
public:
ProcessPipe()
{
uv_handle.data = this;
uv_pipe_init(uv_default_loop(), &uv_handle, 0);
}
virtual ~ProcessPipe() = default;

uv_stream_t* stream()
{
return (uv_stream_t*)&uv_handle;
}

protected:
pid_t pid = 0;
};

/**
* Read the output of a process line by line and print each one to our logs.
*/
class ProcessReader : public ProcessPipe
{
static constexpr size_t max_read_size = 16384;

public:
ProcessReader(std::string name) : name(name) {}

void start(pid_t pid)
{
this->pid = pid;

int rc = uv_read_start((uv_stream_t*)&uv_handle, on_alloc_cb, on_read_cb);
if (rc < 0)
{
LOG_FAIL_FMT("uv_read_start failed: {}", uv_strerror(rc));
close();
}
}

private:
static void on_alloc_cb(
uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
{
static_cast<ProcessReader*>(handle->data)->on_alloc(suggested_size, buf);
}

static void on_read_cb(
uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf)
{
static_cast<ProcessReader*>(handle->data)->on_read(nread, buf);
}

void on_alloc(size_t suggested_size, uv_buf_t* buf)
{
auto alloc_size = std::min<size_t>(suggested_size, max_read_size);
LOG_TRACE_FMT(
"Allocating {} bytes for reading from host process pid={}",
alloc_size,
pid);

buf->base = new char[alloc_size];
buf->len = alloc_size;
}

void on_read(ssize_t nread, const uv_buf_t* buf)
{
if (nread < 0)
{
LOG_DEBUG_FMT(
"ProcessReader on_read: status={} pid={} file={}",
uv_strerror(nread),
pid,
name);
// Print any trailing text which didn't have a newline
if (!buffer.empty())
{
LOG_INFO_FMT("{} from process {}: {}", name, pid, buffer);
}
close();
}
else if (nread > 0)
{
buffer.insert(buffer.end(), buf->base, buf->base + nread);
LOG_DEBUG_FMT(
"Read {} bytes from host process, total={} file={}",
nread,
buffer.size(),
name);
print_lines();
}
on_free(buf);
}

void on_free(const uv_buf_t* buf)
{
delete[] buf->base;
}

/**
* Take each line out of the buffer and print it to the logs.
*/
void print_lines()
{
auto start = buffer.begin();
while (true)
{
auto newline = std::find(start, buffer.end(), '\n');
if (newline == buffer.end())
{
break;
}

size_t count = newline - start;
std::string_view line(&*start, count);
LOG_INFO_FMT("{} from process {}: {}", name, pid, line);

// Move past the newline character so we can look for the next one.
start = newline + 1;
}
buffer.erase(buffer.begin(), start);
}

std::string name;
std::string buffer;
};

/**
* Write a byte buffer to a process' standard input.
*/
class ProcessWriter : public ProcessPipe
{
public:
ProcessWriter(std::vector<uint8_t>&& data) : buffer(std::move(data))
{
request.data = this;
}

void start(pid_t pid)
{
this->pid = pid;

LOG_DEBUG_FMT(
"Writing {} bytes to host process pid={}", buffer.size(), pid);

uv_buf_t buf = {(char*)buffer.data(), buffer.size()};
int rc =
uv_write(&request, (uv_stream_t*)&uv_handle, &buf, 1, on_write_done_cb);

if (rc < 0)
{
LOG_FAIL_FMT("uv_write failed: {}", uv_strerror(rc));
close();
}
}

private:
static void on_write_done_cb(uv_write_t* req, int status)
{
static_cast<ProcessWriter*>(req->data)->on_write_done(req, status);
}

void on_write_done(uv_write_t* req, int status)
{
LOG_DEBUG_FMT(
"Write to host process completed: status={} pid={}", status, pid);
close();
}

uv_write_t request;
std::vector<uint8_t> buffer;
};

class ProcessLauncher
{
static constexpr size_t max_processes = 8;
Expand All @@ -20,15 +194,16 @@ namespace asynchost

struct QueueEntry
{
LaunchHostProcessMessage msg;
std::vector<std::string> args;
std::vector<uint8_t> input;
std::chrono::steady_clock::time_point queued_at;
};

std::queue<QueueEntry> queued;

struct ProcessEntry
{
LaunchHostProcessMessage msg;
std::vector<std::string> args;
std::chrono::steady_clock::time_point started_at;
};

Expand All @@ -53,8 +228,7 @@ namespace asynchost
now - entry.queued_at)
.count();

auto& msg = entry.msg;
auto& args = msg.args;
const auto& args = entry.args;

std::vector<const char*> argv;
for (size_t i = 0; i < args.size(); i++)
Expand All @@ -63,30 +237,49 @@ namespace asynchost
}
argv.push_back(nullptr);

close_ptr<ProcessReader> stdout_reader("stdout");
close_ptr<ProcessReader> stderr_reader("stderr");
close_ptr<ProcessWriter> stdin_writer(std::move(entry.input));

auto handle = new uv_process_t;
handle->data = this;

uv_stdio_container_t stdio[3];
stdio[0].flags = (uv_stdio_flags)(UV_CREATE_PIPE | UV_READABLE_PIPE);
stdio[0].data.stream = stdin_writer->stream();

stdio[1].flags = (uv_stdio_flags)(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
stdio[1].data.stream = stdout_reader->stream();

stdio[2].flags = (uv_stdio_flags)(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
stdio[2].data.stream = stderr_reader->stream();

uv_process_options_t options = {};
options.file = argv.at(0);
options.args = const_cast<char**>(argv.data());
options.exit_cb = ProcessLauncher::on_process_exit;
options.stdio = stdio;
options.stdio_count = 3;

auto rc = uv_spawn(uv_default_loop(), handle, &options);

if (rc != 0)
{
LOG_FAIL_FMT("Error starting host process: {}", uv_strerror(rc));
return;
}

LOG_DEBUG_FMT(
LOG_INFO_FMT(
plietar marked this conversation as resolved.
Show resolved Hide resolved
"Launching host process: pid={} queuetime={}ms cmd={}",
handle->pid,
queue_time_ms,
fmt::join(args, " "));

stdin_writer.release()->start(handle->pid);
stdout_reader.release()->start(handle->pid);
stderr_reader.release()->start(handle->pid);

auto started_at = std::chrono::steady_clock::now();
ProcessEntry process_entry{std::move(entry.msg), started_at};
ProcessEntry process_entry{std::move(entry.args), started_at};
running.insert({handle->pid, std::move(process_entry)});
}

Expand All @@ -106,12 +299,24 @@ namespace asynchost
t_end - process.started_at)
.count();

LOG_DEBUG_FMT(
"Host process exited: pid={} status={} runtime={}ms cmd={}",
handle->pid,
exit_status,
runtime_ms,
fmt::join(process.msg.args, " "));
if (exit_status == 0)
{
LOG_INFO_FMT(
"Host process exited: pid={} status={} runtime={}ms cmd={}",
handle->pid,
exit_status,
runtime_ms,
fmt::join(process.args, " "));
}
else
{
LOG_FAIL_FMT(
"Host process exited: pid={} status={} runtime={}ms cmd={}",
handle->pid,
exit_status,
runtime_ms,
fmt::join(process.args, " "));
}

running.erase(handle->pid);

Expand All @@ -133,15 +338,19 @@ namespace asynchost
disp,
AppMessage::launch_host_process,
[this](const uint8_t* data, size_t size) {
auto [json] =
auto [json, input] =
ringbuffer::read_message<AppMessage::launch_host_process>(
data, size);

auto obj = nlohmann::json::parse(json);
auto msg = obj.get<LaunchHostProcessMessage>();
auto msg = obj.get<HostProcessArguments>();

auto queued_at = std::chrono::steady_clock::now();
QueueEntry entry{msg, queued_at};
QueueEntry entry{
std::move(msg.args),
std::move(input),
queued_at,
};

LOG_DEBUG_FMT("Queueing host process launch: {}", json);

Expand Down
Loading