Skip to content

Commit

Permalink
connection: implement vectored writes
Browse files Browse the repository at this point in the history
  • Loading branch information
paolobarbolini committed Aug 8, 2023
1 parent a7acfaf commit 9884831
Showing 1 changed file with 56 additions and 0 deletions.
56 changes: 56 additions & 0 deletions async-nats/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use std::collections::VecDeque;
use std::fmt::{self, Display, Write as _};
use std::future::{self, Future};
use std::io::IoSlice;
use std::pin::Pin;
use std::str::{self, FromStr};
use std::task::{Context, Poll};
Expand All @@ -29,6 +30,7 @@ use crate::{ClientOp, ServerError, ServerOp};

const SOFT_WRITE_BUF_LIMIT: usize = 65535;
const WRITE_FLATTEN_THRESHOLD: usize = 4096;
const WRITE_VECTORED_CHUNKS: usize = 64;

/// Supertrait enabling trait object for containing both TLS and non TLS `TcpStream` connection.
pub(crate) trait AsyncReadWrite: AsyncWrite + AsyncRead + Send + Unpin {}
Expand Down Expand Up @@ -483,6 +485,14 @@ impl Connection {
}

pub(crate) fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
if !self.stream.is_write_vectored() {
self.poll_write_sequential(cx)
} else {
self.poll_write_vectored(cx)
}
}

fn poll_write_sequential(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
loop {
let buf = match self.write_buf.front_mut() {
Some(buf) => &**buf,
Expand Down Expand Up @@ -516,6 +526,52 @@ impl Connection {
}
}

fn poll_write_vectored(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
'outer: loop {
let mut writes = [IoSlice::new(b""); WRITE_VECTORED_CHUNKS];
let mut writes_len = 0;

self.write_buf
.iter()
.take(WRITE_VECTORED_CHUNKS)
.enumerate()
.for_each(|(i, buf)| {
writes[i] = IoSlice::new(buf);
writes_len += 1;
});

if writes_len < WRITE_VECTORED_CHUNKS && !self.flattened_writes.is_empty() {
writes[writes_len] = IoSlice::new(&self.flattened_writes);
writes_len += 1;
}

if writes_len == 0 {
return Poll::Ready(Ok(()));
}

match Pin::new(&mut self.stream).poll_write_vectored(cx, &writes[..writes_len]) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(mut n)) => {
self.write_buf_len -= n;
self.can_flush = true;

while let Some(buf) = self.write_buf.front_mut() {
if buf.len() <= n {
n -= buf.len();
self.write_buf.pop_front();
} else {
buf.advance(n);
continue 'outer;
}
}

self.flattened_writes.advance(n);
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
}
}
}

fn write(&mut self, buf: impl Into<Bytes>) {
let buf = buf.into();
if buf.is_empty() {
Expand Down

0 comments on commit 9884831

Please sign in to comment.