Skip to content

Commit

Permalink
perf(http1): implement an adaptive read buffer strategy
Browse files Browse the repository at this point in the history
The default read strategy for HTTP/1 connections is now adaptive. It
increases or decreases the size of the read buffer depending on the
number of bytes that are received in a `read` call. If a transport
continuously fills the read buffer, it will continue to grow (up to the
`max_buf_size`), allowing for reading faster. If the transport
consistently only fills a portion of the read buffer, it will be shrunk.

This doesn't provide much benefit to small requests/responses, but
benchmarks show it to be a noticeable improvement to throughput when
streaming larger bodies.

Closes #1708
  • Loading branch information
seanmonstar committed Nov 28, 2018
1 parent a6fff13 commit fd25129
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 29 deletions.
28 changes: 28 additions & 0 deletions benches/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,26 @@ fn http1_post(b: &mut test::Bencher) {
.bench(b)
}

#[bench]
fn http1_body_100kb(b: &mut test::Bencher) {
let body = &[b'x'; 1024 * 100];
opts()
.method(Method::POST)
.request_body(body)
.response_body(body)
.bench(b)
}

#[bench]
fn http1_body_10mb(b: &mut test::Bencher) {
let body = &[b'x'; 1024 * 1024 * 10];
opts()
.method(Method::POST)
.request_body(body)
.response_body(body)
.bench(b)
}

#[bench]
fn http1_get_parallel(b: &mut test::Bencher) {
opts()
Expand Down Expand Up @@ -96,6 +116,11 @@ impl Opts {
self
}

fn response_body(mut self, body: &'static [u8]) -> Self {
self.response_body = body;
self
}

fn parallel(mut self, cnt: u32) -> Self {
assert!(cnt > 0, "parallel count must be larger than 0");
self.parallel_cnt = cnt;
Expand All @@ -105,6 +130,9 @@ impl Opts {
fn bench(self, b: &mut test::Bencher) {
let _ = pretty_env_logger::try_init();
let mut rt = Runtime::new().unwrap();

b.bytes = self.response_body.len() as u64 + self.request_body.map(|b| b.len()).unwrap_or(0) as u64;

let addr = spawn_hello(&mut rt, self.response_body);

let connector = HttpConnector::new(1);
Expand Down
203 changes: 174 additions & 29 deletions src/proto/h1/io.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::cell::Cell;
use std::cmp;
use std::collections::VecDeque;
use std::fmt;
use std::io;
Expand Down Expand Up @@ -60,9 +61,7 @@ where
io: io,
read_blocked: false,
read_buf: BytesMut::with_capacity(0),
read_buf_strategy: ReadStrategy::Adaptive {
max: DEFAULT_MAX_BUFFER_SIZE,
},
read_buf_strategy: ReadStrategy::default(),
write_buf: WriteBuf::new(),
}
}
Expand All @@ -81,9 +80,7 @@ where
"The max_buf_size cannot be smaller than {}.",
MINIMUM_MAX_BUFFER_SIZE,
);
self.read_buf_strategy = ReadStrategy::Adaptive {
max,
};
self.read_buf_strategy = ReadStrategy::with_max(max);
self.write_buf.max_buf_size = max;
}

