Skip to content

Commit

Permalink
perf(http1): improve parsing of sequentially partial messages
Browse files Browse the repository at this point in the history
If request headers are received in incremental partial chunks, hyper
would restart parsing each time. This is because the HTTP/1 parser is
stateless, since the most common case is a full message and stateless
parses faster.

However, if continuing to receive more partial chunks of the request,
each subsequent full parse is slower and slower. Since partial parses is
less common, we can store a little bit of state to improve performance
in general.

Now, if a partial request is received, hyper will check for the end of
the message quickly, and if not found, simply save the length to allow
the next partial chunk to start its search from there. Only once the end
is found will a fill parse happen.

Reported-by: Datong Sun <datong.sun@konghq.com>
  • Loading branch information
seanmonstar committed Oct 15, 2024
1 parent c86a6bc commit 0332887
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 1 deletion.
8 changes: 7 additions & 1 deletion src/proto/h1/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const MAX_BUF_LIST_BUFFERS: usize = 16;
pub(crate) struct Buffered<T, B> {
flush_pipeline: bool,
io: T,
partial_len: Option<usize>,
read_blocked: bool,
read_buf: BytesMut,
read_buf_strategy: ReadStrategy,
Expand Down Expand Up @@ -65,6 +66,7 @@ where
Buffered {
flush_pipeline: false,
io,
partial_len: None,
read_blocked: false,
read_buf: BytesMut::with_capacity(0),
read_buf_strategy: ReadStrategy::default(),
Expand Down Expand Up @@ -176,6 +178,7 @@ where
loop {
match super::role::parse_headers::<S>(
&mut self.read_buf,
self.partial_len,
ParseContext {
cached_headers: parse_ctx.cached_headers,
req_method: parse_ctx.req_method,
Expand All @@ -191,14 +194,17 @@ where
)? {
Some(msg) => {
debug!("parsed {} headers", msg.head.headers.len());
self.partial_len = None;
return Poll::Ready(Ok(msg));
}
None => {
let max = self.read_buf_strategy.max();
if self.read_buf.len() >= max {
let curr_len = self.read_buf.len();
if curr_len >= max {
debug!("max_buf_size ({}) reached, closing", max);
return Poll::Ready(Err(crate::Error::new_too_large()));
}
self.partial_len = Some(curr_len);
}
}
if ready!(self.poll_read_from_io(cx)).map_err(crate::Error::new_io)? == 0 {
Expand Down
56 changes: 56 additions & 0 deletions src/proto/h1/role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ macro_rules! maybe_panic {

pub(super) fn parse_headers<T>(
bytes: &mut BytesMut,
prev_len: Option<usize>,
ctx: ParseContext<'_>,
) -> ParseResult<T::Incoming>
where
Expand All @@ -78,9 +79,42 @@ where

let _entered = trace_span!("parse_headers");

if let Some(prev_len) = prev_len {
if !is_complete_fast(bytes, prev_len) {
return Ok(None);
}
}

T::parse(bytes, ctx)
}

/// A fast scan for the end of a message.
/// Used when there was a partial read, to skip full parsing on a
/// a slow connection.
fn is_complete_fast(bytes: &[u8], prev_len: usize) -> bool {
let start = if prev_len < 3 {
0
} else {
prev_len - 3
};
let bytes = &bytes[start..];

for (i, b) in bytes.iter().copied().enumerate() {
if b == b'\r' {
if bytes[i+1..].chunks(3).next() == Some(&b"\n\r\n"[..]) {
return true;
}
} else if b == b'\n' {
if bytes.get(i + 1) == Some(&b'\n') {
return true;
}
}

}

false
}

pub(super) fn encode_headers<T>(
enc: Encode<'_, T::Outgoing>,
dst: &mut Vec<u8>,
Expand Down Expand Up @@ -2827,6 +2861,28 @@ mod tests {
parse(Some(200), 210, false);
}

#[test]
fn test_is_complete_fast() {
let s = b"GET / HTTP/1.1\r\na: b\r\n\r\n";
for n in 0..s.len() {
assert!(is_complete_fast(s, n), "{:?}; {}", s, n);
}
let s = b"GET / HTTP/1.1\na: b\n\n";
for n in 0..s.len() {
assert!(is_complete_fast(s, n));
}

// Not
let s = b"GET / HTTP/1.1\r\na: b\r\n\r";
for n in 0..s.len() {
assert!(!is_complete_fast(s, n));
}
let s = b"GET / HTTP/1.1\na: b\n";
for n in 0..s.len() {
assert!(!is_complete_fast(s, n));
}
}

#[test]
fn test_write_headers_orig_case_empty_value() {
let mut headers = HeaderMap::new();
Expand Down

0 comments on commit 0332887

Please sign in to comment.