From 32ad0600510d6bbe3d9d00565e0d4a8ac2ec990e Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Thu, 23 Apr 2020 20:40:50 +0200 Subject: [PATCH 1/8] init upgrade submodule --- src/lib.rs | 3 ++ src/request.rs | 75 ++++++++++++++++++++++++++++++++++----- src/upgrade/connection.rs | 42 ++++++++++++++++++++++ src/upgrade/mod.rs | 19 ++++++++++ src/upgrade/receiver.rs | 35 ++++++++++++++++++ src/upgrade/sender.rs | 28 +++++++++++++++ 6 files changed, 193 insertions(+), 9 deletions(-) create mode 100644 src/upgrade/connection.rs create mode 100644 src/upgrade/mod.rs create mode 100644 src/upgrade/receiver.rs create mode 100644 src/upgrade/sender.rs diff --git a/src/lib.rs b/src/lib.rs index e917fb64..df0d7619 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -131,11 +131,14 @@ mod status_code; mod version; cfg_unstable! { + pub mod upgrade; + mod client; mod server; pub use client::Client; pub use server::Server; + } pub use body::Body; diff --git a/src/request.rs b/src/request.rs index 4423971f..d43cb692 100644 --- a/src/request.rs +++ b/src/request.rs @@ -16,6 +16,10 @@ use crate::mime::Mime; use crate::trailers::{self, Trailers}; use crate::{Body, Extensions, Method, StatusCode, Url, Version}; +cfg_unstable! { + use crate::upgrade; +} + pin_project_lite::pin_project! { /// An HTTP request. /// @@ -33,8 +37,10 @@ pin_project_lite::pin_project! { url: Url, headers: Headers, version: Option, - sender: Option>, - receiver: Option>, + trailers_sender: Option>, + trailers_receiver: Option>, + upgrade_sender: Option>, + upgrade_receiver: Option>, #[pin] body: Body, local_addr: Option, @@ -45,24 +51,51 @@ pin_project_lite::pin_project! { impl Request { /// Create a new request. + #[cfg(feature = "unstable")] pub fn new(method: Method, url: U) -> Self where U: TryInto, U::Error: std::fmt::Debug, { let url = url.try_into().expect("Could not convert into a valid url"); - let (sender, receiver) = sync::channel(1); + let (trailers_sender, trailers_receiver) = sync::channel(1); + let (upgrade_sender, upgrade_receiver) = sync::channel(1); Self { method, url, headers: Headers::new(), version: None, body: Body::empty(), - sender: Some(sender), - receiver: Some(receiver), ext: Extensions::new(), peer_addr: None, local_addr: None, + trailers_receiver: Some(trailers_receiver), + trailers_sender: Some(trailers_sender), + upgrade_sender: Some(upgrade_sender), + upgrade_receiver: Some(upgrade_receiver), + } + } + + /// Create a new request. + #[cfg(not(feature = "unstable"))] + pub fn new(method: Method, url: U) -> Self + where + U: TryInto, + U::Error: std::fmt::Debug, + { + let url = url.try_into().expect("Could not convert into a valid url"); + let (trailers_sender, trailers_receiver) = sync::channel(1); + Self { + method, + url, + headers: Headers::new(), + version: None, + body: Body::empty(), + ext: Extensions::new(), + peer_addr: None, + local_addr: None, + trailers_receiver: Some(trailers_receiver), + trailers_sender: Some(trailers_sender), } } @@ -543,7 +576,7 @@ impl Request { /// Sends trailers to the a receiver. pub fn send_trailers(&mut self) -> trailers::Sender { let sender = self - .sender + .trailers_sender .take() .expect("Trailers sender can only be constructed once"); trailers::Sender::new(sender) @@ -552,12 +585,34 @@ impl Request { /// Receive trailers from a sender. pub async fn recv_trailers(&mut self) -> trailers::Receiver { let receiver = self - .receiver + .trailers_receiver .take() .expect("Trailers receiver can only be constructed once"); trailers::Receiver::new(receiver) } + /// Sends an upgrade connection to the a receiver. + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + pub fn send_upgrade(&mut self) -> upgrade::Sender { + let sender = self + .upgrade_sender + .take() + .expect("Upgrade sender can only be constructed once"); + upgrade::Sender::new(sender) + } + + /// Receive an upgraded connection from a sender. + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + pub async fn recv_upgrade(&mut self) -> upgrade::Receiver { + let receiver = self + .upgrade_receiver + .take() + .expect("Upgrade receiver can only be constructed once"); + upgrade::Receiver::new(receiver) + } + /// An iterator visiting all header pairs in arbitrary order. pub fn iter(&self) -> headers::Iter<'_> { self.headers.iter() @@ -867,8 +922,10 @@ impl Clone for Request { url: self.url.clone(), headers: self.headers.clone(), version: self.version.clone(), - sender: self.sender.clone(), - receiver: self.receiver.clone(), + trailers_sender: None, + trailers_receiver: None, + upgrade_sender: None, + upgrade_receiver: None, body: Body::empty(), ext: Extensions::new(), peer_addr: self.peer_addr.clone(), diff --git a/src/upgrade/connection.rs b/src/upgrade/connection.rs new file mode 100644 index 00000000..d8331a57 --- /dev/null +++ b/src/upgrade/connection.rs @@ -0,0 +1,42 @@ +use async_std::io::{self, prelude::*}; + +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// An upgraded HTTP connection. +#[derive(Debug, Clone)] +pub struct Connection { + inner: Pin>, +} + +impl Read for Connection { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let this = self.project(); + Pin::new(this.inner).poll_read(cx, buf) + } +} + +impl Write for Connection { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let this = self.project(); + Pin::new(this.inner).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + Pin::new(this.inner).poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + Pin::new(this.inner).poll_close(cx) + } +} diff --git a/src/upgrade/mod.rs b/src/upgrade/mod.rs new file mode 100644 index 00000000..4519483f --- /dev/null +++ b/src/upgrade/mod.rs @@ -0,0 +1,19 @@ +//! HTTP protocol upgrades. +//! +//! In HTTP it's not uncommon to convert from one protocol to another. For +//! example `HTTP/1.1` can upgrade a connection to websockets using the +//! [upgrade header](https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism), +//! while `HTTP/2` uses [a custom +//! handshake](https://tools.ietf.org/html/rfc8441#section-5.1). Regardless of +//! the HTTP version, changing protocols always involves some handshake, +//! after which it is turned into a stream of bytes. This module provides +//! primitives for upgrading from HTTP request-response pairs to alternate +//! protocols. + +mod connection; +mod receiver; +mod sender; + +pub use connection::Connection; +pub use receiver::Receiver; +pub use sender::Sender; diff --git a/src/upgrade/receiver.rs b/src/upgrade/receiver.rs new file mode 100644 index 00000000..44f4522a --- /dev/null +++ b/src/upgrade/receiver.rs @@ -0,0 +1,35 @@ +use async_std::prelude::*; +use async_std::sync; + +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::upgrade::Connection; + +/// The receiving half of a channel to send an upgraded connection. +/// +/// Unlike `async_std::sync::channel` the `send` method on this type can only be +/// called once, and cannot be cloned. That's because only a single instance of +/// `Connection` should be created. +#[must_use = "Futures do nothing unless polled or .awaited"] +#[derive(Debug)] +pub struct Receiver { + receiver: sync::Receiver, +} + +impl Receiver { + /// Create a new instance of `Receiver`. + #[allow(unused)] + pub(crate) fn new(receiver: sync::Receiver) -> Self { + Self { receiver } + } +} + +impl Future for Receiver { + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.receiver).poll_next(cx) + } +} diff --git a/src/upgrade/sender.rs b/src/upgrade/sender.rs new file mode 100644 index 00000000..562a455b --- /dev/null +++ b/src/upgrade/sender.rs @@ -0,0 +1,28 @@ +use async_std::sync; + +use crate::upgrade::Connection; + +/// The sending half of a channel to send an upgraded connection. +/// +/// Unlike `async_std::sync::channel` the `send` method on this type can only be +/// called once, and cannot be cloned. That's because only a single instance of +/// `Connection` should be created. +#[derive(Debug)] +pub struct Sender { + sender: sync::Sender, +} + +impl Sender { + /// Create a new instance of `Sender`. + #[doc(hidden)] + pub fn new(sender: sync::Sender) -> Self { + Self { sender } + } + + /// Send a `Trailer`. + /// + /// The channel will be consumed after having sent trailers. + pub async fn send(self, trailers: Connection) { + self.sender.send(trailers).await + } +} From 1a993a93e8c0d3ae9df7eb45386530cfc1a8eade Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Thu, 14 May 2020 22:22:08 +0200 Subject: [PATCH 2/8] make it compile --- src/request.rs | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/request.rs b/src/request.rs index d43cb692..50b577c0 100644 --- a/src/request.rs +++ b/src/request.rs @@ -20,6 +20,33 @@ cfg_unstable! { use crate::upgrade; } +#[cfg(not(feature = "unstable"))] +pin_project_lite::pin_project! { + /// An HTTP request. + /// + /// # Examples + /// + /// ``` + /// use http_types::{Url, Method, Request}; + /// + /// let mut req = Request::new(Method::Get, Url::parse("https://example.com").unwrap()); + /// req.set_body("Hello, Nori!"); + /// ``` + #[derive(Debug)] + pub struct Request { + method: Method, + url: Url, + headers: Headers, + version: Option, + trailers_sender: Option>, + trailers_receiver: Option>, + #[pin] + body: Body, + local: TypeMap, + } +} + +#[cfg(feature = "unstable")] pin_project_lite::pin_project! { /// An HTTP request. /// @@ -924,7 +951,9 @@ impl Clone for Request { version: self.version.clone(), trailers_sender: None, trailers_receiver: None, + #[cfg(feature = "unstable")] upgrade_sender: None, + #[cfg(feature = "unstable")] upgrade_receiver: None, body: Body::empty(), ext: Extensions::new(), From cf4f77eddf7cb66c6a5c35be924c5df996a3a488 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Tue, 19 May 2020 10:57:01 +0200 Subject: [PATCH 3/8] get it actually to compile --- src/upgrade/connection.rs | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/src/upgrade/connection.rs b/src/upgrade/connection.rs index d8331a57..a46edc48 100644 --- a/src/upgrade/connection.rs +++ b/src/upgrade/connection.rs @@ -1,42 +1,49 @@ use async_std::io::{self, prelude::*}; +use std::fmt; use std::pin::Pin; use std::task::{Context, Poll}; /// An upgraded HTTP connection. -#[derive(Debug, Clone)] pub struct Connection { - inner: Pin>, + inner: Pin>, } +pub(crate) trait InnerConnection: Read + Write + Send + Sync + Unpin {} +impl InnerConnection for T {} + impl Read for Connection { fn poll_read( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - let this = self.project(); - Pin::new(this.inner).poll_read(cx, buf) + Pin::new(&mut self.inner).poll_read(cx, buf) } } impl Write for Connection { fn poll_write( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - let this = self.project(); - Pin::new(this.inner).poll_write(cx, buf) + Pin::new(&mut self.inner).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - Pin::new(this.inner).poll_flush(cx) + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_close(cx) } +} - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - Pin::new(this.inner).poll_close(cx) +impl fmt::Debug for Connection { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Connection") + .field("inner", &"Pin>") + .finish() } } From 2a162373ee14ecf614f308bea230d490dd940f75 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Tue, 19 May 2020 11:03:15 +0200 Subject: [PATCH 4/8] improve structure using generic --- src/upgrade/connection.rs | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/src/upgrade/connection.rs b/src/upgrade/connection.rs index a46edc48..abd47ec6 100644 --- a/src/upgrade/connection.rs +++ b/src/upgrade/connection.rs @@ -1,18 +1,22 @@ use async_std::io::{self, prelude::*}; -use std::fmt; use std::pin::Pin; use std::task::{Context, Poll}; /// An upgraded HTTP connection. -pub struct Connection { - inner: Pin>, +#[derive(Debug, Clone)] +pub struct RawConnection { + inner: Inner, } -pub(crate) trait InnerConnection: Read + Write + Send + Sync + Unpin {} +/// A boxed upgraded HTTP connection. +pub type Connection = RawConnection>; + +/// Trait to signal the requirements for an underlying connection type. +pub trait InnerConnection: Read + Write + Send + Sync + Unpin {} impl InnerConnection for T {} -impl Read for Connection { +impl Read for RawConnection { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -22,7 +26,7 @@ impl Read for Connection { } } -impl Write for Connection { +impl Write for RawConnection { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -39,11 +43,3 @@ impl Write for Connection { Pin::new(&mut self.inner).poll_close(cx) } } - -impl fmt::Debug for Connection { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Connection") - .field("inner", &"Pin>") - .finish() - } -} From e6421c19dc9d6b2ca189197200ba41f34ed9ac63 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 29 May 2020 14:39:32 +0200 Subject: [PATCH 5/8] Finish rebasing on master --- src/request.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/request.rs b/src/request.rs index 50b577c0..587438a4 100644 --- a/src/request.rs +++ b/src/request.rs @@ -38,11 +38,13 @@ pin_project_lite::pin_project! { url: Url, headers: Headers, version: Option, - trailers_sender: Option>, - trailers_receiver: Option>, #[pin] body: Body, - local: TypeMap, + local_addr: Option, + peer_addr: Option, + ext: Extensions, + trailers_sender: Option>, + trailers_receiver: Option>, } } @@ -64,15 +66,15 @@ pin_project_lite::pin_project! { url: Url, headers: Headers, version: Option, - trailers_sender: Option>, - trailers_receiver: Option>, - upgrade_sender: Option>, - upgrade_receiver: Option>, #[pin] body: Body, local_addr: Option, peer_addr: Option, ext: Extensions, + trailers_sender: Option>, + trailers_receiver: Option>, + upgrade_sender: Option>, + upgrade_receiver: Option>, } } From e2b8f64b78309f1ecd3a1c8bf676ca781d3c2d12 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 29 May 2020 14:54:33 +0200 Subject: [PATCH 6/8] Move upgrade methods from Request to Response --- src/request.rs | 88 -------------------------------------- src/response.rs | 110 ++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 101 insertions(+), 97 deletions(-) diff --git a/src/request.rs b/src/request.rs index 587438a4..fb007600 100644 --- a/src/request.rs +++ b/src/request.rs @@ -16,39 +16,6 @@ use crate::mime::Mime; use crate::trailers::{self, Trailers}; use crate::{Body, Extensions, Method, StatusCode, Url, Version}; -cfg_unstable! { - use crate::upgrade; -} - -#[cfg(not(feature = "unstable"))] -pin_project_lite::pin_project! { - /// An HTTP request. - /// - /// # Examples - /// - /// ``` - /// use http_types::{Url, Method, Request}; - /// - /// let mut req = Request::new(Method::Get, Url::parse("https://example.com").unwrap()); - /// req.set_body("Hello, Nori!"); - /// ``` - #[derive(Debug)] - pub struct Request { - method: Method, - url: Url, - headers: Headers, - version: Option, - #[pin] - body: Body, - local_addr: Option, - peer_addr: Option, - ext: Extensions, - trailers_sender: Option>, - trailers_receiver: Option>, - } -} - -#[cfg(feature = "unstable")] pin_project_lite::pin_project! { /// An HTTP request. /// @@ -73,40 +40,11 @@ pin_project_lite::pin_project! { ext: Extensions, trailers_sender: Option>, trailers_receiver: Option>, - upgrade_sender: Option>, - upgrade_receiver: Option>, } } impl Request { /// Create a new request. - #[cfg(feature = "unstable")] - pub fn new(method: Method, url: U) -> Self - where - U: TryInto, - U::Error: std::fmt::Debug, - { - let url = url.try_into().expect("Could not convert into a valid url"); - let (trailers_sender, trailers_receiver) = sync::channel(1); - let (upgrade_sender, upgrade_receiver) = sync::channel(1); - Self { - method, - url, - headers: Headers::new(), - version: None, - body: Body::empty(), - ext: Extensions::new(), - peer_addr: None, - local_addr: None, - trailers_receiver: Some(trailers_receiver), - trailers_sender: Some(trailers_sender), - upgrade_sender: Some(upgrade_sender), - upgrade_receiver: Some(upgrade_receiver), - } - } - - /// Create a new request. - #[cfg(not(feature = "unstable"))] pub fn new(method: Method, url: U) -> Self where U: TryInto, @@ -620,28 +558,6 @@ impl Request { trailers::Receiver::new(receiver) } - /// Sends an upgrade connection to the a receiver. - #[cfg(feature = "unstable")] - #[cfg_attr(feature = "docs", doc(cfg(unstable)))] - pub fn send_upgrade(&mut self) -> upgrade::Sender { - let sender = self - .upgrade_sender - .take() - .expect("Upgrade sender can only be constructed once"); - upgrade::Sender::new(sender) - } - - /// Receive an upgraded connection from a sender. - #[cfg(feature = "unstable")] - #[cfg_attr(feature = "docs", doc(cfg(unstable)))] - pub async fn recv_upgrade(&mut self) -> upgrade::Receiver { - let receiver = self - .upgrade_receiver - .take() - .expect("Upgrade receiver can only be constructed once"); - upgrade::Receiver::new(receiver) - } - /// An iterator visiting all header pairs in arbitrary order. pub fn iter(&self) -> headers::Iter<'_> { self.headers.iter() @@ -953,10 +869,6 @@ impl Clone for Request { version: self.version.clone(), trailers_sender: None, trailers_receiver: None, - #[cfg(feature = "unstable")] - upgrade_sender: None, - #[cfg(feature = "unstable")] - upgrade_receiver: None, body: Body::empty(), ext: Extensions::new(), peer_addr: self.peer_addr.clone(), diff --git a/src/response.rs b/src/response.rs index a1ce1931..1ea19fbd 100644 --- a/src/response.rs +++ b/src/response.rs @@ -17,6 +17,42 @@ use crate::mime::Mime; use crate::trailers::{self, Trailers}; use crate::{Body, Extensions, StatusCode, Version}; +cfg_unstable! { + use crate::upgrade; +} + +#[cfg(not(feature = "unstable"))] +pin_project_lite::pin_project! { + /// An HTTP response. + /// + /// # Examples + /// + /// ``` + /// # fn main() -> Result<(), http_types::Error> { + /// # + /// use http_types::{Response, StatusCode}; + /// + /// let mut res = Response::new(StatusCode::Ok); + /// res.set_body("Hello, Nori!"); + /// # + /// # Ok(()) } + /// ``` + #[derive(Debug)] + pub struct Response { + status: StatusCode, + headers: Headers, + version: Option, + trailers_sender: Option>, + trailers_receiver: Option>, + #[pin] + body: Body, + ext: Extensions, + local_addr: Option, + peer_addr: Option, + } +} + +#[cfg(feature = "unstable")] pin_project_lite::pin_project! { /// An HTTP response. /// @@ -37,8 +73,10 @@ pin_project_lite::pin_project! { status: StatusCode, headers: Headers, version: Option, - sender: Option>, - receiver: Option>, + trailers_sender: Option>, + trailers_receiver: Option>, + upgrade_sender: Option>, + upgrade_receiver: Option>, #[pin] body: Body, ext: Extensions, @@ -49,6 +87,7 @@ pin_project_lite::pin_project! { impl Response { /// Create a new response. + #[cfg(not(feature = "unstable"))] pub fn new(status: S) -> Self where S: TryInto, @@ -57,14 +96,41 @@ impl Response { let status = status .try_into() .expect("Could not convert into a valid `StatusCode`"); - let (sender, receiver) = sync::channel(1); + let (trailers_sender, trailers_receiver) = sync::channel(1); Self { status, headers: Headers::new(), version: None, body: Body::empty(), - sender: Some(sender), - receiver: Some(receiver), + trailers_sender: Some(trailers_sender), + trailers_receiver: Some(trailers_receiver), + ext: Extensions::new(), + peer_addr: None, + local_addr: None, + } + } + + /// Create a new response. + #[cfg(feature = "unstable")] + pub fn new(status: S) -> Self + where + S: TryInto, + S::Error: Debug, + { + let status = status + .try_into() + .expect("Could not convert into a valid `StatusCode`"); + let (trailers_sender, trailers_receiver) = sync::channel(1); + let (upgrade_sender, upgrade_receiver) = sync::channel(1); + Self { + status, + headers: Headers::new(), + version: None, + body: Body::empty(), + trailers_sender: Some(trailers_sender), + trailers_receiver: Some(trailers_receiver), + upgrade_sender: Some(upgrade_sender), + upgrade_receiver: Some(upgrade_receiver), ext: Extensions::new(), peer_addr: None, local_addr: None, @@ -457,7 +523,7 @@ impl Response { /// Sends trailers to the a receiver. pub fn send_trailers(&mut self) -> trailers::Sender { let sender = self - .sender + .trailers_sender .take() .expect("Trailers sender can only be constructed once"); trailers::Sender::new(sender) @@ -466,12 +532,34 @@ impl Response { /// Receive trailers from a sender. pub async fn recv_trailers(&mut self) -> trailers::Receiver { let receiver = self - .receiver + .trailers_receiver .take() .expect("Trailers receiver can only be constructed once"); trailers::Receiver::new(receiver) } + /// Sends an upgrade connection to the a receiver. + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + pub fn send_upgrade(&mut self) -> upgrade::Sender { + let sender = self + .upgrade_sender + .take() + .expect("Upgrade sender can only be constructed once"); + upgrade::Sender::new(sender) + } + + /// Receive an upgraded connection from a sender. + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + pub async fn recv_upgrade(&mut self) -> upgrade::Receiver { + let receiver = self + .upgrade_receiver + .take() + .expect("Upgrade receiver can only be constructed once"); + upgrade::Receiver::new(receiver) + } + /// An iterator visiting all header pairs in arbitrary order. pub fn iter(&self) -> headers::Iter<'_> { self.headers.iter() @@ -526,8 +614,12 @@ impl Clone for Response { status: self.status.clone(), headers: self.headers.clone(), version: self.version.clone(), - sender: self.sender.clone(), - receiver: self.receiver.clone(), + trailers_sender: self.trailers_sender.clone(), + trailers_receiver: self.trailers_receiver.clone(), + #[cfg(feature = "unstable")] + upgrade_sender: self.upgrade_sender.clone(), + #[cfg(feature = "unstable")] + upgrade_receiver: self.upgrade_receiver.clone(), body: Body::empty(), ext: Extensions::new(), peer_addr: self.peer_addr.clone(), From d0a6054f97b3620540bbb266a2ef98d1cafb6c0c Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 29 May 2020 15:00:19 +0200 Subject: [PATCH 7/8] Add Response::is_upgrade --- src/response.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/response.rs b/src/response.rs index 1ea19fbd..1163b0ed 100644 --- a/src/response.rs +++ b/src/response.rs @@ -77,6 +77,7 @@ pin_project_lite::pin_project! { trailers_receiver: Option>, upgrade_sender: Option>, upgrade_receiver: Option>, + is_upgrade: bool, #[pin] body: Body, ext: Extensions, @@ -131,6 +132,7 @@ impl Response { trailers_receiver: Some(trailers_receiver), upgrade_sender: Some(upgrade_sender), upgrade_receiver: Some(upgrade_receiver), + is_upgrade: false, ext: Extensions::new(), peer_addr: None, local_addr: None, @@ -542,6 +544,7 @@ impl Response { #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub fn send_upgrade(&mut self) -> upgrade::Sender { + self.is_upgrade = true; let sender = self .upgrade_sender .take() @@ -553,6 +556,7 @@ impl Response { #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub async fn recv_upgrade(&mut self) -> upgrade::Receiver { + self.is_upgrade = true; let receiver = self .upgrade_receiver .take() @@ -560,6 +564,13 @@ impl Response { upgrade::Receiver::new(receiver) } + /// Returns `true` if a protocol upgrade is in progress. + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + pub fn is_upgrade(&self) -> bool { + self.is_upgrade + } + /// An iterator visiting all header pairs in arbitrary order. pub fn iter(&self) -> headers::Iter<'_> { self.headers.iter() @@ -620,6 +631,8 @@ impl Clone for Response { upgrade_sender: self.upgrade_sender.clone(), #[cfg(feature = "unstable")] upgrade_receiver: self.upgrade_receiver.clone(), + #[cfg(feature = "unstable")] + is_upgrade: false, body: Body::empty(), ext: Extensions::new(), peer_addr: self.peer_addr.clone(), From 17ce3e5a11c79966cf1eff50c0811692651d5c6a Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 29 May 2020 15:01:34 +0200 Subject: [PATCH 8/8] Rename is_upgrade to has_upgrade --- src/response.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/response.rs b/src/response.rs index 1163b0ed..d3ba2bf0 100644 --- a/src/response.rs +++ b/src/response.rs @@ -77,7 +77,7 @@ pin_project_lite::pin_project! { trailers_receiver: Option>, upgrade_sender: Option>, upgrade_receiver: Option>, - is_upgrade: bool, + has_upgrade: bool, #[pin] body: Body, ext: Extensions, @@ -132,7 +132,7 @@ impl Response { trailers_receiver: Some(trailers_receiver), upgrade_sender: Some(upgrade_sender), upgrade_receiver: Some(upgrade_receiver), - is_upgrade: false, + has_upgrade: false, ext: Extensions::new(), peer_addr: None, local_addr: None, @@ -544,7 +544,7 @@ impl Response { #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub fn send_upgrade(&mut self) -> upgrade::Sender { - self.is_upgrade = true; + self.has_upgrade = true; let sender = self .upgrade_sender .take() @@ -556,7 +556,7 @@ impl Response { #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub async fn recv_upgrade(&mut self) -> upgrade::Receiver { - self.is_upgrade = true; + self.has_upgrade = true; let receiver = self .upgrade_receiver .take() @@ -567,8 +567,8 @@ impl Response { /// Returns `true` if a protocol upgrade is in progress. #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] - pub fn is_upgrade(&self) -> bool { - self.is_upgrade + pub fn has_upgrade(&self) -> bool { + self.has_upgrade } /// An iterator visiting all header pairs in arbitrary order. @@ -632,7 +632,7 @@ impl Clone for Response { #[cfg(feature = "unstable")] upgrade_receiver: self.upgrade_receiver.clone(), #[cfg(feature = "unstable")] - is_upgrade: false, + has_upgrade: false, body: Body::empty(), ext: Extensions::new(), peer_addr: self.peer_addr.clone(),