Skip to content

Commit

Permalink
fix: fix sequence number increment
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Nov 4, 2024
1 parent 41e568e commit cb37c29
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl StageIn {
let mut c_guard = self.mutex.current();

macro_rules! zgetbatch_rets {
($restore_sn:expr) => {
($($restore_sn:stmt)?) => {
loop {
match c_guard.take() {
Some(batch) => break batch,
Expand All @@ -234,7 +234,7 @@ impl StageIn {
if !deadline.wait(&self.s_ref) {
// Still no available batch.
// Restore the sequence number and drop the message
$restore_sn;
$($restore_sn)?
return false;
}
c_guard = self.mutex.current();
Expand Down Expand Up @@ -262,7 +262,7 @@ impl StageIn {
}

// Get the current serialization batch.
let mut batch = zgetbatch_rets!({});
let mut batch = zgetbatch_rets!();
// Attempt the serialization on the current batch
let e = match batch.encode(&*msg) {
Ok(_) => zretok!(batch, msg),
Expand Down Expand Up @@ -322,9 +322,9 @@ impl StageIn {
let mut reader = self.fragbuf.reader();
while reader.can_read() {
// Get the current serialization batch
// If deadline is reached, sequence number is incremented in order to break the
// fragment chain already sent.
batch = zgetbatch_rets!(tch.sn.set(fragment.sn + 1).unwrap());
// If deadline is reached, sequence number is incremented with `SeqNumGenerator::get`
// in order to break the fragment chain already sent.
batch = zgetbatch_rets!(let _ = tch.sn.get());

// Serialize the message fragment
match batch.encode((&mut reader, &mut fragment)) {
Expand Down

0 comments on commit cb37c29

Please sign in to comment.