Skip to content

Commit d1493b4

Browse files
committed
Update rpclib to version 2.3.0
Signed-off-by: iabdalkader <i.abdalkader@gmail.com>
1 parent fcc63d9 commit d1493b4

21 files changed

+430
-44
lines changed

Diff for: libraries/rpclib/src/rpc/client.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ class client {
8888
//!
8989
//! \param func_name The name of the notification to call.
9090
//! \param args The arguments to pass to the function.
91-
//! \tparam Args THe types of the arguments.
91+
//! \tparam Args The types of the arguments.
9292
//!
9393
//! \note This function returns immediately (possibly before the
9494
//! notification is written to the socket).
@@ -144,4 +144,4 @@ class client {
144144
};
145145
}
146146

147-
//#include "rpc/client.inl"
147+
#include "rpc/client.inl"

Diff for: libraries/rpclib/src/rpc/client.inl

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ client::async_call(std::string const &func_name, Args... args) {
4747
//! \param args The arguments to pass to the function.
4848
//! \note This function returns when the notification is written to the
4949
//! socket.
50-
//! \tparam Args THe types of the arguments.
50+
//! \tparam Args The types of the arguments.
5151
template <typename... Args>
5252
void client::send(std::string const &func_name, Args... args) {
5353
RPCLIB_CREATE_LOG_CHANNEL(client)

Diff for: libraries/rpclib/src/rpc/detail/async_writer.h

+29-21
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,27 @@ class async_writer : public std::enable_shared_from_this<async_writer> {
2323
RPCLIB_ASIO::ip::tcp::socket socket)
2424
: socket_(std::move(socket)), write_strand_(*io), exit_(false) {}
2525

26+
void close() {
27+
exit_ = true;
28+
29+
auto self = shared_from_this();
30+
write_strand_.post([this, self]() {
31+
LOG_INFO("Closing socket");
32+
std::error_code e;
33+
socket_.shutdown(
34+
RPCLIB_ASIO::ip::tcp::socket::shutdown_both, e);
35+
if (e) {
36+
LOG_WARN("std::system_error during socket shutdown. "
37+
"Code: {}. Message: {}", e.value(), e.message());
38+
}
39+
socket_.close();
40+
});
41+
}
42+
43+
bool is_closed() const {
44+
return exit_.load();
45+
}
46+
2647
void do_write() {
2748
if (exit_) {
2849
return;
@@ -46,20 +67,6 @@ class async_writer : public std::enable_shared_from_this<async_writer> {
4667
} else {
4768
LOG_ERROR("Error while writing to socket: {}", ec);
4869
}
49-
50-
if (exit_) {
51-
LOG_INFO("Closing socket");
52-
try {
53-
socket_.shutdown(
54-
RPCLIB_ASIO::ip::tcp::socket::shutdown_both);
55-
}
56-
catch (std::system_error &e) {
57-
(void)e;
58-
LOG_WARN("std::system_error during socket shutdown. "
59-
"Code: {}. Message: {}", e.code(), e.what());
60-
}
61-
socket_.close();
62-
}
6370
}));
6471
}
6572

@@ -72,23 +79,24 @@ class async_writer : public std::enable_shared_from_this<async_writer> {
7279
do_write();
7380
}
7481

75-
friend class rpc::client;
82+
RPCLIB_ASIO::ip::tcp::socket& socket() {
83+
return socket_;
84+
}
7685

7786
protected:
7887
template <typename Derived>
7988
std::shared_ptr<Derived> shared_from_base() {
8089
return std::static_pointer_cast<Derived>(shared_from_this());
8190
}
8291

83-
protected:
92+
RPCLIB_ASIO::strand& write_strand() {
93+
return write_strand_;
94+
}
95+
96+
private:
8497
RPCLIB_ASIO::ip::tcp::socket socket_;
8598
RPCLIB_ASIO::strand write_strand_;
8699
std::atomic_bool exit_{false};
87-
bool exited_ = false;
88-
std::mutex m_exit_;
89-
std::condition_variable cv_exit_;
90-
91-
private:
92100
std::deque<RPCLIB_MSGPACK::sbuffer> write_queue_;
93101
RPCLIB_CREATE_LOG_CHANNEL(async_writer)
94102
};

