diff --git a/Cargo.lock b/Cargo.lock index a99c1d4e5a..9402214fac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -719,7 +719,7 @@ dependencies = [ "ring", "rustls", "tokio 0.1.22", - "tokio 0.2.17", + "tokio 0.2.19", "tokio-compat", "tokio-connect", "tokio-current-thread", @@ -790,7 +790,7 @@ dependencies = [ "rand 0.7.2", "regex 1.0.0", "tokio 0.1.22", - "tokio 0.2.17", + "tokio 0.2.19", "tokio-compat", "tokio-timer", "tower 0.3.1", @@ -899,7 +899,7 @@ dependencies = [ "futures 0.3.4", "linkerd2-error", "pin-project", - "tokio 0.2.17", + "tokio 0.2.19", "tokio-test", "tower 0.3.1", "tower-test", @@ -925,10 +925,10 @@ dependencies = [ name = "linkerd2-concurrency-limit" version = "0.1.0" dependencies = [ - "futures 0.1.26", - "linkerd2-error", - "tokio-sync", - "tower 0.1.1", + "futures 0.3.4", + "pin-project", + "tokio 0.2.19", + "tower 0.3.1", "tracing", ] @@ -964,7 +964,7 @@ dependencies = [ "futures 0.3.4", "linkerd2-error", "pin-project", - "tokio 0.2.17", + "tokio 0.2.19", "tokio-test", ] @@ -1174,7 +1174,7 @@ dependencies = [ "futures 0.3.4", "linkerd2-error", "pin-project", - "tokio 0.2.17", + "tokio 0.2.19", "tower 0.3.1", "tracing-futures 0.2.3", ] @@ -1188,7 +1188,7 @@ dependencies = [ "linkerd2-io", "linkerd2-proxy-core", "pin-project", - "tokio 0.2.17", + "tokio 0.2.19", "tower 0.3.1", ] @@ -1234,7 +1234,7 @@ dependencies = [ "rand 0.7.2", "task-compat", "tokio 0.1.22", - "tokio 0.2.17", + "tokio 0.2.19", "tokio-connect", "tokio-timer", "tower 0.1.1", @@ -1437,7 +1437,7 @@ dependencies = [ "futures 0.3.4", "linkerd2-error", "pin-project", - "tokio 0.2.17", + "tokio 0.2.19", "tower 0.3.1", ] @@ -1485,7 +1485,7 @@ dependencies = [ "linkerd2-error", "linkerd2-stack", "pin-project", - "tokio 0.2.17", + "tokio 0.2.19", "tokio-connect", "tokio-test", "tower 0.3.1", @@ -2446,9 +2446,9 @@ dependencies = [ [[package]] name = "tokio" -version = "0.2.17" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39fb9142eb6e9cc37f4f29144e62618440b149a138eee01a7bbe9b9226aaf17c" +checksum = "7d9c43f1bb96970e153bcbae39a65e249ccb942bd9d36dbdf086024920417c9c" dependencies = [ "bytes 0.5.3", "fnv", @@ -2493,7 +2493,7 @@ dependencies = [ "futures-core", "futures-util", "pin-project-lite", - "tokio 0.2.17", + "tokio 0.2.19", "tokio-current-thread", "tokio-executor", "tokio-reactor", @@ -2643,7 +2643,7 @@ checksum = "09cf9705471976fa5fc6817d3fbc9c4ff9696a6647af0e5c1870c81ca7445b05" dependencies = [ "bytes 0.5.3", "futures-core", - "tokio 0.2.17", + "tokio 0.2.19", ] [[package]] @@ -2719,7 +2719,7 @@ dependencies = [ "futures-sink", "log", "pin-project-lite", - "tokio 0.2.17", + "tokio 0.2.19", ] [[package]] @@ -2748,7 +2748,7 @@ dependencies = [ "futures-core", "futures-util", "pin-project", - "tokio 0.2.17", + "tokio 0.2.19", "tower-layer 0.3.0 (git+https://github.com/tower-rs/tower?rev=8752a3811788e94670c62dc0acbc9613207931b1)", "tower-service 0.3.0", "tracing", @@ -2965,7 +2965,7 @@ checksum = "9ba4bbc2c1e4a8543c30d4c13a4c8314ed72d6e07581910f665aa13fde0153c8" dependencies = [ "futures-util", "pin-project", - "tokio 0.2.17", + "tokio 0.2.19", "tokio-test", "tower-layer 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "tower-service 0.3.0", diff --git a/linkerd/app/core/src/svc.rs b/linkerd/app/core/src/svc.rs index c57eebce51..a4f93334e9 100644 --- a/linkerd/app/core/src/svc.rs +++ b/linkerd/app/core/src/svc.rs @@ -213,12 +213,12 @@ impl Stack { self.push(SpawnReadyLayer::new()) } - // pub fn push_concurrency_limit( - // self, - // max: usize, - // ) -> Stack> { - // self.push(concurrency_limit::Layer::new(max)) - // } + pub fn push_concurrency_limit( + self, + max: usize, + ) -> Stack> { + self.push(concurrency_limit::Layer::new(max)) + } pub fn push_timeout(self, timeout: Duration) -> Stack> { self.push(tower::timeout::TimeoutLayer::new(timeout)) diff --git a/linkerd/concurrency-limit/Cargo.toml b/linkerd/concurrency-limit/Cargo.toml index 2f12635834..303b25e609 100644 --- a/linkerd/concurrency-limit/Cargo.toml +++ b/linkerd/concurrency-limit/Cargo.toml @@ -7,8 +7,8 @@ publish = false [dependencies] -futures = "0.1" -linkerd2-error = { path = "../error" } -tokio-sync = "0.1" -tower = "0.1" +futures = "0.3" +tokio = { version = "0.2.19", features = ["sync"] } +tower = { version = "0.3", default-features = false } tracing = "0.1" +pin-project = "0.4" diff --git a/linkerd/concurrency-limit/src/lib.rs b/linkerd/concurrency-limit/src/lib.rs index d90e49c819..6b62d6965e 100644 --- a/linkerd/concurrency-limit/src/lib.rs +++ b/linkerd/concurrency-limit/src/lib.rs @@ -6,10 +6,13 @@ #![deny(warnings, rust_2018_idioms)] -use futures::{try_ready, Future, Poll}; -use linkerd2_error::Error; +use pin_project::pin_project; +use std::future::Future; +use std::pin::Pin; use std::sync::Arc; -use tokio_sync::semaphore::{Permit, Semaphore}; +use std::task::{Context, Poll}; +use std::{fmt, mem}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tower::Service; use tracing::trace; @@ -22,20 +25,24 @@ pub struct Layer { #[derive(Debug)] pub struct ConcurrencyLimit { inner: T, - limit: Limit, + semaphore: Arc, + state: State, } -#[derive(Debug)] -struct Limit { - semaphore: Arc, - permit: Permit, +enum State { + Waiting(Pin + 'static>>), + Ready(OwnedSemaphorePermit), + Empty, } /// Future for the `ConcurrencyLimit` service. +#[pin_project] #[derive(Debug)] pub struct ResponseFuture { + #[pin] inner: T, - semaphore: Arc, + // We only keep this around so that it is dropped when the future completes. + _permit: OwnedSemaphorePermit, } impl From> for Layer { @@ -63,10 +70,8 @@ impl ConcurrencyLimit { pub fn new(inner: T, semaphore: Arc) -> Self { ConcurrencyLimit { inner, - limit: Limit { - semaphore, - permit: Permit::new(), - }, + semaphore, + state: State::Empty, } } @@ -89,45 +94,42 @@ impl ConcurrencyLimit { impl Service for ConcurrencyLimit where S: Service, - S::Error: Into, { type Response = S::Response; - type Error = Error; + type Error = S::Error; type Future = ResponseFuture; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - trace!(available = %self.limit.semaphore.available_permits(), "acquiring permit"); - try_ready!(self - .limit - .permit - .poll_acquire(&self.limit.semaphore) - .map_err(Error::from)); - - self.inner.poll_ready().map_err(Into::into) + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + trace!(available = %self.semaphore.available_permits(), "acquiring permit"); + loop { + self.state = match self.state { + State::Ready(_) => { + trace!(available = %self.semaphore.available_permits(), "permit acquired"); + return Poll::Ready(Ok(())); + } + State::Waiting(ref mut fut) => { + tokio::pin!(fut); + let permit = futures::ready!(fut.poll(cx)); + State::Ready(permit) + } + State::Empty => State::Waiting(Box::pin(self.semaphore.clone().acquire_owned())), + }; + } } fn call(&mut self, request: Request) -> Self::Future { // Make sure a permit has been acquired - if self - .limit - .permit - .try_acquire(&self.limit.semaphore) - .is_err() - { - panic!("max requests in-flight; poll_ready must be called first"); - } + let _permit = match mem::replace(&mut self.state, State::Empty) { + // Take the permit. + State::Ready(permit) => permit, + // whoopsie! + _ => panic!("max requests in-flight; poll_ready must be called first"), + }; // Call the inner service let inner = self.inner.call(request); - // Forget the permit, the permit will be returned when - // `future::ResponseFuture` is dropped. - self.limit.permit.forget(); - - ResponseFuture { - inner, - semaphore: self.limit.semaphore.clone(), - } + ResponseFuture { inner, _permit } } } @@ -138,36 +140,32 @@ where fn clone(&self) -> ConcurrencyLimit { ConcurrencyLimit { inner: self.inner.clone(), - limit: Limit { - semaphore: self.limit.semaphore.clone(), - permit: Permit::new(), - }, + semaphore: self.semaphore.clone(), + state: State::Empty, } } } -impl Drop for Limit { - fn drop(&mut self) { - self.permit.release(&self.semaphore); - } -} - impl Future for ResponseFuture where T: Future, - T::Error: Into, { - type Item = T::Item; - type Error = Error; + type Output = T::Output; - fn poll(&mut self) -> Poll { - self.inner.poll().map_err(Into::into) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().inner.poll(cx) } } -impl Drop for ResponseFuture { - fn drop(&mut self) { - trace!(available = %self.semaphore.available_permits() + 1, "releasing permit"); - self.semaphore.add_permits(1); +impl fmt::Debug for State { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + State::Waiting(_) => f + .debug_tuple("State::Waiting") + .field(&format_args!("...")) + .finish(), + State::Ready(ref r) => f.debug_tuple("State::Ready").field(&r).finish(), + State::Empty => f.debug_tuple("State::Empty").finish(), + } } }