Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add higher-level HTTP upgrade support to Client and Server #1563

Merged
merged 1 commit into from
Jun 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ iovec = "0.1"
log = "0.4"
net2 = { version = "0.2.32", optional = true }
time = "0.1"
tokio = { version = "0.1.5", optional = true }
tokio = { version = "0.1.7", optional = true }
tokio-executor = { version = "0.1.0", optional = true }
tokio-io = "0.1"
tokio-reactor = { version = "0.1", optional = true }
Expand Down Expand Up @@ -101,6 +101,12 @@ name = "send_file"
path = "examples/send_file.rs"
required-features = ["runtime"]

[[example]]
name = "upgrades"
path = "examples/upgrades.rs"
required-features = ["runtime"]


[[example]]
name = "web_api"
path = "examples/web_api.rs"
Expand Down
2 changes: 2 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ Run examples with `cargo run --example example_name`.

* [`send_file`](send_file.rs) - A server that sends back content of files using tokio_fs to read the files asynchronously.

* [`upgrades`](upgrades.rs) - A server and client demonstrating how to do HTTP upgrades (such as WebSockets or `CONNECT` tunneling).

* [`web_api`](web_api.rs) - A server consisting in a service that returns incoming POST request's content in the response in uppercase and a service that call that call the first service and includes the first service response in its own response.
127 changes: 127 additions & 0 deletions examples/upgrades.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Note: `hyper::upgrade` docs link to this upgrade.
extern crate futures;
extern crate hyper;
extern crate tokio;

use std::str;

use futures::sync::oneshot;

use hyper::{Body, Client, Request, Response, Server, StatusCode};
use hyper::header::{UPGRADE, HeaderValue};
use hyper::rt::{self, Future};
use hyper::service::service_fn_ok;

/// Our server HTTP handler to initiate HTTP upgrades.
fn server_upgrade(req: Request<Body>) -> Response<Body> {
let mut res = Response::new(Body::empty());

// Send a 400 to any request that doesn't have
// an `Upgrade` header.
if !req.headers().contains_key(UPGRADE) {
*res.status_mut() = StatusCode::BAD_REQUEST;
return res;
}

// Setup a future that will eventually receive the upgraded
// connection and talk a new protocol, and spawn the future
// into the runtime.
//
// Note: This can't possibly be fulfilled until the 101 response
// is returned below, so it's better to spawn this future instead
// waiting for it to complete to then return a response.
let on_upgrade = req
.into_body()
.on_upgrade()
.map_err(|err| eprintln!("upgrade error: {}", err))
.and_then(|upgraded| {
// We have an upgraded connection that we can read and
// write on directly.
//
// Since we completely control this example, we know exactly
// how many bytes the client will write, so just read exact...
tokio::io::read_exact(upgraded, vec![0; 7])
.and_then(|(upgraded, vec)| {
println!("server[foobar] recv: {:?}", str::from_utf8(&vec));

// And now write back the server 'foobar' protocol's
// response...
tokio::io::write_all(upgraded, b"bar=foo")
})
.map(|_| println!("server[foobar] sent"))
.map_err(|e| eprintln!("server foobar io error: {}", e))
});

rt::spawn(on_upgrade);


// Now return a 101 Response saying we agree to the upgrade to some
// made-up 'foobar' protocol.
*res.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
res.headers_mut().insert(UPGRADE, HeaderValue::from_static("foobar"));
res
}

fn main() {
// For this example, we just make a server and our own client to talk to
// it, so the exact port isn't important. Instead, let the OS give us an
// unused port.
let addr = ([127, 0, 0, 1], 0).into();

let server = Server::bind(&addr)
.serve(|| service_fn_ok(server_upgrade));

// We need the assigned address for the client to send it messages.
let addr = server.local_addr();


// For this example, a oneshot is used to signal that after 1 request,
// the server should be shutdown.
let (tx, rx) = oneshot::channel();

let server = server
.map_err(|e| eprintln!("server error: {}", e))
.select2(rx)
.then(|_| Ok(()));

rt::run(rt::lazy(move || {
rt::spawn(server);

let req = Request::builder()
.uri(format!("http://{}/", addr))
.header(UPGRADE, "foobar")
.body(Body::empty())
.unwrap();

Client::new()
.request(req)
.and_then(|res| {
if res.status() != StatusCode::SWITCHING_PROTOCOLS {
panic!("Our server didn't upgrade: {}", res.status());
}

res
.into_body()
.on_upgrade()
})
.map_err(|e| eprintln!("client error: {}", e))
.and_then(|upgraded| {
// We've gotten an upgraded connection that we can read
// and write directly on. Let's start out 'foobar' protocol.
tokio::io::write_all(upgraded, b"foo=bar")
.and_then(|(upgraded, _)| {
println!("client[foobar] sent");
tokio::io::read_to_end(upgraded, Vec::new())
})
.map(|(_upgraded, vec)| {
println!("client[foobar] recv: {:?}", str::from_utf8(&vec));


// Complete the oneshot so that the server stops
// listening and the process can close down.
let _ = tx.send(());
})
.map_err(|e| eprintln!("client foobar io error: {}", e))
})
}));
}
72 changes: 56 additions & 16 deletions src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use http::HeaderMap;
use common::Never;
use super::{Chunk, Payload};
use super::internal::{FullDataArg, FullDataRet};
use upgrade::OnUpgrade;

