diff --git a/async-nats/src/connection.rs b/async-nats/src/connection.rs index a59c7db6a..fa50c2cce 100644 --- a/async-nats/src/connection.rs +++ b/async-nats/src/connection.rs @@ -16,6 +16,7 @@ use std::collections::VecDeque; use std::fmt::{self, Display, Write as _}; use std::future::{self, Future}; +use std::io::IoSlice; use std::pin::Pin; use std::str::{self, FromStr}; use std::task::{Context, Poll}; @@ -29,6 +30,7 @@ use crate::{ClientOp, ServerError, ServerOp}; const SOFT_WRITE_BUF_LIMIT: usize = 65535; const WRITE_FLATTEN_THRESHOLD: usize = 4096; +const WRITE_VECTORED_CHUNKS: usize = 64; /// Supertrait enabling trait object for containing both TLS and non TLS `TcpStream` connection. pub(crate) trait AsyncReadWrite: AsyncWrite + AsyncRead + Send + Unpin {} @@ -483,6 +485,14 @@ impl Connection { } pub(crate) fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll> { + if !self.stream.is_write_vectored() { + self.poll_write_sequential(cx) + } else { + self.poll_write_vectored(cx) + } + } + + fn poll_write_sequential(&mut self, cx: &mut Context<'_>) -> Poll> { loop { let buf = match self.write_buf.front_mut() { Some(buf) => &**buf, @@ -516,6 +526,52 @@ impl Connection { } } + fn poll_write_vectored(&mut self, cx: &mut Context<'_>) -> Poll> { + 'outer: loop { + let mut writes = [IoSlice::new(b""); WRITE_VECTORED_CHUNKS]; + let mut writes_len = 0; + + self.write_buf + .iter() + .take(WRITE_VECTORED_CHUNKS) + .enumerate() + .for_each(|(i, buf)| { + writes[i] = IoSlice::new(buf); + writes_len += 1; + }); + + if writes_len < WRITE_VECTORED_CHUNKS && !self.flattened_writes.is_empty() { + writes[writes_len] = IoSlice::new(&self.flattened_writes); + writes_len += 1; + } + + if writes_len == 0 { + return Poll::Ready(Ok(())); + } + + match Pin::new(&mut self.stream).poll_write_vectored(cx, &writes[..writes_len]) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Ok(mut n)) => { + self.write_buf_len -= n; + self.can_flush = true; + + while let Some(buf) = self.write_buf.front_mut() { + if buf.len() <= n { + n -= buf.len(); + self.write_buf.pop_front(); + } else { + buf.advance(n); + continue 'outer; + } + } + + self.flattened_writes.advance(n); + } + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + } + } + } + fn write(&mut self, buf: impl Into) { let buf = buf.into(); if buf.is_empty() {