Skip to content

Commit ef0056d

Browse files
committed
feat: recode with agnostic packet parsing
1 parent b66acbb commit ef0056d

File tree

5 files changed

+152
-148
lines changed

5 files changed

+152
-148
lines changed

src/client.h

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
class RPCClient {
1313
RpcDecoder<>* decoder = nullptr;
14+
int _waiting_msg_id;
1415

1516
public:
1617
RpcError lastError;
@@ -31,23 +32,46 @@ class RPCClient {
3132
template<typename RType, typename... Args>
3233
bool call(const MsgPack::str_t method, RType& result, Args&&... args) {
3334

34-
int msg_id;
35-
if (!decoder->send_call(CALL_MSG, method, msg_id, std::forward<Args>(args)...)){
35+
if(!send_rpc(method, std::forward<Args>(args)...)) {
36+
lastError.code = GENERIC_ERR;
37+
lastError.traceback = "Failed to send RPC call";
38+
return false;
3639
}
3740

38-
RpcError error;
3941
// blocking call
40-
while (!decoder->get_response(msg_id, result, error)){
42+
while (!get_response(result)){
4143
decoder->decode();
4244
//delay(1);
4345
}
4446

45-
lastError.code = error.code;
46-
lastError.traceback = error.traceback;
47+
return (lastError.code == NO_ERR);
48+
49+
}
50+
51+
protected:
52+
template<typename... Args>
53+
bool send_rpc(const MsgPack::str_t method, Args&&... args) {
54+
int msg_id;
55+
if (decoder->send_call(CALL_MSG, method, msg_id, std::forward<Args>(args)...)) {
56+
_waiting_msg_id = msg_id;
57+
return true;
58+
}
59+
return false;
60+
}
4761

48-
return (error.code == NO_ERR);
62+
template<typename RType>
63+
bool get_response(RType& result) {
64+
RpcError tmp_error;
65+
decoder->decode();
4966

67+
if (decoder->get_response(_waiting_msg_id, result, tmp_error)) {
68+
lastError.code = tmp_error.code;
69+
lastError.traceback = tmp_error.traceback;
70+
return true;
71+
}
72+
return false;
5073
}
74+
5175
};
5276

5377
#endif //RPCLITE_CLIENT_H

src/decoder.h

Lines changed: 69 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,9 @@
88

99
using namespace RpcUtils::detail;
1010

11-
#define NO_MSG -1
12-
#define CALL_MSG 0
13-
#define RESP_MSG 1
14-
#define NOTIFY_MSG 2
1511

16-
#define REQUEST_SIZE 4
17-
#define RESPONSE_SIZE 4
18-
#define NOTIFY_SIZE 3
12+
13+
#define MIN_RPC_BYTES 4
1914

2015
#define MAX_BUFFER_SIZE 1024
2116
#define CHUNK_SIZE 32
@@ -56,118 +51,45 @@ class RpcDecoder {
5651
template<typename RType>
5752
bool get_response(const int msg_id, RType& result, RpcError& error) {
5853

59-
if (packet_type()!=RESP_MSG) return false;
54+
if (!packet_incoming() || packet_type()!=RESP_MSG) return false;
6055

6156
MsgPack::Unpacker unpacker;
57+
unpacker.clear();
6258

63-
size_t bytes_checked = 0;
59+
if (!unpacker.feed(_raw_buffer, get_packet_size())) return false;
6460

65-
while (bytes_checked < _bytes_stored) {
66-
bytes_checked++;
67-
unpacker.clear();
68-
if (!unpacker.feed(_raw_buffer, bytes_checked)) continue;
69-
MsgPack::arr_size_t resp_size;
70-
int resp_type;
71-
int resp_id;
72-
if (!unpacker.deserialize(resp_size, resp_type, resp_id)) continue;
73-
if (resp_size.size() != RESPONSE_SIZE) continue;
74-
if (resp_type != RESP_MSG) continue;
75-
if (resp_id != msg_id) continue;
76-
77-
MsgPack::object::nil_t nil;
78-
if (unpacker.unpackable(nil)){ // No error
79-
if (!unpacker.deserialize(nil, result)) continue;
80-
} else { // RPC returned an error
81-
if (!unpacker.deserialize(error, nil)) continue;
82-
}
83-
consume(bytes_checked);
84-
return true;
85-
}
86-
return false;
87-
}
88-
89-
template<typename RType>
90-
bool send_response(const int msg_id, const RpcError& error, const RType& result) {
91-
MsgPack::Packer packer;
92-
MsgPack::arr_size_t resp_size(RESPONSE_SIZE);
93-
MsgPack::object::nil_t nil;
61+
MsgPack::arr_size_t resp_size;
62+
int resp_type;
63+
int resp_id;
9464

95-
packer.clear();
96-
packer.serialize(resp_size, RESP_MSG, msg_id);
65+
if (!unpacker.deserialize(resp_size, resp_type, resp_id)) return false;
66+
if (resp_size.size() != RESPONSE_SIZE) return false;
67+
if (resp_type != RESP_MSG) return false;
68+
if (resp_id != msg_id) return false;
9769

98-
if (error.code == NO_ERR){
99-
packer.serialize(nil, result);
100-
} else {
101-
packer.serialize(error, nil);
70+
MsgPack::object::nil_t nil;
71+
if (unpacker.unpackable(nil)){ // No error
72+
if (!unpacker.deserialize(nil, result)) return false;
73+
} else { // RPC returned an error
74+
if (!unpacker.deserialize(error, nil)) return false;
10275
}
10376

104-
return send(reinterpret_cast<const uint8_t*>(packer.data()), packer.size()) == packer.size();
77+
consume(get_packet_size());
78+
return true;
10579

10680
}
10781

108-
template<size_t N>
109-
void process_requests(RpcFunctionDispatcher<N>& dispatcher) {
110-
if (_packet_type!=CALL_MSG && _packet_type!=NOTIFY_MSG) return;
111-
112-
MsgPack::Unpacker unpacker;
113-
MsgPack::Packer packer;
114-
115-
size_t bytes_checked = 0;
116-
117-
while (bytes_checked < _bytes_stored) {
118-
bytes_checked++;
119-
unpacker.clear();
120-
if (!unpacker.feed(_raw_buffer, bytes_checked)) continue;
82+
bool send_response(const MsgPack::Packer& packer) {
83+
return send(reinterpret_cast<const uint8_t*>(packer.data()), packer.size()) == packer.size();
84+
}
12185

122-
int msg_type;
123-
int msg_id;
124-
MsgPack::str_t method;
125-
MsgPack::arr_size_t req_size;
126-
127-
if (!unpacker.deserialize(req_size, msg_type)) continue;
128-
// todo HANDLE MALFORMED CLIENT REQ ERRORS
129-
if ((req_size.size() == REQUEST_SIZE) && (msg_type == CALL_MSG)){
130-
if (!unpacker.deserialize(msg_id, method)) continue;
131-
if (unpacker.size() < REQUEST_SIZE + 1) continue; // there must be at least 5 indices
132-
} else if ((req_size.size() == NOTIFY_SIZE) && (msg_type == NOTIFY_MSG)) {
133-
if (!unpacker.deserialize(method)) continue;
134-
if (unpacker.size() < NOTIFY_SIZE + 1) continue; // there must be at least 4 indices
135-
} else if ((req_size.size() == RESPONSE_SIZE) && (msg_type == RESP_MSG)) { // this should never happen but it's addressed to a client
136-
break;
137-
} else {
138-
discard_packet();
139-
break;
140-
}
141-
// Headers unpacked
142-
143-
MsgPack::arr_size_t resp_size(RESPONSE_SIZE);
144-
packer.clear();
145-
if (msg_type == CALL_MSG) packer.serialize(resp_size, RESP_MSG, msg_id);
146-
size_t headers_size = packer.size();
147-
148-
if (!dispatcher.call(method, unpacker, packer)) {
149-
if (packer.size()==headers_size) {
150-
// Call didn't go through bc parameters are not ready yet
151-
continue;
152-
} else {
153-
// something went wrong the call raised an error or the client issued a malformed request
154-
if (msg_type == CALL_MSG) {
155-
send(reinterpret_cast<const uint8_t*>(packer.data()), packer.size());
156-
} // if notification client will never know something went wrong
157-
discard_packet(); // agnostic pop
158-
break;
159-
}
160-
} else {
161-
// all is well we can respond and pop the deserialized packet
162-
if (msg_type == CALL_MSG){
163-
send(reinterpret_cast<const uint8_t*>(packer.data()), packer.size());
164-
}
165-
consume(bytes_checked);
166-
break;
167-
}
86+
size_t get_request(uint8_t* buffer, size_t buffer_size) {
16887

88+
if (packet_type() != CALL_MSG && packet_type() != NOTIFY_MSG) {
89+
return 0; // No RPC
16990
}
17091

92+
return pop_packet(buffer, buffer_size);
17193
}
17294

17395
void decode(){
@@ -187,52 +109,45 @@ class RpcDecoder {
187109

188110
void parse_packet(){
189111

190-
if (packet_incoming() || _bytes_stored < 2){return;}
191-
192-
MsgPack::Unpacker unpacker;
193-
unpacker.clear();
194-
unpacker.feed(_raw_buffer, 2);
195-
196-
MsgPack::arr_size_t elem_size;
197-
int type;
198-
if (unpacker.deserialize(elem_size, type)){
199-
_packet_type = type;
200-
}
201-
202-
}
203-
204-
// Check if a packet is available
205-
inline bool packet_incoming() const { return _packet_type >= CALL_MSG; }
206-
207-
int packet_type() const {return _packet_type;}
208-
209-
// Get the size of the next packet in the buffer (must be array contained, no other requirements)
210-
size_t get_packet_size() {
112+
if (packet_incoming()){return;}
211113

212114
size_t bytes_checked = 0;
213115
size_t container_size;
116+
int type;
214117
MsgPack::Unpacker unpacker;
215118

216119
while (bytes_checked < _bytes_stored){
217120
bytes_checked++;
218121
unpacker.clear();
219122
if (!unpacker.feed(_raw_buffer, bytes_checked)) continue;
220123

221-
if (unpackArray(unpacker, container_size)) {
222-
return bytes_checked;
124+
if (unpackTypedArray(unpacker, container_size, type)) {
125+
126+
if (type != CALL_MSG && type != RESP_MSG && type != NOTIFY_MSG) {
127+
consume(bytes_checked);
128+
break; // Not a valid RPC type (could be type=WRONG_MSG)
129+
}
130+
131+
if ((type == CALL_MSG && container_size != REQUEST_SIZE) || (type == RESP_MSG && container_size != RESPONSE_SIZE) || (type == NOTIFY_MSG && container_size != NOTIFY_SIZE)) {
132+
consume(bytes_checked);
133+
break; // Not a valid RPC format
134+
}
135+
136+
_packet_type = type;
137+
_packet_size = bytes_checked;
223138
} else {
224139
continue;
225140
}
226141

227142
}
228143

229-
return 0;
230144
}
231145

232-
// Discard the next (array) packet in the buffer, returns the number of bytes consumed.
233-
size_t discard_packet() {
234-
return consume(get_packet_size());
235-
}
146+
inline bool packet_incoming() const { return _packet_size >= MIN_RPC_BYTES; }
147+
148+
inline int packet_type() const { return _packet_type; }
149+
150+
size_t get_packet_size() const { return _packet_size;}
236151

237152
inline size_t size() const {return _bytes_stored;}
238153

@@ -241,6 +156,7 @@ class RpcDecoder {
241156
uint8_t _raw_buffer[BufferSize];
242157
size_t _bytes_stored = 0;
243158
int _packet_type = NO_MSG;
159+
size_t _packet_size = 0;
244160
int _msg_id = 0;
245161

246162
inline bool buffer_full() const { return _bytes_stored == BufferSize; }
@@ -251,7 +167,27 @@ class RpcDecoder {
251167
return _transport.write(data, size);
252168
}
253169

254-
// Consume the first 'size' bytes of the buffer, shifting remaining data forward
170+
size_t pop_packet(uint8_t* buffer, size_t buffer_size) {
171+
172+
if (!packet_incoming()) return 0;
173+
174+
size_t packet_size = get_packet_size();
175+
if (packet_size > buffer_size) return 0;
176+
177+
for (size_t i = 0; i < packet_size; i++) {
178+
buffer[i] = _raw_buffer[i];
179+
}
180+
181+
reset_packet();
182+
return consume(packet_size);
183+
}
184+
185+
186+
void reset_packet() {
187+
_packet_type = NO_MSG;
188+
_packet_size = 0;
189+
}
190+
255191
size_t consume(size_t size) {
256192

257193
if (size > _bytes_stored) return 0;
@@ -264,7 +200,6 @@ class RpcDecoder {
264200
}
265201

266202
_bytes_stored = remaining_bytes;
267-
_packet_type = NO_MSG;
268203

269204
return size;
270205
}

src/dispatcher.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class RpcFunctionDispatcher {
4040
}
4141
}
4242

43-
// handle not found
43+
// handler not found
4444
MsgPack::object::nil_t nil;
4545
packer.serialize(RpcError(FUNCTION_NOT_FOUND_ERR, name), nil);
4646
return false;

0 commit comments

Comments
 (0)