Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update concurrency-limit to std::future #492

Merged
merged 2 commits into from
Apr 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ dependencies = [
"ring",
"rustls",
"tokio 0.1.22",
"tokio 0.2.17",
"tokio 0.2.19",
"tokio-compat",
"tokio-connect",
"tokio-current-thread",
Expand Down Expand Up @@ -790,7 +790,7 @@ dependencies = [
"rand 0.7.2",
"regex 1.0.0",
"tokio 0.1.22",
"tokio 0.2.17",
"tokio 0.2.19",
"tokio-compat",
"tokio-timer",
"tower 0.3.1",
Expand Down Expand Up @@ -899,7 +899,7 @@ dependencies = [
"futures 0.3.4",
"linkerd2-error",
"pin-project",
"tokio 0.2.17",
"tokio 0.2.19",
"tokio-test",
"tower 0.3.1",
"tower-test",
Expand All @@ -925,10 +925,10 @@ dependencies = [
name = "linkerd2-concurrency-limit"
version = "0.1.0"
dependencies = [
"futures 0.1.26",
"linkerd2-error",
"tokio-sync",
"tower 0.1.1",
"futures 0.3.4",
"pin-project",
"tokio 0.2.19",
"tower 0.3.1",
"tracing",
]

Expand Down Expand Up @@ -964,7 +964,7 @@ dependencies = [
"futures 0.3.4",
"linkerd2-error",
"pin-project",
"tokio 0.2.17",
"tokio 0.2.19",
"tokio-test",
]

Expand Down Expand Up @@ -1174,7 +1174,7 @@ dependencies = [
"futures 0.3.4",
"linkerd2-error",
"pin-project",
"tokio 0.2.17",
"tokio 0.2.19",
"tower 0.3.1",
"tracing-futures 0.2.3",
]
Expand All @@ -1188,7 +1188,7 @@ dependencies = [
"linkerd2-io",
"linkerd2-proxy-core",
"pin-project",
"tokio 0.2.17",
"tokio 0.2.19",
"tower 0.3.1",
]

Expand Down Expand Up @@ -1234,7 +1234,7 @@ dependencies = [
"rand 0.7.2",
"task-compat",
"tokio 0.1.22",
"tokio 0.2.17",
"tokio 0.2.19",
"tokio-connect",
"tokio-timer",
"tower 0.1.1",
Expand Down Expand Up @@ -1437,7 +1437,7 @@ dependencies = [
"futures 0.3.4",
"linkerd2-error",
"pin-project",
"tokio 0.2.17",
"tokio 0.2.19",
"tower 0.3.1",
]

Expand Down Expand Up @@ -1485,7 +1485,7 @@ dependencies = [
"linkerd2-error",
"linkerd2-stack",
"pin-project",
"tokio 0.2.17",
"tokio 0.2.19",
"tokio-connect",
"tokio-test",
"tower 0.3.1",
Expand Down Expand Up @@ -2446,9 +2446,9 @@ dependencies = [

[[package]]
name = "tokio"
version = "0.2.17"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39fb9142eb6e9cc37f4f29144e62618440b149a138eee01a7bbe9b9226aaf17c"
checksum = "7d9c43f1bb96970e153bcbae39a65e249ccb942bd9d36dbdf086024920417c9c"
dependencies = [
"bytes 0.5.3",
"fnv",
Expand Down Expand Up @@ -2493,7 +2493,7 @@ dependencies = [
"futures-core",
"futures-util",
"pin-project-lite",
"tokio 0.2.17",
"tokio 0.2.19",
"tokio-current-thread",
"tokio-executor",
"tokio-reactor",
Expand Down Expand Up @@ -2643,7 +2643,7 @@ checksum = "09cf9705471976fa5fc6817d3fbc9c4ff9696a6647af0e5c1870c81ca7445b05"
dependencies = [
"bytes 0.5.3",
"futures-core",
"tokio 0.2.17",
"tokio 0.2.19",
]

[[package]]
Expand Down Expand Up @@ -2719,7 +2719,7 @@ dependencies = [
"futures-sink",
"log",
"pin-project-lite",
"tokio 0.2.17",
"tokio 0.2.19",
]

[[package]]
Expand Down Expand Up @@ -2748,7 +2748,7 @@ dependencies = [
"futures-core",
"futures-util",
"pin-project",
"tokio 0.2.17",
"tokio 0.2.19",
"tower-layer 0.3.0 (git+https://github.com/tower-rs/tower?rev=8752a3811788e94670c62dc0acbc9613207931b1)",
"tower-service 0.3.0",
"tracing",
Expand Down Expand Up @@ -2965,7 +2965,7 @@ checksum = "9ba4bbc2c1e4a8543c30d4c13a4c8314ed72d6e07581910f665aa13fde0153c8"
dependencies = [
"futures-util",
"pin-project",
"tokio 0.2.17",
"tokio 0.2.19",
"tokio-test",
"tower-layer 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tower-service 0.3.0",
Expand Down
12 changes: 6 additions & 6 deletions linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,12 @@ impl<S> Stack<S> {
// self.push(SpawnReadyLayer::new())
// }

// pub fn push_concurrency_limit(
// self,
// max: usize,
// ) -> Stack<concurrency_limit::ConcurrencyLimit<S>> {
// self.push(concurrency_limit::Layer::new(max))
// }
pub fn push_concurrency_limit(
self,
max: usize,
) -> Stack<concurrency_limit::ConcurrencyLimit<S>> {
self.push(concurrency_limit::Layer::new(max))
}

pub fn push_timeout(self, timeout: Duration) -> Stack<tower::timeout::Timeout<S>> {
self.push(tower::timeout::TimeoutLayer::new(timeout))
Expand Down
8 changes: 4 additions & 4 deletions linkerd/concurrency-limit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ publish = false


[dependencies]
futures = "0.1"
linkerd2-error = { path = "../error" }
tokio-sync = "0.1"
tower = "0.1"
futures = "0.3"
tokio = { version = "0.2.19", features = ["sync"] }
tower = { version = "0.3", default-features = false }
tracing = "0.1"
pin-project = "0.4"
116 changes: 57 additions & 59 deletions linkerd/concurrency-limit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@

#![deny(warnings, rust_2018_idioms)]

use futures::{try_ready, Future, Poll};
use linkerd2_error::Error;
use pin_project::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio_sync::semaphore::{Permit, Semaphore};
use std::task::{Context, Poll};
use std::{fmt, mem};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tower::Service;
use tracing::trace;

Expand All @@ -22,20 +25,24 @@ pub struct Layer {
#[derive(Debug)]
pub struct ConcurrencyLimit<T> {
inner: T,
limit: Limit,
semaphore: Arc<Semaphore>,
state: State,
}

#[derive(Debug)]
struct Limit {
semaphore: Arc<Semaphore>,
permit: Permit,
enum State {
Waiting(Pin<Box<dyn Future<Output = OwnedSemaphorePermit> + 'static>>),
Ready(OwnedSemaphorePermit),
Empty,
}

/// Future for the `ConcurrencyLimit` service.
#[pin_project]
#[derive(Debug)]
pub struct ResponseFuture<T> {
#[pin]
inner: T,
semaphore: Arc<Semaphore>,
// We only keep this around so that it is dropped when the future completes.
_permit: OwnedSemaphorePermit,
}

impl From<Arc<Semaphore>> for Layer {
Expand Down Expand Up @@ -63,10 +70,8 @@ impl<T> ConcurrencyLimit<T> {
pub fn new(inner: T, semaphore: Arc<Semaphore>) -> Self {
ConcurrencyLimit {
inner,
limit: Limit {
semaphore,
permit: Permit::new(),
},
semaphore,
state: State::Empty,
}
}

Expand All @@ -89,45 +94,42 @@ impl<T> ConcurrencyLimit<T> {
impl<S, Request> Service<Request> for ConcurrencyLimit<S>
where
S: Service<Request>,
S::Error: Into<Error>,
{
type Response = S::Response;
type Error = Error;
type Error = S::Error;
type Future = ResponseFuture<S::Future>;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
trace!(available = %self.limit.semaphore.available_permits(), "acquiring permit");
try_ready!(self
.limit
.permit
.poll_acquire(&self.limit.semaphore)
.map_err(Error::from));

self.inner.poll_ready().map_err(Into::into)
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
trace!(available = %self.semaphore.available_permits(), "acquiring permit");
loop {
self.state = match self.state {
State::Ready(_) => {
trace!(available = %self.semaphore.available_permits(), "permit acquired");
return Poll::Ready(Ok(()));
}
State::Waiting(ref mut fut) => {
tokio::pin!(fut);
let permit = futures::ready!(fut.poll(cx));
State::Ready(permit)
}
State::Empty => State::Waiting(Box::pin(self.semaphore.clone().acquire_owned())),
olix0r marked this conversation as resolved.
Show resolved Hide resolved
};
}
}

fn call(&mut self, request: Request) -> Self::Future {
// Make sure a permit has been acquired
if self
.limit
.permit
.try_acquire(&self.limit.semaphore)
.is_err()
{
panic!("max requests in-flight; poll_ready must be called first");
}
let _permit = match mem::replace(&mut self.state, State::Empty) {
// Take the permit.
State::Ready(permit) => permit,
// whoopsie!
_ => panic!("max requests in-flight; poll_ready must be called first"),
};

// Call the inner service
let inner = self.inner.call(request);

// Forget the permit, the permit will be returned when
// `future::ResponseFuture` is dropped.
self.limit.permit.forget();

ResponseFuture {
inner,
semaphore: self.limit.semaphore.clone(),
}
ResponseFuture { inner, _permit }
}
}

Expand All @@ -138,36 +140,32 @@ where
fn clone(&self) -> ConcurrencyLimit<S> {
ConcurrencyLimit {
inner: self.inner.clone(),
limit: Limit {
semaphore: self.limit.semaphore.clone(),
permit: Permit::new(),
},
semaphore: self.semaphore.clone(),
state: State::Empty,
}
}
}

impl Drop for Limit {
fn drop(&mut self) {
self.permit.release(&self.semaphore);
}
}

impl<T> Future for ResponseFuture<T>
where
T: Future,
T::Error: Into<Error>,
{
type Item = T::Item;
type Error = Error;
type Output = T::Output;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll().map_err(Into::into)
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx)
}
}

impl<T> Drop for ResponseFuture<T> {
fn drop(&mut self) {
trace!(available = %self.semaphore.available_permits() + 1, "releasing permit");
self.semaphore.add_permits(1);
impl fmt::Debug for State {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
State::Waiting(_) => f
.debug_tuple("State::Waiting")
.field(&format_args!("..."))
.finish(),
State::Ready(ref r) => f.debug_tuple("State::Ready").field(&r).finish(),
State::Empty => f.debug_tuple("State::Empty").finish(),
}
}
}