Skip to content

Commit

Permalink
perf(h1): improve parsing and encoding of http1 messages
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed May 15, 2018
1 parent c3c35e8 commit 26417fc
Show file tree
Hide file tree
Showing 13 changed files with 1,014 additions and 450 deletions.
2 changes: 1 addition & 1 deletion src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ impl<T, B, R> Future for HandshakeInner<T, B, R>
where
T: AsyncRead + AsyncWrite + Send + 'static,
B: Payload,
R: proto::Http1Transaction<
R: proto::h1::Http1Transaction<
Incoming=StatusCode,
Outgoing=proto::RequestLine,
>,
Expand Down
1 change: 0 additions & 1 deletion src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ where C: Connect + Sync + 'static,
Version::HTTP_11 => (),
other => {
error!("Request has unsupported version \"{:?}\"", other);
//TODO: replace this with a proper variant
return ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_version())));
}
}
Expand Down
19 changes: 6 additions & 13 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ pub(crate) enum Kind {
Parse(Parse),
/// A message reached EOF, but is not complete.
Incomplete,
/// A protocol upgrade was encountered, but not yet supported in hyper.
Upgrade,
/// A client connection received a response when not waiting for one.
MismatchedResponse,
/// A pending item was dropped before ever being processed.
Expand Down Expand Up @@ -74,6 +72,9 @@ pub(crate) enum Parse {
Header,
TooLarge,
Status,

/// A protocol upgrade was encountered, but not yet supported in hyper.
UpgradeNotSupported,
}

/*
Expand Down Expand Up @@ -141,10 +142,6 @@ impl Error {
Error::new(Kind::Canceled, cause.map(Into::into))
}

pub(crate) fn new_upgrade() -> Error {
Error::new(Kind::Upgrade, None)
}

pub(crate) fn new_incomplete() -> Error {
Error::new(Kind::Incomplete, None)
}
Expand All @@ -161,10 +158,6 @@ impl Error {
Error::new(Kind::Parse(Parse::Status), None)
}

pub(crate) fn new_version() -> Error {
Error::new(Kind::Parse(Parse::Version), None)
}

pub(crate) fn new_version_h2() -> Error {
Error::new(Kind::Parse(Parse::VersionH2), None)
}
Expand Down Expand Up @@ -260,8 +253,8 @@ impl StdError for Error {
Kind::Parse(Parse::Header) => "invalid Header provided",
Kind::Parse(Parse::TooLarge) => "message head is too large",
Kind::Parse(Parse::Status) => "invalid Status provided",
Kind::Parse(Parse::UpgradeNotSupported) => "unsupported protocol upgrade",
Kind::Incomplete => "message is incomplete",
Kind::Upgrade => "unsupported protocol upgrade",
Kind::MismatchedResponse => "response received without matching request",
Kind::Closed => "connection closed",
Kind::Connect => "an error occurred trying to connect",
Expand Down Expand Up @@ -325,8 +318,8 @@ impl From<http::status::InvalidStatusCode> for Parse {
}
}

impl From<http::uri::InvalidUriBytes> for Parse {
fn from(_: http::uri::InvalidUriBytes) -> Parse {
impl From<http::uri::InvalidUri> for Parse {
fn from(_: http::uri::InvalidUri) -> Parse {
Parse::Uri
}
}
Expand Down
67 changes: 32 additions & 35 deletions src/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,43 @@ use std::fmt::Write;

use bytes::BytesMut;
use http::HeaderMap;
use http::header::{CONNECTION, CONTENT_LENGTH, EXPECT, TRANSFER_ENCODING};
use http::header::{CONTENT_LENGTH, TRANSFER_ENCODING};
use http::header::{HeaderValue, OccupiedEntry, ValueIter};

/// Maximum number of bytes needed to serialize a u64 into ASCII decimal.
const MAX_DECIMAL_U64_BYTES: usize = 20;

pub fn connection_keep_alive(headers: &HeaderMap) -> bool {
for line in headers.get_all(CONNECTION) {
if let Ok(s) = line.to_str() {
for val in s.split(',') {
if eq_ascii(val.trim(), "keep-alive") {
return true;
}
}
}
}
pub fn connection_keep_alive(value: &HeaderValue) -> bool {
connection_has(value, "keep-alive")
}

false
pub fn connection_close(value: &HeaderValue) -> bool {
connection_has(value, "close")
}

pub fn connection_close(headers: &HeaderMap) -> bool {
for line in headers.get_all(CONNECTION) {
if let Ok(s) = line.to_str() {
for val in s.split(',') {
if eq_ascii(val.trim(), "close") {
return true;
}
fn connection_has(value: &HeaderValue, needle: &str) -> bool {
if let Ok(s) = value.to_str() {
for val in s.split(',') {
if eq_ascii(val.trim(), needle) {
return true;
}
}
}

false
}

pub fn content_length_parse(headers: &HeaderMap) -> Option<u64> {
content_length_parse_all(headers.get_all(CONTENT_LENGTH).into_iter())
pub fn content_length_parse(value: &HeaderValue) -> Option<u64> {
value
.to_str()
.ok()
.and_then(|s| s.parse().ok())
}

pub fn content_length_parse_all(headers: &HeaderMap) -> Option<u64> {
content_length_parse_all_values(headers.get_all(CONTENT_LENGTH).into_iter())
}

pub fn content_length_parse_all(values: ValueIter<HeaderValue>) -> Option<u64> {
pub fn content_length_parse_all_values(values: ValueIter<HeaderValue>) -> Option<u64> {
// If multiple Content-Length headers were sent, everything can still
// be alright if they all contain the same value, and all parse
// correctly. If not, then it's an error.
Expand Down Expand Up @@ -70,10 +68,6 @@ pub fn content_length_parse_all(values: ValueIter<HeaderValue>) -> Option<u64> {
}
}

pub fn content_length_zero(headers: &mut HeaderMap) {
headers.insert(CONTENT_LENGTH, HeaderValue::from_static("0"));
}

pub fn content_length_value(len: u64) -> HeaderValue {
let mut len_buf = BytesMut::with_capacity(MAX_DECIMAL_U64_BYTES);
write!(len_buf, "{}", len)
Expand All @@ -84,21 +78,24 @@ pub fn content_length_value(len: u64) -> HeaderValue {
}
}

pub fn expect_continue(headers: &HeaderMap) -> bool {
Some(&b"100-continue"[..]) == headers.get(EXPECT).map(|v| v.as_bytes())
}

pub fn transfer_encoding_is_chunked(headers: &HeaderMap) -> bool {
is_chunked(headers.get_all(TRANSFER_ENCODING).into_iter())
}

pub fn is_chunked(mut encodings: ValueIter<HeaderValue>) -> bool {
// chunked must always be the last encoding, according to spec
if let Some(line) = encodings.next_back() {
if let Ok(s) = line.to_str() {
if let Some(encoding) = s.rsplit(',').next() {
return eq_ascii(encoding.trim(), "chunked");
}
return is_chunked_(line);
}

false
}

pub fn is_chunked_(value: &HeaderValue) -> bool {
// chunked must always be the last encoding, according to spec
if let Ok(s) = value.to_str() {
if let Some(encoding) = s.rsplit(',').next() {
return eq_ascii(encoding.trim(), "chunked");
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![doc(html_root_url = "https://docs.rs/hyper/0.11.22")]
#![deny(missing_docs)]
#![deny(warnings)]
//#![deny(warnings)]
#![deny(missing_debug_implementations)]
#![cfg_attr(all(test, feature = "nightly"), feature(test))]

Expand Down
87 changes: 43 additions & 44 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ use std::marker::PhantomData;

use bytes::{Buf, Bytes};
use futures::{Async, Poll};
use http::{Method, Version};
use http::{HeaderMap, Method, Version};
use tokio_io::{AsyncRead, AsyncWrite};

use ::Chunk;
use proto::{BodyLength, Decode, Http1Transaction, MessageHead};
use proto::{BodyLength, MessageHead};
use super::io::{Buffered};
use super::{EncodedBuf, Encoder, Decoder};
use super::{EncodedBuf, Encode, Encoder, Decode, Decoder, Http1Transaction, ParseContext};

const H2_PREFACE: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";

Expand All @@ -36,6 +36,7 @@ where I: AsyncRead + AsyncWrite,
Conn {
io: Buffered::new(io),
state: State {
cached_headers: None,
error: None,
keep_alive: KA::Busy,
method: None,
Expand Down Expand Up @@ -118,8 +119,11 @@ where I: AsyncRead + AsyncWrite,
trace!("Conn::read_head");

loop {
let (version, head) = match self.io.parse::<T>() {
Ok(Async::Ready(head)) => (head.version, head),
let msg = match self.io.parse::<T>(ParseContext {
cached_headers: &mut self.state.cached_headers,
req_method: &mut self.state.method,
}) {
Ok(Async::Ready(msg)) => msg,
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => {
// If we are currently waiting on a message, then an empty
Expand All @@ -141,48 +145,32 @@ where I: AsyncRead + AsyncWrite,
}
};

match version {
Version::HTTP_10 |
Version::HTTP_11 => {},
_ => {
error!("unimplemented HTTP Version = {:?}", version);
self.state.close_read();
//TODO: replace this with a more descriptive error
return Err(::Error::new_version());
}
};
self.state.version = version;

let decoder = match T::decoder(&head, &mut self.state.method) {
Ok(Decode::Normal(d)) => {
self.state.version = msg.head.version;
let head = msg.head;
let decoder = match msg.decode {
Decode::Normal(d) => {
d
},
Ok(Decode::Final(d)) => {
Decode::Final(d) => {
trace!("final decoder, HTTP ending");
debug_assert!(d.is_eof());
self.state.close_read();
d
},
Ok(Decode::Ignore) => {
Decode::Ignore => {
// likely a 1xx message that we can ignore
continue;
}
Err(e) => {
debug!("decoder error = {:?}", e);
self.state.close_read();
return self.on_parse_error(e)
.map(|()| Async::NotReady);
}
};

debug!("incoming body is {}", decoder);

self.state.busy();
if head.expecting_continue() {
let msg = b"HTTP/1.1 100 Continue\r\n\r\n";
self.io.write_buf_mut().extend_from_slice(msg);
if msg.expect_continue {
let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
self.io.write_buf_mut().extend_from_slice(cont);
}
let wants_keep_alive = head.should_keep_alive();
let wants_keep_alive = msg.keep_alive;
self.state.keep_alive &= wants_keep_alive;
let (body, reading) = if decoder.is_eof() {
(false, Reading::KeepAlive)
Expand Down Expand Up @@ -410,8 +398,17 @@ where I: AsyncRead + AsyncWrite,
self.enforce_version(&mut head);

let buf = self.io.write_buf_mut();
self.state.writing = match T::encode(head, body, &mut self.state.method, self.state.title_case_headers, buf) {
self.state.writing = match T::encode(Encode {
head: &mut head,
body,
keep_alive: self.state.wants_keep_alive(),
req_method: &mut self.state.method,
title_case_headers: self.state.title_case_headers,
}, buf) {
Ok(encoder) => {
debug_assert!(self.state.cached_headers.is_none());
debug_assert!(head.headers.is_empty());
self.state.cached_headers = Some(head.headers);
if !encoder.is_eof() {
Writing::Body(encoder)
} else if encoder.is_last() {
Expand All @@ -430,24 +427,12 @@ where I: AsyncRead + AsyncWrite,
// If we know the remote speaks an older version, we try to fix up any messages
// to work with our older peer.
fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) {
//use header::Connection;

let wants_keep_alive = if self.state.wants_keep_alive() {
let ka = head.should_keep_alive();
self.state.keep_alive &= ka;
ka
} else {
false
};

match self.state.version {
Version::HTTP_10 => {
// If the remote only knows HTTP/1.0, we should force ourselves
// to do only speak HTTP/1.0 as well.
head.version = Version::HTTP_10;
if wants_keep_alive {
//TODO: head.headers.set(Connection::keep_alive());
}
},
_ => {
// If the remote speaks HTTP/1.1, then it *should* be fine with
Expand Down Expand Up @@ -617,13 +602,27 @@ impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> {
}

struct State {
/// Re-usable HeaderMap to reduce allocating new ones.
cached_headers: Option<HeaderMap>,
/// If an error occurs when there wasn't a direct way to return it
/// back to the user, this is set.
error: Option<::Error>,
/// Current keep-alive status.
keep_alive: KA,
/// If mid-message, the HTTP Method that started it.
///
/// This is used to know things such as if the message can include
/// a body or not.
method: Option<Method>,
title_case_headers: bool,
/// Set to true when the Dispatcher should poll read operations
/// again. See the `maybe_notify` method for more.
notify_read: bool,
/// State of allowed reads
reading: Reading,
/// State of allowed writes
writing: Writing,
/// Either HTTP/1.0 or 1.1 connection
version: Version,
}

Expand Down
3 changes: 2 additions & 1 deletion src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use http::{Request, Response, StatusCode};
use tokio_io::{AsyncRead, AsyncWrite};

use body::{Body, Payload};
use proto::{BodyLength, Conn, Http1Transaction, MessageHead, RequestHead, RequestLine, ResponseHead};
use proto::{BodyLength, Conn, MessageHead, RequestHead, RequestLine, ResponseHead};
use super::Http1Transaction;
use service::Service;

pub(crate) struct Dispatcher<D, Bs: Payload, I, T> {
Expand Down
7 changes: 4 additions & 3 deletions src/proto/h1/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use iovec::IoVec;
use common::StaticBuf;

/// Encoders to handle different Transfer-Encodings.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct Encoder {
kind: Kind,
is_last: bool,
Expand Down Expand Up @@ -70,8 +70,9 @@ impl Encoder {
}
}

pub fn set_last(&mut self) {
self.is_last = true;
pub fn set_last(mut self, is_last: bool) -> Self {
self.is_last = is_last;
self
}

pub fn is_last(&self) -> bool {
Expand Down
Loading

0 comments on commit 26417fc

Please sign in to comment.