diff --git a/async-nats/src/connection.rs b/async-nats/src/connection.rs index bed7a5940..de6a092fb 100644 --- a/async-nats/src/connection.rs +++ b/async-nats/src/connection.rs @@ -16,6 +16,7 @@ use std::collections::VecDeque; use std::fmt::{self, Display, Write as _}; use std::future::{self, Future}; +use std::io::IoSlice; use std::pin::Pin; use std::str::{self, FromStr}; use std::task::{Context, Poll}; @@ -33,6 +34,8 @@ const SOFT_WRITE_BUF_LIMIT: usize = 65535; /// How big a single buffer must be before it's written separately /// instead of being flattened. const WRITE_FLATTEN_THRESHOLD: usize = 4096; +/// How many buffers to write in a single vectored write call. +const WRITE_VECTORED_CHUNKS: usize = 64; /// Supertrait enabling trait object for containing both TLS and non TLS `TcpStream` connection. pub(crate) trait AsyncReadWrite: AsyncWrite + AsyncRead + Send + Unpin {} @@ -501,6 +504,17 @@ impl Connection { /// Compared to [`AsyncWrite::poll_write`], this implementation /// may do a partial write before failing. pub(crate) fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll> { + if !self.stream.is_write_vectored() { + self.poll_write_sequential(cx) + } else { + self.poll_write_vectored(cx) + } + } + + /// Write the internal buffers into the write stream using sequential write operations + /// + /// Writes one chunk at a time. Less efficient. + fn poll_write_sequential(&mut self, cx: &mut Context<'_>) -> Poll> { loop { let buf = match self.write_buf.front() { Some(buf) => &**buf, @@ -534,6 +548,56 @@ impl Connection { } } + /// Write the internal buffers into the write stream using vectored write operations + /// + /// Writes [`WRITE_VECTORED_CHUNKS`] at a time. More efficient _if_ + /// the underlying writer supports it. + fn poll_write_vectored(&mut self, cx: &mut Context<'_>) -> Poll> { + 'outer: loop { + let mut writes = [IoSlice::new(b""); WRITE_VECTORED_CHUNKS]; + let mut writes_len = 0; + + self.write_buf + .iter() + .take(WRITE_VECTORED_CHUNKS) + .enumerate() + .for_each(|(i, buf)| { + writes[i] = IoSlice::new(buf); + writes_len += 1; + }); + + if writes_len < WRITE_VECTORED_CHUNKS && !self.flattened_writes.is_empty() { + writes[writes_len] = IoSlice::new(&self.flattened_writes); + writes_len += 1; + } + + if writes_len == 0 { + return Poll::Ready(Ok(())); + } + + match Pin::new(&mut self.stream).poll_write_vectored(cx, &writes[..writes_len]) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Ok(mut n)) => { + self.write_buf_len -= n; + self.can_flush = true; + + while let Some(buf) = self.write_buf.front_mut() { + if n < buf.len() { + buf.advance(n); + continue 'outer; + } + + n -= buf.len(); + self.write_buf.pop_front(); + } + + self.flattened_writes.advance(n); + } + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + } + } + } + /// Write `buf` into the writes buffer /// /// If `buf` is smaller than [`WRITE_FLATTEN_THRESHOLD`]