Skip to content

Commit

Permalink
Flatten small writes
Browse files Browse the repository at this point in the history
  • Loading branch information
paolobarbolini authored and Jarema committed Sep 19, 2023
1 parent a35c60a commit 9d0eb80
Showing 1 changed file with 80 additions and 35 deletions.
115 changes: 80 additions & 35 deletions async-nats/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
//! This module provides a connection implementation for communicating with a NATS server.

use std::collections::VecDeque;
use std::fmt::Display;
use std::fmt::{self, Display, Write as _};
use std::future::{self, Future};
use std::pin::Pin;
use std::str::{self, FromStr};
Expand All @@ -28,7 +28,11 @@ use crate::status::StatusCode;
use crate::{ClientOp, ServerError, ServerOp};

/// Soft limit for the amount of bytes in [`Connection::write_buf`]
/// and [`Connection::flattened_writes`].
const SOFT_WRITE_BUF_LIMIT: usize = 65535;
/// How big a single buffer must be before it's written separately
/// instead of being flattened.
const WRITE_FLATTEN_THRESHOLD: usize = 4096;

/// Supertrait enabling trait object for containing both TLS and non TLS `TcpStream` connection.
pub(crate) trait AsyncReadWrite: AsyncWrite + AsyncRead + Send + Unpin {}
Expand Down Expand Up @@ -60,6 +64,7 @@ pub(crate) struct Connection {
read_buf: BytesMut,
write_buf: VecDeque<Bytes>,
write_buf_len: usize,
flattened_writes: BytesMut,
can_flush: bool,
}

Expand All @@ -72,6 +77,7 @@ impl Connection {
read_buf: BytesMut::with_capacity(read_buffer_capacity),
write_buf: VecDeque::new(),
write_buf_len: 0,
flattened_writes: BytesMut::new(),
can_flush: false,
}
}
Expand All @@ -83,7 +89,7 @@ impl Connection {

/// Returns `true` if [`Self::poll_flush`] should be polled.
pub(crate) fn should_flush(&self) -> bool {
self.can_flush && self.write_buf.is_empty()
self.can_flush && self.write_buf.is_empty() && self.flattened_writes.is_empty()
}

/// Attempts to read a server operation from the read buffer.
Expand Down Expand Up @@ -402,6 +408,12 @@ impl Connection {

/// Writes a client operation to the write buffer.
pub(crate) fn enqueue_write_op(&mut self, item: &ClientOp) {
macro_rules! small_write {
($dst:expr) => {
write!(self.small_write(), $dst).expect("do small write to Connection");
};
}

match item {
ClientOp::Connect(connect_info) => {
let json = serde_json::to_vec(&connect_info).expect("serialize `ConnectInfo`");
Expand All @@ -416,17 +428,15 @@ impl Connection {
respond,
headers,
} => {
self.write(match headers.as_ref() {
Some(headers) if !headers.is_empty() => "HPUB ",
_ => "PUB ",
});
let verb = match headers.as_ref() {
Some(headers) if !headers.is_empty() => "HPUB",
_ => "PUB",
};

self.write(Bytes::copy_from_slice(subject.as_bytes()));
self.write(" ");
small_write!("{verb} {subject} ");

if let Some(respond) = respond {
self.write(Bytes::copy_from_slice(respond.as_bytes()));
self.write(" ");
small_write!("{respond} ");
}

match headers {
Expand All @@ -435,12 +445,12 @@ impl Connection {

let headers_len = headers.len();
let total_len = headers_len + payload.len();
self.write(format!("{headers_len} {total_len}\r\n"));
small_write!("{headers_len} {total_len}\r\n");
self.write(headers);
}
_ => {
let payload_len = payload.len();
self.write(format!("{payload_len}\r\n"));
small_write!("{payload_len}\r\n");
}
}

Expand All @@ -452,23 +462,23 @@ impl Connection {
sid,
subject,
queue_group,
} => {
self.write(match queue_group {
Some(queue_group) => {
format!("SUB {subject} {queue_group} {sid}\r\n")
}
None => {
format!("SUB {subject} {sid}\r\n")
}
});
}
} => match queue_group {
Some(queue_group) => {
small_write!("SUB {subject} {queue_group} {sid}\r\n");
}
None => {
small_write!("SUB {subject} {sid}\r\n");
}
},

ClientOp::Unsubscribe { sid, max } => {
self.write(match max {
Some(max) => format!("UNSUB {sid} {max}\r\n"),
None => format!("UNSUB {sid}\r\n"),
});
}
ClientOp::Unsubscribe { sid, max } => match max {
Some(max) => {
small_write!("UNSUB {sid} {max}\r\n");
}
None => {
small_write!("UNSUB {sid}\r\n");
}
},
ClientOp::Ping => {
self.write("PING\r\n");
}
Expand All @@ -492,8 +502,9 @@ impl Connection {
/// may do a partial write before failing.
pub(crate) fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
loop {
let buf = match self.write_buf.front_mut() {
Some(buf) => buf,
let buf = match self.write_buf.front() {
Some(buf) => &**buf,
None if !self.flattened_writes.is_empty() => &self.flattened_writes,
None => return Poll::Ready(Ok(())),
};

Expand All @@ -505,10 +516,16 @@ impl Connection {
self.write_buf_len -= n;
self.can_flush = true;

if n < buf.len() {
buf.advance(n);
} else {
self.write_buf.pop_front();
match self.write_buf.front_mut() {
Some(buf) if n < buf.len() => {
buf.advance(n);
}
Some(_buf) => {
self.write_buf.pop_front();
}
None => {
self.flattened_writes.advance(n);
}
}
continue;
}
Expand All @@ -519,6 +536,9 @@ impl Connection {

/// Write `buf` into the writes buffer
///
/// If `buf` is smaller than [`WRITE_FLATTEN_THRESHOLD`]
/// flattens it, otherwise appends it to the chunks queue.
///
/// Empty `buf`s are a no-op.
fn write(&mut self, buf: impl Into<Bytes>) {
let buf = buf.into();
Expand All @@ -527,7 +547,32 @@ impl Connection {
}

self.write_buf_len += buf.len();
self.write_buf.push_back(buf);
if buf.len() < WRITE_FLATTEN_THRESHOLD {
self.flattened_writes.extend_from_slice(&buf);
} else {
if !self.flattened_writes.is_empty() {
let buf = self.flattened_writes.split().freeze();
self.write_buf.push_back(buf);
}

self.write_buf.push_back(buf);
}
}

/// Obtain an [`fmt::Write`]r for the small writes buffer.
fn small_write(&mut self) -> impl fmt::Write + '_ {
struct Writer<'a> {
this: &'a mut Connection,
}

impl<'a> fmt::Write for Writer<'a> {
fn write_str(&mut self, s: &str) -> fmt::Result {
self.this.write_buf_len += s.len();
self.this.flattened_writes.write_str(s)
}
}

Writer { this: self }
}

/// Flush the write buffer, sending all pending data down the current write stream.
Expand Down

0 comments on commit 9d0eb80

Please sign in to comment.