diff --git a/src/body/body.rs b/src/body/body.rs index fca66a6f93..4a1d6210bc 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -23,7 +23,6 @@ use crate::common::{task, watch, Pin, Poll}; use crate::common::{Future, Never}; #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] use crate::proto::h2::ping; -use crate::upgrade::OnUpgrade; type BodySender = mpsc::Sender>; @@ -70,7 +69,6 @@ struct Extra { /// a brand new connection, since the pool didn't know about the idle /// connection yet. delayed_eof: Option, - on_upgrade: OnUpgrade, } #[cfg(any(feature = "http1", feature = "http2"))] @@ -187,17 +185,6 @@ impl Body { Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped)))) } - // TODO: Eventually the pending upgrade should be stored in the - // `Extensions`, and all these pieces can be removed. In v0.14, we made - // the breaking changes, so now this TODO can be done without breakage. - pub(crate) fn take_upgrade(&mut self) -> OnUpgrade { - if let Some(ref mut extra) = self.extra { - std::mem::replace(&mut extra.on_upgrade, OnUpgrade::none()) - } else { - OnUpgrade::none() - } - } - fn new(kind: Kind) -> Body { Body { kind, extra: None } } @@ -217,14 +204,6 @@ impl Body { body } - #[cfg(feature = "http1")] - pub(crate) fn set_on_upgrade(&mut self, upgrade: OnUpgrade) { - debug_assert!(!upgrade.is_none(), "set_on_upgrade with empty upgrade"); - let extra = self.extra_mut(); - debug_assert!(extra.on_upgrade.is_none(), "set_on_upgrade twice"); - extra.on_upgrade = upgrade; - } - #[cfg(any(feature = "http1", feature = "http2"))] #[cfg(feature = "client")] pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) { @@ -239,12 +218,8 @@ impl Body { #[cfg(any(feature = "http1", feature = "http2"))] fn extra_mut(&mut self) -> &mut Extra { - self.extra.get_or_insert_with(|| { - Box::new(Extra { - delayed_eof: None, - on_upgrade: OnUpgrade::none(), - }) - }) + self.extra + .get_or_insert_with(|| Box::new(Extra { delayed_eof: None })) } fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll>> { diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 595a5c4ac9..b9f658af9f 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -10,6 +10,7 @@ use crate::common::{task, Future, Pin, Poll, Unpin}; use crate::proto::{ BodyLength, Conn, Dispatched, MessageHead, RequestHead, }; +use crate::upgrade::OnUpgrade; pub(crate) struct Dispatcher { conn: Conn, @@ -243,8 +244,8 @@ where } // dispatch is ready for a message, try to read one match ready!(self.conn.poll_read_head(cx)) { - Some(Ok((head, body_len, wants))) => { - let mut body = match body_len { + Some(Ok((mut head, body_len, wants))) => { + let body = match body_len { DecodedLength::ZERO => Body::empty(), other => { let (tx, rx) = Body::new_channel(other, wants.contains(Wants::EXPECT)); @@ -253,7 +254,10 @@ where } }; if wants.contains(Wants::UPGRADE) { - body.set_on_upgrade(self.conn.on_upgrade()); + let upgrade = self.conn.on_upgrade(); + debug_assert!(!upgrade.is_none(), "empty upgrade"); + debug_assert!(head.extensions.get::().is_none(), "OnUpgrade already set"); + head.extensions.insert(upgrade); } self.dispatch.recv_msg(Ok((head, body)))?; Poll::Ready(Ok(())) @@ -488,6 +492,7 @@ cfg_server! { version: parts.version, subject: parts.status, headers: parts.headers, + extensions: http::Extensions::default(), }; Poll::Ready(Some(Ok((head, body)))) } else { @@ -506,6 +511,7 @@ cfg_server! { *req.uri_mut() = msg.subject.1; *req.headers_mut() = msg.headers; *req.version_mut() = msg.version; + *req.extensions_mut() = msg.extensions; let fut = self.service.call(req); self.in_flight.set(Some(fut)); Ok(()) @@ -570,6 +576,7 @@ cfg_client! { version: parts.version, subject: crate::proto::RequestLine(parts.method, parts.uri), headers: parts.headers, + extensions: http::Extensions::default(), }; *this.callback = Some(cb); Poll::Ready(Some(Ok((head, body)))) @@ -594,6 +601,7 @@ cfg_client! { *res.status_mut() = msg.subject; *res.headers_mut() = msg.headers; *res.version_mut() = msg.version; + *res.extensions_mut() = msg.extensions; cb.send(Ok(res)); Ok(()) } else { diff --git a/src/proto/h1/role.rs b/src/proto/h1/role.rs index bfd5f8c75c..2a3b1fdd29 100644 --- a/src/proto/h1/role.rs +++ b/src/proto/h1/role.rs @@ -270,6 +270,7 @@ impl Http1Transaction for Server { version, subject, headers, + extensions: http::Extensions::default(), }, decode: decoder, expect_continue, @@ -713,6 +714,7 @@ impl Http1Transaction for Client { version, subject: status, headers, + extensions: http::Extensions::default(), }; if let Some((decode, is_upgrade)) = Client::decoder(&head, ctx.req_method)? { return Ok(Some(ParsedMessage { diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 9ae9637b49..27b3ef6f12 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -16,7 +16,7 @@ cfg_http2! { } /// An Incoming Message head. Includes request/status line, and headers. -#[derive(Clone, Debug, Default, PartialEq)] +#[derive(Debug, Default)] pub struct MessageHead { /// HTTP version of the message. pub version: http::Version, @@ -24,6 +24,9 @@ pub struct MessageHead { pub subject: S, /// Headers of the Incoming message. pub headers: http::HeaderMap, + + /// Extensions. + extensions: http::Extensions, } /// An incoming request message. diff --git a/src/upgrade.rs b/src/upgrade.rs index eb9ed97ef2..46ce37fcf8 100644 --- a/src/upgrade.rs +++ b/src/upgrade.rs @@ -317,26 +317,34 @@ mod sealed { } impl CanUpgrade for http::Request { - fn on_upgrade(self) -> OnUpgrade { - self.into_body().take_upgrade() + fn on_upgrade(mut self) -> OnUpgrade { + self.extensions_mut() + .remove::() + .unwrap_or_else(OnUpgrade::none) } } impl CanUpgrade for &'_ mut http::Request { fn on_upgrade(self) -> OnUpgrade { - self.body_mut().take_upgrade() + self.extensions_mut() + .remove::() + .unwrap_or_else(OnUpgrade::none) } } impl CanUpgrade for http::Response { - fn on_upgrade(self) -> OnUpgrade { - self.into_body().take_upgrade() + fn on_upgrade(mut self) -> OnUpgrade { + self.extensions_mut() + .remove::() + .unwrap_or_else(OnUpgrade::none) } } impl CanUpgrade for &'_ mut http::Response { fn on_upgrade(self) -> OnUpgrade { - self.body_mut().take_upgrade() + self.extensions_mut() + .remove::() + .unwrap_or_else(OnUpgrade::none) } } }