Expand Down Expand Up @@ -149,18 +146,11 @@ where
debug!("parsed {} headers", msg.head.headers.len());
return Ok(Async::Ready(msg))
},
None => match self.read_buf_strategy {
ReadStrategy::Adaptive { max } => {
if self.read_buf.len() >= max {
debug!("max_buf_size ({}) reached, closing", max);
return Err(::Error::new_too_large());
}
},
ReadStrategy::Exact(exact) => {
if self.read_buf.len() >= exact {
debug!("exact buf size ({}) filled, closing", exact);
return Err(::Error::new_too_large());
}
None => {
let max = self.read_buf_strategy.max();
if self.read_buf.len() >= max {
debug!("max_buf_size ({}) reached, closing", max);
return Err(::Error::new_too_large());
}
},
}
Expand All @@ -177,22 +167,15 @@ where
pub fn read_from_io(&mut self) -> Poll<usize, io::Error> {
use bytes::BufMut;
self.read_blocked = false;
match self.read_buf_strategy {
ReadStrategy::Adaptive { .. } => {
if self.read_buf.remaining_mut() < INIT_BUFFER_SIZE {
self.read_buf.reserve(INIT_BUFFER_SIZE);
}
},
ReadStrategy::Exact(exact) => {
if self.read_buf.capacity() < exact {
self.read_buf.reserve(exact);
}
},
let next = self.read_buf_strategy.next();
if self.read_buf.remaining_mut() < next {
self.read_buf.reserve(next);
}
self.io.read_buf(&mut self.read_buf).map(|ok| {
match ok {
Async::Ready(n) => {
debug!("read {} bytes", n);
self.read_buf_strategy.record(n);
Async::Ready(n)
},
Async::NotReady => {
Expand Down Expand Up @@ -285,11 +268,82 @@ where
#[derive(Clone, Copy, Debug)]
enum ReadStrategy {
Adaptive {
decrease_now: bool,
next: usize,
max: usize
},
Exact(usize),
}

impl ReadStrategy {
fn with_max(max: usize) -> ReadStrategy {
ReadStrategy::Adaptive {
decrease_now: false,
next: INIT_BUFFER_SIZE,
max,
}
}

fn next(&self) -> usize {
match *self {
ReadStrategy::Adaptive { next, .. } => next,
ReadStrategy::Exact(exact) => exact,
}
}

fn max(&self) -> usize {
match *self {
ReadStrategy::Adaptive { max, .. } => max,
ReadStrategy::Exact(exact) => exact,
}
}

fn record(&mut self, bytes_read: usize) {
match *self {
ReadStrategy::Adaptive { ref mut decrease_now, ref mut next, max, .. } => {
if bytes_read >= *next {
*next = cmp::min(incr_power_of_two(*next), max);
*decrease_now = false;
} else {
let decr_to = prev_power_of_two(*next);
if bytes_read < decr_to {
if *decrease_now {
*next = cmp::max(decr_to, INIT_BUFFER_SIZE);
*decrease_now = false;
} else {
// Decreasing is a two "record" process.
*decrease_now = true;
}
} else {
// A read within the current range should cancel
// a potential decrease, since we just saw proof
// that we still need this size.
*decrease_now = false;
}
}
},
_ => (),
}
}
}

fn incr_power_of_two(n: usize) -> usize {
n.saturating_mul(2)
}

fn prev_power_of_two(n: usize) -> usize {
// Only way this shift can underflow is if n is less than 4.
// (Which would means `usize::MAX >> 64` and underflowed!)
debug_assert!(n >= 4);
(::std::usize::MAX >> (n.leading_zeros() + 2)) + 1
}

impl Default for ReadStrategy {
fn default() -> ReadStrategy {
ReadStrategy::with_max(DEFAULT_MAX_BUFFER_SIZE)
}
}

#[derive(Clone)]
pub struct Cursor<T> {
bytes: T,
Expand Down Expand Up @@ -637,6 +691,97 @@ mod tests {
assert!(buffered.io.blocked());
}

#[test]
fn read_strategy_adaptive_increments() {
let mut strategy = ReadStrategy::default();
assert_eq!(strategy.next(), 8192);

// Grows if record == next
strategy.record(8192);
assert_eq!(strategy.next(), 16384);

strategy.record(16384);
assert_eq!(strategy.next(), 32768);

// Enormous records still increment at same rate
strategy.record(::std::usize::MAX);
assert_eq!(strategy.next(), 65536);

let max = strategy.max();
while strategy.next() < max {
strategy.record(max);
}

assert_eq!(strategy.next(), max, "never goes over max");
strategy.record(max + 1);
assert_eq!(strategy.next(), max, "never goes over max");
}

#[test]
fn read_strategy_adaptive_decrements() {
let mut strategy = ReadStrategy::default();
strategy.record(8192);
assert_eq!(strategy.next(), 16384);

strategy.record(1);
assert_eq!(strategy.next(), 16384, "first smaller record doesn't decrement yet");
strategy.record(8192);
assert_eq!(strategy.next(), 16384, "record was with range");

strategy.record(1);
assert_eq!(strategy.next(), 16384, "in-range record should make this the 'first' again");

strategy.record(1);
assert_eq!(strategy.next(), 8192, "second smaller record decrements");

strategy.record(1);
assert_eq!(strategy.next(), 8192, "first doesn't decrement");
strategy.record(1);
assert_eq!(strategy.next(), 8192, "doesn't decrement under minimum");
}

#[test]
fn read_strategy_adaptive_stays_the_same() {
let mut strategy = ReadStrategy::default();
strategy.record(8192);
assert_eq!(strategy.next(), 16384);

strategy.record(8193);
assert_eq!(strategy.next(), 16384, "first smaller record doesn't decrement yet");

strategy.record(8193);
assert_eq!(strategy.next(), 16384, "with current step does not decrement");
}

#[test]
fn read_strategy_adaptive_max_fuzz() {
fn fuzz(max: usize) {
let mut strategy = ReadStrategy::with_max(max);
while strategy.next() < max {
strategy.record(::std::usize::MAX);
}
let mut next = strategy.next();
while next > 8192 {
strategy.record(1);
strategy.record(1);
next = strategy.next();
assert!(
next.is_power_of_two(),
"decrement should be powers of two: {} (max = {})",
next,
max,
);
}
}

let mut max = 8192;
while max < ::std::usize::MAX {
fuzz(max);
max = (max / 2).saturating_mul(3);
}
fuzz(::std::usize::MAX);
}

#[test]
#[should_panic]
fn write_buf_requires_non_empty_bufs() {
Expand Down

0 comments on commit fd25129

Please sign in to comment.