diff --git a/tower-layer/src/lib.rs b/tower-layer/src/lib.rs index 4ae58b912..b664011d7 100644 --- a/tower-layer/src/lib.rs +++ b/tower-layer/src/lib.rs @@ -5,7 +5,7 @@ rust_2018_idioms, unreachable_pub )] -#![deny(broken_intra_doc_links)] +#![deny(rustdoc::broken_intra_doc_links)] //! Layer traits and extensions. //! diff --git a/tower-service/src/lib.rs b/tower-service/src/lib.rs index 363189ea3..7bc9028e2 100644 --- a/tower-service/src/lib.rs +++ b/tower-service/src/lib.rs @@ -5,7 +5,7 @@ rust_2018_idioms, unreachable_pub )] -#![deny(broken_intra_doc_links)] +#![deny(rustdoc::broken_intra_doc_links)] //! Definition of the core `Service` trait to Tower //! diff --git a/tower-test/Cargo.toml b/tower-test/Cargo.toml index 497214e2f..4a2749fc6 100644 --- a/tower-test/Cargo.toml +++ b/tower-test/Cargo.toml @@ -27,7 +27,7 @@ tokio = { version = "1.0", features = ["sync"] } tokio-test = "0.4" tower-layer = { version = "0.3", path = "../tower-layer" } tower-service = { version = "0.3" } -pin-project = "1" +pin-project-lite = "0.2" [dev-dependencies] tokio = { version = "1.0", features = ["macros"] } diff --git a/tower-test/src/lib.rs b/tower-test/src/lib.rs index 1fa59da98..1f5869869 100644 --- a/tower-test/src/lib.rs +++ b/tower-test/src/lib.rs @@ -6,7 +6,7 @@ unreachable_pub )] #![allow(elided_lifetimes_in_paths)] -#![deny(broken_intra_doc_links)] +#![deny(rustdoc::broken_intra_doc_links)] //! Mock `Service` that can be used in tests. diff --git a/tower-test/src/mock/future.rs b/tower-test/src/mock/future.rs index 872df028a..cda4518e0 100644 --- a/tower-test/src/mock/future.rs +++ b/tower-test/src/mock/future.rs @@ -2,7 +2,7 @@ use crate::mock::error::{self, Error}; use futures_util::ready; -use pin_project::pin_project; +use pin_project_lite::pin_project; use tokio::sync::oneshot; use std::{ @@ -11,12 +11,13 @@ use std::{ task::{Context, Poll}, }; -/// Future of the `Mock` response. -#[pin_project] -#[derive(Debug)] -pub struct ResponseFuture { - #[pin] - rx: Option>, +pin_project! { + /// Future of the `Mock` response. + #[derive(Debug)] + pub struct ResponseFuture { + #[pin] + rx: Option>, + } } type Rx = oneshot::Receiver>; diff --git a/tower/Cargo.toml b/tower/Cargo.toml index 12ed91e17..a1c2f25ac 100644 --- a/tower/Cargo.toml +++ b/tower/Cargo.toml @@ -76,6 +76,7 @@ tokio = { version = "1", optional = true, features = ["sync"] } tokio-stream = { version = "0.1.0", optional = true } tokio-util = { version = "0.6.3", default-features = false, optional = true } tracing = { version = "0.1.2", optional = true } +pin-project-lite = "0.2.7" [dev-dependencies] futures = "0.3" diff --git a/tower/examples/tower-balance.rs b/tower/examples/tower-balance.rs index 3c9b70475..49717c1e7 100644 --- a/tower/examples/tower-balance.rs +++ b/tower/examples/tower-balance.rs @@ -3,7 +3,7 @@ use futures_core::{Stream, TryStream}; use futures_util::{stream, stream::StreamExt, stream::TryStreamExt}; use hdrhistogram::Histogram; -use pin_project::pin_project; +use pin_project_lite::pin_project; use rand::{self, Rng}; use std::hash::Hash; use std::time::Duration; @@ -78,8 +78,17 @@ type Error = Box; type Key = usize; -#[pin_project] -struct Disco(Vec<(Key, S)>); +pin_project! { + struct Disco { + services: Vec<(Key, S)> + } +} + +impl Disco { + fn new(services: Vec<(Key, S)>) -> Self { + Self { services } + } +} impl Stream for Disco where @@ -88,7 +97,7 @@ where type Item = Result, Error>; fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - match self.project().0.pop() { + match self.project().services.pop() { Some((k, service)) => Poll::Ready(Some(Ok(Change::Insert(k, service)))), None => { // there may be more later @@ -105,7 +114,7 @@ fn gen_disco() -> impl Discover< impl Service + Send, >, > + Send { - Disco( + Disco::new( MAX_ENDPOINT_LATENCIES .iter() .enumerate() diff --git a/tower/src/balance/p2c/make.rs b/tower/src/balance/p2c/make.rs index 306cd2283..84ea92577 100644 --- a/tower/src/balance/p2c/make.rs +++ b/tower/src/balance/p2c/make.rs @@ -1,7 +1,7 @@ use super::Balance; use crate::discover::Discover; use futures_core::ready; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::hash::Hash; use std::marker::PhantomData; use std::{ @@ -29,15 +29,16 @@ pub struct MakeBalance { _marker: PhantomData, } -/// A [`Balance`] in the making. -/// -/// [`Balance`]: crate::balance::p2c::Balance -#[pin_project] -#[derive(Debug)] -pub struct MakeFuture { - #[pin] - inner: F, - _marker: PhantomData, +pin_project! { + /// A [`Balance`] in the making. + /// + /// [`Balance`]: crate::balance::p2c::Balance + #[derive(Debug)] + pub struct MakeFuture { + #[pin] + inner: F, + _marker: PhantomData, + } } impl MakeBalance { diff --git a/tower/src/balance/p2c/service.rs b/tower/src/balance/p2c/service.rs index 690e2723d..d12e87323 100644 --- a/tower/src/balance/p2c/service.rs +++ b/tower/src/balance/p2c/service.rs @@ -4,7 +4,7 @@ use crate::load::Load; use crate::ready_cache::{error::Failed, ReadyCache}; use futures_core::ready; use futures_util::future::{self, TryFutureExt}; -use pin_project::pin_project; +use pin_project_lite::pin_project; use rand::{rngs::SmallRng, Rng, SeedableRng}; use std::hash::Hash; use std::marker::PhantomData; @@ -59,18 +59,19 @@ where } } -/// A Future that becomes satisfied when an `S`-typed service is ready. -/// -/// May fail due to cancelation, i.e., if [`Discover`] removes the service from the service set. -#[pin_project] -#[derive(Debug)] -struct UnreadyService { - key: Option, - #[pin] - cancel: oneshot::Receiver<()>, - service: Option, - - _req: PhantomData, +pin_project! { + /// A Future that becomes satisfied when an `S`-typed service is ready. + /// + /// May fail due to cancelation, i.e., if [`Discover`] removes the service from the service set. + #[derive(Debug)] + struct UnreadyService { + key: Option, + #[pin] + cancel: oneshot::Receiver<()>, + service: Option, + + _req: PhantomData, + } } enum Error { diff --git a/tower/src/balance/pool/mod.rs b/tower/src/balance/pool/mod.rs index 067785210..f32d25fe0 100644 --- a/tower/src/balance/pool/mod.rs +++ b/tower/src/balance/pool/mod.rs @@ -19,7 +19,7 @@ use crate::discover::Change; use crate::load::Load; use crate::make::MakeService; use futures_core::{ready, Stream}; -use pin_project::pin_project; +use pin_project_lite::pin_project; use slab::Slab; use std::{ fmt, @@ -42,23 +42,24 @@ enum Level { High, } -/// A wrapper around `MakeService` that discovers a new service when load is high, and removes a -/// service when load is low. See [`Pool`]. -#[pin_project] -pub struct PoolDiscoverer -where - MS: MakeService, -{ - maker: MS, - #[pin] - making: Option, - target: Target, - load: Level, - services: Slab<()>, - died_tx: tokio::sync::mpsc::UnboundedSender, - #[pin] - died_rx: tokio::sync::mpsc::UnboundedReceiver, - limit: Option, +pin_project! { + /// A wrapper around `MakeService` that discovers a new service when load is high, and removes a + /// service when load is low. See [`Pool`]. + pub struct PoolDiscoverer + where + MS: MakeService, + { + maker: MS, + #[pin] + making: Option, + target: Target, + load: Level, + services: Slab<()>, + died_tx: tokio::sync::mpsc::UnboundedSender, + #[pin] + died_rx: tokio::sync::mpsc::UnboundedReceiver, + limit: Option, + } } impl fmt::Debug for PoolDiscoverer diff --git a/tower/src/buffer/future.rs b/tower/src/buffer/future.rs index 40774fc9c..411789007 100644 --- a/tower/src/buffer/future.rs +++ b/tower/src/buffer/future.rs @@ -4,39 +4,50 @@ use super::{error::Closed, message}; use futures_core::ready; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::{ future::Future, pin::Pin, task::{Context, Poll}, }; -/// Future that completes when the buffered service eventually services the submitted request. -#[pin_project] -#[derive(Debug)] -pub struct ResponseFuture { - #[pin] - state: ResponseState, +pin_project! { + /// Future that completes when the buffered service eventually services the submitted request. + #[derive(Debug)] + pub struct ResponseFuture { + #[pin] + state: ResponseState, + } } -#[pin_project(project = ResponseStateProj)] -#[derive(Debug)] -enum ResponseState { - Failed(Option), - Rx(#[pin] message::Rx), - Poll(#[pin] T), +pin_project! { + #[project = ResponseStateProj] + #[derive(Debug)] + enum ResponseState { + Failed { + error: Option, + }, + Rx { + #[pin] + rx: message::Rx, + }, + Poll { + #[pin] + fut: T, + }, + } } impl ResponseFuture { pub(crate) fn new(rx: message::Rx) -> Self { ResponseFuture { - state: ResponseState::Rx(rx), + state: ResponseState::Rx { rx }, } } pub(crate) fn failed(err: crate::BoxError) -> Self { ResponseFuture { - state: ResponseState::Failed(Some(err)), + state: ResponseState::Failed { error: Some(err) }, } } } @@ -53,15 +64,15 @@ where loop { match this.state.as_mut().project() { - ResponseStateProj::Failed(e) => { - return Poll::Ready(Err(e.take().expect("polled after error"))); + ResponseStateProj::Failed { error } => { + return Poll::Ready(Err(error.take().expect("polled after error"))); } - ResponseStateProj::Rx(rx) => match ready!(rx.poll(cx)) { - Ok(Ok(f)) => this.state.set(ResponseState::Poll(f)), + ResponseStateProj::Rx { rx } => match ready!(rx.poll(cx)) { + Ok(Ok(fut)) => this.state.set(ResponseState::Poll { fut }), Ok(Err(e)) => return Poll::Ready(Err(e.into())), Err(_) => return Poll::Ready(Err(Closed::new().into())), }, - ResponseStateProj::Poll(fut) => return fut.poll(cx).map_err(Into::into), + ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into), } } } diff --git a/tower/src/buffer/worker.rs b/tower/src/buffer/worker.rs index 0a8d91705..fe7ea555c 100644 --- a/tower/src/buffer/worker.rs +++ b/tower/src/buffer/worker.rs @@ -3,7 +3,6 @@ use super::{ message::Message, }; use futures_core::ready; -use pin_project::pin_project; use std::sync::{Arc, Mutex, Weak}; use std::{ future::Future, @@ -13,27 +12,34 @@ use std::{ use tokio::sync::{mpsc, Semaphore}; use tower_service::Service; -/// Task that handles processing the buffer. This type should not be used -/// directly, instead `Buffer` requires an `Executor` that can accept this task. -/// -/// The struct is `pub` in the private module and the type is *not* re-exported -/// as part of the public API. This is the "sealed" pattern to include "private" -/// types in public traits that are not meant for consumers of the library to -/// implement (only call). -#[pin_project(PinnedDrop)] -#[derive(Debug)] -pub struct Worker -where - T: Service, - T::Error: Into, -{ - current_message: Option>, - rx: mpsc::UnboundedReceiver>, - service: T, - finish: bool, - failed: Option, - handle: Handle, - close: Option>, +pin_project_lite::pin_project! { + /// Task that handles processing the buffer. This type should not be used + /// directly, instead `Buffer` requires an `Executor` that can accept this task. + /// + /// The struct is `pub` in the private module and the type is *not* re-exported + /// as part of the public API. This is the "sealed" pattern to include "private" + /// types in public traits that are not meant for consumers of the library to + /// implement (only call). + #[derive(Debug)] + pub struct Worker + where + T: Service, + { + current_message: Option>, + rx: mpsc::UnboundedReceiver>, + service: T, + finish: bool, + failed: Option, + handle: Handle, + close: Option>, + } + + impl, Request> PinnedDrop for Worker + { + fn drop(mut this: Pin<&mut Self>) { + this.as_mut().close_semaphore(); + } + } } /// Get the error out @@ -42,6 +48,22 @@ pub(crate) struct Handle { inner: Arc>>, } +impl Worker +where + T: Service, +{ + /// Closes the buffer's semaphore if it is still open, waking any pending + /// tasks. + fn close_semaphore(&mut self) { + if let Some(close) = self.close.take().as_ref().and_then(Weak::upgrade) { + tracing::debug!("buffer closing; waking pending tasks"); + close.close(); + } else { + tracing::trace!("buffer already closed"); + } + } +} + impl Worker where T: Service, @@ -141,17 +163,6 @@ where // requests that we receive before we've exhausted the receiver receive the error: self.failed = Some(error); } - - /// Closes the buffer's semaphore if it is still open, waking any pending - /// tasks. - fn close_semaphore(&mut self) { - if let Some(close) = self.close.take().as_ref().and_then(Weak::upgrade) { - tracing::debug!("buffer closing; waking pending tasks"); - close.close(); - } else { - tracing::trace!("buffer already closed"); - } - } } impl Future for Worker @@ -225,17 +236,6 @@ where } } -#[pin_project::pinned_drop] -impl PinnedDrop for Worker -where - T: Service, - T::Error: Into, -{ - fn drop(mut self: Pin<&mut Self>) { - self.as_mut().close_semaphore(); - } -} - impl Handle { pub(crate) fn get_error_on_closed(&self) -> crate::BoxError { self.inner diff --git a/tower/src/discover/list.rs b/tower/src/discover/list.rs index 768eb54bb..b4198025f 100644 --- a/tower/src/discover/list.rs +++ b/tower/src/discover/list.rs @@ -1,6 +1,6 @@ use super::{error::Never, Change}; use futures_core::Stream; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::iter::{Enumerate, IntoIterator}; use std::{ pin::Pin, @@ -8,17 +8,18 @@ use std::{ }; use tower_service::Service; -/// Static service discovery based on a predetermined list of services. -/// -/// [`ServiceList`] is created with an initial list of services. The discovery -/// process will yield this list once and do nothing after. -#[pin_project] -#[derive(Debug)] -pub struct ServiceList -where - T: IntoIterator, -{ - inner: Enumerate, +pin_project! { + /// Static service discovery based on a predetermined list of services. + /// + /// [`ServiceList`] is created with an initial list of services. The discovery + /// process will yield this list once and do nothing after. + #[derive(Debug)] + pub struct ServiceList + where + T: IntoIterator, + { + inner: Enumerate, + } } impl ServiceList diff --git a/tower/src/filter/future.rs b/tower/src/filter/future.rs index 7577887a1..67772bbec 100644 --- a/tower/src/filter/future.rs +++ b/tower/src/filter/future.rs @@ -3,7 +3,7 @@ use super::AsyncPredicate; use crate::BoxError; use futures_core::ready; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::{ future::Future, pin::Pin, @@ -11,21 +11,22 @@ use std::{ }; use tower_service::Service; -/// Filtered response future from [`AsyncFilter`] services. -/// -/// [`AsyncFilter`]: crate::filter::AsyncFilter -#[pin_project] -#[derive(Debug)] -pub struct AsyncResponseFuture -where - P: AsyncPredicate, - S: Service, -{ - #[pin] - state: State, +pin_project! { + /// Filtered response future from [`AsyncFilter`] services. + /// + /// [`AsyncFilter`]: crate::filter::AsyncFilter + #[derive(Debug)] + pub struct AsyncResponseFuture + where + P: AsyncPredicate, + S: Service, + { + #[pin] + state: State, - /// Inner service - service: S, + // Inner service + service: S, + } } opaque_future! { @@ -39,13 +40,21 @@ opaque_future! { >; } -#[pin_project(project = StateProj)] -#[derive(Debug)] -enum State { - /// Waiting for the predicate future - Check(#[pin] F), - /// Waiting for the response future - WaitResponse(#[pin] G), +pin_project! { + #[project = StateProj] + #[derive(Debug)] + enum State { + /// Waiting for the predicate future + Check { + #[pin] + check: F + }, + /// Waiting for the response future + WaitResponse { + #[pin] + response: G + }, + } } impl AsyncResponseFuture @@ -56,7 +65,7 @@ where { pub(crate) fn new(check: P::Future, service: S) -> Self { Self { - state: State::Check(check), + state: State::Check { check }, service, } } @@ -75,12 +84,12 @@ where loop { match this.state.as_mut().project() { - StateProj::Check(mut check) => { + StateProj::Check { mut check } => { let request = ready!(check.as_mut().poll(cx))?; let response = this.service.call(request); - this.state.set(State::WaitResponse(response)); + this.state.set(State::WaitResponse { response }); } - StateProj::WaitResponse(response) => { + StateProj::WaitResponse { response } => { return response.poll(cx).map_err(Into::into); } } diff --git a/tower/src/filter/mod.rs b/tower/src/filter/mod.rs index 344c8ed2a..9ed7f63d7 100644 --- a/tower/src/filter/mod.rs +++ b/tower/src/filter/mod.rs @@ -112,7 +112,7 @@ where } fn call(&mut self, request: Request) -> Self::Future { - ResponseFuture(match self.predicate.check(request) { + ResponseFuture::new(match self.predicate.check(request) { Ok(request) => Either::Right(self.inner.call(request).err_into()), Err(e) => Either::Left(futures_util::future::ready(Err(e.into()))), }) diff --git a/tower/src/hedge/delay.rs b/tower/src/hedge/delay.rs index 7f35a7069..3d634bfaa 100644 --- a/tower/src/hedge/delay.rs +++ b/tower/src/hedge/delay.rs @@ -1,5 +1,5 @@ use futures_util::ready; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::time::Duration; use std::{ future::Future, @@ -23,22 +23,42 @@ pub struct Delay { service: S, } -#[pin_project] -#[derive(Debug)] -pub struct ResponseFuture -where - S: Service, -{ - service: Option, - #[pin] - state: State>, +pin_project! { + #[derive(Debug)] + pub struct ResponseFuture + where + S: Service, + { + service: Option, + #[pin] + state: State>, + } } -#[pin_project(project = StateProj)] -#[derive(Debug)] -enum State { - Delaying(#[pin] tokio::time::Sleep, Option), - Called(#[pin] F), +pin_project! { + #[project = StateProj] + #[derive(Debug)] + enum State { + Delaying { + #[pin] + delay: tokio::time::Sleep, + req: Option, + }, + Called { + #[pin] + fut: F, + }, + } +} + +impl State { + fn delaying(delay: tokio::time::Sleep, req: Option) -> Self { + Self::Delaying { delay, req } + } + + fn called(fut: F) -> Self { + Self::Called { fut } + } } impl Delay { @@ -73,7 +93,7 @@ where let delay = self.policy.delay(&request); ResponseFuture { service: Some(self.service.clone()), - state: State::Delaying(tokio::time::sleep(delay), Some(request)), + state: State::delaying(tokio::time::sleep(delay), Some(request)), } } } @@ -90,14 +110,14 @@ where loop { match this.state.as_mut().project() { - StateProj::Delaying(delay, req) => { + StateProj::Delaying { delay, req } => { ready!(delay.poll(cx)); let req = req.take().expect("Missing request in delay"); let svc = this.service.take().expect("Missing service in delay"); let fut = Oneshot::new(svc, req); - this.state.set(State::Called(fut)); + this.state.set(State::called(fut)); } - StateProj::Called(fut) => { + StateProj::Called { fut } => { return fut.poll(cx).map_err(Into::into); } }; diff --git a/tower/src/hedge/latency.rs b/tower/src/hedge/latency.rs index ebe3fff20..5f99642ba 100644 --- a/tower/src/hedge/latency.rs +++ b/tower/src/hedge/latency.rs @@ -1,5 +1,5 @@ use futures_util::ready; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::time::Duration; use std::{ future::Future, @@ -24,13 +24,14 @@ pub struct Latency { service: S, } -#[pin_project] -#[derive(Debug)] -pub struct ResponseFuture { - start: Instant, - rec: R, - #[pin] - inner: F, +pin_project! { + #[derive(Debug)] + pub struct ResponseFuture { + start: Instant, + rec: R, + #[pin] + inner: F, + } } impl Latency diff --git a/tower/src/hedge/mod.rs b/tower/src/hedge/mod.rs index f2fc650c0..3cd152e7e 100644 --- a/tower/src/hedge/mod.rs +++ b/tower/src/hedge/mod.rs @@ -5,7 +5,7 @@ use crate::filter::AsyncFilter; use futures_util::future; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::sync::{Arc, Mutex}; use std::time::Duration; use std::{ @@ -37,17 +37,18 @@ type Service = select::Select< #[derive(Debug)] pub struct Hedge(Service); -/// The [`Future`] returned by the [`Hedge`] service. -/// -/// [`Future`]: std::future::Future -#[pin_project] -#[derive(Debug)] -pub struct Future -where - S: tower_service::Service, -{ - #[pin] - inner: S::Future, +pin_project! { + /// The [`Future`] returned by the [`Hedge`] service. + /// + /// [`Future`]: std::future::Future + #[derive(Debug)] + pub struct Future + where + S: tower_service::Service, + { + #[pin] + inner: S::Future, + } } /// A policy which describes which requests can be cloned and then whether those diff --git a/tower/src/hedge/select.rs b/tower/src/hedge/select.rs index 2841d59cb..5d1573c08 100644 --- a/tower/src/hedge/select.rs +++ b/tower/src/hedge/select.rs @@ -1,4 +1,4 @@ -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::{ future::Future, pin::Pin, @@ -23,13 +23,14 @@ pub struct Select { b: B, } -#[pin_project] -#[derive(Debug)] -pub struct ResponseFuture { - #[pin] - a_fut: AF, - #[pin] - b_fut: Option, +pin_project! { + #[derive(Debug)] + pub struct ResponseFuture { + #[pin] + a_fut: AF, + #[pin] + b_fut: Option, + } } impl Select { diff --git a/tower/src/lib.rs b/tower/src/lib.rs index f12439e7f..458e4f585 100644 --- a/tower/src/lib.rs +++ b/tower/src/lib.rs @@ -5,7 +5,7 @@ rust_2018_idioms, unreachable_pub )] -#![deny(broken_intra_doc_links)] +#![deny(rustdoc::broken_intra_doc_links)] #![allow(elided_lifetimes_in_paths, clippy::type_complexity)] #![cfg_attr(test, allow(clippy::float_cmp))] #![cfg_attr(docsrs, feature(doc_cfg))] diff --git a/tower/src/limit/concurrency/future.rs b/tower/src/limit/concurrency/future.rs index 1fea13438..6eb0100a8 100644 --- a/tower/src/limit/concurrency/future.rs +++ b/tower/src/limit/concurrency/future.rs @@ -2,7 +2,7 @@ //! //! [`Future`]: std::future::Future use futures_core::ready; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::{ future::Future, pin::Pin, @@ -10,16 +10,17 @@ use std::{ }; use tokio::sync::OwnedSemaphorePermit; -/// Future for the [`ConcurrencyLimit`] service. -/// -/// [`ConcurrencyLimit`]: crate::limit::ConcurrencyLimit -#[pin_project] -#[derive(Debug)] -pub struct ResponseFuture { - #[pin] - inner: T, - // Keep this around so that it is dropped when the future completes - _permit: OwnedSemaphorePermit, +pin_project! { + /// Future for the [`ConcurrencyLimit`] service. + /// + /// [`ConcurrencyLimit`]: crate::limit::ConcurrencyLimit + #[derive(Debug)] + pub struct ResponseFuture { + #[pin] + inner: T, + // Keep this around so that it is dropped when the future completes + _permit: OwnedSemaphorePermit, + } } impl ResponseFuture { diff --git a/tower/src/load/completion.rs b/tower/src/load/completion.rs index 5040a4a88..3c14a7ff7 100644 --- a/tower/src/load/completion.rs +++ b/tower/src/load/completion.rs @@ -1,7 +1,7 @@ //! Application-specific request completion semantics. use futures_core::ready; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::{ future::Future, pin::Pin, @@ -44,14 +44,15 @@ pub trait TrackCompletion: Clone { #[non_exhaustive] pub struct CompleteOnResponse; -/// Attaches a `C`-typed completion tracker to the result of an `F`-typed [`Future`]. -#[pin_project] -#[derive(Debug)] -pub struct TrackCompletionFuture { - #[pin] - future: F, - handle: Option, - completion: C, +pin_project! { + /// Attaches a `C`-typed completion tracker to the result of an `F`-typed [`Future`]. + #[derive(Debug)] + pub struct TrackCompletionFuture { + #[pin] + future: F, + handle: Option, + completion: C, + } } // ===== impl InstrumentFuture ===== diff --git a/tower/src/load/constant.rs b/tower/src/load/constant.rs index a116d4379..a7c874e2b 100644 --- a/tower/src/load/constant.rs +++ b/tower/src/load/constant.rs @@ -8,18 +8,19 @@ use futures_core::{ready, Stream}; use std::pin::Pin; use super::Load; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::task::{Context, Poll}; use tower_service::Service; -/// Wraps a type so that it implements [`Load`] and returns a constant load metric. -/// -/// This load estimator is primarily useful for testing. -#[pin_project] -#[derive(Debug)] -pub struct Constant { - inner: T, - load: M, +pin_project! { + #[derive(Debug)] + /// Wraps a type so that it implements [`Load`] and returns a constant load metric. + /// + /// This load estimator is primarily useful for testing. + pub struct Constant { + inner: T, + load: M, + } } // ===== impl Constant ===== diff --git a/tower/src/load/peak_ewma.rs b/tower/src/load/peak_ewma.rs index 0568ceee7..13cb1b104 100644 --- a/tower/src/load/peak_ewma.rs +++ b/tower/src/load/peak_ewma.rs @@ -5,7 +5,7 @@ use crate::discover::{Change, Discover}; #[cfg(feature = "discover")] use futures_core::{ready, Stream}; #[cfg(feature = "discover")] -use pin_project::pin_project; +use pin_project_lite::pin_project; #[cfg(feature = "discover")] use std::pin::Pin; @@ -48,17 +48,18 @@ pub struct PeakEwma { completion: C, } -/// Wraps a `D`-typed stream of discovered services with `PeakEwma`. -#[pin_project] -#[derive(Debug)] #[cfg(feature = "discover")] #[cfg_attr(docsrs, doc(cfg(feature = "discover")))] -pub struct PeakEwmaDiscover { - #[pin] - discover: D, - decay_ns: f64, - default_rtt: Duration, - completion: C, +pin_project! { + /// Wraps a `D`-typed stream of discovered services with `PeakEwma`. + #[derive(Debug)] + pub struct PeakEwmaDiscover { + #[pin] + discover: D, + decay_ns: f64, + default_rtt: Duration, + completion: C, + } } /// Represents the relative cost of communicating with a service. @@ -378,11 +379,11 @@ mod tests { time::advance(Duration::from_millis(100)).await; let () = assert_ready_ok!(rsp0.poll()); - assert_eq!(svc.load(), Cost(404_000_000.0)); + assert_eq!(svc.load(), Cost(400_000_000.0)); time::advance(Duration::from_millis(100)).await; let () = assert_ready_ok!(rsp1.poll()); - assert_eq!(svc.load(), Cost(202_000_000.0)); + assert_eq!(svc.load(), Cost(200_000_000.0)); // Check that values decay as time elapses time::advance(Duration::from_secs(1)).await; diff --git a/tower/src/load/pending_requests.rs b/tower/src/load/pending_requests.rs index b1769e948..54550bfaa 100644 --- a/tower/src/load/pending_requests.rs +++ b/tower/src/load/pending_requests.rs @@ -5,7 +5,7 @@ use crate::discover::{Change, Discover}; #[cfg(feature = "discover")] use futures_core::{ready, Stream}; #[cfg(feature = "discover")] -use pin_project::pin_project; +use pin_project_lite::pin_project; #[cfg(feature = "discover")] use std::pin::Pin; @@ -27,15 +27,16 @@ pub struct PendingRequests { #[derive(Clone, Debug, Default)] struct RefCount(Arc<()>); -/// Wraps a `D`-typed stream of discovered services with [`PendingRequests`]. -#[pin_project] -#[derive(Debug)] #[cfg(feature = "discover")] #[cfg_attr(docsrs, doc(cfg(feature = "discover")))] -pub struct PendingRequestsDiscover { - #[pin] - discover: D, - completion: C, +pin_project! { + /// Wraps a `D`-typed stream of discovered services with [`PendingRequests`]. + #[derive(Debug)] + pub struct PendingRequestsDiscover { + #[pin] + discover: D, + completion: C, + } } /// Represents the number of currently-pending requests to a given service. diff --git a/tower/src/load_shed/future.rs b/tower/src/load_shed/future.rs index 045c197da..64f394c92 100644 --- a/tower/src/load_shed/future.rs +++ b/tower/src/load_shed/future.rs @@ -6,29 +6,35 @@ use std::pin::Pin; use std::task::{Context, Poll}; use futures_core::ready; -use pin_project::pin_project; +use pin_project_lite::pin_project; use super::error::Overloaded; -/// Future for the [`LoadShed`] service. -/// -/// [`LoadShed`]: crate::load_shed::LoadShed -#[pin_project] -pub struct ResponseFuture { - #[pin] - state: ResponseState, +pin_project! { + /// Future for the [`LoadShed`] service. + /// + /// [`LoadShed`]: crate::load_shed::LoadShed + pub struct ResponseFuture { + #[pin] + state: ResponseState, + } } -#[pin_project(project = ResponseStateProj)] -enum ResponseState { - Called(#[pin] F), - Overloaded, +pin_project! { + #[project = ResponseStateProj] + enum ResponseState { + Called { + #[pin] + fut: F + }, + Overloaded, + } } impl ResponseFuture { pub(crate) fn called(fut: F) -> Self { ResponseFuture { - state: ResponseState::Called(fut), + state: ResponseState::Called { fut }, } } @@ -48,7 +54,9 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.project().state.project() { - ResponseStateProj::Called(fut) => Poll::Ready(ready!(fut.poll(cx)).map_err(Into::into)), + ResponseStateProj::Called { fut } => { + Poll::Ready(ready!(fut.poll(cx)).map_err(Into::into)) + } ResponseStateProj::Overloaded => Poll::Ready(Err(Overloaded::new().into())), } } diff --git a/tower/src/macros.rs b/tower/src/macros.rs index e57c522dc..f30775663 100644 --- a/tower/src/macros.rs +++ b/tower/src/macros.rs @@ -6,9 +6,21 @@ ))] macro_rules! opaque_future { ($(#[$m:meta])* pub type $name:ident<$($param:ident),+> = $actual:ty;) => { - #[pin_project::pin_project] - $(#[$m])* - pub struct $name<$($param),+>(#[pin] pub(crate) $actual); + pin_project_lite::pin_project! { + $(#[$m])* + pub struct $name<$($param),+> { + #[pin] + inner: $actual + } + } + + impl<$($param),+> $name<$($param),+> { + pub(crate) fn new(inner: $actual) -> Self { + Self { + inner + } + } + } impl<$($param),+> std::fmt::Debug for $name<$($param),+> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -23,7 +35,7 @@ macro_rules! opaque_future { type Output = <$actual as std::future::Future>::Output; #[inline] fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { - self.project().0.poll(cx) + self.project().inner.poll(cx) } } } diff --git a/tower/src/make/make_service/shared.rs b/tower/src/make/make_service/shared.rs index 6c427fe7f..fd308a02a 100644 --- a/tower/src/make/make_service/shared.rs +++ b/tower/src/make/make_service/shared.rs @@ -93,7 +93,7 @@ where } fn call(&mut self, _target: T) -> Self::Future { - SharedFuture(futures_util::future::ready(Ok(self.service.clone()))) + SharedFuture::new(futures_util::future::ready(Ok(self.service.clone()))) } } diff --git a/tower/src/reconnect/future.rs b/tower/src/reconnect/future.rs index 4e13aa4e8..3c295b961 100644 --- a/tower/src/reconnect/future.rs +++ b/tower/src/reconnect/future.rs @@ -1,35 +1,53 @@ -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::{ future::Future, pin::Pin, task::{Context, Poll}, }; -/// Future that resolves to the response or failure to connect. -#[pin_project] -#[derive(Debug)] -pub struct ResponseFuture { - #[pin] - inner: Inner, +pin_project! { + /// Future that resolves to the response or failure to connect. + #[derive(Debug)] + pub struct ResponseFuture { + #[pin] + inner: Inner, + } +} + +pin_project! { + #[project = InnerProj] + #[derive(Debug)] + enum Inner { + Future { + #[pin] + fut: F, + }, + Error { + error: Option, + }, + } } -#[pin_project(project = InnerProj)] -#[derive(Debug)] -enum Inner { - Future(#[pin] F), - Error(Option), +impl Inner { + fn future(fut: F) -> Self { + Self::Future { fut } + } + + fn error(error: Option) -> Self { + Self::Error { error } + } } impl ResponseFuture { pub(crate) fn new(inner: F) -> Self { ResponseFuture { - inner: Inner::Future(inner), + inner: Inner::future(inner), } } pub(crate) fn error(error: E) -> Self { ResponseFuture { - inner: Inner::Error(Some(error)), + inner: Inner::error(Some(error)), } } } @@ -45,9 +63,9 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let me = self.project(); match me.inner.project() { - InnerProj::Future(fut) => fut.poll(cx).map_err(Into::into), - InnerProj::Error(e) => { - let e = e.take().expect("Polled after ready.").into(); + InnerProj::Future { fut } => fut.poll(cx).map_err(Into::into), + InnerProj::Error { error } => { + let e = error.take().expect("Polled after ready.").into(); Poll::Ready(Err(e)) } } diff --git a/tower/src/retry/future.rs b/tower/src/retry/future.rs index d6988ec30..d18a5abb7 100644 --- a/tower/src/retry/future.rs +++ b/tower/src/retry/future.rs @@ -2,36 +2,45 @@ use super::{Policy, Retry}; use futures_core::ready; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use tower_service::Service; -/// The [`Future`] returned by a [`Retry`] service. -#[pin_project] -#[derive(Debug)] -pub struct ResponseFuture -where - P: Policy, - S: Service, -{ - request: Option, - #[pin] - retry: Retry, - #[pin] - state: State, +pin_project! { + /// The [`Future`] returned by a [`Retry`] service. + #[derive(Debug)] + pub struct ResponseFuture + where + P: Policy, + S: Service, + { + request: Option, + #[pin] + retry: Retry, + #[pin] + state: State, + } } -#[pin_project(project = StateProj)] -#[derive(Debug)] -enum State { - /// Polling the future from [`Service::call`] - Called(#[pin] F), - /// Polling the future from [`Policy::retry`] - Checking(#[pin] P), - /// Polling [`Service::poll_ready`] after [`Checking`] was OK. - Retrying, +pin_project! { + #[project = StateProj] + #[derive(Debug)] + enum State { + // Polling the future from [`Service::call`] + Called { + #[pin] + future: F + }, + // Polling the future from [`Policy::retry`] + Checking { + #[pin] + checking: P + }, + // Polling [`Service::poll_ready`] after [`Checking`] was OK. + Retrying, + } } impl ResponseFuture @@ -47,7 +56,7 @@ where ResponseFuture { request, retry, - state: State::Called(future), + state: State::Called { future }, } } } @@ -64,12 +73,12 @@ where loop { match this.state.as_mut().project() { - StateProj::Called(future) => { + StateProj::Called { future } => { let result = ready!(future.poll(cx)); if let Some(ref req) = this.request { match this.retry.policy.retry(req, result.as_ref()) { Some(checking) => { - this.state.set(State::Checking(checking)); + this.state.set(State::Checking { checking }); } None => return Poll::Ready(result), } @@ -78,12 +87,12 @@ where return Poll::Ready(result); } } - StateProj::Checking(future) => { + StateProj::Checking { checking } => { this.retry .as_mut() .project() .policy - .set(ready!(future.poll(cx))); + .set(ready!(checking.poll(cx))); this.state.set(State::Retrying); } StateProj::Retrying => { @@ -104,9 +113,9 @@ where .take() .expect("retrying requires cloned request"); *this.request = this.retry.policy.clone_request(&req); - this.state.set(State::Called( - this.retry.as_mut().project().service.call(req), - )); + this.state.set(State::Called { + future: this.retry.as_mut().project().service.call(req), + }); } } } diff --git a/tower/src/retry/mod.rs b/tower/src/retry/mod.rs index 98fb084e9..a9e2738a1 100644 --- a/tower/src/retry/mod.rs +++ b/tower/src/retry/mod.rs @@ -9,19 +9,20 @@ pub use self::layer::RetryLayer; pub use self::policy::Policy; use self::future::ResponseFuture; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::task::{Context, Poll}; use tower_service::Service; -/// Configure retrying requests of "failed" responses. -/// -/// A [`Policy`] classifies what is a "failed" response. -#[pin_project] -#[derive(Clone, Debug)] -pub struct Retry { - #[pin] - policy: P, - service: S, +pin_project! { + /// Configure retrying requests of "failed" responses. + /// + /// A [`Policy`] classifies what is a "failed" response. + #[derive(Clone, Debug)] + pub struct Retry { + #[pin] + policy: P, + service: S, + } } // ===== impl Retry ===== diff --git a/tower/src/spawn_ready/make.rs b/tower/src/spawn_ready/make.rs index e2c1f2aed..a8adc73e9 100644 --- a/tower/src/spawn_ready/make.rs +++ b/tower/src/spawn_ready/make.rs @@ -1,6 +1,6 @@ use super::SpawnReady; use futures_core::ready; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::{ future::Future, pin::Pin, @@ -21,12 +21,13 @@ impl MakeSpawnReady { } } -/// Builds a [`SpawnReady`] with the result of an inner [`Future`]. -#[pin_project] -#[derive(Debug)] -pub struct MakeFuture { - #[pin] - inner: F, +pin_project! { + /// Builds a [`SpawnReady`] with the result of an inner [`Future`]. + #[derive(Debug)] + pub struct MakeFuture { + #[pin] + inner: F, + } } impl Service for MakeSpawnReady diff --git a/tower/src/spawn_ready/service.rs b/tower/src/spawn_ready/service.rs index 3b2af5daa..74618432a 100644 --- a/tower/src/spawn_ready/service.rs +++ b/tower/src/spawn_ready/service.rs @@ -75,7 +75,7 @@ where fn call(&mut self, request: Req) -> Self::Future { match self.inner { Inner::Service(Some(ref mut svc)) => { - ResponseFuture(svc.call(request).map_err(Into::into)) + ResponseFuture::new(svc.call(request).map_err(Into::into)) } _ => unreachable!("poll_ready must be called"), } diff --git a/tower/src/timeout/future.rs b/tower/src/timeout/future.rs index a14ad590d..b4eb3f4e3 100644 --- a/tower/src/timeout/future.rs +++ b/tower/src/timeout/future.rs @@ -1,7 +1,7 @@ //! Future types use super::error::Elapsed; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::{ future::Future, pin::Pin, @@ -9,16 +9,17 @@ use std::{ }; use tokio::time::Sleep; -/// [`Timeout`] response future -/// -/// [`Timeout`]: crate::timeout::Timeout -#[pin_project] -#[derive(Debug)] -pub struct ResponseFuture { - #[pin] - response: T, - #[pin] - sleep: Sleep, +pin_project! { + /// [`Timeout`] response future + /// + /// [`Timeout`]: crate::timeout::Timeout + #[derive(Debug)] + pub struct ResponseFuture { + #[pin] + response: T, + #[pin] + sleep: Sleep, + } } impl ResponseFuture { diff --git a/tower/src/util/and_then.rs b/tower/src/util/and_then.rs index 10e47b167..819ca273c 100644 --- a/tower/src/util/and_then.rs +++ b/tower/src/util/and_then.rs @@ -28,13 +28,21 @@ where } } -/// Response future from [`AndThen`] services. -/// -/// [`AndThen`]: crate::util::AndThen -#[pin_project::pin_project] -pub struct AndThenFuture( - #[pin] pub(crate) future::AndThen, F2, N>, -); +pin_project_lite::pin_project! { + /// Response future from [`AndThen`] services. + /// + /// [`AndThen`]: crate::util::AndThen + pub struct AndThenFuture { + #[pin] + inner: future::AndThen, F2, N>, + } +} + +impl AndThenFuture { + pub(crate) fn new(inner: future::AndThen, F2, N>) -> Self { + Self { inner } + } +} impl std::fmt::Debug for AndThenFuture { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -52,7 +60,7 @@ where #[inline] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().0.poll(cx) + self.project().inner.poll(cx) } } @@ -96,7 +104,7 @@ where } fn call(&mut self, request: Request) -> Self::Future { - AndThenFuture(self.inner.call(request).err_into().and_then(self.f.clone())) + AndThenFuture::new(self.inner.call(request).err_into().and_then(self.f.clone())) } } diff --git a/tower/src/util/call_all/common.rs b/tower/src/util/call_all/common.rs index fde0f0452..09c458a9b 100644 --- a/tower/src/util/call_all/common.rs +++ b/tower/src/util/call_all/common.rs @@ -1,5 +1,5 @@ use futures_core::{ready, Stream}; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::{ future::Future, pin::Pin, @@ -7,15 +7,16 @@ use std::{ }; use tower_service::Service; -/// The [`Future`] returned by the [`ServiceExt::call_all`] combinator. -#[pin_project] -#[derive(Debug)] -pub(crate) struct CallAll { - service: Option, - #[pin] - stream: S, - queue: Q, - eof: bool, +pin_project! { + /// The [`Future`] returned by the [`ServiceExt::call_all`] combinator. + #[derive(Debug)] + pub(crate) struct CallAll { + service: Option, + #[pin] + stream: S, + queue: Q, + eof: bool, + } } pub(crate) trait Drive { diff --git a/tower/src/util/call_all/ordered.rs b/tower/src/util/call_all/ordered.rs index 1d91056c5..a0c9ae4bb 100644 --- a/tower/src/util/call_all/ordered.rs +++ b/tower/src/util/call_all/ordered.rs @@ -6,7 +6,7 @@ use super::common; use futures_core::Stream; use futures_util::stream::FuturesOrdered; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::{ future::Future, pin::Pin, @@ -14,84 +14,85 @@ use std::{ }; use tower_service::Service; -/// This is a [`Stream`] of responses resulting from calling the wrapped [`Service`] for each -/// request received on the wrapped [`Stream`]. -/// -/// ```rust -/// # use std::task::{Poll, Context}; -/// # use std::cell::Cell; -/// # use std::error::Error; -/// # use std::rc::Rc; -/// # -/// use futures::future::{ready, Ready}; -/// use futures::StreamExt; -/// use futures::channel::mpsc; -/// use tower_service::Service; -/// use tower::util::ServiceExt; -/// -/// // First, we need to have a Service to process our requests. -/// #[derive(Debug, Eq, PartialEq)] -/// struct FirstLetter; -/// impl Service<&'static str> for FirstLetter { -/// type Response = &'static str; -/// type Error = Box; -/// type Future = Ready>; -/// -/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { -/// Poll::Ready(Ok(())) -/// } -/// -/// fn call(&mut self, req: &'static str) -> Self::Future { -/// ready(Ok(&req[..1])) -/// } -/// } -/// -/// #[tokio::main] -/// async fn main() { -/// // Next, we need a Stream of requests. -// TODO(eliza): when `tokio-util` has a nice way to convert MPSCs to streams, -// tokio::sync::mpsc again? -/// let (mut reqs, rx) = mpsc::unbounded(); -/// // Note that we have to help Rust out here by telling it what error type to use. -/// // Specifically, it has to be From + From. -/// let mut rsps = FirstLetter.call_all(rx); -/// -/// // Now, let's send a few requests and then check that we get the corresponding responses. -/// reqs.unbounded_send("one").unwrap(); -/// reqs.unbounded_send("two").unwrap(); -/// reqs.unbounded_send("three").unwrap(); -/// drop(reqs); -/// -/// // We then loop over the response Strem that we get back from call_all. -/// let mut i = 0usize; -/// while let Some(rsp) = rsps.next().await { -/// // Each response is a Result (we could also have used TryStream::try_next) -/// match (i + 1, rsp.unwrap()) { -/// (1, "o") | -/// (2, "t") | -/// (3, "t") => {} -/// (n, i) => { -/// unreachable!("{}. response was '{}'", n, i); -/// } -/// } -/// i += 1; -/// } -/// -/// // And at the end, we can get the Service back when there are no more requests. -/// assert_eq!(rsps.into_inner(), FirstLetter); -/// } -/// ``` -/// -/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html -#[pin_project] -#[derive(Debug)] -pub struct CallAll -where - Svc: Service, - S: Stream, -{ - #[pin] - inner: common::CallAll>, +pin_project! { + /// This is a [`Stream`] of responses resulting from calling the wrapped [`Service`] for each + /// request received on the wrapped [`Stream`]. + /// + /// ```rust + /// # use std::task::{Poll, Context}; + /// # use std::cell::Cell; + /// # use std::error::Error; + /// # use std::rc::Rc; + /// # + /// use futures::future::{ready, Ready}; + /// use futures::StreamExt; + /// use futures::channel::mpsc; + /// use tower_service::Service; + /// use tower::util::ServiceExt; + /// + /// // First, we need to have a Service to process our requests. + /// #[derive(Debug, Eq, PartialEq)] + /// struct FirstLetter; + /// impl Service<&'static str> for FirstLetter { + /// type Response = &'static str; + /// type Error = Box; + /// type Future = Ready>; + /// + /// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + /// Poll::Ready(Ok(())) + /// } + /// + /// fn call(&mut self, req: &'static str) -> Self::Future { + /// ready(Ok(&req[..1])) + /// } + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// // Next, we need a Stream of requests. + // TODO(eliza): when `tokio-util` has a nice way to convert MPSCs to streams, + // tokio::sync::mpsc again? + /// let (mut reqs, rx) = mpsc::unbounded(); + /// // Note that we have to help Rust out here by telling it what error type to use. + /// // Specifically, it has to be From + From. + /// let mut rsps = FirstLetter.call_all(rx); + /// + /// // Now, let's send a few requests and then check that we get the corresponding responses. + /// reqs.unbounded_send("one").unwrap(); + /// reqs.unbounded_send("two").unwrap(); + /// reqs.unbounded_send("three").unwrap(); + /// drop(reqs); + /// + /// // We then loop over the response Strem that we get back from call_all. + /// let mut i = 0usize; + /// while let Some(rsp) = rsps.next().await { + /// // Each response is a Result (we could also have used TryStream::try_next) + /// match (i + 1, rsp.unwrap()) { + /// (1, "o") | + /// (2, "t") | + /// (3, "t") => {} + /// (n, i) => { + /// unreachable!("{}. response was '{}'", n, i); + /// } + /// } + /// i += 1; + /// } + /// + /// // And at the end, we can get the Service back when there are no more requests. + /// assert_eq!(rsps.into_inner(), FirstLetter); + /// } + /// ``` + /// + /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html + #[derive(Debug)] + pub struct CallAll + where + Svc: Service, + S: Stream, + { + #[pin] + inner: common::CallAll>, + } } impl CallAll diff --git a/tower/src/util/call_all/unordered.rs b/tower/src/util/call_all/unordered.rs index 6d1fd640f..acf2c86c7 100644 --- a/tower/src/util/call_all/unordered.rs +++ b/tower/src/util/call_all/unordered.rs @@ -6,7 +6,7 @@ use super::common; use futures_core::Stream; use futures_util::stream::FuturesUnordered; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::{ future::Future, pin::Pin, @@ -14,21 +14,22 @@ use std::{ }; use tower_service::Service; -/// A stream of responses received from the inner service in received order. -/// -/// Similar to [`CallAll`] except, instead of yielding responses in request order, -/// responses are returned as they are available. -/// -/// [`CallAll`]: crate::util::CallAll -#[pin_project] -#[derive(Debug)] -pub struct CallAllUnordered -where - Svc: Service, - S: Stream, -{ - #[pin] - inner: common::CallAll>, +pin_project! { + /// A stream of responses received from the inner service in received order. + /// + /// Similar to [`CallAll`] except, instead of yielding responses in request order, + /// responses are returned as they are available. + /// + /// [`CallAll`]: crate::util::CallAll + #[derive(Debug)] + pub struct CallAllUnordered + where + Svc: Service, + S: Stream, + { + #[pin] + inner: common::CallAll>, + } } impl CallAllUnordered diff --git a/tower/src/util/map_err.rs b/tower/src/util/map_err.rs index 66e7bc266..b79c5fee2 100644 --- a/tower/src/util/map_err.rs +++ b/tower/src/util/map_err.rs @@ -72,7 +72,7 @@ where #[inline] fn call(&mut self, request: Request) -> Self::Future { - MapErrFuture(self.inner.call(request).map_err(self.f.clone())) + MapErrFuture::new(self.inner.call(request).map_err(self.f.clone())) } } diff --git a/tower/src/util/map_response.rs b/tower/src/util/map_response.rs index a8dbff9f6..249be3a4d 100644 --- a/tower/src/util/map_response.rs +++ b/tower/src/util/map_response.rs @@ -72,7 +72,7 @@ where #[inline] fn call(&mut self, request: Request) -> Self::Future { - MapResponseFuture(self.inner.call(request).map_ok(self.f.clone())) + MapResponseFuture::new(self.inner.call(request).map_ok(self.f.clone())) } } diff --git a/tower/src/util/map_result.rs b/tower/src/util/map_result.rs index 00e13f5b0..bfe16b5b4 100644 --- a/tower/src/util/map_result.rs +++ b/tower/src/util/map_result.rs @@ -73,7 +73,7 @@ where #[inline] fn call(&mut self, request: Request) -> Self::Future { - MapResultFuture(self.inner.call(request).map(self.f.clone())) + MapResultFuture::new(self.inner.call(request).map(self.f.clone())) } } diff --git a/tower/src/util/mod.rs b/tower/src/util/mod.rs index 68f683fc7..15cd0735b 100644 --- a/tower/src/util/mod.rs +++ b/tower/src/util/mod.rs @@ -133,7 +133,7 @@ pub trait ServiceExt: tower_service::Service { /// # struct DatabaseService; /// # impl DatabaseService { /// # fn new(address: &str) -> Self { - /// # DatabaseService + /// # DatabaseService /// # } /// # } /// # diff --git a/tower/src/util/oneshot.rs b/tower/src/util/oneshot.rs index 3e3ebbb1b..93b5070be 100644 --- a/tower/src/util/oneshot.rs +++ b/tower/src/util/oneshot.rs @@ -1,5 +1,5 @@ use futures_core::ready; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::{ fmt, future::Future, @@ -8,21 +8,40 @@ use std::{ }; use tower_service::Service; -/// A [`Future`] consuming a [`Service`] and request, waiting until the [`Service`] -/// is ready, and then calling [`Service::call`] with the request, and -/// waiting for that [`Future`]. -#[pin_project] -#[derive(Debug)] -pub struct Oneshot, Req> { - #[pin] - state: State, +pin_project! { + /// A [`Future`] consuming a [`Service`] and request, waiting until the [`Service`] + /// is ready, and then calling [`Service::call`] with the request, and + /// waiting for that [`Future`]. + #[derive(Debug)] + pub struct Oneshot, Req> { + #[pin] + state: State, + } +} + +pin_project! { + #[project = StateProj] + enum State, Req> { + NotReady { + svc: S, + req: Option, + }, + Called { + #[pin] + fut: S::Future, + }, + Done, + } } -#[pin_project(project = StateProj)] -enum State, Req> { - NotReady(S, Option), - Called(#[pin] S::Future), - Done, +impl, Req> State { + fn not_ready(svc: S, req: Option) -> Self { + Self::NotReady { svc, req } + } + + fn called(fut: S::Future) -> Self { + Self::Called { fut } + } } impl fmt::Debug for State @@ -32,13 +51,16 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - State::NotReady(s, Some(req)) => f + State::NotReady { + svc, + req: Some(req), + } => f .debug_tuple("State::NotReady") - .field(s) + .field(svc) .field(req) .finish(), - State::NotReady(_, None) => unreachable!(), - State::Called(_) => f.debug_tuple("State::Called").field(&"S::Future").finish(), + State::NotReady { req: None, .. } => unreachable!(), + State::Called { .. } => f.debug_tuple("State::Called").field(&"S::Future").finish(), State::Done => f.debug_tuple("State::Done").finish(), } } @@ -51,7 +73,7 @@ where #[allow(missing_docs)] pub fn new(svc: S, req: Req) -> Self { Oneshot { - state: State::NotReady(svc, Some(req)), + state: State::not_ready(svc, Some(req)), } } } @@ -66,12 +88,12 @@ where let mut this = self.project(); loop { match this.state.as_mut().project() { - StateProj::NotReady(svc, req) => { + StateProj::NotReady { svc, req } => { let _ = ready!(svc.poll_ready(cx))?; let f = svc.call(req.take().expect("already called")); - this.state.set(State::Called(f)); + this.state.set(State::called(f)); } - StateProj::Called(fut) => { + StateProj::Called { fut } => { let res = ready!(fut.poll(cx))?; this.state.set(State::Done); return Poll::Ready(Ok(res)); diff --git a/tower/src/util/optional/future.rs b/tower/src/util/optional/future.rs index 182132286..7d289b7b5 100644 --- a/tower/src/util/optional/future.rs +++ b/tower/src/util/optional/future.rs @@ -1,20 +1,21 @@ use super::error; use futures_core::ready; -use pin_project::pin_project; +use pin_project_lite::pin_project; use std::{ future::Future, pin::Pin, task::{Context, Poll}, }; -/// Response future returned by [`Optional`]. -/// -/// [`Optional`]: crate::util::Optional -#[pin_project] -#[derive(Debug)] -pub struct ResponseFuture { - #[pin] - inner: Option, +pin_project! { + /// Response future returned by [`Optional`]. + /// + /// [`Optional`]: crate::util::Optional + #[derive(Debug)] + pub struct ResponseFuture { + #[pin] + inner: Option, + } } impl ResponseFuture { diff --git a/tower/src/util/then.rs b/tower/src/util/then.rs index c59cc080a..1ec3c1492 100644 --- a/tower/src/util/then.rs +++ b/tower/src/util/then.rs @@ -77,7 +77,7 @@ where #[inline] fn call(&mut self, request: Request) -> Self::Future { - ThenFuture(self.inner.call(request).then(self.f.clone())) + ThenFuture::new(self.inner.call(request).then(self.f.clone())) } } diff --git a/tower/tests/balance/main.rs b/tower/tests/balance/main.rs index c526418da..aed51203a 100644 --- a/tower/tests/balance/main.rs +++ b/tower/tests/balance/main.rs @@ -37,7 +37,7 @@ fn stress() { let _t = support::trace_init(); let mut task = task::spawn(()); let (tx, rx) = tokio::sync::mpsc::unbounded_channel::>(); - let mut cache = Balance::<_, Req>::new(support::IntoStream(rx)); + let mut cache = Balance::<_, Req>::new(support::IntoStream::new(rx)); let mut nready = 0; let mut services = slab::Slab::<(mock::Handle, bool)>::new(); diff --git a/tower/tests/spawn_ready/main.rs b/tower/tests/spawn_ready/main.rs index be19d124d..d1890159c 100644 --- a/tower/tests/spawn_ready/main.rs +++ b/tower/tests/spawn_ready/main.rs @@ -81,5 +81,6 @@ async fn abort_on_drop() { // End the task and ensure that the inner service has been dropped. assert!(drop_tx.send(()).is_ok()); tokio_test::assert_ready!(task.poll()); + tokio::task::yield_now().await; assert!(tokio_test::assert_ready!(handle.poll_request()).is_none()); } diff --git a/tower/tests/support.rs b/tower/tests/support.rs index 150617c22..b54708229 100644 --- a/tower/tests/support.rs +++ b/tower/tests/support.rs @@ -17,15 +17,25 @@ pub(crate) fn trace_init() -> tracing::subscriber::DefaultGuard { tracing::subscriber::set_default(subscriber) } -#[pin_project::pin_project] -#[derive(Clone, Debug)] -pub struct IntoStream(#[pin] pub S); +pin_project_lite::pin_project! { + #[derive(Clone, Debug)] + pub struct IntoStream { + #[pin] + inner: S + } +} + +impl IntoStream { + pub fn new(inner: S) -> Self { + Self { inner } + } +} impl Stream for IntoStream> { type Item = I; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().0.poll_recv(cx) + self.project().inner.poll_recv(cx) } } @@ -33,7 +43,7 @@ impl Stream for IntoStream> { type Item = I; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().0.poll_recv(cx) + self.project().inner.poll_recv(cx) } } diff --git a/tower/tests/util/call_all.rs b/tower/tests/util/call_all.rs index c5d8fa0a0..6bc092918 100644 --- a/tower/tests/util/call_all.rs +++ b/tower/tests/util/call_all.rs @@ -51,7 +51,7 @@ fn ordered() { admit: admit.clone(), }; let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let ca = srv.call_all(support::IntoStream(rx)); + let ca = srv.call_all(support::IntoStream::new(rx)); pin_mut!(ca); assert_pending!(mock.enter(|cx, _| ca.as_mut().poll_next(cx)));