diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index f71eeac92e..62320e379d 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -215,7 +215,7 @@ impl StageIn { let mut c_guard = self.mutex.current(); macro_rules! zgetbatch_rets { - ($restore_sn:expr) => { + ($($restore_sn:stmt)?) => { loop { match c_guard.take() { Some(batch) => break batch, @@ -234,7 +234,7 @@ impl StageIn { if !deadline.wait(&self.s_ref) { // Still no available batch. // Restore the sequence number and drop the message - $restore_sn; + $($restore_sn)? return false; } c_guard = self.mutex.current(); @@ -262,7 +262,7 @@ impl StageIn { } // Get the current serialization batch. - let mut batch = zgetbatch_rets!({}); + let mut batch = zgetbatch_rets!(); // Attempt the serialization on the current batch let e = match batch.encode(&*msg) { Ok(_) => zretok!(batch, msg), @@ -322,9 +322,9 @@ impl StageIn { let mut reader = self.fragbuf.reader(); while reader.can_read() { // Get the current serialization batch - // If deadline is reached, sequence number is incremented in order to break the - // fragment chain already sent. - batch = zgetbatch_rets!(tch.sn.set(fragment.sn + 1).unwrap()); + // If deadline is reached, sequence number is incremented with `SeqNumGenerator::get` + // in order to break the fragment chain already sent. + batch = zgetbatch_rets!(let _ = tch.sn.get()); // Serialize the message fragment match batch.encode((&mut reader, &mut fragment)) {