diff --git a/Cargo.lock b/Cargo.lock index a36a96a9d3..bfb9d43821 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -670,7 +670,7 @@ dependencies = [ "tracing", "tracing-futures", "tracing-log", - "tracing-subscriber", + "tracing-subscriber 0.2.0-alpha.6", ] [[package]] @@ -872,11 +872,16 @@ dependencies = [ name = "linkerd2-lock" version = "0.1.0" dependencies = [ + "fnv", "futures", "linkerd2-error", + "rand 0.7.2", "tokio", "tower", "tracing", + "tracing-futures", + "tracing-log", + "tracing-subscriber 0.1.6", ] [[package]] @@ -1115,7 +1120,7 @@ dependencies = [ "tower", "tower-util", "tracing", - "tracing-subscriber", + "tracing-subscriber 0.2.0-alpha.6", "untrusted", "webpki", ] @@ -1426,6 +1431,15 @@ dependencies = [ "tower-grpc-build", ] +[[package]] +name = "owning_ref" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a4b8ea2179e6a2e27411d3bca09ca6dd630821cf6894c6c7c8467a8ee7ef13" +dependencies = [ + "stable_deref_trait", +] + [[package]] name = "percent-encoding" version = "1.0.1" @@ -1947,6 +1961,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "stable_deref_trait" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dba1a27d3efae4351c8051072d619e3ade2820635c3958d826bfea39d59b54c8" + [[package]] name = "string" version = "0.2.0" @@ -2563,6 +2583,23 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-subscriber" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "192ca16595cdd0661ce319e8eede9c975f227cdaabc4faaefdc256f43d852e45" +dependencies = [ + "ansi_term", + "chrono", + "lazy_static", + "matchers", + "owning_ref", + "regex 1.0.0", + "smallvec 0.6.10", + "tracing-core", + "tracing-log", +] + [[package]] name = "tracing-subscriber" version = "0.2.0-alpha.6" diff --git a/linkerd/lock/Cargo.toml b/linkerd/lock/Cargo.toml index 11ba7b0304..7342281e58 100644 --- a/linkerd/lock/Cargo.toml +++ b/linkerd/lock/Cargo.toml @@ -9,8 +9,15 @@ A middleware that provides mutual exclusion. """ [dependencies] +fnv = "1.0" futures = "0.1" linkerd2-error = { path = "../error" } tokio = "0.1" tower = "0.1" tracing = "0.1" + +[dev-dependencies] +rand = "0.7" +tracing-futures = "0.1" +tracing-log = "0.1" +tracing-subscriber = "0.1" diff --git a/linkerd/lock/src/error.rs b/linkerd/lock/src/error.rs new file mode 100644 index 0000000000..94b238dc99 --- /dev/null +++ b/linkerd/lock/src/error.rs @@ -0,0 +1,44 @@ +pub use linkerd2_error::Error; +use std::sync::Arc; + +#[derive(Debug)] +pub struct Poisoned(()); + +#[derive(Debug)] +pub struct ServiceError(Arc); + +// === impl POisoned === + +impl Poisoned { + pub fn new() -> Self { + Poisoned(()) + } +} + +impl std::fmt::Display for Poisoned { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "poisoned") + } +} + +impl std::error::Error for Poisoned {} + +// === impl ServiceError === + +impl ServiceError { + pub(crate) fn new(e: Arc) -> Self { + ServiceError(e) + } + + pub fn inner(&self) -> &Error { + self.0.as_ref() + } +} + +impl std::fmt::Display for ServiceError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl std::error::Error for ServiceError {} diff --git a/linkerd/lock/src/layer.rs b/linkerd/lock/src/layer.rs new file mode 100644 index 0000000000..ed68e289f8 --- /dev/null +++ b/linkerd/lock/src/layer.rs @@ -0,0 +1,20 @@ +use super::Lock; + +#[derive(Clone, Debug, Default)] +pub struct LockLayer(()); + +// === impl Layer === + +impl LockLayer { + pub fn new() -> Self { + LockLayer(()) + } +} + +impl tower::layer::Layer for LockLayer { + type Service = Lock; + + fn layer(&self, inner: S) -> Self::Service { + Self::Service::new(inner) + } +} diff --git a/linkerd/lock/src/lib.rs b/linkerd/lock/src/lib.rs index eb74c071b2..a1a628d8c4 100644 --- a/linkerd/lock/src/lib.rs +++ b/linkerd/lock/src/lib.rs @@ -2,399 +2,11 @@ #![deny(warnings, rust_2018_idioms)] -use futures::{future, Async, Future, Poll}; -use linkerd2_error::Error; -use tokio::sync::lock; -use tracing::trace; - -#[derive(Clone, Debug)] -pub struct Layer { - _marker: std::marker::PhantomData, -} - -/// Guards access to an inner service with a `tokio::sync::lock::Lock`. -/// -/// As the service is polled to readiness, the lock is acquired and the inner -/// service is polled. If the sevice is cloned, the service's lock state is not -/// retained by the clone. -/// -/// The inner service's errors are coerced to the cloneable `C`-typed error so -/// that the error may be returned to all clones of the lock. By default, errors -/// are propagated through the `Poisoned` type, but they may be propagated -/// through custom types as well. -pub struct Lock { - lock: lock::Lock>, - locked: Option>>, -} - -/// The lock holds either an inner service or, if it failed, an error salvaged -/// from the failure. -enum State { - Service(S), - Error(E), -} - -/// A default error type that propagates the inner service's error message to -/// consumers. -#[derive(Clone, Debug)] -pub struct Poisoned(String); - -// === impl Layer === - -impl Default for Layer { - fn default() -> Self { - Self::new() - } -} - -impl Layer { - /// Sets the error type to be returned to consumers when poll_ready fails. - pub fn new() -> Layer - where - E: Clone + From + Into, - { - Layer { - _marker: std::marker::PhantomData, - } - } -} - -impl + Clone> tower::layer::Layer for Layer { - type Service = Lock; - - fn layer(&self, inner: S) -> Self::Service { - Self::Service { - lock: lock::Lock::new(State::Service(inner)), - locked: None, - } - } -} - -// === impl Lock === - -impl Clone for Lock { - fn clone(&self) -> Self { - Self { - locked: None, - lock: self.lock.clone(), - } - } -} - -impl tower::Service for Lock -where - S: tower::Service, - S::Error: Into, - E: Clone + From + Into, -{ - type Response = S::Response; - type Error = Error; - type Future = future::MapErr Error>; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - loop { - if let Some(state) = self.locked.as_mut() { - // Drive the service to readiness if it is already locked. - if let State::Service(ref mut inner) = **state { - return match inner.poll_ready() { - Ok(ok) => { - trace!(ready = ok.is_ready(), "service"); - Ok(ok) - } - Err(inner) => { - // Coerce the error into `E` and clone it into the - // locked state so that it can be returned from all - // clones of the lock. - let err = E::from(inner.into()); - **state = State::Error(err.clone()); - self.locked = None; - Err(err.into()) - } - }; - } - - // If an error occured above,the locked state is dropped and - // cannot be acquired again. - unreachable!("must not lock on error"); - } - - // Acquire the inner service exclusively so that the service can be - // driven to readiness. - match self.lock.poll_lock() { - Async::NotReady => { - trace!(locked = false); - return Ok(Async::NotReady); - } - Async::Ready(locked) => { - if let State::Error(ref e) = *locked { - return Err(e.clone().into()); - } - - trace!(locked = true); - self.locked = Some(locked); - } - } - } - } - - fn call(&mut self, t: T) -> Self::Future { - if let Some(mut state) = self.locked.take() { - if let State::Service(ref mut inner) = *state { - return inner.call(t).map_err(Into::into); - } - } - - unreachable!("called before ready"); - } -} - -// === impl Poisoned === - -impl From for Poisoned { - fn from(e: Error) -> Self { - Poisoned(e.to_string()) - } -} - -impl std::fmt::Display for Poisoned { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) - } -} - -impl std::error::Error for Poisoned {} - +pub mod error; +mod layer; +mod service; +mod shared; #[cfg(test)] -mod test { - use super::*; - use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::Arc; - use tokio::runtime::current_thread; - use tower::layer::Layer as _Layer; - use tower::Service as _Service; - - #[test] - fn exclusive_access() { - current_thread::run(future::lazy(|| { - let ready = Arc::new(AtomicBool::new(false)); - let mut svc0 = Layer::default().layer(Decr::new(2, ready.clone())); - - // svc0 grabs the lock, but the inner service isn't ready. - assert!(svc0.poll_ready().expect("must not fail").is_not_ready()); - - // Cloning a locked service does not preserve the lock. - let mut svc1 = svc0.clone(); - - // svc1 can't grab the lock. - assert!(svc1.poll_ready().expect("must not fail").is_not_ready()); - - // svc0 holds the lock and becomes ready with the inner service. - ready.store(true, Ordering::SeqCst); - assert!(svc0.poll_ready().expect("must not fail").is_ready()); - - // svc1 still can't grab the lock. - assert!(svc1.poll_ready().expect("must not fail").is_not_ready()); - - // svc0 remains ready. - svc0.call(1) - .and_then(move |_| { - // svc1 grabs the lock and is immediately ready. - assert!(svc1.poll_ready().expect("must not fail").is_ready()); - // svc0 cannot grab the lock. - assert!(svc0.poll_ready().expect("must not fail").is_not_ready()); - - svc1.call(1) - }) - .map(|_| ()) - .map_err(|_| panic!("must not fail")) - })); - } - - #[test] - fn propagates_poisoned_errors() { - current_thread::run(future::lazy(|| { - let mut svc0 = Layer::default().layer(Decr::from(1)); - - // svc0 grabs the lock and we decr the service so it will fail. - assert!(svc0.poll_ready().expect("must not fail").is_ready()); - // svc0 remains ready. - svc0.call(1) - .map_err(|_| panic!("must not fail")) - .map(move |_| { - // svc1 grabs the lock and fails immediately. - let mut svc1 = svc0.clone(); - assert_eq!( - svc1.poll_ready() - .expect_err("mut fail") - .downcast_ref::() - .expect("must fail with Poisoned") - .to_string(), - "underflow" - ); - - // svc0 suffers the same fate. - assert_eq!( - svc0.poll_ready() - .expect_err("mut fail") - .downcast_ref::() - .expect("must fail with Poisoned") - .to_string(), - "underflow" - ); - }) - })); - } - - #[test] - fn propagates_custom_errors() { - current_thread::run(future::lazy(|| { - #[derive(Clone, Debug)] - enum Custom { - Underflow, - Wtf, - } - impl From for Custom { - fn from(e: Error) -> Self { - if e.is::() { - Custom::Underflow - } else { - Custom::Wtf - } - } - } - impl std::fmt::Display for Custom { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Custom::Underflow => write!(f, "custom underflow"), - Custom::Wtf => write!(f, "wtf"), - } - } - } - impl std::error::Error for Custom {} - - let mut svc0 = Layer::new::().layer(Decr::from(1)); - - // svc0 grabs the lock and we decr the service so it will fail. - assert!(svc0.poll_ready().expect("must not fail").is_ready()); - // svc0 remains ready. - svc0.call(1) - .map_err(|_| panic!("must not fail")) - .map(move |_| { - let mut svc1 = svc0.clone(); - // svc1 grabs the lock and fails immediately. - assert_eq!( - svc1.poll_ready() - .expect_err("mut fail") - .downcast_ref::() - .expect("must fail with Custom") - .to_string(), - "custom underflow" - ); - - // svc0 suffers the same fate. - assert_eq!( - svc0.poll_ready() - .expect_err("mut fail") - .downcast_ref::() - .expect("must fail with Custom") - .to_string(), - "custom underflow" - ); - }) - })); - } - - #[test] - fn dropping_releases_access() { - use tower::util::ServiceExt; - - current_thread::run(future::lazy(|| { - let ready = Arc::new(AtomicBool::new(false)); - let mut svc0 = Layer::default().layer(Decr::new(2, ready.clone())); - - // svc0 grabs the lock, but the inner service isn't ready. - assert!(svc0.poll_ready().expect("must not fail").is_not_ready()); - - // Cloning a locked service does not preserve the lock. - let mut svc1 = svc0.clone(); - - // svc1 can't grab the lock. - assert!(svc1.poll_ready().expect("must not fail").is_not_ready()); - - // svc0 holds the lock and becomes ready with the inner service. - ready.store(true, Ordering::SeqCst); - assert!(svc0.poll_ready().expect("must not fail").is_ready()); - - // svc1 still can't grab the lock. - assert!(svc1.poll_ready().expect("must not fail").is_not_ready()); - - let mut fut = svc1.oneshot(1); - - assert!(fut.poll().expect("must not fail").is_not_ready()); - - drop(svc0); - - // svc1 grabs the lock and is immediately ready. - assert_eq!(fut.poll().expect("must not fail"), Async::Ready(1)); - - Ok(().into()) - })); - } - - #[derive(Debug, Default)] - struct Decr { - value: usize, - ready: Arc, - } - - #[derive(Copy, Clone, Debug)] - struct Underflow; - - impl From for Decr { - fn from(value: usize) -> Self { - Self::new(value, Arc::new(AtomicBool::new(true))) - } - } - - impl Decr { - fn new(value: usize, ready: Arc) -> Self { - Decr { value, ready } - } - } - - impl tower::Service for Decr { - type Response = usize; - type Error = Underflow; - type Future = futures::future::FutureResult; - - fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> { - if self.value == 0 { - return Err(Underflow); - } - - if !self.ready.load(Ordering::SeqCst) { - return Ok(Async::NotReady); - } - - Ok(().into()) - } - - fn call(&mut self, decr: usize) -> Self::Future { - if self.value < decr { - self.value = 0; - return futures::future::err(Underflow); - } - - self.value -= decr; - futures::future::ok(self.value) - } - } - - impl std::fmt::Display for Underflow { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "underflow") - } - } +mod test; - impl std::error::Error for Underflow {} -} +pub use self::{layer::LockLayer, service::Lock}; diff --git a/linkerd/lock/src/service.rs b/linkerd/lock/src/service.rs new file mode 100644 index 0000000000..09a6e448a8 --- /dev/null +++ b/linkerd/lock/src/service.rs @@ -0,0 +1,177 @@ +use crate::error::{Error, Poisoned, ServiceError}; +use crate::shared::{Shared, Wait}; +use futures::{future, Async, Future, Poll}; +use std::sync::{Arc, Mutex}; +use tracing::trace; + +/// Guards access to an inner service with a `tokio::sync::lock::Lock`. +/// +/// As the service is polled to readiness, the lock is acquired and the inner +/// service is polled. If the sevice is cloned, the service's lock state is not +/// retained by the clone. +pub struct Lock { + state: State, + shared: Arc>>, +} + +/// The state of a single `Lock` consumer. +enum State { + /// This lock has not registered interest in the inner service. + Released, + + /// This lock is interested in the inner service. + Waiting(Wait), + + /// This lock instance has exclusive ownership of the inner service. + Acquired(S), + + /// The inner service has failed. + Failed(Arc), +} + +// === impl Lock === + +impl Lock { + pub fn new(service: S) -> Self { + Self { + state: State::Released, + shared: Arc::new(Mutex::new(Shared::new(service))), + } + } +} + +impl Clone for Lock { + fn clone(&self) -> Self { + Self { + // Clones have an independent local lock state. + state: State::Released, + shared: self.shared.clone(), + } + } +} + +impl tower::Service for Lock +where + S: tower::Service, + S::Error: Into, +{ + type Response = S::Response; + type Error = Error; + type Future = future::MapErr Self::Error>; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + loop { + trace!(state = ?self.state, "Polling"); + self.state = match self.state { + // This instance has exlcusive access to the inner service. + State::Acquired(ref mut svc) => match svc.poll_ready() { + Ok(ok) => { + trace!(ready = ok.is_ready()); + return Ok(ok); + } + Err(inner) => { + // If the inner service fails to become ready, share + // that error with all other consumers and update this + // lock's state to prevent trying to acquire the shared + // state again. + let error = Arc::new(inner.into()); + trace!(%error); + if let Ok(mut shared) = self.shared.lock() { + shared.fail(error.clone()); + } + State::Failed(error) + } + }, + + // This instance has not registered interest in the lock. + State::Released => match self.shared.lock() { + Err(_) => return Err(Poisoned::new().into()), + // First, try to acquire the lock withotu creating a waiter. + // If the lock isn't available, create a waiter and try + // again, registering interest> + Ok(mut shared) => match shared.try_acquire() { + Ok(None) => State::Waiting(Wait::default()), + Ok(Some(svc)) => State::Acquired(svc), + Err(error) => State::Failed(error), + }, + }, + + // This instance is interested in the lock. + State::Waiting(ref waiter) => match self.shared.lock() { + Err(_) => return Err(Poisoned::new().into()), + Ok(mut shared) => match shared.poll_acquire(waiter) { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(svc)) => State::Acquired(svc), + Err(error) => State::Failed(error), + }, + }, + + // The inner service failed, so share that failure with all consumers. + State::Failed(ref error) => return Err(ServiceError::new(error.clone()).into()), + }; + } + } + + fn call(&mut self, req: T) -> Self::Future { + // The service must have been acquired by poll_ready. Reset this lock's + // state so that it must reacquire the service via poll_ready. + let mut svc = match std::mem::replace(&mut self.state, State::Released) { + State::Acquired(svc) => svc, + _ => panic!("Called before ready"), + }; + + let fut = svc.call(req); + + // Return the service to the shared state, notifying waiters as needed. + // + // The service is dropped if the inner mutex has been poisoned, and + // subsequent calsl to poll_ready will return a Poisioned error. + if let Ok(mut shared) = self.shared.lock() { + trace!("Releasing acquired lock after use"); + shared.release_and_notify(svc); + } + + // The inner service's error type is *not* wrapped with a ServiceError. + fut.map_err(Into::into) + } +} + +impl Drop for Lock { + fn drop(&mut self) { + let state = std::mem::replace(&mut self.state, State::Released); + trace!(?state, "Dropping"); + match state { + // If this lock was holding the service, return it back back to the + // shared state so another lock may acquire it. Waiters are notified. + State::Acquired(service) => { + if let Ok(mut shared) = self.shared.lock() { + shared.release_and_notify(service); + } + } + + // If this lock is waiting but the waiter isn't registered, it must + // have been notified. Notify the next waiter to prevent deadlock. + State::Waiting(wait) => { + if let Ok(mut shared) = self.shared.lock() { + if wait.is_not_waiting() { + shared.notify_next_waiter(); + } + } + } + + // No state to cleanup. + State::Released | State::Failed(_) => {} + } + } +} + +impl std::fmt::Debug for State { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + State::Released => write!(f, "Released"), + State::Acquired(_) => write!(f, "Acquired(..)"), + State::Waiting(ref w) => write!(f, "Waiting(pending={})", w.is_waiting()), + State::Failed(ref e) => write!(f, "Failed({:?})", e), + } + } +} diff --git a/linkerd/lock/src/shared.rs b/linkerd/lock/src/shared.rs new file mode 100644 index 0000000000..17f3e4147a --- /dev/null +++ b/linkerd/lock/src/shared.rs @@ -0,0 +1,166 @@ +use self::waiter::Notify; +pub(crate) use self::waiter::Wait; +use crate::error::Error; +use futures::{Async, Poll}; +use std::sync::Arc; +use tracing::trace; + +/// The shared state between one or more Lock instances. +/// +/// When multiple lock instances are contending for the inner value, waiters are +/// stored in a LIFO stack. This is done to bias for latency instead of fairness +pub(crate) struct Shared { + state: State, + waiters: Vec, +} + +enum State { + /// A Lock is holding the value. + Claimed, + + /// The inner value is available. + Unclaimed(T), + + /// The lock has failed. + Failed(Arc), +} + +// === impl Shared === + +impl Shared { + pub fn new(value: T) -> Self { + Self { + waiters: Vec::new(), + state: State::Unclaimed(value), + } + } + + /// Try to claim a value without registering a waiter. + pub fn try_acquire(&mut self) -> Result, Arc> { + match std::mem::replace(&mut self.state, State::Claimed) { + // This lock has acquired the value. + State::Unclaimed(v) => { + trace!("acquired"); + Ok(Some(v)) + } + // The value is already claimed by a lock. + State::Claimed => Ok(None), + // The locks failed, so reset the state immediately so that all + // instances may be notified. + State::Failed(error) => { + self.state = State::Failed(error.clone()); + Err(error) + } + } + } + + /// Try to acquire a value or register the given waiter to be notified when + /// the lock is available. + pub fn poll_acquire(&mut self, wait: &Wait) -> Poll> { + match self.try_acquire() { + Ok(Some(svc)) => Ok(Async::Ready(svc)), + Ok(None) => { + // Register the current task to be notified. + wait.register(); + // Register the waiter's notify handle if one isn't already registered. + if let Some(notify) = wait.get_notify() { + trace!("Registering waiter"); + self.waiters.push(notify); + } + debug_assert!(wait.is_waiting()); + Ok(Async::NotReady) + } + Err(error) => Err(error), + } + } + + pub fn release_and_notify(&mut self, value: T) { + trace!(waiters = self.waiters.len(), "Releasing"); + debug_assert!(match self.state { + State::Claimed => true, + _ => false, + }); + self.state = State::Unclaimed(value); + self.notify_next_waiter(); + } + + pub fn notify_next_waiter(&mut self) { + while let Some(waiter) = self.waiters.pop() { + if waiter.notify() { + trace!("Notified waiter"); + return; + } + } + } + + pub fn fail(&mut self, error: Arc) { + trace!(waiters = self.waiters.len(), %error, "Failing"); + debug_assert!(match self.state { + State::Claimed => true, + _ => false, + }); + self.state = State::Failed(error); + + while let Some(waiter) = self.waiters.pop() { + waiter.notify(); + } + } +} + +mod waiter { + use futures::task::AtomicTask; + use std::sync::{Arc, Weak}; + + /// A handle held by Lock instances when waiting to be notified. + #[derive(Default)] + pub(crate) struct Wait(Arc); + + /// A handle held by shared lock state to notify a waiting Lock. + /// + /// There may be at most one `Notify` instance per `Wait` instance at any one + /// time. + pub(super) struct Notify(Weak); + + impl Wait { + /// If a `Notify` handle does not currently exist for this waiter, create + /// a new one. + pub(super) fn get_notify(&self) -> Option { + if self.is_not_waiting() { + let n = Notify(Arc::downgrade(&self.0)); + debug_assert!(self.is_waiting()); + Some(n) + } else { + None + } + } + + /// Register this waiter with the current task. + pub(super) fn register(&self) { + self.0.register(); + } + + /// Returns true iff there is currently a `Notify` handle for this waiter. + pub fn is_waiting(&self) -> bool { + Arc::weak_count(&self.0) == 1 + } + + /// Returns true iff there is not currently a `Notify` handle for this waiter. + pub fn is_not_waiting(&self) -> bool { + Arc::weak_count(&self.0) == 0 + } + } + + impl Notify { + /// Attempt to notify the waiter. + /// + /// Returns true if a waiter was notified and false otherwise. + pub fn notify(self) -> bool { + if let Some(task) = self.0.upgrade() { + task.notify(); + true + } else { + false + } + } + } +} diff --git a/linkerd/lock/src/test.rs b/linkerd/lock/src/test.rs new file mode 100644 index 0000000000..1f5968e5bf --- /dev/null +++ b/linkerd/lock/src/test.rs @@ -0,0 +1,254 @@ +use crate::error::ServiceError; +use crate::Lock; +use futures::{future, try_ready, Async, Future, Poll, Stream}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tokio::sync::{mpsc, oneshot}; +use tower::Service as _Service; +use tracing::{info_span, trace}; +use tracing_futures::Instrument; + +#[test] +fn exclusive_access() { + run(future::lazy(|| { + let ready = Arc::new(AtomicBool::new(false)); + let mut svc0 = Lock::new(Decr::new(2, ready.clone())); + + // svc0 grabs the lock, but the inner service isn't ready. + assert!(svc0.poll_ready().expect("must not fail").is_not_ready()); + + // Cloning a locked service does not preserve the lock. + let mut svc1 = svc0.clone(); + + // svc1 can't grab the lock. + assert!(svc1.poll_ready().expect("must not fail").is_not_ready()); + + // svc0 holds the lock and becomes ready with the inner service. + ready.store(true, Ordering::SeqCst); + assert!(svc0.poll_ready().expect("must not fail").is_ready()); + + // svc1 still can't grab the lock. + assert!(svc1.poll_ready().expect("must not fail").is_not_ready()); + + // svc0 remains ready. + let fut0 = svc0.call(1); + + // svc1 grabs the lock and is immediately ready. + assert!(svc1.poll_ready().expect("must not fail").is_ready()); + // svc0 cannot grab the lock. + assert!(svc0.poll_ready().expect("must not fail").is_not_ready()); + + let fut1 = svc1.call(1); + + fut0.join(fut1) + .map(|_| ()) + .map_err(|_| panic!("must not fail")) + })); +} + +#[test] +fn propagates_errors() { + run(future::lazy(|| { + let mut svc0 = Lock::new(Decr::from(1)); + + // svc0 grabs the lock and we decr the service so it will fail. + assert!(svc0.poll_ready().expect("must not fail").is_ready()); + // svc0 remains ready. + svc0.call(1) + .map_err(|_| panic!("must not fail")) + .map(move |_| { + // svc1 grabs the lock and fails immediately. + let mut svc1 = svc0.clone(); + assert!(svc1 + .poll_ready() + .expect_err("mut fail") + .downcast_ref::() + .expect("must fail with service error") + .inner() + .is::()); + + // svc0 suffers the same fate. + assert!(svc0 + .poll_ready() + .expect_err("mut fail") + .downcast_ref::() + .expect("must fail with service error") + .inner() + .is::()); + }) + })); +} + +#[test] +fn dropping_releases_access() { + use tower::util::ServiceExt; + run(future::lazy(|| { + let mut svc0 = Lock::new(Decr::new(3, Arc::new(true.into()))); + + // svc0 grabs the lock, but the inner service isn't ready. + assert!(svc0.poll_ready().expect("must not fail").is_ready()); + + let svc1 = svc0.clone(); + let (tx1, rx1) = oneshot::channel(); + tokio::spawn( + svc1.oneshot(1) + .then(move |_| { + trace!("complete"); + tx1.send(()).map_err(|_| ()) + }) + .instrument(info_span!("Svc1")), + ); + + let svc2 = svc0.clone(); + let (tx2, rx2) = oneshot::channel(); + tokio::spawn( + svc2.oneshot(1) + .then(move |_| { + trace!("complete"); + tx2.send(()).map_err(|_| ()) + }) + .instrument(info_span!("Svc2")), + ); + + // svc3 will be the notified waiter when svc0 completes; but it drops + // svc3 before polling the waiter. This test ensures that svc2 is + // notified by svc3's drop. + let svc3 = svc0.clone(); + let (tx3, rx3) = oneshot::channel(); + tokio::spawn(PollAndDrop(svc3, rx3).instrument(info_span!("Svc3"))); + + tokio::spawn( + svc0.ready() + .then(move |_| tx3.send(()).map_err(|_| ())) + .instrument(info_span!("Svc0")), + ); + // svc3 notified; but it is dropped before it can be polled + + rx2.then(move |_| rx1).map_err(|_| ()) + })); + + struct PollAndDrop(Lock, oneshot::Receiver<()>); + impl Future for PollAndDrop { + type Item = (); + type Error = (); + fn poll(&mut self) -> Poll { + trace!("Polling"); + if self.1.poll().map_err(|_| ())?.is_ready() { + trace!("Dropping"); + return Ok(Async::Ready(())); + } + self.0.poll_ready().map_err(|_| ()) + } + } +} + +#[test] +fn fuzz() { + const ITERS: usize = 100_000; + for (concurrency, iterations) in &[(1, ITERS), (3, ITERS), (100, ITERS)] { + tokio::run(future::lazy(move || { + let svc = Lock::new(Decr::new(*iterations, Arc::new(true.into()))); + let (tx, rx) = mpsc::channel(1); + for _ in 0..*concurrency { + tokio::spawn(Loop { + lock: svc.clone(), + _tx: tx.clone(), + }); + } + rx.fold((), |(), ()| Ok(())).map_err(|_| ()) + })); + } + + struct Loop { + lock: Lock, + _tx: mpsc::Sender<()>, + } + impl Future for Loop { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + loop { + try_ready!(self.lock.poll_ready().map_err(|_| ())); + + // Randomly be busy while holding the lock. + if rand::random::() { + futures::task::current().notify(); + return Ok(Async::NotReady); + } + + tokio::spawn(self.lock.call(1).then(|_| Ok(()))); + } + } + } +} + +fn run(future: F) +where + F: Future + 'static, +{ + let subscriber = tracing_subscriber::fmt::Subscriber::builder() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_writer(std::io::stdout) + .finish(); + tracing::subscriber::with_default(subscriber, move || { + tokio::runtime::current_thread::run(future) + }); +} + +#[derive(Debug, Default)] +struct Decr { + value: usize, + ready: Arc, +} + +#[derive(Copy, Clone, Debug)] +struct Underflow; + +impl From for Decr { + fn from(value: usize) -> Self { + Self::new(value, Arc::new(AtomicBool::new(true))) + } +} + +impl Decr { + fn new(value: usize, ready: Arc) -> Self { + Decr { value, ready } + } +} + +impl tower::Service for Decr { + type Response = usize; + type Error = Underflow; + type Future = futures::future::FutureResult; + + fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> { + if self.value == 0 { + return Err(Underflow); + } + + if !self.ready.load(Ordering::SeqCst) { + return Ok(Async::NotReady); + } + + Ok(().into()) + } + + fn call(&mut self, decr: usize) -> Self::Future { + if self.value < decr { + self.value = 0; + return futures::future::err(Underflow); + } + + self.value -= decr; + futures::future::ok(self.value) + } +} + +impl std::fmt::Display for Underflow { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "underflow") + } +} + +impl std::error::Error for Underflow {}