Skip to content

Commit

Permalink
feat(h2): implement CONNECT support (fixes #2508)
Browse files Browse the repository at this point in the history
  • Loading branch information
nox committed May 19, 2021
1 parent 90c14ee commit 845c9c9
Show file tree
Hide file tree
Showing 10 changed files with 767 additions and 86 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ http = "0.2"
http-body = "0.4"
httpdate = "1.0"
httparse = "1.4"
h2 = { version = "0.3", optional = true }
h2 = { version = "0.3.3", optional = true }
itoa = "0.4.1"
tracing = { version = "0.1", default-features = false, features = ["std"] }
pin-project = "1.0"
Expand Down
11 changes: 11 additions & 0 deletions src/body/length.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,17 @@ use std::fmt;
#[derive(Clone, Copy, PartialEq, Eq)]
pub(crate) struct DecodedLength(u64);

#[cfg(any(feature = "http1", feature = "http2"))]
impl From<Option<u64>> for DecodedLength {
fn from(len: Option<u64>) -> Self {
len.and_then(|len| {
// If the length is u64::MAX, oh well, just reported chunked.
Self::checked_new(len).ok()
})
.unwrap_or(DecodedLength::CHUNKED)
}
}

#[cfg(any(feature = "http1", feature = "http2", test))]
const MAX_LEN: u64 = std::u64::MAX - 2;

Expand Down
21 changes: 8 additions & 13 deletions src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,20 +243,15 @@ where
.expect("uri host is valid header value")
});
}
}

