Skip to content

Commit 80689ff

Browse files
committed
feat: RPCRequest obj. Server uses run()-local copy of the RPC for thread safety
1 parent effb162 commit 80689ff

File tree

3 files changed

+104
-33
lines changed

3 files changed

+104
-33
lines changed

src/decoder.h

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,54 @@ class RpcDecoder {
8282
return send(reinterpret_cast<const uint8_t*>(packer.data()), packer.size()) == packer.size();
8383
}
8484

85+
MsgPack::str_t fetch_method(){
86+
87+
if (_packet_type != CALL_MSG && _packet_type != NOTIFY_MSG) {
88+
return ""; // No RPC
89+
}
90+
91+
MsgPack::Unpacker unpacker;
92+
93+
unpacker.clear();
94+
if (!unpacker.feed(_raw_buffer, _packet_size)) { // feed should not fail at this point
95+
consume(_packet_size);
96+
reset_packet();
97+
return "";
98+
};
99+
100+
int msg_type;
101+
int msg_id;
102+
MsgPack::str_t method;
103+
MsgPack::arr_size_t req_size;
104+
105+
if (!unpacker.deserialize(req_size, msg_type)) {
106+
consume(_packet_size);
107+
reset_packet();
108+
return ""; // Header not unpackable
109+
}
110+
111+
if (msg_type == CALL_MSG && req_size.size() == REQUEST_SIZE) {
112+
if (!unpacker.deserialize(msg_id, method)) {
113+
consume(_packet_size);
114+
reset_packet();
115+
return ""; // Method not unpackable
116+
}
117+
} else if (msg_type == NOTIFY_MSG && req_size.size() == NOTIFY_SIZE) {
118+
if (!unpacker.deserialize(method)) {
119+
consume(_packet_size);
120+
reset_packet();
121+
return ""; // Method not unpackable
122+
}
123+
} else {
124+
consume(_packet_size);
125+
reset_packet();
126+
return ""; // Invalid request size/type
127+
}
128+
129+
return method;
130+
131+
}
132+
85133
size_t get_request(uint8_t* buffer, size_t buffer_size) {
86134

87135
if (_packet_type != CALL_MSG && _packet_type != NOTIFY_MSG) {

src/request.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#ifndef RPCLITE_REQUEST_H
2+
#define RPCLITE_REQUEST_H
3+
4+
#define RPC_BUFFER_SIZE 512
5+
6+
7+
#include "rpclite_utils.h"
8+
9+
class RPCRequest {
10+
11+
public:
12+
uint8_t buffer[RPC_BUFFER_SIZE];
13+
size_t size = 0;
14+
int type = NO_MSG;
15+
MsgPack::Packer res_packer;
16+
17+
void reset(){
18+
size = 0;
19+
type = NO_MSG;
20+
res_packer.clear();
21+
}
22+
23+
};
24+
25+
#endif RPCLITE_REQUEST_H

src/server.h

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#ifndef RPCLITE_SERVER_H
22
#define RPCLITE_SERVER_H
33

4+
#include "request.h"
45
#include "error.h"
56
#include "wrapper.h"
67
#include "dispatcher.h"
@@ -9,7 +10,6 @@
910
#include "SerialTransport.h"
1011

1112
#define MAX_CALLBACKS 100
12-
#define RPC_BUFFER_SIZE 1024
1313

1414
class RPCServer {
1515

@@ -32,30 +32,33 @@ class RPCServer {
3232
}
3333

3434
void run() {
35-
get_rpc();
36-
process_request();
37-
send_response();
38-
//delay(1);
35+
36+
RPCRequest req;
37+
if (get_rpc(req)) { // Populate local request
38+
process_request(req); // Process local data
39+
send_response(req); // Send from local data
40+
}
41+
3942
}
4043

41-
bool get_rpc() {
44+
bool get_rpc(RPCRequest& req, MsgPack::str_t tag="") {
4245
decoder->decode();
43-
if (_rpc_size > 0) return true; // Already have a request
44-
// TODO USE A QUEUE
45-
_rpc_size = decoder->get_request(_rpc_buffer, RPC_BUFFER_SIZE);
46-
return _rpc_size > 0;
46+
47+
MsgPack::str_t method = decoder->fetch_method();
48+
49+
if (method == "" || !hasTag(method, tag)) return false;
50+
51+
req.size = decoder->get_request(req.buffer, RPC_BUFFER_SIZE);
52+
return req.size > 0;
4753
}
4854

49-
void process_request(MsgPack::str_t tag="") {
50-
if (_rpc_size == 0) return;
55+
void process_request(RPCRequest& req) {
56+
if (req.size == 0) return;
5157

5258
MsgPack::Unpacker unpacker;
5359

5460
unpacker.clear();
55-
if (!unpacker.feed(_rpc_buffer, _rpc_size)) {
56-
_rpc_size = 0; // Reset size on error
57-
return; // Error in unpacking
58-
}
61+
if (!unpacker.feed(req.buffer, req.size)) return;
5962

6063
int msg_type;
6164
int msg_id;
@@ -69,43 +72,38 @@ class RPCServer {
6972

7073
if (msg_type == CALL_MSG && req_size.size() == REQUEST_SIZE) {
7174
if (!unpacker.deserialize(msg_id, method)) {
72-
reset_rpc();
75+
req.reset();
7376
return; // Method not unpackable
7477
}
7578
} else if (msg_type == NOTIFY_MSG && req_size.size() == NOTIFY_SIZE) {
7679
if (!unpacker.deserialize(method)) {
77-
reset_rpc();
80+
req.reset();
7881
return; // Method not unpackable
7982
}
8083
} else {
81-
reset_rpc();
84+
req.reset();
8285
return; // Invalid request size/type
8386
}
8487

85-
if (!hasTag(method, tag)) return;
86-
87-
_rpc_type = msg_type;
88+
req.type = msg_type;
8889

8990
MsgPack::arr_size_t resp_size(RESPONSE_SIZE);
90-
res_packer.clear();
91-
if (msg_type == CALL_MSG) res_packer.serialize(resp_size, RESP_MSG, msg_id);
91+
req.res_packer.clear();
92+
if (msg_type == CALL_MSG) req.res_packer.serialize(resp_size, RESP_MSG, msg_id);
9293

93-
dispatcher.call(method, unpacker, res_packer);
94+
dispatcher.call(method, unpacker, req.res_packer);
9495

9596
}
9697

97-
bool send_response() {
98-
if (_rpc_type == NO_MSG || res_packer.size() == 0) {
98+
bool send_response(RPCRequest& req) {
99+
100+
if (req.type == NO_MSG || req.res_packer.size() == 0) {
99101
return true; // No response to send
100102
}
101103

102-
if (_rpc_type == NOTIFY_MSG) {
103-
reset_rpc();
104-
return true;
105-
}
104+
if (req.type == NOTIFY_MSG) return true;
106105

107-
reset_rpc();
108-
return decoder->send_response(res_packer);
106+
return decoder->send_response(req.res_packer);
109107

110108
}
111109

0 commit comments

Comments
 (0)