@@ -80,7 +80,7 @@ class RpcDecoder {
8080 } else { // RPC returned an error
8181 if (!unpacker.deserialize (error, nil)) continue ;
8282 }
83- pop_packet (bytes_checked);
83+ consume (bytes_checked);
8484 return true ;
8585 }
8686 return false ;
@@ -162,15 +162,15 @@ class RpcDecoder {
162162 if (msg_type == CALL_MSG){
163163 send (reinterpret_cast <const uint8_t *>(packer.data ()), packer.size ());
164164 }
165- pop_packet (bytes_checked);
165+ consume (bytes_checked);
166166 break ;
167167 }
168168
169169 }
170170
171171 }
172172
173- void process (){
173+ void decode (){
174174 advance ();
175175 parse_packet ();
176176 }
@@ -215,42 +215,32 @@ class RpcDecoder {
215215
216216 int packet_type () const {return _packet_type;}
217217
218- // Get the oldest packet (returns false if no packet available)
219- bool get_next_packet (MsgPack::Unpacker& unpacker, size_t size) {
220- if (!packet_incoming ()) return false ;
221- unpacker.clear ();
222- return unpacker.feed (_raw_buffer, size);
223- }
218+ // Get the size of the next packet in the buffer (must be array contained, no other requirements)
219+ size_t get_packet_size () {
224220
225- // Try to recover buffer error condition
226- void recover () {
227- // ensure parsing was attempted
228- parse_packet ();
229- if (buffer_full () && !packet_incoming ()){
230- flush_buffer ();
231- }
232- }
233-
234- // Discard the oldest packet. Returns the number of freed_bytes
235- size_t pop_packet (size_t size) {
221+ size_t bytes_checked = 0 ;
222+ size_t container_size;
223+ MsgPack::Unpacker unpacker;
236224
237- if (size > _bytes_stored) return 0 ;
225+ while (bytes_checked < _bytes_stored){
226+ bytes_checked++;
227+ unpacker.clear ();
228+ if (!unpacker.feed (_raw_buffer, bytes_checked)) continue ;
238229
239- const size_t remaining_bytes = _bytes_stored - size;
230+ if (unpackArray (unpacker, container_size)) {
231+ return bytes_checked;
232+ } else {
233+ continue ;
234+ }
240235
241- // Shift remaining data forward (manual memmove for compatibility)
242- for (size_t i = 0 ; i < remaining_bytes; i++) {
243- _raw_buffer[i] = _raw_buffer[size + i];
244236 }
245237
246- _bytes_stored = remaining_bytes;
247- _packet_type = NO_MSG;
248-
249- return size;
238+ return 0 ;
250239 }
251240
241+ // Discard the next (array) packet in the buffer, returns the number of bytes consumed.
252242 size_t discard_packet () {
253- return pop_packet (get_packet_size ());
243+ return consume (get_packet_size ());
254244 }
255245
256246 inline size_t size () const {return _bytes_stored;}
@@ -263,36 +253,29 @@ class RpcDecoder {
263253 int _msg_id = 0 ;
264254
265255 inline bool buffer_full () const { return _bytes_stored == BufferSize; }
256+
266257 inline bool buffer_empty () const { return _bytes_stored == 0 ;}
267- inline void flush_buffer () {
268- uint8_t discard_buf[CHUNK_SIZE];
269- while (_transport.read (discard_buf, CHUNK_SIZE) > 0 );
270- _bytes_stored = 0 ;
271- }
258+
272259 inline size_t send (const uint8_t * data, const size_t size) {
273260 return _transport.write (data, size);
274261 }
275262
276- size_t get_packet_size (){
277-
278- size_t bytes_checked = 0 ;
279- size_t container_size;
280- MsgPack::Unpacker unpacker;
263+ // Consume the first 'size' bytes of the buffer, shifting remaining data forward
264+ size_t consume (size_t size) {
281265
282- while (bytes_checked < _bytes_stored){
283- bytes_checked++;
284- unpacker.clear ();
285- if (!unpacker.feed (_raw_buffer, bytes_checked)) continue ;
266+ if (size > _bytes_stored) return 0 ;
286267
287- if (unpackArray (unpacker, container_size)) {
288- return bytes_checked;
289- } else {
290- continue ;
291- }
268+ const size_t remaining_bytes = _bytes_stored - size;
292269
270+ // Shift remaining data forward (manual memmove for compatibility)
271+ for (size_t i = 0 ; i < remaining_bytes; i++) {
272+ _raw_buffer[i] = _raw_buffer[size + i];
293273 }
294274
295- return 0 ;
275+ _bytes_stored = remaining_bytes;
276+ _packet_type = NO_MSG;
277+
278+ return size;
296279 }
297280
298281};
0 commit comments