type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;

Expand All @@ -21,15 +22,9 @@ type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;
#[must_use = "streams do nothing unless polled"]
pub struct Body {
kind: Kind,
/// Allow the client to pass a future to delay the `Body` from returning
/// EOF. This allows the `Client` to try to put the idle connection
/// back into the pool before the body is "finished".
///
/// The reason for this is so that creating a new request after finishing
/// streaming the body of a response could sometimes result in creating
/// a brand new connection, since the pool didn't know about the idle
/// connection yet.
delayed_eof: Option<DelayEof>,
/// Keep the extra bits in an `Option<Box<Extra>>`, so that
/// Body stays small in the common case (no extras needed).
extra: Option<Box<Extra>>,
}

enum Kind {
Expand All @@ -43,6 +38,19 @@ enum Kind {
Wrapped(Box<Stream<Item=Chunk, Error=Box<::std::error::Error + Send + Sync>> + Send>),
}

struct Extra {
/// Allow the client to pass a future to delay the `Body` from returning
/// EOF. This allows the `Client` to try to put the idle connection
/// back into the pool before the body is "finished".
///
/// The reason for this is so that creating a new request after finishing
/// streaming the body of a response could sometimes result in creating
/// a brand new connection, since the pool didn't know about the idle
/// connection yet.
delayed_eof: Option<DelayEof>,
on_upgrade: OnUpgrade,
}

type DelayEofUntil = oneshot::Receiver<Never>;

enum DelayEof {
Expand Down Expand Up @@ -89,7 +97,6 @@ impl Body {
Self::new_channel(None)
}

#[inline]
pub(crate) fn new_channel(content_length: Option<u64>) -> (Sender, Body) {
let (tx, rx) = mpsc::channel(0);
let (abort_tx, abort_rx) = oneshot::channel();
Expand Down Expand Up @@ -139,34 +146,67 @@ impl Body {
Body::new(Kind::Wrapped(Box::new(mapped)))
}

/// Converts this `Body` into a `Future` of a pending HTTP upgrade.
///
/// See [the `upgrade` module](::upgrade) for more.
pub fn on_upgrade(self) -> OnUpgrade {
self
.extra
.map(|ex| ex.on_upgrade)
.unwrap_or_else(OnUpgrade::none)
}

fn new(kind: Kind) -> Body {
Body {
kind: kind,
delayed_eof: None,
extra: None,
}
}

pub(crate) fn h2(recv: h2::RecvStream) -> Self {
Body::new(Kind::H2(recv))
}

pub(crate) fn set_on_upgrade(&mut self, upgrade: OnUpgrade) {
debug_assert!(!upgrade.is_none(), "set_on_upgrade with empty upgrade");
let extra = self.extra_mut();
debug_assert!(extra.on_upgrade.is_none(), "set_on_upgrade twice");
extra.on_upgrade = upgrade;
}

pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
self.delayed_eof = Some(DelayEof::NotEof(fut));
self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut));
}

fn take_delayed_eof(&mut self) -> Option<DelayEof> {
self
.extra
.as_mut()
.and_then(|extra| extra.delayed_eof.take())
}

fn extra_mut(&mut self) -> &mut Extra {
self
.extra
.get_or_insert_with(|| Box::new(Extra {
delayed_eof: None,
on_upgrade: OnUpgrade::none(),
}))
}

fn poll_eof(&mut self) -> Poll<Option<Chunk>, ::Error> {
match self.delayed_eof.take() {
match self.take_delayed_eof() {
Some(DelayEof::NotEof(mut delay)) => {
match self.poll_inner() {
ok @ Ok(Async::Ready(Some(..))) |
ok @ Ok(Async::NotReady) => {
self.delayed_eof = Some(DelayEof::NotEof(delay));
self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay));
ok
},
Ok(Async::Ready(None)) => match delay.poll() {
Ok(Async::Ready(never)) => match never {},
Ok(Async::NotReady) => {
self.delayed_eof = Some(DelayEof::Eof(delay));
self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
Ok(Async::NotReady)
},
Err(_done) => {
Expand All @@ -180,7 +220,7 @@ impl Body {
match delay.poll() {
Ok(Async::Ready(never)) => match never {},
Ok(Async::NotReady) => {
self.delayed_eof = Some(DelayEof::Eof(delay));
self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
Ok(Async::NotReady)
},
Err(_done) => {
Expand Down
Loading