Skip to content

Commit

Permalink
Use vectored writes when possible
Browse files Browse the repository at this point in the history
  • Loading branch information
paolobarbolini authored and Jarema committed Sep 19, 2023
1 parent 9d0eb80 commit 215c18f
Showing 1 changed file with 64 additions and 0 deletions.
64 changes: 64 additions & 0 deletions async-nats/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use std::collections::VecDeque;
use std::fmt::{self, Display, Write as _};
use std::future::{self, Future};
use std::io::IoSlice;
use std::pin::Pin;
use std::str::{self, FromStr};
use std::task::{Context, Poll};
Expand All @@ -33,6 +34,8 @@ const SOFT_WRITE_BUF_LIMIT: usize = 65535;
/// How big a single buffer must be before it's written separately
/// instead of being flattened.
const WRITE_FLATTEN_THRESHOLD: usize = 4096;
/// How many buffers to write in a single vectored write call.
const WRITE_VECTORED_CHUNKS: usize = 64;

/// Supertrait enabling trait object for containing both TLS and non TLS `TcpStream` connection.
pub(crate) trait AsyncReadWrite: AsyncWrite + AsyncRead + Send + Unpin {}
Expand Down Expand Up @@ -501,6 +504,17 @@ impl Connection {
/// Compared to [`AsyncWrite::poll_write`], this implementation
/// may do a partial write before failing.
pub(crate) fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
if !self.stream.is_write_vectored() {
self.poll_write_sequential(cx)
} else {
self.poll_write_vectored(cx)
}
}

/// Write the internal buffers into the write stream using sequential write operations
///
/// Writes one chunk at a time. Less efficient.
fn poll_write_sequential(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
loop {
let buf = match self.write_buf.front() {
Some(buf) => &**buf,
Expand Down Expand Up @@ -534,6 +548,56 @@ impl Connection {
}
}

/// Write the internal buffers into the write stream using vectored write operations
///
/// Writes [`WRITE_VECTORED_CHUNKS`] at a time. More efficient _if_
/// the underlying writer supports it.
fn poll_write_vectored(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
'outer: loop {
let mut writes = [IoSlice::new(b""); WRITE_VECTORED_CHUNKS];
let mut writes_len = 0;

self.write_buf
.iter()
.take(WRITE_VECTORED_CHUNKS)
.enumerate()
.for_each(|(i, buf)| {
writes[i] = IoSlice::new(buf);
writes_len += 1;
});

if writes_len < WRITE_VECTORED_CHUNKS && !self.flattened_writes.is_empty() {
writes[writes_len] = IoSlice::new(&self.flattened_writes);
writes_len += 1;
}

if writes_len == 0 {
return Poll::Ready(Ok(()));
}

match Pin::new(&mut self.stream).poll_write_vectored(cx, &writes[..writes_len]) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(mut n)) => {
self.write_buf_len -= n;
self.can_flush = true;

while let Some(buf) = self.write_buf.front_mut() {
if n < buf.len() {
buf.advance(n);
continue 'outer;
}

n -= buf.len();
self.write_buf.pop_front();
}

self.flattened_writes.advance(n);
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
}
}
}

/// Write `buf` into the writes buffer
///
/// If `buf` is smaller than [`WRITE_FLATTEN_THRESHOLD`]
Expand Down

0 comments on commit 215c18f

Please sign in to comment.