Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve server publish #126

Merged
merged 1 commit into from
Jan 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 33 additions & 7 deletions include/rest_rpc/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,45 @@ struct route_result_t {
std::string result;
};

template <typename Tuple, bool is_pub> class helper_t {
public:
helper_t(Tuple &tp) : tp_(tp) {}

void operator()() {}

private:
Tuple &tp_;
};

template <typename Tuple> class helper_t<Tuple, true> {
public:
helper_t(Tuple &tp) : tp_(tp) {}

void operator()() {
auto &arg = std::get<std::tuple_size<Tuple>::value - 1>(tp_);
msgpack_codec codec;
arg = codec.unpack<std::string>(arg.data(), arg.size());
}

private:
Tuple &tp_;
};

class router : asio::noncopyable {
public:
template <typename Function>
void register_handler(std::string const &name, Function f) {
template <bool is_pub = false, typename Function>
void register_handler(std::string const &name, Function f, bool pub = false) {
uint32_t key = MD5::MD5Hash32(name.data());
key2func_name_.emplace(key, name);
return register_nonmember_func(key, std::move(f));
return register_nonmember_func<is_pub>(key, std::move(f));
}

template <typename Function, typename Self>
template <bool is_pub = false, typename Function, typename Self>
void register_handler(std::string const &name, const Function &f,
Self *self) {
uint32_t key = MD5::MD5Hash32(name.data());
key2func_name_.emplace(key, name);
return register_member_func(key, f, self);
return register_member_func<is_pub>(key, f, self);
}

void remove_handler(std::string const &name) {
Expand Down Expand Up @@ -154,7 +178,7 @@ class router : asio::noncopyable {
result = msgpack_codec::pack_args_str(result_code::OK, r);
}

template <typename Function>
template <bool is_pub, typename Function>
void register_nonmember_func(uint32_t key, Function f) {
this->map_invokers_[key] = [f](std::weak_ptr<connection> conn,
nonstd::string_view str,
Expand All @@ -163,6 +187,7 @@ class router : asio::noncopyable {
msgpack_codec codec;
try {
auto tp = codec.unpack<args_tuple>(str.data(), str.size());
helper_t<args_tuple, is_pub>{tp}();
call(f, conn, result, std::move(tp));
} catch (std::invalid_argument &e) {
result = codec.pack_args_str(result_code::FAIL, e.what());
Expand All @@ -172,7 +197,7 @@ class router : asio::noncopyable {
};
}

template <typename Function, typename Self>
template <bool is_pub, typename Function, typename Self>
void register_member_func(uint32_t key, const Function &f, Self *self) {
this->map_invokers_[key] = [f, self](std::weak_ptr<connection> conn,
nonstd::string_view str,
Expand All @@ -181,6 +206,7 @@ class router : asio::noncopyable {
msgpack_codec codec;
try {
auto tp = codec.unpack<args_tuple>(str.data(), str.size());
helper_t<args_tuple, is_pub>{tp}();
call_member(f, self, conn, result, std::move(tp));
} catch (std::invalid_argument &e) {
result = codec.pack_args_str(result_code::FAIL, e.what());
Expand Down
8 changes: 4 additions & 4 deletions include/rest_rpc/rpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ class rpc_server : private asio::noncopyable {

void run() { io_service_pool_.run(); }

template <typename Function>
template <bool is_pub = false, typename Function>
void register_handler(std::string const &name, const Function &f) {
router_.register_handler(name, f);
router_.register_handler<is_pub>(name, f);
}

template <typename Function, typename Self>
template <bool is_pub = false, typename Function, typename Self>
void register_handler(std::string const &name, const Function &f,
Self *self) {
router_.register_handler(name, f, self);
router_.register_handler<is_pub>(name, f, self);
}

void set_conn_timeout_callback(std::function<void(int64_t)> callback) {
Expand Down
13 changes: 8 additions & 5 deletions tests/test_rest_rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,12 @@ TEST_CASE("test_client_async_call_with_timeout") {

TEST_CASE("test_client_subscribe") {
rpc_server server(9000, std::thread::hardware_concurrency());
server.register_handler("publish",
[&server](rpc_conn conn, std::string key,
std::string token, std::string val) {
server.publish(std::move(key), std::move(val));
});
server.register_handler<true>(
"publish", [&server](rpc_conn conn, std::string key, std::string token,
std::string val) {
CHECK(val == "hello subscriber");
server.publish(std::move(key), std::move(val));
});
bool stop = false;
std::thread thd([&server, &stop] {
while (!stop) {
Expand All @@ -221,6 +222,8 @@ TEST_CASE("test_client_subscribe") {
rpc_client client;
bool r = client.connect("127.0.0.1", 9000);
CHECK(r);
client.publish("key", "hello subscriber");

client.subscribe("key", [&stop](string_view data) {
std::cout << data << "\n";
CHECK_EQ(data, "hello subscriber");
Expand Down
Loading