// CONNECT always sends authority-form, so check it first...
if req.method() == Method::CONNECT {
authority_form(req.uri_mut());
} else if pooled.conn_info.is_proxied {
absolute_form(req.uri_mut());
} else {
origin_form(req.uri_mut());
};
} else if req.method() == Method::CONNECT {
debug!("client does not support CONNECT requests over HTTP2");
return Err(ClientError::Normal(
crate::Error::new_user_unsupported_request_method(),
));
// CONNECT always sends authority-form, so check it first...
if req.method() == Method::CONNECT {
authority_form(req.uri_mut());
} else if pooled.conn_info.is_proxied {
absolute_form(req.uri_mut());
} else {
origin_form(req.uri_mut());
}

let fut = pooled
Expand Down
6 changes: 3 additions & 3 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub(super) enum User {
/// User tried to send a certain header in an unexpected context.
///
/// For example, sending both `content-length` and `transfer-encoding`.
#[cfg(feature = "http1")]
#[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "server")]
UnexpectedHeader,
/// User tried to create a Request with bad version.
Expand Down Expand Up @@ -290,7 +290,7 @@ impl Error {
Error::new(Kind::User(user))
}

#[cfg(feature = "http1")]
#[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "server")]
pub(super) fn new_user_header() -> Error {
Error::new_user(User::UnexpectedHeader)
Expand Down Expand Up @@ -405,7 +405,7 @@ impl Error {
Kind::User(User::MakeService) => "error from user's MakeService",
#[cfg(any(feature = "http1", feature = "http2"))]
Kind::User(User::Service) => "error from user's Service",
#[cfg(feature = "http1")]
#[cfg(any(feature = "http1", feature = "http2"))]
#[cfg(feature = "server")]
Kind::User(User::UnexpectedHeader) => "user sent unexpected header",
#[cfg(any(feature = "http1", feature = "http2"))]
Expand Down
111 changes: 79 additions & 32 deletions src/proto/h2/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@ use std::error::Error as StdError;
#[cfg(feature = "runtime")]
use std::time::Duration;

use bytes::{Bytes};
use futures_channel::{mpsc, oneshot};
use futures_util::future::{self, Either, FutureExt as _, TryFutureExt as _};
use futures_util::stream::StreamExt as _;
use h2::client::{Builder, SendRequest};
use http::Method;
use tokio::io::{AsyncRead, AsyncWrite};

use super::{decode_content_length, ping, PipeToSendStream, SendBuf};
use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
use crate::body::HttpBody;
use crate::common::{exec::Exec, task, Future, Never, Pin, Poll};
use crate::headers;
use crate::proto::Dispatched;
use crate::proto::h2::UpgradedSendStream;
use crate::upgrade::Upgraded;
use crate::{Body, Request, Response};

type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<Body>>;
Expand Down Expand Up @@ -233,8 +237,20 @@ where
headers::set_content_length_if_missing(req.headers_mut(), len);
}
}

let is_connect = req.method() == Method::CONNECT;
let eos = body.is_end_stream();
let (fut, body_tx) = match self.h2_tx.send_request(req, eos) {
let ping = self.ping.clone();

if is_connect {
if headers::content_length_parse_all(req.headers()).map_or(false, |len| len != 0) {
warn!("h2 connect request with non-zero body not supported");
cb.send(Err((crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), None)));
continue;
}
}

let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
Ok(ok) => ok,
Err(err) => {
debug!("client send request error: {}", err);
Expand All @@ -243,45 +259,76 @@ where
}
};

let ping = self.ping.clone();
if !eos {
let mut pipe = Box::pin(PipeToSendStream::new(body, body_tx)).map(|res| {
if let Err(e) = res {
debug!("client request body error: {}", e);
}
});

// eagerly see if the body pipe is ready and
// can thus skip allocating in the executor
match Pin::new(&mut pipe).poll(cx) {
Poll::Ready(_) => (),
Poll::Pending => {
let conn_drop_ref = self.conn_drop_ref.clone();
// keep the ping recorder's knowledge of an
// "open stream" alive while this body is
// still sending...
let ping = ping.clone();
let pipe = pipe.map(move |x| {
drop(conn_drop_ref);
drop(ping);
x
let send_stream = if !is_connect {
if !eos {
let mut pipe =
Box::pin(PipeToSendStream::new(body, body_tx)).map(|res| {
if let Err(e) = res {
debug!("client request body error: {}", e);
}
});
self.executor.execute(pipe);

// eagerly see if the body pipe is ready and
// can thus skip allocating in the executor
match Pin::new(&mut pipe).poll(cx) {
Poll::Ready(_) => (),
Poll::Pending => {
let conn_drop_ref = self.conn_drop_ref.clone();
// keep the ping recorder's knowledge of an
// "open stream" alive while this body is
// still sending...
let ping = ping.clone();
let pipe = pipe.map(move |x| {
drop(conn_drop_ref);
drop(ping);
x
});
self.executor.execute(pipe);
}
}
}
}

None
} else {
Some(body_tx)
};

let fut = fut.map(move |result| match result {
Ok(res) => {
// record that we got the response headers
ping.record_non_data();

let content_length = decode_content_length(res.headers());
let res = res.map(|stream| {
let ping = ping.for_stream(&stream);
crate::Body::h2(stream, content_length, ping)
});
Ok(res)
let content_length = headers::content_length_parse_all(res.headers());
if let Some(mut send_stream) = send_stream {
if content_length.map_or(false, |len| len != 0) {
warn!("h2 connect response with non-zero body not supported");

send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
return Err((crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), None));
}
let (parts, recv_stream) = res.into_parts();
let mut res = Response::from_parts(parts, Body::empty());

let (pending, on_upgrade) = crate::upgrade::pending();
let io = H2Upgraded {
ping,
send_stream: unsafe { UpgradedSendStream::new(send_stream) },
recv_stream,
buf: Bytes::new(),
};
let upgraded = Upgraded::new(io, Bytes::new());

pending.fulfill(upgraded);
res.extensions_mut().insert(on_upgrade);

Ok(res)
} else {
let res = res.map(|stream| {
let ping = ping.for_stream(&stream);
crate::Body::h2(stream, content_length.into(), ping)
});
Ok(res)
}
}
Err(err) => {
ping.ensure_not_timed_out().map_err(|e| (e, None))?;
Expand Down
Loading

0 comments on commit 845c9c9

Please sign in to comment.