Skip to content

Commit b66acbb

Browse files
committed
mod: server refactoring read-process-send
1 parent 664e821 commit b66acbb

File tree

1 file changed

+75
-3
lines changed

1 file changed

+75
-3
lines changed

src/server.h

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,9 @@
1313
#include "SerialTransport.h"
1414

1515
#define MAX_CALLBACKS 100
16+
#define RPC_BUFFER_SIZE 1024
1617

1718
class RPCServer {
18-
RpcDecoder<>* decoder = nullptr;
19-
RpcFunctionDispatcher<MAX_CALLBACKS> dispatcher;
2019

2120
public:
2221
RPCServer(ITransport& t) : decoder(&RpcDecoderManager<>::getDecoder(t)) {}
@@ -33,10 +32,83 @@ class RPCServer {
3332

3433
void run() {
3534
decoder->decode();
36-
decoder->process_requests(dispatcher);
35+
read_rpc();
36+
process_request();
37+
send_response();
3738
//delay(1);
3839
}
3940

41+
protected:
42+
void read_rpc() {
43+
_rpc_size = decoder->read_rpc(_rpc_buffer, RPC_BUFFER_SIZE);
44+
}
45+
46+
void process_request() {
47+
if (_rpc_size == 0) return;
48+
49+
MsgPack::Unpacker unpacker;
50+
51+
unpacker.clear();
52+
if (!unpacker.feed(_rpc_buffer, _rpc_size)) {
53+
_rpc_size = 0; // Reset size on error
54+
return; // Error in unpacking
55+
}
56+
57+
int msg_type;
58+
int msg_id;
59+
MsgPack::str_t method;
60+
MsgPack::arr_size_t req_size;
61+
62+
if (!unpacker.deserialize(req_size, msg_type)) {
63+
reset_rpc();
64+
return; // Header not unpackable
65+
}
66+
67+
if (msg_type == CALL_MSG && req_size.size() == REQUEST_SIZE) {
68+
if (!unpacker.deserialize(msg_id, method)) {
69+
reset_rpc();
70+
return; // Method not unpackable
71+
}
72+
} else if (msg_type == NOTIFY_MSG && req_size.size() == NOTIFY_SIZE) {
73+
if (!unpacker.deserialize(method)) {
74+
reset_rpc();
75+
return; // Method not unpackable
76+
}
77+
} else {
78+
reset_rpc();
79+
return; // Invalid request size/type
80+
}
81+
82+
_rpc_type = msg_type;
83+
84+
MsgPack::arr_size_t resp_size(RESPONSE_SIZE);
85+
res_packer.clear();
86+
if (msg_type == CALL_MSG) res_packer.serialize(resp_size, RESP_MSG, msg_id);
87+
88+
dispatcher.call(method, unpacker, res_packer);
89+
reset_rpc();
90+
91+
}
92+
93+
void send_response() {
94+
if (res_packer.size() > 0) {
95+
decoder->send(reinterpret_cast<const uint8_t*>(res_packer.data()), res_packer.size());
96+
}
97+
}
98+
99+
private:
100+
RpcDecoder<>* decoder = nullptr;
101+
RpcFunctionDispatcher<MAX_CALLBACKS> dispatcher;
102+
uint8_t _rpc_buffer[RPC_BUFFER_SIZE];
103+
size_t _rpc_size = 0;
104+
uint8_t _rpc_type = NO_MSG;
105+
MsgPack::Packer res_packer;
106+
107+
void reset_rpc() {
108+
_rpc_size = 0;
109+
_rpc_type = NO_MSG;
110+
}
111+
40112
};
41113

42114
#endif //RPCLITE_SERVER_H

0 commit comments

Comments
 (0)