Skip to content

Commit

Permalink
Merge pull request #24
Browse files Browse the repository at this point in the history
Run only one libuv event loop for I/O and move processing to worker threads
  • Loading branch information
uatuko authored Feb 2, 2024
2 parents 9a519af + 82b6e30 commit 30e3811
Show file tree
Hide file tree
Showing 18 changed files with 562 additions and 314 deletions.
4 changes: 2 additions & 2 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ A gRPC server capable of serving multiple clients.

Constructs a new server instance.

1. Constructs a server with `n` workers in the worker pool. If `n` is `0`, the worker pool will have one worker.
1. Constructs a server with `n` worker threads (in _addition_ to the I/O thread). If `n` is `0`, all requests will be processed in the I/O thread.

> 💡 Number of workers in the pool can impact throughput (i.e. more workers will not always increase throughput).
> 💡 Number of worker threads can impact throughput and should be tuned for your use-case (i.e. more workers will not always increase throughput).
#### add

Expand Down
11 changes: 7 additions & 4 deletions lib/grpcxx/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,32 @@ target_sources(grpcxx
conn.cpp
context.cpp
message.cpp
pool.cpp
reader.cpp
request.cpp
scheduler.cpp
server.cpp
worker.cpp
writer.cpp
PUBLIC
FILE_SET headers TYPE HEADERS
FILES
context.h
fixed_string.h
pool.h
rpc.h
scheduler.h
server.h
service.h
status.h
worker.h
task.h
PRIVATE
FILE_SET private_headers TYPE HEADERS
FILES
conn.h
coroutine.h
message.h
reader.h
request.h
response.h
writer.h
)

# h2 sources
Expand Down
115 changes: 29 additions & 86 deletions lib/grpcxx/conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,57 +2,33 @@

