From 6a6a24030ec53321ce026fe294c56c86f21e19d4 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Fri, 4 Jun 2021 23:57:27 +0200 Subject: [PATCH] refactor(lib): Switch from pin-project to pin-project-lite (#2566) Note the practical affects of this change: - Dependency count with --features full dropped from 65 to 55. - Time to compile after a clean dropped from 48s to 35s (on a pretty underpowered VM). Closes #2388 --- Cargo.toml | 2 +- src/client/conn.rs | 74 +++++++---- src/client/connect/http.rs | 27 ++-- src/client/pool.rs | 25 ++-- src/common/drain.rs | 19 +-- src/common/lazy.rs | 44 ++++--- src/proto/h1/dispatch.rs | 11 +- src/proto/h2/mod.rs | 21 +-- src/proto/h2/server.rs | 76 ++++++----- src/server/accept.rs | 14 +- src/server/conn.rs | 255 +++++++++++++++++++++++-------------- src/server/server.rs | 23 ++-- src/server/shutdown.rs | 43 ++++--- src/server/tcp.rs | 15 ++- src/service/oneshot.rs | 48 ++++--- 15 files changed, 416 insertions(+), 281 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 42358c41bb..be44c3081f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,7 @@ httparse = "1.4" h2 = { version = "0.3.3", optional = true } itoa = "0.4.1" tracing = { version = "0.1", default-features = false, features = ["std"] } -pin-project = "1.0" +pin-project-lite = "0.2.4" tower-service = "0.3" tokio = { version = "1", features = ["sync"] } want = "0.3" diff --git a/src/client/conn.rs b/src/client/conn.rs index 70c1dad248..c6170007a2 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -48,7 +48,7 @@ use std::error::Error as StdError; use std::fmt; -#[cfg(feature = "http2")] +#[cfg(not(all(feature = "http1", feature = "http2")))] use std::marker::PhantomData; use std::sync::Arc; #[cfg(all(feature = "runtime", feature = "http2"))] @@ -57,12 +57,14 @@ use std::time::Duration; use bytes::Bytes; use futures_util::future::{self, Either, FutureExt as _}; use httparse::ParserConfig; -use pin_project::pin_project; +use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; use tower_service::Service; use super::dispatch; use crate::body::HttpBody; +#[cfg(not(all(feature = "http1", feature = "http2")))] +use crate::common::Never; use crate::common::{ exec::{BoxSendFuture, Exec}, task, Future, Pin, Poll, @@ -74,17 +76,33 @@ use crate::upgrade::Upgraded; use crate::{Body, Request, Response}; #[cfg(feature = "http1")] -type Http1Dispatcher = proto::dispatch::Dispatcher, B, T, R>; +type Http1Dispatcher = + proto::dispatch::Dispatcher, B, T, proto::h1::ClientTransaction>; -#[pin_project(project = ProtoClientProj)] -enum ProtoClient -where - B: HttpBody, -{ - #[cfg(feature = "http1")] - H1(#[pin] Http1Dispatcher), - #[cfg(feature = "http2")] - H2(#[pin] proto::h2::ClientTask, PhantomData), +#[cfg(not(feature = "http1"))] +type Http1Dispatcher = (Never, PhantomData<(T, Pin>)>); + +#[cfg(feature = "http2")] +type Http2ClientTask = proto::h2::ClientTask; + +#[cfg(not(feature = "http2"))] +type Http2ClientTask = (Never, PhantomData>>); + +pin_project! { + #[project = ProtoClientProj] + enum ProtoClient + where + B: HttpBody, + { + H1 { + #[pin] + h1: Http1Dispatcher, + }, + H2 { + #[pin] + h2: Http2ClientTask, + }, + } } /// Returns a handshake future over some IO. @@ -405,7 +423,7 @@ where pub fn into_parts(self) -> Parts { match self.inner.expect("already upgraded") { #[cfg(feature = "http1")] - ProtoClient::H1(h1) => { + ProtoClient::H1 { h1 } => { let (io, read_buf, _) = h1.into_inner(); Parts { io, @@ -413,10 +431,12 @@ where _inner: (), } } - #[cfg(feature = "http2")] - ProtoClient::H2(..) => { + ProtoClient::H2 { .. } => { panic!("http2 cannot into_inner"); } + + #[cfg(not(feature = "http1"))] + ProtoClient::H1 { h1 } => match h1.0 {}, } } @@ -434,9 +454,14 @@ where pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll> { match *self.inner.as_mut().expect("already upgraded") { #[cfg(feature = "http1")] - ProtoClient::H1(ref mut h1) => h1.poll_without_shutdown(cx), + ProtoClient::H1 { ref mut h1 } => h1.poll_without_shutdown(cx), #[cfg(feature = "http2")] - ProtoClient::H2(ref mut h2, _) => Pin::new(h2).poll(cx).map_ok(|_| ()), + ProtoClient::H2 { ref mut h2, .. } => Pin::new(h2).poll(cx).map_ok(|_| ()), + + #[cfg(not(feature = "http1"))] + ProtoClient::H1 { ref mut h1 } => match h1.0 {}, + #[cfg(not(feature = "http2"))] + ProtoClient::H2 { ref mut h2, .. } => match h2.0 {}, } } @@ -465,7 +490,7 @@ where proto::Dispatched::Shutdown => Poll::Ready(Ok(())), #[cfg(feature = "http1")] proto::Dispatched::Upgrade(pending) => match self.inner.take() { - Some(ProtoClient::H1(h1)) => { + Some(ProtoClient::H1 { h1 }) => { let (io, buf, _) = h1.into_inner(); pending.fulfill(Upgraded::new(io, buf)); Poll::Ready(Ok(())) @@ -756,14 +781,14 @@ impl Builder { } let cd = proto::h1::dispatch::Client::new(rx); let dispatch = proto::h1::Dispatcher::new(cd, conn); - ProtoClient::H1(dispatch) + ProtoClient::H1 { h1: dispatch } } #[cfg(feature = "http2")] Proto::Http2 => { let h2 = proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone()) .await?; - ProtoClient::H2(h2, PhantomData) + ProtoClient::H2 { h2 } } }; @@ -817,9 +842,14 @@ where fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { match self.project() { #[cfg(feature = "http1")] - ProtoClientProj::H1(c) => c.poll(cx), + ProtoClientProj::H1 { h1 } => h1.poll(cx), #[cfg(feature = "http2")] - ProtoClientProj::H2(c, _) => c.poll(cx), + ProtoClientProj::H2 { h2, .. } => h2.poll(cx), + + #[cfg(not(feature = "http1"))] + ProtoClientProj::H1 { h1 } => match h1.0 {}, + #[cfg(not(feature = "http2"))] + ProtoClientProj::H2 { h2, .. } => match h2.0 {}, } } } diff --git a/src/client/connect/http.rs b/src/client/connect/http.rs index 4437c86380..0f1a487adb 100644 --- a/src/client/connect/http.rs +++ b/src/client/connect/http.rs @@ -11,7 +11,7 @@ use std::time::Duration; use futures_util::future::Either; use http::uri::{Scheme, Uri}; -use pin_project::pin_project; +use pin_project_lite::pin_project; use tokio::net::{TcpSocket, TcpStream}; use tokio::time::Sleep; @@ -373,18 +373,19 @@ impl HttpInfo { } } -// Not publicly exported (so missing_docs doesn't trigger). -// -// We return this `Future` instead of the `Pin>` directly -// so that users don't rely on it fitting in a `Pin>` slot -// (and thus we can change the type in the future). -#[must_use = "futures do nothing unless polled"] -#[pin_project] -#[allow(missing_debug_implementations)] -pub struct HttpConnecting { - #[pin] - fut: BoxConnecting, - _marker: PhantomData, +pin_project! { + // Not publicly exported (so missing_docs doesn't trigger). + // + // We return this `Future` instead of the `Pin>` directly + // so that users don't rely on it fitting in a `Pin>` slot + // (and thus we can change the type in the future). + #[must_use = "futures do nothing unless polled"] + #[allow(missing_debug_implementations)] + pub struct HttpConnecting { + #[pin] + fut: BoxConnecting, + _marker: PhantomData, + } } type ConnectResult = Result; diff --git a/src/client/pool.rs b/src/client/pool.rs index 0f22657bd4..94f73f6afd 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -11,7 +11,7 @@ use futures_channel::oneshot; use tokio::time::{Duration, Instant, Interval}; use super::client::Ver; -use crate::common::{task, exec::Exec, Future, Pin, Poll, Unpin}; +use crate::common::{exec::Exec, task, Future, Pin, Poll, Unpin}; // FIXME: allow() required due to `impl Trait` leaking types to this lint #[allow(missing_debug_implementations)] @@ -714,16 +714,17 @@ impl Expiration { } #[cfg(feature = "runtime")] -#[pin_project::pin_project] -struct IdleTask { - #[pin] - interval: Interval, - pool: WeakOpt>>, - // This allows the IdleTask to be notified as soon as the entire - // Pool is fully dropped, and shutdown. This channel is never sent on, - // but Err(Canceled) will be received when the Pool is dropped. - #[pin] - pool_drop_notifier: oneshot::Receiver, +pin_project_lite::pin_project! { + struct IdleTask { + #[pin] + interval: Interval, + pool: WeakOpt>>, + // This allows the IdleTask to be notified as soon as the entire + // Pool is fully dropped, and shutdown. This channel is never sent on, + // but Err(Canceled) will be received when the Pool is dropped. + #[pin] + pool_drop_notifier: oneshot::Receiver, + } } #[cfg(feature = "runtime")] @@ -776,7 +777,7 @@ mod tests { use std::time::Duration; use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt}; - use crate::common::{task, exec::Exec, Future, Pin}; + use crate::common::{exec::Exec, task, Future, Pin}; /// Test unique reservations. #[derive(Debug, PartialEq, Eq)] diff --git a/src/common/drain.rs b/src/common/drain.rs index 4bb2ecc118..174da876df 100644 --- a/src/common/drain.rs +++ b/src/common/drain.rs @@ -1,6 +1,6 @@ use std::mem; -use pin_project::pin_project; +use pin_project_lite::pin_project; use tokio::sync::watch; use super::{task, Future, Pin, Poll}; @@ -21,14 +21,15 @@ pub(crate) struct Watch { rx: watch::Receiver<()>, } -#[allow(missing_debug_implementations)] -#[pin_project] -pub struct Watching { - #[pin] - future: F, - state: State, - watch: Pin + Send + Sync>>, - _rx: watch::Receiver<()>, +pin_project! { + #[allow(missing_debug_implementations)] + pub struct Watching { + #[pin] + future: F, + state: State, + watch: Pin + Send + Sync>>, + _rx: watch::Receiver<()>, + } } enum State { diff --git a/src/common/lazy.rs b/src/common/lazy.rs index 6bf87c4355..2722077303 100644 --- a/src/common/lazy.rs +++ b/src/common/lazy.rs @@ -1,4 +1,4 @@ -use pin_project::pin_project; +use pin_project_lite::pin_project; use super::{task, Future, Pin, Poll}; @@ -12,23 +12,27 @@ where R: Future + Unpin, { Lazy { - inner: Inner::Init(func), + inner: Inner::Init { func }, } } // FIXME: allow() required due to `impl Trait` leaking types to this lint -#[allow(missing_debug_implementations)] -#[pin_project] -pub(crate) struct Lazy { - #[pin] - inner: Inner, +pin_project! { + #[allow(missing_debug_implementations)] + pub(crate) struct Lazy { + #[pin] + inner: Inner, + } } -#[pin_project(project = InnerProj, project_replace = InnerProjReplace)] -enum Inner { - Init(F), - Fut(#[pin] R), - Empty, +pin_project! { + #[project = InnerProj] + #[project_replace = InnerProjReplace] + enum Inner { + Init { func: F }, + Fut { #[pin] fut: R }, + Empty, + } } impl Started for Lazy @@ -38,8 +42,8 @@ where { fn started(&self) -> bool { match self.inner { - Inner::Init(_) => false, - Inner::Fut(_) | Inner::Empty => true, + Inner::Init { .. } => false, + Inner::Fut { .. } | Inner::Empty => true, } } } @@ -54,15 +58,15 @@ where fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { let mut this = self.project(); - if let InnerProj::Fut(f) = this.inner.as_mut().project() { - return f.poll(cx); + if let InnerProj::Fut { fut } = this.inner.as_mut().project() { + return fut.poll(cx); } match this.inner.as_mut().project_replace(Inner::Empty) { - InnerProjReplace::Init(func) => { - this.inner.set(Inner::Fut(func())); - if let InnerProj::Fut(f) = this.inner.project() { - return f.poll(cx); + InnerProjReplace::Init { func } => { + this.inner.set(Inner::Fut { fut: func() }); + if let InnerProj::Fut { fut } = this.inner.project() { + return fut.poll(cx); } unreachable!() } diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 88e641e9a4..1a72450b15 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -44,10 +44,13 @@ cfg_server! { } cfg_client! { - pub(crate) struct Client { - callback: Option, http::Response>>, - rx: ClientRx, - rx_closed: bool, + pin_project_lite::pin_project! { + pub(crate) struct Client { + callback: Option, http::Response>>, + #[pin] + rx: ClientRx, + rx_closed: bool, + } } type ClientRx = crate::client::dispatch::Receiver, http::Response>; diff --git a/src/proto/h2/mod.rs b/src/proto/h2/mod.rs index 0dbcc8d466..b410bab60c 100644 --- a/src/proto/h2/mod.rs +++ b/src/proto/h2/mod.rs @@ -5,7 +5,7 @@ use http::header::{ TRANSFER_ENCODING, UPGRADE, }; use http::HeaderMap; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::error::Error as StdError; use std::io::{self, Cursor, IoSlice}; use std::mem; @@ -88,15 +88,16 @@ fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) { // body adapters used by both Client and Server -#[pin_project] -struct PipeToSendStream -where - S: HttpBody, -{ - body_tx: SendStream>, - data_done: bool, - #[pin] - stream: S, +pin_project! { + struct PipeToSendStream + where + S: HttpBody, + { + body_tx: SendStream>, + data_done: bool, + #[pin] + stream: S, + } } impl PipeToSendStream diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index de77eaa232..1222663dda 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -7,7 +7,7 @@ use bytes::Bytes; use h2::server::{Connection, Handshake, SendResponse}; use h2::{Reason, RecvStream}; use http::{Method, Request}; -use pin_project::pin_project; +use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; use super::{ping, PipeToSendStream, SendBuf}; @@ -62,15 +62,16 @@ impl Default for Config { } } -#[pin_project] -pub(crate) struct Server -where - S: HttpService, - B: HttpBody, -{ - exec: E, - service: S, - state: State, +pin_project! { + pub(crate) struct Server + where + S: HttpService, + B: HttpBody, + { + exec: E, + service: S, + state: State, + } } enum State @@ -348,24 +349,34 @@ where } } -#[allow(missing_debug_implementations)] -#[pin_project] -pub struct H2Stream -where - B: HttpBody, -{ - reply: SendResponse>, - #[pin] - state: H2StreamState, +pin_project! { + #[allow(missing_debug_implementations)] + pub struct H2Stream + where + B: HttpBody, + { + reply: SendResponse>, + #[pin] + state: H2StreamState, + } } -#[pin_project(project = H2StreamStateProj)] -enum H2StreamState -where - B: HttpBody, -{ - Service(#[pin] F, Option), - Body(#[pin] PipeToSendStream), +pin_project! { + #[project = H2StreamStateProj] + enum H2StreamState + where + B: HttpBody, + { + Service { + #[pin] + fut: F, + connect_parts: Option, + }, + Body { + #[pin] + pipe: PipeToSendStream, + }, + } } struct ConnectParts { @@ -385,7 +396,7 @@ where ) -> H2Stream { H2Stream { reply: respond, - state: H2StreamState::Service(fut, connect_parts), + state: H2StreamState::Service { fut, connect_parts }, } } } @@ -415,7 +426,10 @@ where let mut me = self.project(); loop { let next = match me.state.as_mut().project() { - H2StreamStateProj::Service(h, connect_parts) => { + H2StreamStateProj::Service { + fut: h, + connect_parts, + } => { let res = match h.poll(cx) { Poll::Ready(Ok(r)) => r, Poll::Pending => { @@ -476,13 +490,15 @@ where if !body.is_end_stream() { let body_tx = reply!(me, res, false); - H2StreamState::Body(PipeToSendStream::new(body, body_tx)) + H2StreamState::Body { + pipe: PipeToSendStream::new(body, body_tx), + } } else { reply!(me, res, true); return Poll::Ready(Ok(())); } } - H2StreamStateProj::Body(pipe) => { + H2StreamStateProj::Body { pipe } => { return pipe.poll(cx); } }; diff --git a/src/server/accept.rs b/src/server/accept.rs index 4ec287129d..4b7a1487dd 100644 --- a/src/server/accept.rs +++ b/src/server/accept.rs @@ -9,7 +9,7 @@ #[cfg(feature = "stream")] use futures_core::Stream; #[cfg(feature = "stream")] -use pin_project::pin_project; +use pin_project_lite::pin_project; use crate::common::{ task::{self, Poll}, @@ -86,8 +86,12 @@ pub fn from_stream(stream: S) -> impl Accept where S: Stream>, { - #[pin_project] - struct FromStream(#[pin] S); + pin_project! { + struct FromStream { + #[pin] + stream: S, + } + } impl Accept for FromStream where @@ -99,9 +103,9 @@ where self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll>> { - self.project().0.poll_next(cx) + self.project().stream.poll_next(cx) } } - FromStream(stream) + FromStream { stream } } diff --git a/src/server/conn.rs b/src/server/conn.rs index c1a52f1c0d..085f890139 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -45,7 +45,7 @@ use std::error::Error as StdError; use std::fmt; -#[cfg(feature = "http1")] +#[cfg(not(all(feature = "http1", feature = "http2")))] use std::marker::PhantomData; #[cfg(feature = "tcp")] use std::net::SocketAddr; @@ -53,7 +53,7 @@ use std::net::SocketAddr; use std::time::Duration; use bytes::Bytes; -use pin_project::pin_project; +use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; use super::accept::Accept; @@ -61,6 +61,8 @@ use crate::body::{Body, HttpBody}; use crate::common::exec::{ConnStreamExec, Exec, NewSvcExec}; #[cfg(feature = "http2")] use crate::common::io::Rewind; +#[cfg(not(all(feature = "http1", feature = "http2")))] +use crate::common::Never; use crate::common::{task, Future, Pin, Poll, Unpin}; #[cfg(all(feature = "http1", feature = "http2"))] use crate::error::{Kind, Parse}; @@ -111,77 +113,93 @@ enum ConnectionMode { Fallback, } -/// A stream mapping incoming IOs to new services. -/// -/// Yields `Connecting`s that are futures that should be put on a reactor. -#[must_use = "streams do nothing unless polled"] -#[pin_project] -#[derive(Debug)] -pub(super) struct Serve { - #[pin] - incoming: I, - make_service: S, - protocol: Http, +pin_project! { + /// A stream mapping incoming IOs to new services. + /// + /// Yields `Connecting`s that are futures that should be put on a reactor. + #[must_use = "streams do nothing unless polled"] + #[derive(Debug)] + pub(super) struct Serve { + #[pin] + incoming: I, + make_service: S, + protocol: Http, + } } -/// A future building a new `Service` to a `Connection`. -/// -/// Wraps the future returned from `MakeService` into one that returns -/// a `Connection`. -#[must_use = "futures do nothing unless polled"] -#[pin_project] -#[derive(Debug)] -pub struct Connecting { - #[pin] - future: F, - io: Option, - protocol: Http, +pin_project! { + /// A future building a new `Service` to a `Connection`. + /// + /// Wraps the future returned from `MakeService` into one that returns + /// a `Connection`. + #[must_use = "futures do nothing unless polled"] + #[derive(Debug)] + pub struct Connecting { + #[pin] + future: F, + io: Option, + protocol: Http, + } } -#[must_use = "futures do nothing unless polled"] -#[pin_project] -#[derive(Debug)] -pub(super) struct SpawnAll { - // TODO: re-add `pub(super)` once rustdoc can handle this. - // - // See https://github.com/rust-lang/rust/issues/64705 - #[pin] - pub(super) serve: Serve, +pin_project! { + #[must_use = "futures do nothing unless polled"] + #[derive(Debug)] + pub(super) struct SpawnAll { + // TODO: re-add `pub(super)` once rustdoc can handle this. + // + // See https://github.com/rust-lang/rust/issues/64705 + #[pin] + pub(super) serve: Serve, + } } -/// A future binding a connection with a Service. -/// -/// Polling this future will drive HTTP forward. -#[must_use = "futures do nothing unless polled"] -#[pin_project] -pub struct Connection -where - S: HttpService, -{ - pub(super) conn: Option>, - #[cfg(all(feature = "http1", feature = "http2"))] - fallback: Fallback, +pin_project! { + /// A future binding a connection with a Service. + /// + /// Polling this future will drive HTTP forward. + #[must_use = "futures do nothing unless polled"] + pub struct Connection + where + S: HttpService, + { + pub(super) conn: Option>, + fallback: Fallback, + } } -#[pin_project(project = ProtoServerProj)] -pub(super) enum ProtoServer -where - S: HttpService, - B: HttpBody, -{ - #[cfg(feature = "http1")] - H1( - #[pin] - proto::h1::Dispatcher< - proto::h1::dispatch::Server, - B, - T, - proto::ServerTransaction, - >, - PhantomData, - ), - #[cfg(feature = "http2")] - H2(#[pin] proto::h2::Server, S, B, E>), +#[cfg(feature = "http1")] +type Http1Dispatcher = + proto::h1::Dispatcher, B, T, proto::ServerTransaction>; + +#[cfg(not(feature = "http1"))] +type Http1Dispatcher = (Never, PhantomData<(T, Box>, Box>)>); + +#[cfg(feature = "http2")] +type Http2Server = proto::h2::Server, S, B, E>; + +#[cfg(not(feature = "http2"))] +type Http2Server = ( + Never, + PhantomData<(T, Box>, Box>, Box>)>, +); + +pin_project! { + #[project = ProtoServerProj] + pub(super) enum ProtoServer + where + S: HttpService, + B: HttpBody, + { + H1 { + #[pin] + h1: Http1Dispatcher, + }, + H2 { + #[pin] + h2: Http2Server, + }, + } } #[cfg(all(feature = "http1", feature = "http2"))] @@ -191,6 +209,9 @@ enum Fallback { Http1Only, } +#[cfg(not(all(feature = "http1", feature = "http2")))] +type Fallback = PhantomData; + #[cfg(all(feature = "http1", feature = "http2"))] impl Fallback { fn to_h2(&self) -> bool { @@ -557,7 +578,9 @@ impl Http { conn.set_max_buf_size(max); } let sd = proto::h1::dispatch::Server::new(service); - ProtoServer::H1(proto::h1::Dispatcher::new(sd, conn), PhantomData) + ProtoServer::H1 { + h1: proto::h1::Dispatcher::new(sd, conn), + } }}; } @@ -573,19 +596,20 @@ impl Http { let rewind_io = Rewind::new(io); let h2 = proto::h2::Server::new(rewind_io, service, &self.h2_builder, self.exec.clone()); - ProtoServer::H2(h2) + ProtoServer::H2 { h2 } } }; Connection { conn: Some(proto), - #[cfg(feature = "http1")] - #[cfg(feature = "http2")] + #[cfg(all(feature = "http1", feature = "http2"))] fallback: if self.mode == ConnectionMode::Fallback { Fallback::ToHttp2(self.h2_builder.clone(), self.exec.clone()) } else { Fallback::Http1Only }, + #[cfg(not(all(feature = "http1", feature = "http2")))] + fallback: PhantomData, } } @@ -628,17 +652,22 @@ where /// This should only be called while the `Connection` future is still /// pending. If called after `Connection::poll` has resolved, this does /// nothing. - pub fn graceful_shutdown(self: Pin<&mut Self>) { - match self.project().conn { + pub fn graceful_shutdown(mut self: Pin<&mut Self>) { + match self.conn { #[cfg(feature = "http1")] - Some(ProtoServer::H1(ref mut h1, _)) => { + Some(ProtoServer::H1 { ref mut h1, .. }) => { h1.disable_keep_alive(); } #[cfg(feature = "http2")] - Some(ProtoServer::H2(ref mut h2)) => { + Some(ProtoServer::H2 { ref mut h2 }) => { h2.graceful_shutdown(); } None => (), + + #[cfg(not(feature = "http1"))] + Some(ProtoServer::H1 { ref mut h1, .. }) => match h1.0 {}, + #[cfg(not(feature = "http2"))] + Some(ProtoServer::H2 { ref mut h2 }) => match h2.0 {}, } } @@ -662,7 +691,7 @@ where pub fn try_into_parts(self) -> Option> { match self.conn.unwrap() { #[cfg(feature = "http1")] - ProtoServer::H1(h1, _) => { + ProtoServer::H1 { h1, .. } => { let (io, read_buf, dispatch) = h1.into_inner(); Some(Parts { io, @@ -671,8 +700,10 @@ where _inner: (), }) } - #[cfg(feature = "http2")] - ProtoServer::H2(_h2) => None, + ProtoServer::H2 { .. } => None, + + #[cfg(not(feature = "http1"))] + ProtoServer::H1 { h1, .. } => match h1.0 {}, } } @@ -696,7 +727,7 @@ where loop { match *self.conn.as_mut().unwrap() { #[cfg(feature = "http1")] - ProtoServer::H1(ref mut h1, _) => match ready!(h1.poll_without_shutdown(cx)) { + ProtoServer::H1 { ref mut h1, .. } => match ready!(h1.poll_without_shutdown(cx)) { Ok(()) => return Poll::Ready(Ok(())), Err(e) => { #[cfg(feature = "http2")] @@ -712,7 +743,12 @@ where } }, #[cfg(feature = "http2")] - ProtoServer::H2(ref mut h2) => return Pin::new(h2).poll(cx).map_ok(|_| ()), + ProtoServer::H2 { ref mut h2 } => return Pin::new(h2).poll(cx).map_ok(|_| ()), + + #[cfg(not(feature = "http1"))] + ProtoServer::H1 { ref mut h1, .. } => match h1.0 {}, + #[cfg(not(feature = "http2"))] + ProtoServer::H2 { ref mut h2 } => match h2.0 {}, }; } } @@ -738,8 +774,8 @@ where let conn = self.conn.take(); let (io, read_buf, dispatch) = match conn.unwrap() { - ProtoServer::H1(h1, _) => h1.into_inner(), - ProtoServer::H2(_h2) => { + ProtoServer::H1 { h1, .. } => h1.into_inner(), + ProtoServer::H2 { .. } => { panic!("h2 cannot into_inner"); } }; @@ -752,7 +788,7 @@ where let h2 = proto::h2::Server::new(rewind_io, dispatch.into_service(), builder, exec.clone()); debug_assert!(self.conn.is_none()); - self.conn = Some(ProtoServer::H2(h2)); + self.conn = Some(ProtoServer::H2 { h2 }); } /// Enable this connection to support higher-level HTTP upgrades. @@ -986,9 +1022,14 @@ where fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { match self.project() { #[cfg(feature = "http1")] - ProtoServerProj::H1(s, _) => s.poll(cx), + ProtoServerProj::H1 { h1, .. } => h1.poll(cx), #[cfg(feature = "http2")] - ProtoServerProj::H2(s) => s.poll(cx), + ProtoServerProj::H2 { h2 } => h2.poll(cx), + + #[cfg(not(feature = "http1"))] + ProtoServerProj::H1 { h1, .. } => match h1.0 {}, + #[cfg(not(feature = "http2"))] + ProtoServerProj::H2 { h2 } => match h2.0 {}, } } } @@ -1002,7 +1043,7 @@ pub(crate) mod spawn_all { use crate::common::exec::ConnStreamExec; use crate::common::{task, Future, Pin, Poll, Unpin}; use crate::service::HttpService; - use pin_project::pin_project; + use pin_project_lite::pin_project; // Used by `SpawnAll` to optionally watch a `Connection` future. // @@ -1047,23 +1088,36 @@ pub(crate) mod spawn_all { // Users cannot import this type, nor the associated `NewSvcExec`. Instead, // a blanket implementation for `Executor` is sufficient. - #[pin_project] - #[allow(missing_debug_implementations)] - pub struct NewSvcTask, E, W: Watcher> { - #[pin] - state: State, + pin_project! { + #[allow(missing_debug_implementations)] + pub struct NewSvcTask, E, W: Watcher> { + #[pin] + state: State, + } } - #[pin_project(project = StateProj)] - pub(super) enum State, E, W: Watcher> { - Connecting(#[pin] Connecting, W), - Connected(#[pin] W::Future), + pin_project! { + #[project = StateProj] + pub(super) enum State, E, W: Watcher> { + Connecting { + #[pin] + connecting: Connecting, + watcher: W, + }, + Connected { + #[pin] + future: W::Future, + }, + } } impl, E, W: Watcher> NewSvcTask { pub(super) fn new(connecting: Connecting, watcher: W) -> Self { NewSvcTask { - state: State::Connecting(connecting, watcher), + state: State::Connecting { + connecting, + watcher, + }, } } } @@ -1090,7 +1144,10 @@ pub(crate) mod spawn_all { loop { let next = { match me.state.as_mut().project() { - StateProj::Connecting(connecting, watcher) => { + StateProj::Connecting { + connecting, + watcher, + } => { let res = ready!(connecting.poll(cx)); let conn = match res { Ok(conn) => conn, @@ -1100,10 +1157,10 @@ pub(crate) mod spawn_all { return Poll::Ready(()); } }; - let connected = watcher.watch(conn.with_upgrades()); - State::Connected(connected) + let future = watcher.watch(conn.with_upgrades()); + State::Connected { future } } - StateProj::Connected(future) => { + StateProj::Connected { future } => { return future.poll(cx).map(|res| { if let Err(err) = res { debug!("connection error: {}", err); @@ -1171,7 +1228,7 @@ mod upgrades { #[cfg(feature = "http1")] Ok(proto::Dispatched::Upgrade(pending)) => { match self.inner.conn.take() { - Some(ProtoServer::H1(h1, _)) => { + Some(ProtoServer::H1 { h1, .. }) => { let (io, buf, _) = h1.into_inner(); pending.fulfill(Upgraded::new(io, buf)); return Poll::Ready(Ok(())); diff --git a/src/server/server.rs b/src/server/server.rs index 20c993e8ac..bdd517808b 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -6,7 +6,7 @@ use std::net::{SocketAddr, TcpListener as StdTcpListener}; #[cfg(feature = "tcp")] use std::time::Duration; -use pin_project::pin_project; +use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; use super::accept::Accept; @@ -21,16 +21,17 @@ use super::shutdown::{Graceful, GracefulWatcher}; #[cfg(feature = "tcp")] use super::tcp::AddrIncoming; -/// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default. -/// -/// `Server` is a `Future` mapping a bound listener with a set of service -/// handlers. It is built using the [`Builder`](Builder), and the future -/// completes when the server has been shutdown. It should be run by an -/// `Executor`. -#[pin_project] -pub struct Server { - #[pin] - spawn_all: SpawnAll, +pin_project! { + /// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default. + /// + /// `Server` is a `Future` mapping a bound listener with a set of service + /// handlers. It is built using the [`Builder`](Builder), and the future + /// completes when the server has been shutdown. It should be run by an + /// `Executor`. + pub struct Server { + #[pin] + spawn_all: SpawnAll, + } } /// A builder for a [`Server`](Server). diff --git a/src/server/shutdown.rs b/src/server/shutdown.rs index e54ba42104..122853ac17 100644 --- a/src/server/shutdown.rs +++ b/src/server/shutdown.rs @@ -1,33 +1,36 @@ use std::error::Error as StdError; -use pin_project::pin_project; +use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; -use super::conn::{SpawnAll, UpgradeableConnection, Watcher}; use super::accept::Accept; +use super::conn::{SpawnAll, UpgradeableConnection, Watcher}; use crate::body::{Body, HttpBody}; use crate::common::drain::{self, Draining, Signal, Watch, Watching}; use crate::common::exec::{ConnStreamExec, NewSvcExec}; use crate::common::{task, Future, Pin, Poll, Unpin}; use crate::service::{HttpService, MakeServiceRef}; -#[allow(missing_debug_implementations)] -#[pin_project] -pub struct Graceful { - #[pin] - state: State, +pin_project! { + #[allow(missing_debug_implementations)] + pub struct Graceful { + #[pin] + state: State, + } } -#[pin_project(project = StateProj)] -pub(super) enum State { - Running { - drain: Option<(Signal, Watch)>, - #[pin] - spawn_all: SpawnAll, - #[pin] - signal: F, - }, - Draining(Draining), +pin_project! { + #[project = StateProj] + pub(super) enum State { + Running { + drain: Option<(Signal, Watch)>, + #[pin] + spawn_all: SpawnAll, + #[pin] + signal: F, + }, + Draining { draining: Draining }, + } } impl Graceful { @@ -71,14 +74,16 @@ where Poll::Ready(()) => { debug!("signal received, starting graceful shutdown"); let sig = drain.take().expect("drain channel").0; - State::Draining(sig.drain()) + State::Draining { + draining: sig.drain(), + } } Poll::Pending => { let watch = drain.as_ref().expect("drain channel").1.clone(); return spawn_all.poll_watch(cx, &GracefulWatcher(watch)); } }, - StateProj::Draining(ref mut draining) => { + StateProj::Draining { ref mut draining } => { return Pin::new(draining).poll(cx).map(Ok); } } diff --git a/src/server/tcp.rs b/src/server/tcp.rs index 7b2f68b3a9..792e0034f3 100644 --- a/src/server/tcp.rs +++ b/src/server/tcp.rs @@ -199,13 +199,14 @@ mod addr_stream { use crate::common::{task, Pin, Poll}; - /// A transport returned yieled by `AddrIncoming`. - #[pin_project::pin_project] - #[derive(Debug)] - pub struct AddrStream { - #[pin] - inner: TcpStream, - pub(super) remote_addr: SocketAddr, + pin_project_lite::pin_project! { + /// A transport returned yieled by `AddrIncoming`. + #[derive(Debug)] + pub struct AddrStream { + #[pin] + inner: TcpStream, + pub(super) remote_addr: SocketAddr, + } } impl AddrStream { diff --git a/src/service/oneshot.rs b/src/service/oneshot.rs index 766d0c4689..2697af8f4c 100644 --- a/src/service/oneshot.rs +++ b/src/service/oneshot.rs @@ -1,6 +1,6 @@ // TODO: Eventually to be replaced with tower_util::Oneshot. -use pin_project::pin_project; +use pin_project_lite::pin_project; use tower_service::Service; use crate::common::{task, Future, Pin, Poll}; @@ -10,25 +10,35 @@ where S: Service, { Oneshot { - state: State::NotReady(svc, req), + state: State::NotReady { svc, req }, } } -// A `Future` consuming a `Service` and request, waiting until the `Service` -// is ready, and then calling `Service::call` with the request, and -// waiting for that `Future`. -#[allow(missing_debug_implementations)] -#[pin_project] -pub struct Oneshot, Req> { - #[pin] - state: State, +pin_project! { + // A `Future` consuming a `Service` and request, waiting until the `Service` + // is ready, and then calling `Service::call` with the request, and + // waiting for that `Future`. + #[allow(missing_debug_implementations)] + pub struct Oneshot, Req> { + #[pin] + state: State, + } } -#[pin_project(project = StateProj, project_replace = StateProjOwn)] -enum State, Req> { - NotReady(S, Req), - Called(#[pin] S::Future), - Tmp, +pin_project! { + #[project = StateProj] + #[project_replace = StateProjOwn] + enum State, Req> { + NotReady { + svc: S, + req: Req, + }, + Called { + #[pin] + fut: S::Future, + }, + Tmp, + } } impl Future for Oneshot @@ -42,19 +52,19 @@ where loop { match me.state.as_mut().project() { - StateProj::NotReady(ref mut svc, _) => { + StateProj::NotReady { ref mut svc, .. } => { ready!(svc.poll_ready(cx))?; // fallthrough out of the match's borrow } - StateProj::Called(fut) => { + StateProj::Called { fut } => { return fut.poll(cx); } StateProj::Tmp => unreachable!(), } match me.state.as_mut().project_replace(State::Tmp) { - StateProjOwn::NotReady(mut svc, req) => { - me.state.set(State::Called(svc.call(req))); + StateProjOwn::NotReady { mut svc, req } => { + me.state.set(State::Called { fut: svc.call(req) }); } _ => unreachable!(), }