From 4a532ef7c5fd927546eb2f2a7faac95905ba06d1 Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Fri, 5 Apr 2024 20:07:05 +0200 Subject: [PATCH] Don't force flush if write buffer isn't empty --- async-nats/src/connection.rs | 21 +++++++++++++++++++-- async-nats/src/lib.rs | 12 ++++++------ 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/async-nats/src/connection.rs b/async-nats/src/connection.rs index 756140481..4d7352266 100644 --- a/async-nats/src/connection.rs +++ b/async-nats/src/connection.rs @@ -52,6 +52,16 @@ pub enum State { Disconnected, } +#[derive(Debug, Eq, PartialEq, Clone)] +pub enum ShouldFlush { + /// Write buffers are empty, but the connection hasn't been flushed yet + Yes, + /// The connection hasn't been flushed yet, but write buffers aren't empty + May, + /// Flushing would just be a no-op + No, +} + impl Display for State { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -92,8 +102,15 @@ impl Connection { } /// Returns `true` if [`Self::poll_flush`] should be polled. - pub(crate) fn should_flush(&self) -> bool { - self.can_flush && self.write_buf.is_empty() && self.flattened_writes.is_empty() + pub(crate) fn should_flush(&self) -> ShouldFlush { + match ( + self.can_flush, + self.write_buf.is_empty() && self.flattened_writes.is_empty(), + ) { + (true, true) => ShouldFlush::Yes, + (true, false) => ShouldFlush::May, + (false, _) => ShouldFlush::No, + } } /// Attempts to read a server operation from the read buffer. diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 46765ee4d..ee4640d9f 100755 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -416,7 +416,6 @@ pub(crate) struct ConnectionHandler { pending_pings: usize, info_sender: tokio::sync::watch::Sender, ping_interval: Interval, - is_flushing: bool, should_reconnect: bool, flush_observers: Vec>, } @@ -439,7 +438,6 @@ impl ConnectionHandler { pending_pings: 0, info_sender, ping_interval, - is_flushing: false, should_reconnect: false, flush_observers: Vec::new(), } @@ -579,12 +577,13 @@ impl ConnectionHandler { } } - if self.handler.is_flushing || self.handler.connection.should_flush() { + if let (ShouldFlush::Yes, _) | (ShouldFlush::No, false) = ( + self.handler.connection.should_flush(), + self.handler.flush_observers.is_empty(), + ) { match self.handler.connection.poll_flush(cx) { Poll::Pending => {} Poll::Ready(Ok(())) => { - self.handler.is_flushing = false; - for observer in self.handler.flush_observers.drain(..) { let _ = observer.send(()); } @@ -754,7 +753,6 @@ impl ConnectionHandler { } } Command::Flush { observer } => { - self.is_flushing = true; self.flush_observers.push(observer); } Command::Subscribe { @@ -1569,6 +1567,8 @@ macro_rules! from_with_timeout { } pub(crate) use from_with_timeout; +use crate::connection::ShouldFlush; + #[cfg(test)] mod tests { use super::*;