namespace grpcxx {
namespace detail {
conn::conn(uv_tcp_t *handle) noexcept :
_buf(), _eos(false), _h(nullptr), _handle(handle), _reqs(), _session(), _streams() {
_handle->data = this;

uv_read_start(
reinterpret_cast<uv_stream_t *>(_handle),
[](uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
auto *c = static_cast<conn *>(handle->data);
*buf = uv_buf_init(c->_buf.data(), c->_buf.capacity());
},
[](uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
if (nread <= 0) {
if (nread < 0) {
uv_close(reinterpret_cast<uv_handle_t *>(stream), close_cb);
}

return;
}

auto *c = static_cast<conn *>(stream->data);

try {
c->read(nread);
} catch (...) {
uv_close(reinterpret_cast<uv_handle_t *>(stream), close_cb);
return;
}
});
}
conn::conn(uv_stream_t *stream) : _handle(new uv_tcp_t{}, deleter{}) {
_buffer.reserve(1024); // FIXME: make size configurable
uv_tcp_init(stream->loop, _handle.get());

conn::requests_t conn::await_resume() noexcept {
_h = nullptr;

if (_reqs.empty()) {
return {};
if (auto r = uv_accept(stream, reinterpret_cast<uv_stream_t *>(_handle.get())); r != 0) {
throw std::runtime_error(std::string("Failed to accept connection: ") + uv_strerror(r));
}
}

auto reqs = std::move(_reqs);
_reqs = requests_t();

return reqs;
void conn::buffer() noexcept {
for (auto chunk = _session.pending(); chunk.size() > 0; chunk = _session.pending()) {
_buffer.append(chunk);
}
}

void conn::close_cb(uv_handle_t *handle) {
auto *c = static_cast<conn *>(handle->data);
c->_eos = true;
c->resume();
task conn::flush() {
if (_buffer.empty()) {
co_return;
}

co_await write(_buffer);
_buffer.clear();
}

void conn::read(std::size_t n) {
for (auto &ev : _session.read({_buf.data(), n})) {
conn::requests_t conn::read(std::string_view bytes) {
requests_t reqs;
for (auto &ev : _session.read(bytes)) {
if (ev.stream_id <= 0) {
continue;
}
Expand All @@ -72,7 +48,7 @@ void conn::read(std::size_t n) {
}

case h2::event::type_t::stream_end: {
_reqs.push_front(std::move(req));
reqs.push_front(std::move(req));
_streams.erase(ev.stream_id);
break;
}
Expand All @@ -87,39 +63,13 @@ void conn::read(std::size_t n) {
}
}

write();

if (!_reqs.empty()) {
resume();
}
}
buffer();

void conn::resume() noexcept {
if (_h) {
_h.resume();
}
return reqs;
}

void conn::write() noexcept {
auto *bytes = new std::string();
for (auto chunk = _session.pending(); chunk.size() > 0; chunk = _session.pending()) {
bytes->append(chunk);
}

if (bytes->empty()) {
return;
}

auto buf = uv_buf_init(const_cast<char *>(bytes->data()), bytes->size());
auto *req = new uv_write_t();
req->data = bytes;

if (auto r = uv_write(req, reinterpret_cast<uv_stream_t *>(_handle), &buf, 1, write_cb);
r != 0) {
// FIXME: error handling
delete bytes;
delete req;
}
reader conn::reader() const noexcept {
return {std::reinterpret_pointer_cast<uv_stream_t>(_handle)};
}

void conn::write(response resp) noexcept {
Expand All @@ -131,7 +81,7 @@ void conn::write(response resp) noexcept {
});

_session.data(resp.id(), resp.bytes());
write();
buffer();

const auto &status = resp.status();
_session.trailers(
Expand All @@ -141,18 +91,11 @@ void conn::write(response resp) noexcept {
{"grpc-status-details-bin", status.details()},
});

write();
buffer();
}

void conn::write_cb(uv_write_t *req, int status) {
if (status != 0) {
// TODO: error handling
}

auto *str = static_cast<std::string *>(req->data);
delete str;

delete req;
writer conn::write(std::string_view bytes) const noexcept {
return {std::reinterpret_pointer_cast<uv_stream_t>(_handle), bytes};
}
} // namespace detail
} // namespace grpcxx
52 changes: 24 additions & 28 deletions lib/grpcxx/conn.h
Original file line number Diff line number Diff line change
@@ -1,60 +1,56 @@
#pragma once

#include <coroutine>
#include <forward_list>
#include <memory>
#include <string>
#include <string_view>
#include <unordered_map>

#include <uv.h>

#include "h2/session.h"

#include "reader.h"
#include "request.h"
#include "response.h"
#include "task.h"
#include "writer.h"

namespace grpcxx {
namespace detail {
class conn {
public:
using buffer_t = std::string;
using handle_t = std::shared_ptr<uv_tcp_t>;
using requests_t = std::forward_list<request>;
using streams_t = std::unordered_map<int32_t, request>;

conn(const conn &) = delete;
conn(uv_tcp_t *handle) noexcept;
conn(uv_stream_t *stream);

operator bool() const noexcept { return !_eos; }
task flush();

bool await_ready() const noexcept { return _eos || !_reqs.empty(); }
void await_suspend(std::coroutine_handle<> h) noexcept { _h = h; }
requests_t await_resume() noexcept;
requests_t read(std::string_view bytes);
class reader reader() const noexcept;

void read(std::size_t n);
void write(response resp) noexcept;

private:
template <std::size_t N> class buffer_t {
public:
constexpr char *data() noexcept { return &_data[0]; }
constexpr std::size_t capacity() const noexcept { return N; }

private:
char _data[N];
struct deleter {
void operator()(void *handle) const noexcept {
uv_close(static_cast<uv_handle_t *>(handle), [](uv_handle_t *handle) {
delete reinterpret_cast<uv_tcp_t *>(handle);
});
}
};

static void close_cb(uv_handle_t *handle);
static void write_cb(uv_write_t *req, int status);

void resume() noexcept;
void write() noexcept;

std::coroutine_handle<> _h;

buffer_t<1024> _buf; // FIXME: make size configurable
bool _eos;
requests_t _reqs;
h2::session _session;
streams_t _streams;
void buffer() noexcept;
writer write(std::string_view bytes) const noexcept;

uv_tcp_t *_handle;
buffer_t _buffer;
handle_t _handle;
h2::session _session;
streams_t _streams;
};
} // namespace detail
} // namespace grpcxx
4 changes: 2 additions & 2 deletions lib/grpcxx/coroutine.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ struct coroutine {
try {
std::rethrow_exception(std::current_exception());
} catch (const std::exception &e) {
std::fprintf(stderr, "Exception: %s\n", e.what());
std::fprintf(stderr, "[warn] %s\n", e.what());
} catch (...) {
std::fprintf(stderr, "Unknown exception\n");
std::fprintf(stderr, "[warn] Unknown exception\n");
}
}
};
Expand Down
25 changes: 0 additions & 25 deletions lib/grpcxx/pool.cpp

This file was deleted.

50 changes: 0 additions & 50 deletions lib/grpcxx/pool.h

This file was deleted.

Loading

0 comments on commit 30e3811

Please sign in to comment.