From 2b7b875b1b569f99120295abff39554ecc0861ca Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Mon, 17 Feb 2020 17:54:55 +0000 Subject: [PATCH 1/5] Reimplement the Lock middleware with tokio::sync As described in tokio-rs/tokio#2237, the `tokio::sync::Semaphore` can hold unbounded memory, especially when the semaphor is being contested and consumers drop interest. Unfortunately, this use case is common in the proxy, especially when a destination service is unavailable and the proxy is timing out requests. This change reimplements the Lock middleware without using `tokio::sync::Semaphore`. This implementation is in some ways more naive and inefficient, but it appears to be better suited for the proxy's needs. Specifically, waiters are stored in a LIFO stack, which optimizes for minimizing latency. Under certain high-load scenarios, this Lock could be forced to grow its waiters set without cleaning up expired watchers. If this becomes a more serious concern, we could change the implementation to use a FIFO queue of waiters. --- Cargo.lock | 4 + linkerd/lock/Cargo.toml | 6 + linkerd/lock/src/error.rs | 44 ++++ linkerd/lock/src/layer.rs | 20 ++ linkerd/lock/src/lib.rs | 400 +----------------------------------- linkerd/lock/src/service.rs | 177 ++++++++++++++++ linkerd/lock/src/shared.rs | 166 +++++++++++++++ linkerd/lock/src/test.rs | 254 +++++++++++++++++++++++ 8 files changed, 677 insertions(+), 394 deletions(-) create mode 100644 linkerd/lock/src/error.rs create mode 100644 linkerd/lock/src/layer.rs create mode 100644 linkerd/lock/src/service.rs create mode 100644 linkerd/lock/src/shared.rs create mode 100644 linkerd/lock/src/test.rs diff --git a/Cargo.lock b/Cargo.lock index 2418c0a609..40fe035c8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -874,9 +874,13 @@ version = "0.1.0" dependencies = [ "futures", "linkerd2-error", + "rand 0.7.2", "tokio", "tower", "tracing", + "tracing-futures", + "tracing-log", + "tracing-subscriber", ] [[package]] diff --git a/linkerd/lock/Cargo.toml b/linkerd/lock/Cargo.toml index 11ba7b0304..6d72843b46 100644 --- a/linkerd/lock/Cargo.toml +++ b/linkerd/lock/Cargo.toml @@ -14,3 +14,9 @@ linkerd2-error = { path = "../error" } tokio = "0.1" tower = "0.1" tracing = "0.1" + +[dev-dependencies] +rand = "0.7" +tracing-futures = "0.1" +tracing-log = "0.1" +tracing-subscriber = "0.2" diff --git a/linkerd/lock/src/error.rs b/linkerd/lock/src/error.rs new file mode 100644 index 0000000000..94b238dc99 --- /dev/null +++ b/linkerd/lock/src/error.rs @@ -0,0 +1,44 @@ +pub use linkerd2_error::Error; +use std::sync::Arc; + +#[derive(Debug)] +pub struct Poisoned(()); + +#[derive(Debug)] +pub struct ServiceError(Arc); + +// === impl POisoned === + +impl Poisoned { + pub fn new() -> Self { + Poisoned(()) + } +} + +impl std::fmt::Display for Poisoned { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "poisoned") + } +} + +impl std::error::Error for Poisoned {} + +// === impl ServiceError === + +impl ServiceError { + pub(crate) fn new(e: Arc) -> Self { + ServiceError(e) + } + + pub fn inner(&self) -> &Error { + self.0.as_ref() + } +} + +impl std::fmt::Display for ServiceError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl std::error::Error for ServiceError {} diff --git a/linkerd/lock/src/layer.rs b/linkerd/lock/src/layer.rs new file mode 100644 index 0000000000..ed68e289f8 --- /dev/null +++ b/linkerd/lock/src/layer.rs @@ -0,0 +1,20 @@ +use super::Lock; + +#[derive(Clone, Debug, Default)] +pub struct LockLayer(()); + +// === impl Layer === + +impl LockLayer { + pub fn new() -> Self { + LockLayer(()) + } +} + +impl tower::layer::Layer for LockLayer { + type Service = Lock; + + fn layer(&self, inner: S) -> Self::Service { + Self::Service::new(inner) + } +} diff --git a/linkerd/lock/src/lib.rs b/linkerd/lock/src/lib.rs index eb74c071b2..a1a628d8c4 100644 --- a/linkerd/lock/src/lib.rs +++ b/linkerd/lock/src/lib.rs @@ -2,399 +2,11 @@ #![deny(warnings, rust_2018_idioms)] -use futures::{future, Async, Future, Poll}; -use linkerd2_error::Error; -use tokio::sync::lock; -use tracing::trace; - -#[derive(Clone, Debug)] -pub struct Layer { - _marker: std::marker::PhantomData, -} - -/// Guards access to an inner service with a `tokio::sync::lock::Lock`. -/// -/// As the service is polled to readiness, the lock is acquired and the inner -/// service is polled. If the sevice is cloned, the service's lock state is not -/// retained by the clone. -/// -/// The inner service's errors are coerced to the cloneable `C`-typed error so -/// that the error may be returned to all clones of the lock. By default, errors -/// are propagated through the `Poisoned` type, but they may be propagated -/// through custom types as well. -pub struct Lock { - lock: lock::Lock>, - locked: Option>>, -} - -/// The lock holds either an inner service or, if it failed, an error salvaged -/// from the failure. -enum State { - Service(S), - Error(E), -} - -/// A default error type that propagates the inner service's error message to -/// consumers. -#[derive(Clone, Debug)] -pub struct Poisoned(String); - -// === impl Layer === - -impl Default for Layer { - fn default() -> Self { - Self::new() - } -} - -impl Layer { - /// Sets the error type to be returned to consumers when poll_ready fails. - pub fn new() -> Layer - where - E: Clone + From + Into, - { - Layer { - _marker: std::marker::PhantomData, - } - } -} - -impl + Clone> tower::layer::Layer for Layer { - type Service = Lock; - - fn layer(&self, inner: S) -> Self::Service { - Self::Service { - lock: lock::Lock::new(State::Service(inner)), - locked: None, - } - } -} - -// === impl Lock === - -impl Clone for Lock { - fn clone(&self) -> Self { - Self { - locked: None, - lock: self.lock.clone(), - } - } -} - -impl tower::Service for Lock -where - S: tower::Service, - S::Error: Into, - E: Clone + From + Into, -{ - type Response = S::Response; - type Error = Error; - type Future = future::MapErr Error>; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - loop { - if let Some(state) = self.locked.as_mut() { - // Drive the service to readiness if it is already locked. - if let State::Service(ref mut inner) = **state { - return match inner.poll_ready() { - Ok(ok) => { - trace!(ready = ok.is_ready(), "service"); - Ok(ok) - } - Err(inner) => { - // Coerce the error into `E` and clone it into the - // locked state so that it can be returned from all - // clones of the lock. - let err = E::from(inner.into()); - **state = State::Error(err.clone()); - self.locked = None; - Err(err.into()) - } - }; - } - - // If an error occured above,the locked state is dropped and - // cannot be acquired again. - unreachable!("must not lock on error"); - } - - // Acquire the inner service exclusively so that the service can be - // driven to readiness. - match self.lock.poll_lock() { - Async::NotReady => { - trace!(locked = false); - return Ok(Async::NotReady); - } - Async::Ready(locked) => { - if let State::Error(ref e) = *locked { - return Err(e.clone().into()); - } - - trace!(locked = true); - self.locked = Some(locked); - } - } - } - } - - fn call(&mut self, t: T) -> Self::Future { - if let Some(mut state) = self.locked.take() { - if let State::Service(ref mut inner) = *state { - return inner.call(t).map_err(Into::into); - } - } - - unreachable!("called before ready"); - } -} - -// === impl Poisoned === - -impl From for Poisoned { - fn from(e: Error) -> Self { - Poisoned(e.to_string()) - } -} - -impl std::fmt::Display for Poisoned { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) - } -} - -impl std::error::Error for Poisoned {} - +pub mod error; +mod layer; +mod service; +mod shared; #[cfg(test)] -mod test { - use super::*; - use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::Arc; - use tokio::runtime::current_thread; - use tower::layer::Layer as _Layer; - use tower::Service as _Service; - - #[test] - fn exclusive_access() { - current_thread::run(future::lazy(|| { - let ready = Arc::new(AtomicBool::new(false)); - let mut svc0 = Layer::default().layer(Decr::new(2, ready.clone())); - - // svc0 grabs the lock, but the inner service isn't ready. - assert!(svc0.poll_ready().expect("must not fail").is_not_ready()); - - // Cloning a locked service does not preserve the lock. - let mut svc1 = svc0.clone(); - - // svc1 can't grab the lock. - assert!(svc1.poll_ready().expect("must not fail").is_not_ready()); - - // svc0 holds the lock and becomes ready with the inner service. - ready.store(true, Ordering::SeqCst); - assert!(svc0.poll_ready().expect("must not fail").is_ready()); - - // svc1 still can't grab the lock. - assert!(svc1.poll_ready().expect("must not fail").is_not_ready()); - - // svc0 remains ready. - svc0.call(1) - .and_then(move |_| { - // svc1 grabs the lock and is immediately ready. - assert!(svc1.poll_ready().expect("must not fail").is_ready()); - // svc0 cannot grab the lock. - assert!(svc0.poll_ready().expect("must not fail").is_not_ready()); - - svc1.call(1) - }) - .map(|_| ()) - .map_err(|_| panic!("must not fail")) - })); - } - - #[test] - fn propagates_poisoned_errors() { - current_thread::run(future::lazy(|| { - let mut svc0 = Layer::default().layer(Decr::from(1)); - - // svc0 grabs the lock and we decr the service so it will fail. - assert!(svc0.poll_ready().expect("must not fail").is_ready()); - // svc0 remains ready. - svc0.call(1) - .map_err(|_| panic!("must not fail")) - .map(move |_| { - // svc1 grabs the lock and fails immediately. - let mut svc1 = svc0.clone(); - assert_eq!( - svc1.poll_ready() - .expect_err("mut fail") - .downcast_ref::() - .expect("must fail with Poisoned") - .to_string(), - "underflow" - ); - - // svc0 suffers the same fate. - assert_eq!( - svc0.poll_ready() - .expect_err("mut fail") - .downcast_ref::() - .expect("must fail with Poisoned") - .to_string(), - "underflow" - ); - }) - })); - } - - #[test] - fn propagates_custom_errors() { - current_thread::run(future::lazy(|| { - #[derive(Clone, Debug)] - enum Custom { - Underflow, - Wtf, - } - impl From for Custom { - fn from(e: Error) -> Self { - if e.is::() { - Custom::Underflow - } else { - Custom::Wtf - } - } - } - impl std::fmt::Display for Custom { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Custom::Underflow => write!(f, "custom underflow"), - Custom::Wtf => write!(f, "wtf"), - } - } - } - impl std::error::Error for Custom {} - - let mut svc0 = Layer::new::().layer(Decr::from(1)); - - // svc0 grabs the lock and we decr the service so it will fail. - assert!(svc0.poll_ready().expect("must not fail").is_ready()); - // svc0 remains ready. - svc0.call(1) - .map_err(|_| panic!("must not fail")) - .map(move |_| { - let mut svc1 = svc0.clone(); - // svc1 grabs the lock and fails immediately. - assert_eq!( - svc1.poll_ready() - .expect_err("mut fail") - .downcast_ref::() - .expect("must fail with Custom") - .to_string(), - "custom underflow" - ); - - // svc0 suffers the same fate. - assert_eq!( - svc0.poll_ready() - .expect_err("mut fail") - .downcast_ref::() - .expect("must fail with Custom") - .to_string(), - "custom underflow" - ); - }) - })); - } - - #[test] - fn dropping_releases_access() { - use tower::util::ServiceExt; - - current_thread::run(future::lazy(|| { - let ready = Arc::new(AtomicBool::new(false)); - let mut svc0 = Layer::default().layer(Decr::new(2, ready.clone())); - - // svc0 grabs the lock, but the inner service isn't ready. - assert!(svc0.poll_ready().expect("must not fail").is_not_ready()); - - // Cloning a locked service does not preserve the lock. - let mut svc1 = svc0.clone(); - - // svc1 can't grab the lock. - assert!(svc1.poll_ready().expect("must not fail").is_not_ready()); - - // svc0 holds the lock and becomes ready with the inner service. - ready.store(true, Ordering::SeqCst); - assert!(svc0.poll_ready().expect("must not fail").is_ready()); - - // svc1 still can't grab the lock. - assert!(svc1.poll_ready().expect("must not fail").is_not_ready()); - - let mut fut = svc1.oneshot(1); - - assert!(fut.poll().expect("must not fail").is_not_ready()); - - drop(svc0); - - // svc1 grabs the lock and is immediately ready. - assert_eq!(fut.poll().expect("must not fail"), Async::Ready(1)); - - Ok(().into()) - })); - } - - #[derive(Debug, Default)] - struct Decr { - value: usize, - ready: Arc, - } - - #[derive(Copy, Clone, Debug)] - struct Underflow; - - impl From for Decr { - fn from(value: usize) -> Self { - Self::new(value, Arc::new(AtomicBool::new(true))) - } - } - - impl Decr { - fn new(value: usize, ready: Arc) -> Self { - Decr { value, ready } - } - } - - impl tower::Service for Decr { - type Response = usize; - type Error = Underflow; - type Future = futures::future::FutureResult; - - fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> { - if self.value == 0 { - return Err(Underflow); - } - - if !self.ready.load(Ordering::SeqCst) { - return Ok(Async::NotReady); - } - - Ok(().into()) - } - - fn call(&mut self, decr: usize) -> Self::Future { - if self.value < decr { - self.value = 0; - return futures::future::err(Underflow); - } - - self.value -= decr; - futures::future::ok(self.value) - } - } - - impl std::fmt::Display for Underflow { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "underflow") - } - } +mod test; - impl std::error::Error for Underflow {} -} +pub use self::{layer::LockLayer, service::Lock}; diff --git a/linkerd/lock/src/service.rs b/linkerd/lock/src/service.rs new file mode 100644 index 0000000000..09a6e448a8 --- /dev/null +++ b/linkerd/lock/src/service.rs @@ -0,0 +1,177 @@ +use crate::error::{Error, Poisoned, ServiceError}; +use crate::shared::{Shared, Wait}; +use futures::{future, Async, Future, Poll}; +use std::sync::{Arc, Mutex}; +use tracing::trace; + +/// Guards access to an inner service with a `tokio::sync::lock::Lock`. +/// +/// As the service is polled to readiness, the lock is acquired and the inner +/// service is polled. If the sevice is cloned, the service's lock state is not +/// retained by the clone. +pub struct Lock { + state: State, + shared: Arc>>, +} + +/// The state of a single `Lock` consumer. +enum State { + /// This lock has not registered interest in the inner service. + Released, + + /// This lock is interested in the inner service. + Waiting(Wait), + + /// This lock instance has exclusive ownership of the inner service. + Acquired(S), + + /// The inner service has failed. + Failed(Arc), +} + +// === impl Lock === + +impl Lock { + pub fn new(service: S) -> Self { + Self { + state: State::Released, + shared: Arc::new(Mutex::new(Shared::new(service))), + } + } +} + +impl Clone for Lock { + fn clone(&self) -> Self { + Self { + // Clones have an independent local lock state. + state: State::Released, + shared: self.shared.clone(), + } + } +} + +impl tower::Service for Lock +where + S: tower::Service, + S::Error: Into, +{ + type Response = S::Response; + type Error = Error; + type Future = future::MapErr Self::Error>; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + loop { + trace!(state = ?self.state, "Polling"); + self.state = match self.state { + // This instance has exlcusive access to the inner service. + State::Acquired(ref mut svc) => match svc.poll_ready() { + Ok(ok) => { + trace!(ready = ok.is_ready()); + return Ok(ok); + } + Err(inner) => { + // If the inner service fails to become ready, share + // that error with all other consumers and update this + // lock's state to prevent trying to acquire the shared + // state again. + let error = Arc::new(inner.into()); + trace!(%error); + if let Ok(mut shared) = self.shared.lock() { + shared.fail(error.clone()); + } + State::Failed(error) + } + }, + + // This instance has not registered interest in the lock. + State::Released => match self.shared.lock() { + Err(_) => return Err(Poisoned::new().into()), + // First, try to acquire the lock withotu creating a waiter. + // If the lock isn't available, create a waiter and try + // again, registering interest> + Ok(mut shared) => match shared.try_acquire() { + Ok(None) => State::Waiting(Wait::default()), + Ok(Some(svc)) => State::Acquired(svc), + Err(error) => State::Failed(error), + }, + }, + + // This instance is interested in the lock. + State::Waiting(ref waiter) => match self.shared.lock() { + Err(_) => return Err(Poisoned::new().into()), + Ok(mut shared) => match shared.poll_acquire(waiter) { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(svc)) => State::Acquired(svc), + Err(error) => State::Failed(error), + }, + }, + + // The inner service failed, so share that failure with all consumers. + State::Failed(ref error) => return Err(ServiceError::new(error.clone()).into()), + }; + } + } + + fn call(&mut self, req: T) -> Self::Future { + // The service must have been acquired by poll_ready. Reset this lock's + // state so that it must reacquire the service via poll_ready. + let mut svc = match std::mem::replace(&mut self.state, State::Released) { + State::Acquired(svc) => svc, + _ => panic!("Called before ready"), + }; + + let fut = svc.call(req); + + // Return the service to the shared state, notifying waiters as needed. + // + // The service is dropped if the inner mutex has been poisoned, and + // subsequent calsl to poll_ready will return a Poisioned error. + if let Ok(mut shared) = self.shared.lock() { + trace!("Releasing acquired lock after use"); + shared.release_and_notify(svc); + } + + // The inner service's error type is *not* wrapped with a ServiceError. + fut.map_err(Into::into) + } +} + +impl Drop for Lock { + fn drop(&mut self) { + let state = std::mem::replace(&mut self.state, State::Released); + trace!(?state, "Dropping"); + match state { + // If this lock was holding the service, return it back back to the + // shared state so another lock may acquire it. Waiters are notified. + State::Acquired(service) => { + if let Ok(mut shared) = self.shared.lock() { + shared.release_and_notify(service); + } + } + + // If this lock is waiting but the waiter isn't registered, it must + // have been notified. Notify the next waiter to prevent deadlock. + State::Waiting(wait) => { + if let Ok(mut shared) = self.shared.lock() { + if wait.is_not_waiting() { + shared.notify_next_waiter(); + } + } + } + + // No state to cleanup. + State::Released | State::Failed(_) => {} + } + } +} + +impl std::fmt::Debug for State { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + State::Released => write!(f, "Released"), + State::Acquired(_) => write!(f, "Acquired(..)"), + State::Waiting(ref w) => write!(f, "Waiting(pending={})", w.is_waiting()), + State::Failed(ref e) => write!(f, "Failed({:?})", e), + } + } +} diff --git a/linkerd/lock/src/shared.rs b/linkerd/lock/src/shared.rs new file mode 100644 index 0000000000..17f3e4147a --- /dev/null +++ b/linkerd/lock/src/shared.rs @@ -0,0 +1,166 @@ +use self::waiter::Notify; +pub(crate) use self::waiter::Wait; +use crate::error::Error; +use futures::{Async, Poll}; +use std::sync::Arc; +use tracing::trace; + +/// The shared state between one or more Lock instances. +/// +/// When multiple lock instances are contending for the inner value, waiters are +/// stored in a LIFO stack. This is done to bias for latency instead of fairness +pub(crate) struct Shared { + state: State, + waiters: Vec, +} + +enum State { + /// A Lock is holding the value. + Claimed, + + /// The inner value is available. + Unclaimed(T), + + /// The lock has failed. + Failed(Arc), +} + +// === impl Shared === + +impl Shared { + pub fn new(value: T) -> Self { + Self { + waiters: Vec::new(), + state: State::Unclaimed(value), + } + } + + /// Try to claim a value without registering a waiter. + pub fn try_acquire(&mut self) -> Result, Arc> { + match std::mem::replace(&mut self.state, State::Claimed) { + // This lock has acquired the value. + State::Unclaimed(v) => { + trace!("acquired"); + Ok(Some(v)) + } + // The value is already claimed by a lock. + State::Claimed => Ok(None), + // The locks failed, so reset the state immediately so that all + // instances may be notified. + State::Failed(error) => { + self.state = State::Failed(error.clone()); + Err(error) + } + } + } + + /// Try to acquire a value or register the given waiter to be notified when + /// the lock is available. + pub fn poll_acquire(&mut self, wait: &Wait) -> Poll> { + match self.try_acquire() { + Ok(Some(svc)) => Ok(Async::Ready(svc)), + Ok(None) => { + // Register the current task to be notified. + wait.register(); + // Register the waiter's notify handle if one isn't already registered. + if let Some(notify) = wait.get_notify() { + trace!("Registering waiter"); + self.waiters.push(notify); + } + debug_assert!(wait.is_waiting()); + Ok(Async::NotReady) + } + Err(error) => Err(error), + } + } + + pub fn release_and_notify(&mut self, value: T) { + trace!(waiters = self.waiters.len(), "Releasing"); + debug_assert!(match self.state { + State::Claimed => true, + _ => false, + }); + self.state = State::Unclaimed(value); + self.notify_next_waiter(); + } + + pub fn notify_next_waiter(&mut self) { + while let Some(waiter) = self.waiters.pop() { + if waiter.notify() { + trace!("Notified waiter"); + return; + } + } + } + + pub fn fail(&mut self, error: Arc) { + trace!(waiters = self.waiters.len(), %error, "Failing"); + debug_assert!(match self.state { + State::Claimed => true, + _ => false, + }); + self.state = State::Failed(error); + + while let Some(waiter) = self.waiters.pop() { + waiter.notify(); + } + } +} + +mod waiter { + use futures::task::AtomicTask; + use std::sync::{Arc, Weak}; + + /// A handle held by Lock instances when waiting to be notified. + #[derive(Default)] + pub(crate) struct Wait(Arc); + + /// A handle held by shared lock state to notify a waiting Lock. + /// + /// There may be at most one `Notify` instance per `Wait` instance at any one + /// time. + pub(super) struct Notify(Weak); + + impl Wait { + /// If a `Notify` handle does not currently exist for this waiter, create + /// a new one. + pub(super) fn get_notify(&self) -> Option { + if self.is_not_waiting() { + let n = Notify(Arc::downgrade(&self.0)); + debug_assert!(self.is_waiting()); + Some(n) + } else { + None + } + } + + /// Register this waiter with the current task. + pub(super) fn register(&self) { + self.0.register(); + } + + /// Returns true iff there is currently a `Notify` handle for this waiter. + pub fn is_waiting(&self) -> bool { + Arc::weak_count(&self.0) == 1 + } + + /// Returns true iff there is not currently a `Notify` handle for this waiter. + pub fn is_not_waiting(&self) -> bool { + Arc::weak_count(&self.0) == 0 + } + } + + impl Notify { + /// Attempt to notify the waiter. + /// + /// Returns true if a waiter was notified and false otherwise. + pub fn notify(self) -> bool { + if let Some(task) = self.0.upgrade() { + task.notify(); + true + } else { + false + } + } + } +} diff --git a/linkerd/lock/src/test.rs b/linkerd/lock/src/test.rs new file mode 100644 index 0000000000..1f5968e5bf --- /dev/null +++ b/linkerd/lock/src/test.rs @@ -0,0 +1,254 @@ +use crate::error::ServiceError; +use crate::Lock; +use futures::{future, try_ready, Async, Future, Poll, Stream}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tokio::sync::{mpsc, oneshot}; +use tower::Service as _Service; +use tracing::{info_span, trace}; +use tracing_futures::Instrument; + +#[test] +fn exclusive_access() { + run(future::lazy(|| { + let ready = Arc::new(AtomicBool::new(false)); + let mut svc0 = Lock::new(Decr::new(2, ready.clone())); + + // svc0 grabs the lock, but the inner service isn't ready. + assert!(svc0.poll_ready().expect("must not fail").is_not_ready()); + + // Cloning a locked service does not preserve the lock. + let mut svc1 = svc0.clone(); + + // svc1 can't grab the lock. + assert!(svc1.poll_ready().expect("must not fail").is_not_ready()); + + // svc0 holds the lock and becomes ready with the inner service. + ready.store(true, Ordering::SeqCst); + assert!(svc0.poll_ready().expect("must not fail").is_ready()); + + // svc1 still can't grab the lock. + assert!(svc1.poll_ready().expect("must not fail").is_not_ready()); + + // svc0 remains ready. + let fut0 = svc0.call(1); + + // svc1 grabs the lock and is immediately ready. + assert!(svc1.poll_ready().expect("must not fail").is_ready()); + // svc0 cannot grab the lock. + assert!(svc0.poll_ready().expect("must not fail").is_not_ready()); + + let fut1 = svc1.call(1); + + fut0.join(fut1) + .map(|_| ()) + .map_err(|_| panic!("must not fail")) + })); +} + +#[test] +fn propagates_errors() { + run(future::lazy(|| { + let mut svc0 = Lock::new(Decr::from(1)); + + // svc0 grabs the lock and we decr the service so it will fail. + assert!(svc0.poll_ready().expect("must not fail").is_ready()); + // svc0 remains ready. + svc0.call(1) + .map_err(|_| panic!("must not fail")) + .map(move |_| { + // svc1 grabs the lock and fails immediately. + let mut svc1 = svc0.clone(); + assert!(svc1 + .poll_ready() + .expect_err("mut fail") + .downcast_ref::() + .expect("must fail with service error") + .inner() + .is::()); + + // svc0 suffers the same fate. + assert!(svc0 + .poll_ready() + .expect_err("mut fail") + .downcast_ref::() + .expect("must fail with service error") + .inner() + .is::()); + }) + })); +} + +#[test] +fn dropping_releases_access() { + use tower::util::ServiceExt; + run(future::lazy(|| { + let mut svc0 = Lock::new(Decr::new(3, Arc::new(true.into()))); + + // svc0 grabs the lock, but the inner service isn't ready. + assert!(svc0.poll_ready().expect("must not fail").is_ready()); + + let svc1 = svc0.clone(); + let (tx1, rx1) = oneshot::channel(); + tokio::spawn( + svc1.oneshot(1) + .then(move |_| { + trace!("complete"); + tx1.send(()).map_err(|_| ()) + }) + .instrument(info_span!("Svc1")), + ); + + let svc2 = svc0.clone(); + let (tx2, rx2) = oneshot::channel(); + tokio::spawn( + svc2.oneshot(1) + .then(move |_| { + trace!("complete"); + tx2.send(()).map_err(|_| ()) + }) + .instrument(info_span!("Svc2")), + ); + + // svc3 will be the notified waiter when svc0 completes; but it drops + // svc3 before polling the waiter. This test ensures that svc2 is + // notified by svc3's drop. + let svc3 = svc0.clone(); + let (tx3, rx3) = oneshot::channel(); + tokio::spawn(PollAndDrop(svc3, rx3).instrument(info_span!("Svc3"))); + + tokio::spawn( + svc0.ready() + .then(move |_| tx3.send(()).map_err(|_| ())) + .instrument(info_span!("Svc0")), + ); + // svc3 notified; but it is dropped before it can be polled + + rx2.then(move |_| rx1).map_err(|_| ()) + })); + + struct PollAndDrop(Lock, oneshot::Receiver<()>); + impl Future for PollAndDrop { + type Item = (); + type Error = (); + fn poll(&mut self) -> Poll { + trace!("Polling"); + if self.1.poll().map_err(|_| ())?.is_ready() { + trace!("Dropping"); + return Ok(Async::Ready(())); + } + self.0.poll_ready().map_err(|_| ()) + } + } +} + +#[test] +fn fuzz() { + const ITERS: usize = 100_000; + for (concurrency, iterations) in &[(1, ITERS), (3, ITERS), (100, ITERS)] { + tokio::run(future::lazy(move || { + let svc = Lock::new(Decr::new(*iterations, Arc::new(true.into()))); + let (tx, rx) = mpsc::channel(1); + for _ in 0..*concurrency { + tokio::spawn(Loop { + lock: svc.clone(), + _tx: tx.clone(), + }); + } + rx.fold((), |(), ()| Ok(())).map_err(|_| ()) + })); + } + + struct Loop { + lock: Lock, + _tx: mpsc::Sender<()>, + } + impl Future for Loop { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + loop { + try_ready!(self.lock.poll_ready().map_err(|_| ())); + + // Randomly be busy while holding the lock. + if rand::random::() { + futures::task::current().notify(); + return Ok(Async::NotReady); + } + + tokio::spawn(self.lock.call(1).then(|_| Ok(()))); + } + } + } +} + +fn run(future: F) +where + F: Future + 'static, +{ + let subscriber = tracing_subscriber::fmt::Subscriber::builder() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_writer(std::io::stdout) + .finish(); + tracing::subscriber::with_default(subscriber, move || { + tokio::runtime::current_thread::run(future) + }); +} + +#[derive(Debug, Default)] +struct Decr { + value: usize, + ready: Arc, +} + +#[derive(Copy, Clone, Debug)] +struct Underflow; + +impl From for Decr { + fn from(value: usize) -> Self { + Self::new(value, Arc::new(AtomicBool::new(true))) + } +} + +impl Decr { + fn new(value: usize, ready: Arc) -> Self { + Decr { value, ready } + } +} + +impl tower::Service for Decr { + type Response = usize; + type Error = Underflow; + type Future = futures::future::FutureResult; + + fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> { + if self.value == 0 { + return Err(Underflow); + } + + if !self.ready.load(Ordering::SeqCst) { + return Ok(Async::NotReady); + } + + Ok(().into()) + } + + fn call(&mut self, decr: usize) -> Self::Future { + if self.value < decr { + self.value = 0; + return futures::future::err(Underflow); + } + + self.value -= decr; + futures::future::ok(self.value) + } +} + +impl std::fmt::Display for Underflow { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "underflow") + } +} + +impl std::error::Error for Underflow {} From 9e273c3ae02fb8d0389fa4a3b755858e6990e72a Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Mon, 17 Feb 2020 18:57:46 +0000 Subject: [PATCH 2/5] Review feedback/cleanup --- linkerd/lock/src/error.rs | 2 +- linkerd/lock/src/service.rs | 12 ++++-------- linkerd/lock/src/shared.rs | 38 ++++++++++++++++++++++++++----------- 3 files changed, 32 insertions(+), 20 deletions(-) diff --git a/linkerd/lock/src/error.rs b/linkerd/lock/src/error.rs index 94b238dc99..56f946edb6 100644 --- a/linkerd/lock/src/error.rs +++ b/linkerd/lock/src/error.rs @@ -7,7 +7,7 @@ pub struct Poisoned(()); #[derive(Debug)] pub struct ServiceError(Arc); -// === impl POisoned === +// === impl Poisoned === impl Poisoned { pub fn new() -> Self { diff --git a/linkerd/lock/src/service.rs b/linkerd/lock/src/service.rs index 09a6e448a8..70a93ab5ae 100644 --- a/linkerd/lock/src/service.rs +++ b/linkerd/lock/src/service.rs @@ -4,7 +4,7 @@ use futures::{future, Async, Future, Poll}; use std::sync::{Arc, Mutex}; use tracing::trace; -/// Guards access to an inner service with a `tokio::sync::lock::Lock`. +/// A middlware that safely shares an inner service among clones. /// /// As the service is polled to readiness, the lock is acquired and the inner /// service is polled. If the sevice is cloned, the service's lock state is not @@ -86,7 +86,7 @@ where // This instance has not registered interest in the lock. State::Released => match self.shared.lock() { Err(_) => return Err(Poisoned::new().into()), - // First, try to acquire the lock withotu creating a waiter. + // First, try to acquire the lock without creating a waiter. // If the lock isn't available, create a waiter and try // again, registering interest> Ok(mut shared) => match shared.try_acquire() { @@ -149,13 +149,9 @@ impl Drop for Lock { } } - // If this lock is waiting but the waiter isn't registered, it must - // have been notified. Notify the next waiter to prevent deadlock. State::Waiting(wait) => { if let Ok(mut shared) = self.shared.lock() { - if wait.is_not_waiting() { - shared.notify_next_waiter(); - } + shared.release_waiter(wait); } } @@ -170,7 +166,7 @@ impl std::fmt::Debug for State { match self { State::Released => write!(f, "Released"), State::Acquired(_) => write!(f, "Acquired(..)"), - State::Waiting(ref w) => write!(f, "Waiting(pending={})", w.is_waiting()), + State::Waiting(ref w) => write!(f, "Waiting({:?})", w), State::Failed(ref e) => write!(f, "Failed({:?})", e), } } diff --git a/linkerd/lock/src/shared.rs b/linkerd/lock/src/shared.rs index 17f3e4147a..fccbdc5b66 100644 --- a/linkerd/lock/src/shared.rs +++ b/linkerd/lock/src/shared.rs @@ -67,7 +67,7 @@ impl Shared { trace!("Registering waiter"); self.waiters.push(notify); } - debug_assert!(wait.is_waiting()); + debug_assert!(wait.has_notify()); Ok(Async::NotReady) } Err(error) => Err(error), @@ -76,7 +76,7 @@ impl Shared { pub fn release_and_notify(&mut self, value: T) { trace!(waiters = self.waiters.len(), "Releasing"); - debug_assert!(match self.state { + assert!(match self.state { State::Claimed => true, _ => false, }); @@ -84,7 +84,17 @@ impl Shared { self.notify_next_waiter(); } - pub fn notify_next_waiter(&mut self) { + pub fn release_waiter(&mut self, wait: Wait) { + // If a waiter is being released and it does not have a notify, then it must be being + // relased after being notified. Notify the next waiter to prevent deadlock. + if let State::Unclaimed(_) = self.state { + if !wait.has_notify() { + self.notify_next_waiter(); + } + } + } + + fn notify_next_waiter(&mut self) { while let Some(waiter) = self.waiters.pop() { if waiter.notify() { trace!("Notified waiter"); @@ -95,7 +105,7 @@ impl Shared { pub fn fail(&mut self, error: Arc) { trace!(waiters = self.waiters.len(), %error, "Failing"); - debug_assert!(match self.state { + assert!(match self.state { State::Claimed => true, _ => false, }); @@ -125,9 +135,9 @@ mod waiter { /// If a `Notify` handle does not currently exist for this waiter, create /// a new one. pub(super) fn get_notify(&self) -> Option { - if self.is_not_waiting() { + if !self.has_notify() { let n = Notify(Arc::downgrade(&self.0)); - debug_assert!(self.is_waiting()); + debug_assert!(self.has_notify()); Some(n) } else { None @@ -140,13 +150,19 @@ mod waiter { } /// Returns true iff there is currently a `Notify` handle for this waiter. - pub fn is_waiting(&self) -> bool { - Arc::weak_count(&self.0) == 1 + pub(super) fn has_notify(&self) -> bool { + let weaks = Arc::weak_count(&self.0); + debug_assert!( + weaks == 0 || weaks == 1, + "There must only be at most one Notify per Wait" + ); + weaks == 1 } + } - /// Returns true iff there is not currently a `Notify` handle for this waiter. - pub fn is_not_waiting(&self) -> bool { - Arc::weak_count(&self.0) == 0 + impl std::fmt::Debug for Wait { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Wait(notify={})", self.has_notify()) } } From 4aed2a45aa8109c2ac3e97123f1eb54df8935157 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Mon, 17 Feb 2020 20:31:27 +0000 Subject: [PATCH 3/5] typos and commentary --- linkerd/lock/src/service.rs | 29 +++++++++++++---------------- linkerd/lock/src/shared.rs | 21 ++++++++++++--------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/linkerd/lock/src/service.rs b/linkerd/lock/src/service.rs index 70a93ab5ae..59de19fe2f 100644 --- a/linkerd/lock/src/service.rs +++ b/linkerd/lock/src/service.rs @@ -4,11 +4,10 @@ use futures::{future, Async, Future, Poll}; use std::sync::{Arc, Mutex}; use tracing::trace; -/// A middlware that safely shares an inner service among clones. +/// A middleware that safely shares an inner service among clones. /// -/// As the service is polled to readiness, the lock is acquired and the inner -/// service is polled. If the sevice is cloned, the service's lock state is not -/// retained by the clone. +/// As the service is polled to readiness, the lock is acquired and the inner service is polled. If +/// the service is cloned, the service's lock state isnot retained by the clone. pub struct Lock { state: State, shared: Arc>>, @@ -63,17 +62,16 @@ where loop { trace!(state = ?self.state, "Polling"); self.state = match self.state { - // This instance has exlcusive access to the inner service. + // This instance has exclusive access to the inner service. State::Acquired(ref mut svc) => match svc.poll_ready() { Ok(ok) => { trace!(ready = ok.is_ready()); return Ok(ok); } Err(inner) => { - // If the inner service fails to become ready, share - // that error with all other consumers and update this - // lock's state to prevent trying to acquire the shared - // state again. + // If the inner service fails to become ready, share that error with all + // other consumers and update this lock's state to prevent tryingto acquire + // the shared state again. let error = Arc::new(inner.into()); trace!(%error); if let Ok(mut shared) = self.shared.lock() { @@ -86,9 +84,8 @@ where // This instance has not registered interest in the lock. State::Released => match self.shared.lock() { Err(_) => return Err(Poisoned::new().into()), - // First, try to acquire the lock without creating a waiter. - // If the lock isn't available, create a waiter and try - // again, registering interest> + // First, try to acquire the lock without creating a waiter. If the lock isn't + // available, create a waiter and try again, registering interest. Ok(mut shared) => match shared.try_acquire() { Ok(None) => State::Waiting(Wait::default()), Ok(Some(svc)) => State::Acquired(svc), @@ -124,8 +121,8 @@ where // Return the service to the shared state, notifying waiters as needed. // - // The service is dropped if the inner mutex has been poisoned, and - // subsequent calsl to poll_ready will return a Poisioned error. + // The service is dropped if the inner mutex has been poisoned, and subsequent calls to + // poll_ready will return a Poisoned error. if let Ok(mut shared) = self.shared.lock() { trace!("Releasing acquired lock after use"); shared.release_and_notify(svc); @@ -141,8 +138,8 @@ impl Drop for Lock { let state = std::mem::replace(&mut self.state, State::Released); trace!(?state, "Dropping"); match state { - // If this lock was holding the service, return it back back to the - // shared state so another lock may acquire it. Waiters are notified. + // If this lock was holding the service, return it back to the shared state so another + // lock may acquire it. Waiters are notified. State::Acquired(service) => { if let Ok(mut shared) = self.shared.lock() { shared.release_and_notify(service); diff --git a/linkerd/lock/src/shared.rs b/linkerd/lock/src/shared.rs index fccbdc5b66..1aaa152e40 100644 --- a/linkerd/lock/src/shared.rs +++ b/linkerd/lock/src/shared.rs @@ -7,8 +7,13 @@ use tracing::trace; /// The shared state between one or more Lock instances. /// -/// When multiple lock instances are contending for the inner value, waiters are -/// stored in a LIFO stack. This is done to bias for latency instead of fairness +/// When multiple lock instances are contending for the inner value, waiters are stored in a LIFO +/// stack. This is done to bias for latency instead of fairness +/// +/// N.B. Waiters capacity is released lazily as waiters are notified and, i.e., not when a waiter is +/// dropped. In high-load scenarios where the lock _always_ has new waiters, this could potentially +/// manifest as unbounded memory growth. This situation is not expected toarise in normal operation, +/// however. pub(crate) struct Shared { state: State, waiters: Vec, @@ -45,8 +50,8 @@ impl Shared { } // The value is already claimed by a lock. State::Claimed => Ok(None), - // The locks failed, so reset the state immediately so that all - // instances may be notified. + // The lock has failed, so reset the state immediately so that all instances may be + // notified. State::Failed(error) => { self.state = State::Failed(error.clone()); Err(error) @@ -86,7 +91,7 @@ impl Shared { pub fn release_waiter(&mut self, wait: Wait) { // If a waiter is being released and it does not have a notify, then it must be being - // relased after being notified. Notify the next waiter to prevent deadlock. + // released after being notified. Notify the next waiter to prevent deadlock. if let State::Unclaimed(_) = self.state { if !wait.has_notify() { self.notify_next_waiter(); @@ -127,13 +132,11 @@ mod waiter { /// A handle held by shared lock state to notify a waiting Lock. /// - /// There may be at most one `Notify` instance per `Wait` instance at any one - /// time. + /// There may be at most one `Notify` instance per `Wait` instance at any one time. pub(super) struct Notify(Weak); impl Wait { - /// If a `Notify` handle does not currently exist for this waiter, create - /// a new one. + /// If a `Notify` handle does not currently exist for this waiter, create a new one. pub(super) fn get_notify(&self) -> Option { if !self.has_notify() { let n = Notify(Arc::downgrade(&self.0)); From 9bdae7641cda86a4da4611cf5038853e131a7a62 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Mon, 17 Feb 2020 20:33:04 +0000 Subject: [PATCH 4/5] typo, because of course --- linkerd/lock/src/shared.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/linkerd/lock/src/shared.rs b/linkerd/lock/src/shared.rs index 1aaa152e40..c4ff4bc901 100644 --- a/linkerd/lock/src/shared.rs +++ b/linkerd/lock/src/shared.rs @@ -12,8 +12,7 @@ use tracing::trace; /// /// N.B. Waiters capacity is released lazily as waiters are notified and, i.e., not when a waiter is /// dropped. In high-load scenarios where the lock _always_ has new waiters, this could potentially -/// manifest as unbounded memory growth. This situation is not expected toarise in normal operation, -/// however. +/// manifest as unbounded memory growth. This situation is not expected to arise in normal operation. pub(crate) struct Shared { state: State, waiters: Vec, From d6c7f33ae51a2f280180c9c1694bd4d4bae12dd6 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Mon, 17 Feb 2020 21:40:55 +0000 Subject: [PATCH 5/5] Clarify contract --- linkerd/lock/src/shared.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/linkerd/lock/src/shared.rs b/linkerd/lock/src/shared.rs index c4ff4bc901..98d832c761 100644 --- a/linkerd/lock/src/shared.rs +++ b/linkerd/lock/src/shared.rs @@ -40,6 +40,8 @@ impl Shared { } /// Try to claim a value without registering a waiter. + /// + /// Once a value is acquired it **must** be returned via `release_and_notify`. pub fn try_acquire(&mut self) -> Result, Arc> { match std::mem::replace(&mut self.state, State::Claimed) { // This lock has acquired the value. @@ -60,6 +62,12 @@ impl Shared { /// Try to acquire a value or register the given waiter to be notified when /// the lock is available. + /// + /// Once a value is acquired it **must** be returned via `release_and_notify`. + /// + /// If `Async::NotReady` is returned, the polling task, once notified, **must** either call + /// `poll_acquire` again to obtain a value, or the waiter **must** be returned via + /// `release_waiter`. pub fn poll_acquire(&mut self, wait: &Wait) -> Poll> { match self.try_acquire() { Ok(Some(svc)) => Ok(Async::Ready(svc)),