From f72ae29d61ac0883fa4d51beffc146482e056a34 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Tue, 12 Jan 2021 00:12:21 +0100 Subject: [PATCH] refactor(lib): Switch from pin-project to pin-project-lite --- Cargo.toml | 1 + src/client/conn.rs | 49 ++++---- src/client/connect/http.rs | 27 ++--- src/client/pool.rs | 25 +++-- src/common/drain.rs | 19 ++-- src/proto/h1/dispatch.rs | 11 +- src/proto/h2/mod.rs | 21 ++-- src/proto/h2/server.rs | 72 +++++++----- src/server/accept.rs | 14 ++- src/server/conn.rs | 222 +++++++++++++++++++++---------------- src/server/server.rs | 23 ++-- src/server/shutdown.rs | 43 +++---- src/server/tcp.rs | 15 +-- src/service/oneshot.rs | 48 ++++---- 14 files changed, 336 insertions(+), 254 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f5357ed5ba..69d6709b86 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ h2 = { version = "0.3", optional = true } itoa = "0.4.1" tracing = { version = "0.1", default-features = false, features = ["std"] } pin-project = "1.0" +pin-project-lite = "0.2.4" tower-service = "0.3" tokio = { version = "1", features = ["sync"] } want = "0.3" diff --git a/src/client/conn.rs b/src/client/conn.rs index 2799c61eff..62cde0c068 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -56,7 +56,7 @@ use std::time::Duration; use bytes::Bytes; use futures_util::future::{self, Either, FutureExt as _}; -use pin_project::pin_project; +use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; use tower_service::Service; @@ -75,15 +75,23 @@ use crate::{Body, Request, Response}; #[cfg(feature = "http1")] type Http1Dispatcher = proto::dispatch::Dispatcher, B, T, R>; -#[pin_project(project = ProtoClientProj)] -enum ProtoClient -where - B: HttpBody, -{ - #[cfg(feature = "http1")] - H1(#[pin] Http1Dispatcher), - #[cfg(feature = "http2")] - H2(#[pin] proto::h2::ClientTask, PhantomData), +pin_project! { + #[project = ProtoClientProj] + enum ProtoClient + where + B: HttpBody, + { + #[cfg(feature = "http1")] + H1 { + #[pin] + h1: Http1Dispatcher, + }, + #[cfg(feature = "http2")] + H2 { + #[pin] + h2: proto::h2::ClientTask, _phantom: PhantomData, + }, + } } /// Returns a handshake future over some IO. @@ -400,7 +408,7 @@ where pub fn into_parts(self) -> Parts { match self.inner.expect("already upgraded") { #[cfg(feature = "http1")] - ProtoClient::H1(h1) => { + ProtoClient::H1 { h1 } => { let (io, read_buf, _) = h1.into_inner(); Parts { io, @@ -409,7 +417,7 @@ where } } #[cfg(feature = "http2")] - ProtoClient::H2(..) => { + ProtoClient::H2 { .. } => { panic!("http2 cannot into_inner"); } } @@ -429,9 +437,9 @@ where pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll> { match *self.inner.as_mut().expect("already upgraded") { #[cfg(feature = "http1")] - ProtoClient::H1(ref mut h1) => h1.poll_without_shutdown(cx), + ProtoClient::H1 { ref mut h1 } => h1.poll_without_shutdown(cx), #[cfg(feature = "http2")] - ProtoClient::H2(ref mut h2, _) => Pin::new(h2).poll(cx).map_ok(|_| ()), + ProtoClient::H2 { ref mut h2, .. } => Pin::new(h2).poll(cx).map_ok(|_| ()), } } @@ -460,7 +468,7 @@ where proto::Dispatched::Shutdown => Poll::Ready(Ok(())), #[cfg(feature = "http1")] proto::Dispatched::Upgrade(pending) => match self.inner.take() { - Some(ProtoClient::H1(h1)) => { + Some(ProtoClient::H1 { h1 }) => { let (io, buf, _) = h1.into_inner(); pending.fulfill(Upgraded::new(io, buf)); Poll::Ready(Ok(())) @@ -707,14 +715,17 @@ impl Builder { } let cd = proto::h1::dispatch::Client::new(rx); let dispatch = proto::h1::Dispatcher::new(cd, conn); - ProtoClient::H1(dispatch) + ProtoClient::H1 { h1: dispatch } } #[cfg(feature = "http2")] Proto::Http2 => { let h2 = proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone()) .await?; - ProtoClient::H2(h2, PhantomData) + ProtoClient::H2 { + h2, + _phantom: PhantomData, + } } }; @@ -768,9 +779,9 @@ where fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { match self.project() { #[cfg(feature = "http1")] - ProtoClientProj::H1(c) => c.poll(cx), + ProtoClientProj::H1 { h1 } => h1.poll(cx), #[cfg(feature = "http2")] - ProtoClientProj::H2(c, _) => c.poll(cx), + ProtoClientProj::H2 { h2, .. } => h2.poll(cx), } } } diff --git a/src/client/connect/http.rs b/src/client/connect/http.rs index 734aea188a..17339f4179 100644 --- a/src/client/connect/http.rs +++ b/src/client/connect/http.rs @@ -11,7 +11,7 @@ use std::time::Duration; use futures_util::future::Either; use http::uri::{Scheme, Uri}; -use pin_project::pin_project; +use pin_project_lite::pin_project; use tokio::net::{TcpSocket, TcpStream}; use tokio::time::Sleep; @@ -373,18 +373,19 @@ impl HttpInfo { } } -// Not publicly exported (so missing_docs doesn't trigger). -// -// We return this `Future` instead of the `Pin>` directly -// so that users don't rely on it fitting in a `Pin>` slot -// (and thus we can change the type in the future). -#[must_use = "futures do nothing unless polled"] -#[pin_project] -#[allow(missing_debug_implementations)] -pub struct HttpConnecting { - #[pin] - fut: BoxConnecting, - _marker: PhantomData, +pin_project! { + // Not publicly exported (so missing_docs doesn't trigger). + // + // We return this `Future` instead of the `Pin>` directly + // so that users don't rely on it fitting in a `Pin>` slot + // (and thus we can change the type in the future). + #[must_use = "futures do nothing unless polled"] + #[allow(missing_debug_implementations)] + pub struct HttpConnecting { + #[pin] + fut: BoxConnecting, + _marker: PhantomData, + } } type ConnectResult = Result; diff --git a/src/client/pool.rs b/src/client/pool.rs index 0f22657bd4..94f73f6afd 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -11,7 +11,7 @@ use futures_channel::oneshot; use tokio::time::{Duration, Instant, Interval}; use super::client::Ver; -use crate::common::{task, exec::Exec, Future, Pin, Poll, Unpin}; +use crate::common::{exec::Exec, task, Future, Pin, Poll, Unpin}; // FIXME: allow() required due to `impl Trait` leaking types to this lint #[allow(missing_debug_implementations)] @@ -714,16 +714,17 @@ impl Expiration { } #[cfg(feature = "runtime")] -#[pin_project::pin_project] -struct IdleTask { - #[pin] - interval: Interval, - pool: WeakOpt>>, - // This allows the IdleTask to be notified as soon as the entire - // Pool is fully dropped, and shutdown. This channel is never sent on, - // but Err(Canceled) will be received when the Pool is dropped. - #[pin] - pool_drop_notifier: oneshot::Receiver, +pin_project_lite::pin_project! { + struct IdleTask { + #[pin] + interval: Interval, + pool: WeakOpt>>, + // This allows the IdleTask to be notified as soon as the entire + // Pool is fully dropped, and shutdown. This channel is never sent on, + // but Err(Canceled) will be received when the Pool is dropped. + #[pin] + pool_drop_notifier: oneshot::Receiver, + } } #[cfg(feature = "runtime")] @@ -776,7 +777,7 @@ mod tests { use std::time::Duration; use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt}; - use crate::common::{task, exec::Exec, Future, Pin}; + use crate::common::{exec::Exec, task, Future, Pin}; /// Test unique reservations. #[derive(Debug, PartialEq, Eq)] diff --git a/src/common/drain.rs b/src/common/drain.rs index 4bb2ecc118..174da876df 100644 --- a/src/common/drain.rs +++ b/src/common/drain.rs @@ -1,6 +1,6 @@ use std::mem; -use pin_project::pin_project; +use pin_project_lite::pin_project; use tokio::sync::watch; use super::{task, Future, Pin, Poll}; @@ -21,14 +21,15 @@ pub(crate) struct Watch { rx: watch::Receiver<()>, } -#[allow(missing_debug_implementations)] -#[pin_project] -pub struct Watching { - #[pin] - future: F, - state: State, - watch: Pin + Send + Sync>>, - _rx: watch::Receiver<()>, +pin_project! { + #[allow(missing_debug_implementations)] + pub struct Watching { + #[pin] + future: F, + state: State, + watch: Pin + Send + Sync>>, + _rx: watch::Receiver<()>, + } } enum State { diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 88e641e9a4..1a72450b15 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -44,10 +44,13 @@ cfg_server! { } cfg_client! { - pub(crate) struct Client { - callback: Option, http::Response>>, - rx: ClientRx, - rx_closed: bool, + pin_project_lite::pin_project! { + pub(crate) struct Client { + callback: Option, http::Response>>, + #[pin] + rx: ClientRx, + rx_closed: bool, + } } type ClientRx = crate::client::dispatch::Receiver, http::Response>; diff --git a/src/proto/h2/mod.rs b/src/proto/h2/mod.rs index cf06592903..cf78e3f18c 100644 --- a/src/proto/h2/mod.rs +++ b/src/proto/h2/mod.rs @@ -5,7 +5,7 @@ use http::header::{ TRANSFER_ENCODING, UPGRADE, }; use http::HeaderMap; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::error::Error as StdError; use std::io::IoSlice; @@ -94,15 +94,16 @@ fn decode_content_length(headers: &HeaderMap) -> DecodedLength { // body adapters used by both Client and Server -#[pin_project] -struct PipeToSendStream -where - S: HttpBody, -{ - body_tx: SendStream>, - data_done: bool, - #[pin] - stream: S, +pin_project! { + struct PipeToSendStream + where + S: HttpBody, + { + body_tx: SendStream>, + data_done: bool, + #[pin] + stream: S, + } } impl PipeToSendStream diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index eea52e3e4b..167dd90dbb 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -5,7 +5,7 @@ use std::time::Duration; use h2::server::{Connection, Handshake, SendResponse}; use h2::Reason; -use pin_project::pin_project; +use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; use super::{decode_content_length, ping, PipeToSendStream, SendBuf}; @@ -57,15 +57,16 @@ impl Default for Config { } } -#[pin_project] -pub(crate) struct Server -where - S: HttpService, - B: HttpBody, -{ - exec: E, - service: S, - state: State, +pin_project! { + pub(crate) struct Server + where + S: HttpService, + B: HttpBody, + { + exec: E, + service: S, + state: State, + } } enum State @@ -315,24 +316,33 @@ where } } -#[allow(missing_debug_implementations)] -#[pin_project] -pub struct H2Stream -where - B: HttpBody, -{ - reply: SendResponse>, - #[pin] - state: H2StreamState, +pin_project! { + #[allow(missing_debug_implementations)] + pub struct H2Stream + where + B: HttpBody, + { + reply: SendResponse>, + #[pin] + state: H2StreamState, + } } -#[pin_project(project = H2StreamStateProj)] -enum H2StreamState -where - B: HttpBody, -{ - Service(#[pin] F), - Body(#[pin] PipeToSendStream), +pin_project! { + #[project = H2StreamStateProj] + enum H2StreamState + where + B: HttpBody, + { + Service { + #[pin] + fut: F, + }, + Body { + #[pin] + pipe: PipeToSendStream, + }, + } } impl H2Stream @@ -342,7 +352,7 @@ where fn new(fut: F, respond: SendResponse>) -> H2Stream { H2Stream { reply: respond, - state: H2StreamState::Service(fut), + state: H2StreamState::Service { fut }, } } } @@ -371,7 +381,7 @@ where let mut me = self.project(); loop { let next = match me.state.as_mut().project() { - H2StreamStateProj::Service(h) => { + H2StreamStateProj::Service { fut: h } => { let res = match h.poll(cx) { Poll::Ready(Ok(r)) => r, Poll::Pending => { @@ -409,13 +419,15 @@ where if !body.is_end_stream() { let body_tx = reply!(me, res, false); - H2StreamState::Body(PipeToSendStream::new(body, body_tx)) + H2StreamState::Body { + pipe: PipeToSendStream::new(body, body_tx), + } } else { reply!(me, res, true); return Poll::Ready(Ok(())); } } - H2StreamStateProj::Body(pipe) => { + H2StreamStateProj::Body { pipe } => { return pipe.poll(cx); } }; diff --git a/src/server/accept.rs b/src/server/accept.rs index 4ec287129d..4b7a1487dd 100644 --- a/src/server/accept.rs +++ b/src/server/accept.rs @@ -9,7 +9,7 @@ #[cfg(feature = "stream")] use futures_core::Stream; #[cfg(feature = "stream")] -use pin_project::pin_project; +use pin_project_lite::pin_project; use crate::common::{ task::{self, Poll}, @@ -86,8 +86,12 @@ pub fn from_stream(stream: S) -> impl Accept where S: Stream>, { - #[pin_project] - struct FromStream(#[pin] S); + pin_project! { + struct FromStream { + #[pin] + stream: S, + } + } impl Accept for FromStream where @@ -99,9 +103,9 @@ where self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll>> { - self.project().0.poll_next(cx) + self.project().stream.poll_next(cx) } } - FromStream(stream) + FromStream { stream } } diff --git a/src/server/conn.rs b/src/server/conn.rs index 5137708fcb..112bbe535d 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -45,7 +45,6 @@ use std::error::Error as StdError; use std::fmt; -#[cfg(feature = "http1")] use std::marker::PhantomData; #[cfg(feature = "tcp")] use std::net::SocketAddr; @@ -53,7 +52,7 @@ use std::net::SocketAddr; use std::time::Duration; use bytes::Bytes; -use pin_project::pin_project; +use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; use super::accept::Accept; @@ -109,77 +108,85 @@ enum ConnectionMode { Fallback, } -/// A stream mapping incoming IOs to new services. -/// -/// Yields `Connecting`s that are futures that should be put on a reactor. -#[must_use = "streams do nothing unless polled"] -#[pin_project] -#[derive(Debug)] -pub(super) struct Serve { - #[pin] - incoming: I, - make_service: S, - protocol: Http, +pin_project! { + /// A stream mapping incoming IOs to new services. + /// + /// Yields `Connecting`s that are futures that should be put on a reactor. + #[must_use = "streams do nothing unless polled"] + #[derive(Debug)] + pub(super) struct Serve { + #[pin] + incoming: I, + make_service: S, + protocol: Http, + } } -/// A future building a new `Service` to a `Connection`. -/// -/// Wraps the future returned from `MakeService` into one that returns -/// a `Connection`. -#[must_use = "futures do nothing unless polled"] -#[pin_project] -#[derive(Debug)] -pub struct Connecting { - #[pin] - future: F, - io: Option, - protocol: Http, +pin_project! { + /// A future building a new `Service` to a `Connection`. + /// + /// Wraps the future returned from `MakeService` into one that returns + /// a `Connection`. + #[must_use = "futures do nothing unless polled"] + #[derive(Debug)] + pub struct Connecting { + #[pin] + future: F, + io: Option, + protocol: Http, + } } -#[must_use = "futures do nothing unless polled"] -#[pin_project] -#[derive(Debug)] -pub(super) struct SpawnAll { - // TODO: re-add `pub(super)` once rustdoc can handle this. - // - // See https://github.com/rust-lang/rust/issues/64705 - #[pin] - pub(super) serve: Serve, +pin_project! { + #[must_use = "futures do nothing unless polled"] + #[derive(Debug)] + pub(super) struct SpawnAll { + // TODO: re-add `pub(super)` once rustdoc can handle this. + // + // See https://github.com/rust-lang/rust/issues/64705 + #[pin] + pub(super) serve: Serve, + } } -/// A future binding a connection with a Service. -/// -/// Polling this future will drive HTTP forward. -#[must_use = "futures do nothing unless polled"] -#[pin_project] -pub struct Connection -where - S: HttpService, -{ - pub(super) conn: Option>, - #[cfg(all(feature = "http1", feature = "http2"))] - fallback: Fallback, +pin_project! { + /// A future binding a connection with a Service. + /// + /// Polling this future will drive HTTP forward. + #[must_use = "futures do nothing unless polled"] + pub struct Connection + where + S: HttpService, + { + pub(super) conn: Option>, + fallback: Fallback, + } } -#[pin_project(project = ProtoServerProj)] -pub(super) enum ProtoServer -where - S: HttpService, - B: HttpBody, -{ - #[cfg(feature = "http1")] - H1( - #[pin] - proto::h1::Dispatcher< - proto::h1::dispatch::Server, - B, - T, - proto::ServerTransaction, - >, - PhantomData, - ), - #[cfg(feature = "http2")] - H2(#[pin] proto::h2::Server, S, B, E>), +pin_project! { + #[project = ProtoServerProj] + pub(super) enum ProtoServer + where + S: HttpService, + B: HttpBody, + { + #[cfg(feature = "http1")] + H1 { + #[pin] + h1: proto::h1::Dispatcher< + proto::h1::dispatch::Server, + B, + T, + proto::ServerTransaction, + >, + _phantom: PhantomData, + }, + #[cfg(feature = "http2")] + H2 { + #[pin] + h2: proto::h2::Server, S, B, E>, + }, + } } #[cfg(all(feature = "http1", feature = "http2"))] @@ -189,6 +196,10 @@ enum Fallback { Http1Only, } +#[cfg(not(all(feature = "http1", feature = "http2")))] +#[derive(Clone, Debug)] +struct Fallback(PhantomData); + #[cfg(all(feature = "http1", feature = "http2"))] impl Fallback { fn to_h2(&self) -> bool { @@ -519,7 +530,10 @@ impl Http { conn.set_max_buf_size(max); } let sd = proto::h1::dispatch::Server::new(service); - ProtoServer::H1(proto::h1::Dispatcher::new(sd, conn), PhantomData) + ProtoServer::H1 { + h1: proto::h1::Dispatcher::new(sd, conn), + _phantom: PhantomData, + } }}; } @@ -535,7 +549,7 @@ impl Http { let rewind_io = Rewind::new(io); let h2 = proto::h2::Server::new(rewind_io, service, &self.h2_builder, self.exec.clone()); - ProtoServer::H2(h2) + ProtoServer::H2 { h2 } } }; @@ -590,14 +604,14 @@ where /// This should only be called while the `Connection` future is still /// pending. If called after `Connection::poll` has resolved, this does /// nothing. - pub fn graceful_shutdown(self: Pin<&mut Self>) { - match self.project().conn { + pub fn graceful_shutdown(mut self: Pin<&mut Self>) { + match self.conn { #[cfg(feature = "http1")] - Some(ProtoServer::H1(ref mut h1, _)) => { + Some(ProtoServer::H1 { ref mut h1, .. }) => { h1.disable_keep_alive(); } #[cfg(feature = "http2")] - Some(ProtoServer::H2(ref mut h2)) => { + Some(ProtoServer::H2 { ref mut h2 }) => { h2.graceful_shutdown(); } None => (), @@ -624,7 +638,7 @@ where pub fn try_into_parts(self) -> Option> { match self.conn.unwrap() { #[cfg(feature = "http1")] - ProtoServer::H1(h1, _) => { + ProtoServer::H1 { h1, .. } => { let (io, read_buf, dispatch) = h1.into_inner(); Some(Parts { io, @@ -634,7 +648,7 @@ where }) } #[cfg(feature = "http2")] - ProtoServer::H2(_h2) => None, + ProtoServer::H2 { .. } => None, } } @@ -658,7 +672,7 @@ where loop { match *self.conn.as_mut().unwrap() { #[cfg(feature = "http1")] - ProtoServer::H1(ref mut h1, _) => match ready!(h1.poll_without_shutdown(cx)) { + ProtoServer::H1 { ref mut h1, .. } => match ready!(h1.poll_without_shutdown(cx)) { Ok(()) => return Poll::Ready(Ok(())), Err(e) => { #[cfg(feature = "http2")] @@ -674,7 +688,7 @@ where } }, #[cfg(feature = "http2")] - ProtoServer::H2(ref mut h2) => return Pin::new(h2).poll(cx).map_ok(|_| ()), + ProtoServer::H2 { ref mut h2 } => return Pin::new(h2).poll(cx).map_ok(|_| ()), }; } } @@ -700,8 +714,8 @@ where let conn = self.conn.take(); let (io, read_buf, dispatch) = match conn.unwrap() { - ProtoServer::H1(h1, _) => h1.into_inner(), - ProtoServer::H2(_h2) => { + ProtoServer::H1 { h1, .. } => h1.into_inner(), + ProtoServer::H2 { .. } => { panic!("h2 cannot into_inner"); } }; @@ -714,7 +728,7 @@ where let h2 = proto::h2::Server::new(rewind_io, dispatch.into_service(), builder, exec.clone()); debug_assert!(self.conn.is_none()); - self.conn = Some(ProtoServer::H2(h2)); + self.conn = Some(ProtoServer::H2 { h2 }); } /// Enable this connection to support higher-level HTTP upgrades. @@ -948,9 +962,9 @@ where fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { match self.project() { #[cfg(feature = "http1")] - ProtoServerProj::H1(s, _) => s.poll(cx), + ProtoServerProj::H1 { h1, .. } => h1.poll(cx), #[cfg(feature = "http2")] - ProtoServerProj::H2(s) => s.poll(cx), + ProtoServerProj::H2 { h2 } => h2.poll(cx), } } } @@ -964,7 +978,7 @@ pub(crate) mod spawn_all { use crate::common::exec::ConnStreamExec; use crate::common::{task, Future, Pin, Poll, Unpin}; use crate::service::HttpService; - use pin_project::pin_project; + use pin_project_lite::pin_project; // Used by `SpawnAll` to optionally watch a `Connection` future. // @@ -1009,23 +1023,36 @@ pub(crate) mod spawn_all { // Users cannot import this type, nor the associated `NewSvcExec`. Instead, // a blanket implementation for `Executor` is sufficient. - #[pin_project] - #[allow(missing_debug_implementations)] - pub struct NewSvcTask, E, W: Watcher> { - #[pin] - state: State, + pin_project! { + #[allow(missing_debug_implementations)] + pub struct NewSvcTask, E, W: Watcher> { + #[pin] + state: State, + } } - #[pin_project(project = StateProj)] - pub(super) enum State, E, W: Watcher> { - Connecting(#[pin] Connecting, W), - Connected(#[pin] W::Future), + pin_project! { + #[project = StateProj] + pub(super) enum State, E, W: Watcher> { + Connecting { + #[pin] + connecting: Connecting, + watcher: W, + }, + Connected { + #[pin] + future: W::Future, + }, + } } impl, E, W: Watcher> NewSvcTask { pub(super) fn new(connecting: Connecting, watcher: W) -> Self { NewSvcTask { - state: State::Connecting(connecting, watcher), + state: State::Connecting { + connecting, + watcher, + }, } } } @@ -1052,7 +1079,10 @@ pub(crate) mod spawn_all { loop { let next = { match me.state.as_mut().project() { - StateProj::Connecting(connecting, watcher) => { + StateProj::Connecting { + connecting, + watcher, + } => { let res = ready!(connecting.poll(cx)); let conn = match res { Ok(conn) => conn, @@ -1062,10 +1092,10 @@ pub(crate) mod spawn_all { return Poll::Ready(()); } }; - let connected = watcher.watch(conn.with_upgrades()); - State::Connected(connected) + let future = watcher.watch(conn.with_upgrades()); + State::Connected { future } } - StateProj::Connected(future) => { + StateProj::Connected { future } => { return future.poll(cx).map(|res| { if let Err(err) = res { debug!("connection error: {}", err); @@ -1133,7 +1163,7 @@ mod upgrades { #[cfg(feature = "http1")] Ok(proto::Dispatched::Upgrade(pending)) => { match self.inner.conn.take() { - Some(ProtoServer::H1(h1, _)) => { + Some(ProtoServer::H1 { h1, .. }) => { let (io, buf, _) = h1.into_inner(); pending.fulfill(Upgraded::new(io, buf)); return Poll::Ready(Ok(())); diff --git a/src/server/server.rs b/src/server/server.rs index 48cc6e2803..07d9e5fbb0 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -6,7 +6,7 @@ use std::net::{SocketAddr, TcpListener as StdTcpListener}; #[cfg(feature = "tcp")] use std::time::Duration; -use pin_project::pin_project; +use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; use super::accept::Accept; @@ -21,16 +21,17 @@ use super::shutdown::{Graceful, GracefulWatcher}; #[cfg(feature = "tcp")] use super::tcp::AddrIncoming; -/// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default. -/// -/// `Server` is a `Future` mapping a bound listener with a set of service -/// handlers. It is built using the [`Builder`](Builder), and the future -/// completes when the server has been shutdown. It should be run by an -/// `Executor`. -#[pin_project] -pub struct Server { - #[pin] - spawn_all: SpawnAll, +pin_project! { + /// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default. + /// + /// `Server` is a `Future` mapping a bound listener with a set of service + /// handlers. It is built using the [`Builder`](Builder), and the future + /// completes when the server has been shutdown. It should be run by an + /// `Executor`. + pub struct Server { + #[pin] + spawn_all: SpawnAll, + } } /// A builder for a [`Server`](Server). diff --git a/src/server/shutdown.rs b/src/server/shutdown.rs index e54ba42104..122853ac17 100644 --- a/src/server/shutdown.rs +++ b/src/server/shutdown.rs @@ -1,33 +1,36 @@ use std::error::Error as StdError; -use pin_project::pin_project; +use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; -use super::conn::{SpawnAll, UpgradeableConnection, Watcher}; use super::accept::Accept; +use super::conn::{SpawnAll, UpgradeableConnection, Watcher}; use crate::body::{Body, HttpBody}; use crate::common::drain::{self, Draining, Signal, Watch, Watching}; use crate::common::exec::{ConnStreamExec, NewSvcExec}; use crate::common::{task, Future, Pin, Poll, Unpin}; use crate::service::{HttpService, MakeServiceRef}; -#[allow(missing_debug_implementations)] -#[pin_project] -pub struct Graceful { - #[pin] - state: State, +pin_project! { + #[allow(missing_debug_implementations)] + pub struct Graceful { + #[pin] + state: State, + } } -#[pin_project(project = StateProj)] -pub(super) enum State { - Running { - drain: Option<(Signal, Watch)>, - #[pin] - spawn_all: SpawnAll, - #[pin] - signal: F, - }, - Draining(Draining), +pin_project! { + #[project = StateProj] + pub(super) enum State { + Running { + drain: Option<(Signal, Watch)>, + #[pin] + spawn_all: SpawnAll, + #[pin] + signal: F, + }, + Draining { draining: Draining }, + } } impl Graceful { @@ -71,14 +74,16 @@ where Poll::Ready(()) => { debug!("signal received, starting graceful shutdown"); let sig = drain.take().expect("drain channel").0; - State::Draining(sig.drain()) + State::Draining { + draining: sig.drain(), + } } Poll::Pending => { let watch = drain.as_ref().expect("drain channel").1.clone(); return spawn_all.poll_watch(cx, &GracefulWatcher(watch)); } }, - StateProj::Draining(ref mut draining) => { + StateProj::Draining { ref mut draining } => { return Pin::new(draining).poll(cx).map(Ok); } } diff --git a/src/server/tcp.rs b/src/server/tcp.rs index 91afc40120..46c570decd 100644 --- a/src/server/tcp.rs +++ b/src/server/tcp.rs @@ -229,13 +229,14 @@ mod addr_stream { use crate::common::{task, Pin, Poll}; - /// A transport returned yieled by `AddrIncoming`. - #[pin_project::pin_project] - #[derive(Debug)] - pub struct AddrStream { - #[pin] - inner: TcpStream, - pub(super) remote_addr: SocketAddr, + pin_project_lite::pin_project! { + /// A transport returned yieled by `AddrIncoming`. + #[derive(Debug)] + pub struct AddrStream { + #[pin] + inner: TcpStream, + pub(super) remote_addr: SocketAddr, + } } impl AddrStream { diff --git a/src/service/oneshot.rs b/src/service/oneshot.rs index 766d0c4689..2697af8f4c 100644 --- a/src/service/oneshot.rs +++ b/src/service/oneshot.rs @@ -1,6 +1,6 @@ // TODO: Eventually to be replaced with tower_util::Oneshot. -use pin_project::pin_project; +use pin_project_lite::pin_project; use tower_service::Service; use crate::common::{task, Future, Pin, Poll}; @@ -10,25 +10,35 @@ where S: Service, { Oneshot { - state: State::NotReady(svc, req), + state: State::NotReady { svc, req }, } } -// A `Future` consuming a `Service` and request, waiting until the `Service` -// is ready, and then calling `Service::call` with the request, and -// waiting for that `Future`. -#[allow(missing_debug_implementations)] -#[pin_project] -pub struct Oneshot, Req> { - #[pin] - state: State, +pin_project! { + // A `Future` consuming a `Service` and request, waiting until the `Service` + // is ready, and then calling `Service::call` with the request, and + // waiting for that `Future`. + #[allow(missing_debug_implementations)] + pub struct Oneshot, Req> { + #[pin] + state: State, + } } -#[pin_project(project = StateProj, project_replace = StateProjOwn)] -enum State, Req> { - NotReady(S, Req), - Called(#[pin] S::Future), - Tmp, +pin_project! { + #[project = StateProj] + #[project_replace = StateProjOwn] + enum State, Req> { + NotReady { + svc: S, + req: Req, + }, + Called { + #[pin] + fut: S::Future, + }, + Tmp, + } } impl Future for Oneshot @@ -42,19 +52,19 @@ where loop { match me.state.as_mut().project() { - StateProj::NotReady(ref mut svc, _) => { + StateProj::NotReady { ref mut svc, .. } => { ready!(svc.poll_ready(cx))?; // fallthrough out of the match's borrow } - StateProj::Called(fut) => { + StateProj::Called { fut } => { return fut.poll(cx); } StateProj::Tmp => unreachable!(), } match me.state.as_mut().project_replace(State::Tmp) { - StateProjOwn::NotReady(mut svc, req) => { - me.state.set(State::Called(svc.call(req))); + StateProjOwn::NotReady { mut svc, req } => { + me.state.set(State::Called { fut: svc.call(req) }); } _ => unreachable!(), }