Skip to content

Commit

Permalink
feat(http1): Add higher-level HTTP upgrade support to Client and Serv…
Browse files Browse the repository at this point in the history
…er (#1563)

- Adds `Body::on_upgrade()` that returns an `OnUpgrade` future.
- Adds `hyper::upgrade` module containing types for dealing with
  upgrades.
- Adds `server::conn::Connection::with_upgrades()` method to enable
  these upgrades when using lower-level API (because of a missing
  `Send` bound on the transport generic).
- Client connections are automatically enabled.
- Optimizes request parsing, to make up for extra work to look for
  upgrade requests.
  - Returns a smaller `DecodedLength` type instead of the fatter
    `Decoder`, which should also allow a couple fewer branches.
  - Removes the `Decode::Ignore` wrapper enum, and instead ignoring
    1xx responses is handled directly in the response parsing code.

Ref #1563 

Closes #1395
  • Loading branch information
seanmonstar committed Jun 14, 2018
1 parent 1c3fbfd commit fea29b2
Show file tree
Hide file tree
Showing 26 changed files with 1,271 additions and 574 deletions.
8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ iovec = "0.1"
log = "0.4"
net2 = { version = "0.2.32", optional = true }
time = "0.1"
tokio = { version = "0.1.5", optional = true }
tokio = { version = "0.1.7", optional = true }
tokio-executor = { version = "0.1.0", optional = true }
tokio-io = "0.1"
tokio-reactor = { version = "0.1", optional = true }
Expand Down Expand Up @@ -101,6 +101,12 @@ name = "send_file"
path = "examples/send_file.rs"
required-features = ["runtime"]

[[example]]
name = "upgrades"
path = "examples/upgrades.rs"
required-features = ["runtime"]


[[example]]
name = "web_api"
path = "examples/web_api.rs"
Expand Down
2 changes: 2 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ Run examples with `cargo run --example example_name`.

* [`send_file`](send_file.rs) - A server that sends back content of files using tokio_fs to read the files asynchronously.

* [`upgrades`](upgrades.rs) - A server and client demonstrating how to do HTTP upgrades (such as WebSockets or `CONNECT` tunneling).

* [`web_api`](web_api.rs) - A server consisting in a service that returns incoming POST request's content in the response in uppercase and a service that call that call the first service and includes the first service response in its own response.
127 changes: 127 additions & 0 deletions examples/upgrades.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Note: `hyper::upgrade` docs link to this upgrade.
extern crate futures;
extern crate hyper;
extern crate tokio;

use std::str;

use futures::sync::oneshot;

use hyper::{Body, Client, Request, Response, Server, StatusCode};
use hyper::header::{UPGRADE, HeaderValue};
use hyper::rt::{self, Future};
use hyper::service::service_fn_ok;

/// Our server HTTP handler to initiate HTTP upgrades.
fn server_upgrade(req: Request<Body>) -> Response<Body> {
let mut res = Response::new(Body::empty());

// Send a 400 to any request that doesn't have
// an `Upgrade` header.
if !req.headers().contains_key(UPGRADE) {
*res.status_mut() = StatusCode::BAD_REQUEST;
return res;
}

// Setup a future that will eventually receive the upgraded
// connection and talk a new protocol, and spawn the future
// into the runtime.
//
// Note: This can't possibly be fulfilled until the 101 response
// is returned below, so it's better to spawn this future instead
// waiting for it to complete to then return a response.
let on_upgrade = req
.into_body()
.on_upgrade()
.map_err(|err| eprintln!("upgrade error: {}", err))
.and_then(|upgraded| {
// We have an upgraded connection that we can read and
// write on directly.
//
// Since we completely control this example, we know exactly
// how many bytes the client will write, so just read exact...
tokio::io::read_exact(upgraded, vec![0; 7])
.and_then(|(upgraded, vec)| {
println!("server[foobar] recv: {:?}", str::from_utf8(&vec));

// And now write back the server 'foobar' protocol's
// response...
tokio::io::write_all(upgraded, b"bar=foo")
})
.map(|_| println!("server[foobar] sent"))
.map_err(|e| eprintln!("server foobar io error: {}", e))
});

rt::spawn(on_upgrade);


// Now return a 101 Response saying we agree to the upgrade to some
// made-up 'foobar' protocol.
*res.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
res.headers_mut().insert(UPGRADE, HeaderValue::from_static("foobar"));
res
}

fn main() {
// For this example, we just make a server and our own client to talk to
// it, so the exact port isn't important. Instead, let the OS give us an
// unused port.
let addr = ([127, 0, 0, 1], 0).into();

let server = Server::bind(&addr)
.serve(|| service_fn_ok(server_upgrade));

// We need the assigned address for the client to send it messages.
let addr = server.local_addr();


// For this example, a oneshot is used to signal that after 1 request,
// the server should be shutdown.
let (tx, rx) = oneshot::channel();

let server = server
.map_err(|e| eprintln!("server error: {}", e))
.select2(rx)
.then(|_| Ok(()));

rt::run(rt::lazy(move || {
rt::spawn(server);

let req = Request::builder()
.uri(format!("http://{}/", addr))
.header(UPGRADE, "foobar")
.body(Body::empty())
.unwrap();

Client::new()
.request(req)
.and_then(|res| {
if res.status() != StatusCode::SWITCHING_PROTOCOLS {
panic!("Our server didn't upgrade: {}", res.status());
}

res
.into_body()
.on_upgrade()
})
.map_err(|e| eprintln!("client error: {}", e))
.and_then(|upgraded| {
// We've gotten an upgraded connection that we can read
// and write directly on. Let's start out 'foobar' protocol.
tokio::io::write_all(upgraded, b"foo=bar")
.and_then(|(upgraded, _)| {
println!("client[foobar] sent");
tokio::io::read_to_end(upgraded, Vec::new())
})
.map(|(_upgraded, vec)| {
println!("client[foobar] recv: {:?}", str::from_utf8(&vec));


// Complete the oneshot so that the server stops
// listening and the process can close down.
let _ = tx.send(());
})
.map_err(|e| eprintln!("client foobar io error: {}", e))
})
}));
}
72 changes: 56 additions & 16 deletions src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use http::HeaderMap;
use common::Never;
use super::{Chunk, Payload};
use super::internal::{FullDataArg, FullDataRet};
use upgrade::OnUpgrade;

type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;

Expand All @@ -21,15 +22,9 @@ type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;
#[must_use = "streams do nothing unless polled"]
pub struct Body {
kind: Kind,
/// Allow the client to pass a future to delay the `Body` from returning
/// EOF. This allows the `Client` to try to put the idle connection
/// back into the pool before the body is "finished".
///
/// The reason for this is so that creating a new request after finishing
/// streaming the body of a response could sometimes result in creating
/// a brand new connection, since the pool didn't know about the idle
/// connection yet.
delayed_eof: Option<DelayEof>,
/// Keep the extra bits in an `Option<Box<Extra>>`, so that
/// Body stays small in the common case (no extras needed).
extra: Option<Box<Extra>>,
}

enum Kind {
Expand All @@ -43,6 +38,19 @@ enum Kind {
Wrapped(Box<Stream<Item=Chunk, Error=Box<::std::error::Error + Send + Sync>> + Send>),
}

struct Extra {
/// Allow the client to pass a future to delay the `Body` from returning
/// EOF. This allows the `Client` to try to put the idle connection
/// back into the pool before the body is "finished".
///
/// The reason for this is so that creating a new request after finishing
/// streaming the body of a response could sometimes result in creating
/// a brand new connection, since the pool didn't know about the idle
/// connection yet.
delayed_eof: Option<DelayEof>,
on_upgrade: OnUpgrade,
}

type DelayEofUntil = oneshot::Receiver<Never>;

enum DelayEof {
Expand Down Expand Up @@ -89,7 +97,6 @@ impl Body {
Self::new_channel(None)
}

#[inline]
pub(crate) fn new_channel(content_length: Option<u64>) -> (Sender, Body) {
let (tx, rx) = mpsc::channel(0);
let (abort_tx, abort_rx) = oneshot::channel();
Expand Down Expand Up @@ -139,34 +146,67 @@ impl Body {
Body::new(Kind::Wrapped(Box::new(mapped)))
}

/// Converts this `Body` into a `Future` of a pending HTTP upgrade.
///
/// See [the `upgrade` module](::upgrade) for more.
pub fn on_upgrade(self) -> OnUpgrade {
self
.extra
.map(|ex| ex.on_upgrade)
.unwrap_or_else(OnUpgrade::none)
}

fn new(kind: Kind) -> Body {
Body {
kind: kind,
delayed_eof: None,
extra: None,
}
}

pub(crate) fn h2(recv: h2::RecvStream) -> Self {
Body::new(Kind::H2(recv))
}

pub(crate) fn set_on_upgrade(&mut self, upgrade: OnUpgrade) {
debug_assert!(!upgrade.is_none(), "set_on_upgrade with empty upgrade");
let extra = self.extra_mut();
debug_assert!(extra.on_upgrade.is_none(), "set_on_upgrade twice");
extra.on_upgrade = upgrade;
}

pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
self.delayed_eof = Some(DelayEof::NotEof(fut));
self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut));
}

fn take_delayed_eof(&mut self) -> Option<DelayEof> {
self
.extra
.as_mut()
.and_then(|extra| extra.delayed_eof.take())
}

fn extra_mut(&mut self) -> &mut Extra {
self
.extra
.get_or_insert_with(|| Box::new(Extra {
delayed_eof: None,
on_upgrade: OnUpgrade::none(),
}))
}

fn poll_eof(&mut self) -> Poll<Option<Chunk>, ::Error> {
match self.delayed_eof.take() {
match self.take_delayed_eof() {
Some(DelayEof::NotEof(mut delay)) => {
match self.poll_inner() {
ok @ Ok(Async::Ready(Some(..))) |
ok @ Ok(Async::NotReady) => {
self.delayed_eof = Some(DelayEof::NotEof(delay));
self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay));
ok
},
Ok(Async::Ready(None)) => match delay.poll() {
Ok(Async::Ready(never)) => match never {},
Ok(Async::NotReady) => {
self.delayed_eof = Some(DelayEof::Eof(delay));
self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
Ok(Async::NotReady)
},
Err(_done) => {
Expand All @@ -180,7 +220,7 @@ impl Body {
match delay.poll() {
Ok(Async::Ready(never)) => match never {},
Ok(Async::NotReady) => {
self.delayed_eof = Some(DelayEof::Eof(delay));
self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
Ok(Async::NotReady)
},
Err(_done) => {
Expand Down
Loading

0 comments on commit fea29b2

Please sign in to comment.