Diff for: libraries/rpclib/src/rpc/detail/client_error.cc

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#include "format.h"
2+
3+
#include "rpc/detail/client_error.h"
4+
5+
namespace rpc {
6+
namespace detail {
7+
8+
client_error::client_error(code c, const std::string &msg)
9+
: what_(RPCLIB_FMT::format("client error C{0:04x}: {1}",
10+
static_cast<uint16_t>(c), msg)) {}
11+
12+
const char *client_error::what() const noexcept { return what_.c_str(); }
13+
}
14+
}
15+

Diff for: libraries/rpclib/src/rpc/detail/log.h

+9-1
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,19 @@ class logger {
8383
std::stringstream ss;
8484
timespec now_t = {};
8585
clock_gettime(CLOCK_REALTIME, &now_t);
86+
#if __GNUC__ >= 5
8687
ss << std::put_time(
8788
std::localtime(reinterpret_cast<time_t *>(&now_t.tv_sec)),
8889
"%F %T")
89-
<< RPCLIB_FMT::format(
90+
#else
91+
char mltime[128];
92+
strftime(mltime, sizeof(mltime), "%c %Z",
93+
std::localtime(reinterpret_cast<time_t *>(&now_t.tv_sec)));
94+
ss << mltime
95+
#endif
96+
<< RPCLIB_FMT::format(
9097
".{:03}", round(static_cast<double>(now_t.tv_nsec) / 1.0e6));
98+
9199
return ss.str();
92100
}
93101
#endif

Diff for: libraries/rpclib/src/rpc/detail/response.cc

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
#include "rpc/detail/response.h"
2+
#include "rpc/detail/log.h"
3+
#include "rpc/detail/util.h"
4+
5+
#include <assert.h>
6+
7+
namespace rpc {
8+
namespace detail {
9+
10+
response::response() : id_(0), error_(), result_(), empty_(false) {}
11+
12+
response::response(RPCLIB_MSGPACK::object_handle o) : response() {
13+
response_type r;
14+
o.get().convert(r);
15+
// TODO: check protocol [t.szelei 2015-12-30]
16+
id_ = std::get<1>(r);
17+
auto &&error_obj = std::get<2>(r);
18+
if (!error_obj.is_nil()) {
19+
error_ = std::make_shared<RPCLIB_MSGPACK::object_handle>();
20+
*error_ = RPCLIB_MSGPACK::clone(error_obj);
21+
}
22+
result_ = std::make_shared<RPCLIB_MSGPACK::object_handle>(
23+
std::get<3>(r), std::move(o.zone()));
24+
}
25+
26+
RPCLIB_MSGPACK::sbuffer response::get_data() const {
27+
RPCLIB_MSGPACK::sbuffer data;
28+
response_type r(1, id_, error_ ? error_->get() : RPCLIB_MSGPACK::object(),
29+
result_ ? result_->get() : RPCLIB_MSGPACK::object());
30+
RPCLIB_MSGPACK::pack(data, r);
31+
return data;
32+
}
33+
34+
uint32_t response::get_id() const { return id_; }
35+
36+
std::shared_ptr<RPCLIB_MSGPACK::object_handle> response::get_error() const { return error_; }
37+
38+
std::shared_ptr<RPCLIB_MSGPACK::object_handle> response::get_result() const {
39+
return result_;
40+
}
41+
42+
response response::empty() {
43+
response r;
44+
r.empty_ = true;
45+
return r;
46+
}
47+
48+
bool response::is_empty() const { return empty_; }
49+
50+
void response::capture_result(RPCLIB_MSGPACK::object_handle &r) {
51+
if (!result_) {
52+
result_ = std::make_shared<RPCLIB_MSGPACK::object_handle>();
53+
}
54+
result_->set(std::move(r).get());
55+
}
56+
57+
void response::capture_error(RPCLIB_MSGPACK::object_handle &e) {
58+
if (!error_) {
59+
error_ = std::shared_ptr<RPCLIB_MSGPACK::object_handle>();
60+
}
61+
error_->set(std::move(e).get());
62+
}
63+
64+
} /* detail */
65+
} /* rpc */

Diff for: libraries/rpclib/src/rpc/detail/server_session.h

+6-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#ifndef SESSION_H_5KG6ZMAB
44
#define SESSION_H_5KG6ZMAB
55

6+
#include "asio.hpp"
67
#include <memory>
78
#include <vector>
89

@@ -21,7 +22,9 @@ namespace detail {
2122

2223
class server_session : public async_writer {
2324
public:
24-
server_session(server *srv, std::shared_ptr<dispatcher> disp, bool suppress_exceptions);
25+
server_session(server *srv, RPCLIB_ASIO::io_service *io,
26+
RPCLIB_ASIO::ip::tcp::socket socket,
27+
std::shared_ptr<dispatcher> disp, bool suppress_exceptions);
2528
void start();
2629

2730
void close();
@@ -31,6 +34,8 @@ class server_session : public async_writer {
3134

3235
private:
3336
server* parent_;
37+
RPCLIB_ASIO::io_service *io_;
38+
RPCLIB_ASIO::strand read_strand_;
3439
std::shared_ptr<dispatcher> disp_;
3540
RPCLIB_MSGPACK::unpacker pac_;
3641
RPCLIB_MSGPACK::sbuffer output_buf_;

Diff for: libraries/rpclib/src/rpc/dispatcher.cc

+143
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
#include "rpc/dispatcher.h"
2+
#include "format.h"
3+
#include "rpc/detail/client_error.h"
4+
#include "rpc/this_handler.h"
5+
6+
namespace rpc {
7+
namespace detail {
8+
9+
using detail::response;
10+
11+
void dispatcher::dispatch(RPCLIB_MSGPACK::sbuffer const &msg) {
12+
auto unpacked = RPCLIB_MSGPACK::unpack(msg.data(), msg.size());
13+
dispatch(unpacked.get());
14+
}
15+
16+
response dispatcher::dispatch(RPCLIB_MSGPACK::object const &msg,
17+
bool suppress_exceptions) {
18+
switch (msg.via.array.size) {
19+
case 3:
20+
return dispatch_notification(msg, suppress_exceptions);
21+
case 4:
22+
return dispatch_call(msg, suppress_exceptions);
23+
default:
24+
return response::empty();
25+
}
26+
}
27+
28+
response dispatcher::dispatch_call(RPCLIB_MSGPACK::object const &msg,
29+
bool suppress_exceptions) {
30+
call_t the_call;
31+
msg.convert(the_call);
32+
33+
// TODO: proper validation of protocol (and responding to it)
34+
// auto &&type = std::get<0>(the_call);
35+
// assert(type == 0);
36+
37+
auto &&id = std::get<1>(the_call);
38+
auto &&name = std::get<2>(the_call);
39+
auto &&args = std::get<3>(the_call);
40+
41+
auto it_func = funcs_.find(name);
42+
43+
if (it_func != end(funcs_)) {
44+
LOG_DEBUG("Dispatching call to '{}'", name);
45+
try {
46+
auto result = (it_func->second)(args);
47+
return response::make_result(id, std::move(result));
48+
} catch (rpc::detail::client_error &e) {
49+
return response::make_error(
50+
id, RPCLIB_FMT::format("rpclib: {}", e.what()));
51+
} catch (std::exception &e) {
52+
if (!suppress_exceptions) {
53+
throw;
54+
}
55+
return response::make_error(
56+
id,
57+
RPCLIB_FMT::format("rpclib: function '{0}' (called with {1} "
58+
"arg(s)) "
59+
"threw an exception. The exception "
60+
"contained this information: {2}.",
61+
name, args.via.array.size, e.what()));
62+
} catch (rpc::detail::handler_error &) {
63+
// doing nothing, the exception was only thrown to
64+
// return immediately
65+
} catch (rpc::detail::handler_spec_response &) {
66+
// doing nothing, the exception was only thrown to
67+
// return immediately
68+
} catch (...) {
69+
if (!suppress_exceptions) {
70+
throw;
71+
}
72+
return response::make_error(
73+
id,
74+
RPCLIB_FMT::format("rpclib: function '{0}' (called with {1} "
75+
"arg(s)) threw an exception. The exception "
76+
"is not derived from std::exception. No "
77+
"further information available.",
78+
name, args.via.array.size));
79+
}
80+
}
81+
return response::make_error(
82+
id, RPCLIB_FMT::format("rpclib: server could not find "
83+
"function '{0}' with argument count {1}.",
84+
name, args.via.array.size));
85+
}
86+
87+
response dispatcher::dispatch_notification(RPCLIB_MSGPACK::object const &msg,
88+
bool suppress_exceptions) {
89+
notification_t the_call;
90+
msg.convert(the_call);
91+
92+
// TODO: proper validation of protocol (and responding to it)
93+
// auto &&type = std::get<0>(the_call);
94+
// assert(type == static_cast<uint8_t>(request_type::notification));
95+
96+
auto &&name = std::get<1>(the_call);
97+
auto &&args = std::get<2>(the_call);
98+
99+
auto it_func = funcs_.find(name);
100+
101+
if (it_func != end(funcs_)) {
102+
LOG_DEBUG("Dispatching call to '{}'", name);
103+
try {
104+
auto result = (it_func->second)(args);
105+
} catch (rpc::detail::handler_error &) {
106+
// doing nothing, the exception was only thrown to
107+
// return immediately
108+
} catch (rpc::detail::handler_spec_response &) {
109+
// doing nothing, the exception was only thrown to
110+
// return immediately
111+
} catch (...) {
112+
if (!suppress_exceptions) {
113+
throw;
114+
}
115+
}
116+
}
117+
return response::empty();
118+
}
119+
120+
void dispatcher::enforce_arg_count(std::string const &func, std::size_t found,
121+
std::size_t expected) {
122+
using detail::client_error;
123+
if (found != expected) {
124+
throw client_error(
125+
client_error::code::wrong_arity,
126+
RPCLIB_FMT::format(
127+
"Function '{0}' was called with an invalid number of "
128+
"arguments. Expected: {1}, got: {2}",
129+
func, expected, found));
130+
}
131+
}
132+
133+
void dispatcher::enforce_unique_name(std::string const &func) {
134+
auto pos = funcs_.find(func);
135+
if (pos != end(funcs_)) {
136+
throw std::logic_error(
137+
RPCLIB_FMT::format("Function name already bound: '{}'. "
138+
"Please use unique function names", func));
139+
}
140+
}
141+
142+
}
143+
} /* rpc */

Diff for: libraries/rpclib/src/rpc/dispatcher.h

+13
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,19 @@ class dispatcher {
6060
detail::tags::nonvoid_result const &,
6161
detail::tags::nonzero_arg const &);
6262

63+
//! \brief Unbind a functor with a given name from callable functors.
64+
void unbind(std::string const &name) {
65+
funcs_.erase(name);
66+
}
67+
68+
//! \brief returns a list of all names which functors are binded to
69+
std::vector<std::string> names() const {
70+
std::vector<std::string> names;
71+
for(auto it = funcs_.begin(); it != funcs_.end(); ++it)
72+
names.push_back(it->first);
73+
return names;
74+
}
75+
6376
//! @}
6477

6578
//! \brief Processes a message that contains a call according to

0 commit comments

Comments
 (0)