Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

init upgrade submodule #115

Merged
merged 8 commits into from
May 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,14 @@ mod status_code;
mod version;

cfg_unstable! {
pub mod upgrade;

mod client;
mod server;

pub use client::Client;
pub use server::Server;

}

pub use body::Body;
Expand Down
18 changes: 9 additions & 9 deletions src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ pin_project_lite::pin_project! {
url: Url,
headers: Headers,
version: Option<Version>,
sender: Option<sync::Sender<Trailers>>,
receiver: Option<sync::Receiver<Trailers>>,
#[pin]
body: Body,
local_addr: Option<String>,
peer_addr: Option<String>,
ext: Extensions,
trailers_sender: Option<sync::Sender<Trailers>>,
trailers_receiver: Option<sync::Receiver<Trailers>>,
}
}

Expand All @@ -51,18 +51,18 @@ impl Request {
U::Error: std::fmt::Debug,
{
let url = url.try_into().expect("Could not convert into a valid url");
let (sender, receiver) = sync::channel(1);
let (trailers_sender, trailers_receiver) = sync::channel(1);
Self {
method,
url,
headers: Headers::new(),
version: None,
body: Body::empty(),
sender: Some(sender),
receiver: Some(receiver),
ext: Extensions::new(),
peer_addr: None,
local_addr: None,
trailers_receiver: Some(trailers_receiver),
trailers_sender: Some(trailers_sender),
}
}

Expand Down Expand Up @@ -543,7 +543,7 @@ impl Request {
/// Sends trailers to the a receiver.
pub fn send_trailers(&mut self) -> trailers::Sender {
let sender = self
.sender
.trailers_sender
.take()
.expect("Trailers sender can only be constructed once");
trailers::Sender::new(sender)
Expand All @@ -552,7 +552,7 @@ impl Request {
/// Receive trailers from a sender.
pub async fn recv_trailers(&mut self) -> trailers::Receiver {
let receiver = self
.receiver
.trailers_receiver
.take()
.expect("Trailers receiver can only be constructed once");
trailers::Receiver::new(receiver)
Expand Down Expand Up @@ -867,8 +867,8 @@ impl Clone for Request {
url: self.url.clone(),
headers: self.headers.clone(),
version: self.version.clone(),
sender: self.sender.clone(),
receiver: self.receiver.clone(),
trailers_sender: None,
trailers_receiver: None,
body: Body::empty(),
ext: Extensions::new(),
peer_addr: self.peer_addr.clone(),
Expand Down
123 changes: 114 additions & 9 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,42 @@ use crate::mime::Mime;
use crate::trailers::{self, Trailers};
use crate::{Body, Extensions, StatusCode, Version};

cfg_unstable! {
use crate::upgrade;
}

#[cfg(not(feature = "unstable"))]
pin_project_lite::pin_project! {
/// An HTTP response.
///
/// # Examples
///
/// ```
/// # fn main() -> Result<(), http_types::Error> {
/// #
/// use http_types::{Response, StatusCode};
///
/// let mut res = Response::new(StatusCode::Ok);
/// res.set_body("Hello, Nori!");
/// #
/// # Ok(()) }
/// ```
#[derive(Debug)]
pub struct Response {
status: StatusCode,
headers: Headers,
version: Option<Version>,
trailers_sender: Option<sync::Sender<Trailers>>,
trailers_receiver: Option<sync::Receiver<Trailers>>,
#[pin]
body: Body,
ext: Extensions,
local_addr: Option<String>,
peer_addr: Option<String>,
}
}

#[cfg(feature = "unstable")]
pin_project_lite::pin_project! {
/// An HTTP response.
///
Expand All @@ -37,8 +73,11 @@ pin_project_lite::pin_project! {
status: StatusCode,
headers: Headers,
version: Option<Version>,
sender: Option<sync::Sender<Trailers>>,
receiver: Option<sync::Receiver<Trailers>>,
trailers_sender: Option<sync::Sender<Trailers>>,
trailers_receiver: Option<sync::Receiver<Trailers>>,
upgrade_sender: Option<sync::Sender<upgrade::Connection>>,
upgrade_receiver: Option<sync::Receiver<upgrade::Connection>>,
has_upgrade: bool,
#[pin]
body: Body,
ext: Extensions,
Expand All @@ -49,6 +88,7 @@ pin_project_lite::pin_project! {

impl Response {
/// Create a new response.
#[cfg(not(feature = "unstable"))]
pub fn new<S>(status: S) -> Self
where
S: TryInto<StatusCode>,
Expand All @@ -57,14 +97,42 @@ impl Response {
let status = status
.try_into()
.expect("Could not convert into a valid `StatusCode`");
let (sender, receiver) = sync::channel(1);
let (trailers_sender, trailers_receiver) = sync::channel(1);
Self {
status,
headers: Headers::new(),
version: None,
body: Body::empty(),
sender: Some(sender),
receiver: Some(receiver),
trailers_sender: Some(trailers_sender),
trailers_receiver: Some(trailers_receiver),
ext: Extensions::new(),
peer_addr: None,
local_addr: None,
}
}

/// Create a new response.
#[cfg(feature = "unstable")]
pub fn new<S>(status: S) -> Self
where
S: TryInto<StatusCode>,
S::Error: Debug,
{
let status = status
.try_into()
.expect("Could not convert into a valid `StatusCode`");
let (trailers_sender, trailers_receiver) = sync::channel(1);
let (upgrade_sender, upgrade_receiver) = sync::channel(1);
Self {
status,
headers: Headers::new(),
version: None,
body: Body::empty(),
trailers_sender: Some(trailers_sender),
trailers_receiver: Some(trailers_receiver),
upgrade_sender: Some(upgrade_sender),
upgrade_receiver: Some(upgrade_receiver),
has_upgrade: false,
ext: Extensions::new(),
peer_addr: None,
local_addr: None,
Expand Down Expand Up @@ -457,7 +525,7 @@ impl Response {
/// Sends trailers to the a receiver.
pub fn send_trailers(&mut self) -> trailers::Sender {
let sender = self
.sender
.trailers_sender
.take()
.expect("Trailers sender can only be constructed once");
trailers::Sender::new(sender)
Expand All @@ -466,12 +534,43 @@ impl Response {
/// Receive trailers from a sender.
pub async fn recv_trailers(&mut self) -> trailers::Receiver {
let receiver = self
.receiver
.trailers_receiver
.take()
.expect("Trailers receiver can only be constructed once");
trailers::Receiver::new(receiver)
}

/// Sends an upgrade connection to the a receiver.
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub fn send_upgrade(&mut self) -> upgrade::Sender {
self.has_upgrade = true;
let sender = self
.upgrade_sender
.take()
.expect("Upgrade sender can only be constructed once");
upgrade::Sender::new(sender)
}

/// Receive an upgraded connection from a sender.
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub async fn recv_upgrade(&mut self) -> upgrade::Receiver {
self.has_upgrade = true;
let receiver = self
.upgrade_receiver
.take()
.expect("Upgrade receiver can only be constructed once");
upgrade::Receiver::new(receiver)
}

/// Returns `true` if a protocol upgrade is in progress.
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub fn has_upgrade(&self) -> bool {
self.has_upgrade
}

/// An iterator visiting all header pairs in arbitrary order.
pub fn iter(&self) -> headers::Iter<'_> {
self.headers.iter()
Expand Down Expand Up @@ -526,8 +625,14 @@ impl Clone for Response {
status: self.status.clone(),
headers: self.headers.clone(),
version: self.version.clone(),
sender: self.sender.clone(),
receiver: self.receiver.clone(),
trailers_sender: self.trailers_sender.clone(),
trailers_receiver: self.trailers_receiver.clone(),
#[cfg(feature = "unstable")]
upgrade_sender: self.upgrade_sender.clone(),
#[cfg(feature = "unstable")]
upgrade_receiver: self.upgrade_receiver.clone(),
#[cfg(feature = "unstable")]
has_upgrade: false,
body: Body::empty(),
ext: Extensions::new(),
peer_addr: self.peer_addr.clone(),
Expand Down
45 changes: 45 additions & 0 deletions src/upgrade/connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use async_std::io::{self, prelude::*};

use std::pin::Pin;
use std::task::{Context, Poll};

/// An upgraded HTTP connection.
#[derive(Debug, Clone)]
pub struct RawConnection<Inner> {
inner: Inner,
}

/// A boxed upgraded HTTP connection.
pub type Connection = RawConnection<Box<dyn InnerConnection + 'static>>;

/// Trait to signal the requirements for an underlying connection type.
pub trait InnerConnection: Read + Write + Send + Sync + Unpin {}
impl<T: Read + Write + Send + Sync + Unpin> InnerConnection for T {}

impl<Inner: Read + Unpin> Read for RawConnection<Inner> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.inner).poll_read(cx, buf)
}
}

impl<Inner: Write + Unpin> Write for RawConnection<Inner> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.inner).poll_write(cx, buf)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_flush(cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_close(cx)
}
}
19 changes: 19 additions & 0 deletions src/upgrade/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
//! HTTP protocol upgrades.
//!
//! In HTTP it's not uncommon to convert from one protocol to another. For
//! example `HTTP/1.1` can upgrade a connection to websockets using the
//! [upgrade header](https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism),
//! while `HTTP/2` uses [a custom
//! handshake](https://tools.ietf.org/html/rfc8441#section-5.1). Regardless of
//! the HTTP version, changing protocols always involves some handshake,
//! after which it is turned into a stream of bytes. This module provides
//! primitives for upgrading from HTTP request-response pairs to alternate
//! protocols.

mod connection;
mod receiver;
mod sender;

pub use connection::Connection;
pub use receiver::Receiver;
pub use sender::Sender;
35 changes: 35 additions & 0 deletions src/upgrade/receiver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use async_std::prelude::*;
use async_std::sync;

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use crate::upgrade::Connection;

/// The receiving half of a channel to send an upgraded connection.
///
/// Unlike `async_std::sync::channel` the `send` method on this type can only be
/// called once, and cannot be cloned. That's because only a single instance of
/// `Connection` should be created.
#[must_use = "Futures do nothing unless polled or .awaited"]
#[derive(Debug)]
pub struct Receiver {
receiver: sync::Receiver<Connection>,
}

impl Receiver {
/// Create a new instance of `Receiver`.
#[allow(unused)]
pub(crate) fn new(receiver: sync::Receiver<Connection>) -> Self {
Self { receiver }
}
}

impl Future for Receiver {
type Output = Option<Connection>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.receiver).poll_next(cx)
}
}
28 changes: 28 additions & 0 deletions src/upgrade/sender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use async_std::sync;

use crate::upgrade::Connection;

/// The sending half of a channel to send an upgraded connection.
///
/// Unlike `async_std::sync::channel` the `send` method on this type can only be
/// called once, and cannot be cloned. That's because only a single instance of
/// `Connection` should be created.
#[derive(Debug)]
pub struct Sender {
sender: sync::Sender<Connection>,
}

impl Sender {
/// Create a new instance of `Sender`.
#[doc(hidden)]
pub fn new(sender: sync::Sender<Connection>) -> Self {
Self { sender }
}

/// Send a `Trailer`.
///
/// The channel will be consumed after having sent trailers.
pub async fn send(self, trailers: Connection) {
self.sender.send(trailers).await
}
}