From d26b478871e16a1d779882a6acf4bcccf91e722f Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 24 Apr 2020 11:33:05 -0700 Subject: [PATCH 01/13] port lock to std::future Signed-off-by: Eliza Weisman --- Cargo.lock | 102 +++++----- linkerd/lock/Cargo.toml | 13 +- linkerd/lock/src/lib.rs | 1 - linkerd/lock/src/lock.rs | 115 ++++++------ linkerd/lock/src/service.rs | 36 ++-- linkerd/lock/src/shared.rs | 152 --------------- linkerd/lock/src/test.rs | 359 ++++++++++++++++++------------------ 7 files changed, 324 insertions(+), 454 deletions(-) delete mode 100644 linkerd/lock/src/shared.rs diff --git a/Cargo.lock b/Cargo.lock index 71194f3d4e..9598e2d5c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -123,9 +123,9 @@ dependencies = [ [[package]] name = "bytes" -version = "0.5.3" +version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10004c15deb332055f7a4a208190aed362cf9a7c2f6ab70a305fba50e1105f38" +checksum = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" [[package]] name = "c2-chacha" @@ -670,9 +670,6 @@ name = "lazy_static" version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc5729f27f159ddd61f4df6228e827e86643d4d3e7c32183cb30a1c08f604a14" -dependencies = [ - "spin", -] [[package]] name = "libc" @@ -719,7 +716,7 @@ dependencies = [ "ring", "rustls", "tokio 0.1.22", - "tokio 0.2.17", + "tokio 0.2.18", "tokio-compat", "tokio-connect", "tokio-current-thread", @@ -790,7 +787,7 @@ dependencies = [ "rand 0.7.2", "regex 1.0.0", "tokio 0.1.22", - "tokio 0.2.17", + "tokio 0.2.18", "tokio-compat", "tokio-timer", "tower 0.3.1", @@ -801,7 +798,7 @@ dependencies = [ "tower-request-modifier", "tower-spawn-ready", "tracing", - "tracing-futures 0.2.3", + "tracing-futures 0.2.4", "tracing-log", "tracing-subscriber", ] @@ -962,7 +959,7 @@ dependencies = [ "futures 0.3.4", "linkerd2-error", "pin-project", - "tokio 0.2.17", + "tokio 0.2.18", "tokio-test", ] @@ -1083,11 +1080,13 @@ dependencies = [ name = "linkerd2-lock" version = "0.1.0" dependencies = [ - "futures 0.1.26", + "futures 0.3.4", "linkerd2-error", "rand 0.7.2", - "tokio 0.1.22", - "tower 0.1.1", + "tokio 0.2.18", + "tokio-test", + "tower 0.3.1", + "tower-test 0.3.0", "tracing", "tracing-futures 0.1.0", "tracing-log", @@ -1172,9 +1171,9 @@ dependencies = [ "futures 0.3.4", "linkerd2-error", "pin-project", - "tokio 0.2.17", + "tokio 0.2.18", "tower 0.3.1", - "tracing-futures 0.2.3", + "tracing-futures 0.2.4", ] [[package]] @@ -1186,7 +1185,7 @@ dependencies = [ "linkerd2-io", "linkerd2-proxy-core", "pin-project", - "tokio 0.2.17", + "tokio 0.2.18", "tower 0.3.1", ] @@ -1232,7 +1231,7 @@ dependencies = [ "rand 0.7.2", "task-compat", "tokio 0.1.22", - "tokio 0.2.17", + "tokio 0.2.18", "tokio-connect", "tokio-timer", "tower 0.1.1", @@ -1435,7 +1434,7 @@ dependencies = [ "futures 0.3.4", "linkerd2-error", "pin-project", - "tokio 0.2.17", + "tokio 0.2.18", "tower 0.3.1", ] @@ -1483,7 +1482,7 @@ dependencies = [ "linkerd2-error", "linkerd2-stack", "pin-project", - "tokio 0.2.17", + "tokio 0.2.18", "tokio-connect", "tokio-test", "tower 0.3.1", @@ -1763,18 +1762,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "0.4.8" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7804a463a8d9572f13453c516a5faea534a2403d7ced2f0c7e100eeff072772c" +checksum = "6f6a7f5eee6292c559c793430c55c00aea9d3b3d1905e855806ca4d7253426a2" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "0.4.8" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "385322a45f2ecf3410c68d2a549a4a2685e8051d0f278e39743ff4e451cb9b3f" +checksum = "8988430ce790d8682672117bc06dda364c0be32d3abd738234f19f3240bad99a" dependencies = [ "proc-macro2 1.0.10", "quote 1.0.2", @@ -1789,9 +1788,9 @@ checksum = "237844750cfbb86f67afe27eee600dfbbcb6188d734139b534cbfbf4f96792ae" [[package]] name = "pin-utils" -version = "0.1.0-alpha.4" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5894c618ce612a3fa23881b152b608bafb8c56cfc22f434a3ba3120b40f7b587" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "ppv-lite86" @@ -2166,9 +2165,9 @@ dependencies = [ [[package]] name = "ring" -version = "0.16.7" +version = "0.16.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "796ae8317a07b04dffb1983bdc7045ccd02f741f0b411704f07fd35dbf99f757" +checksum = "1ba5a8ec64ee89a76c98c549af81ff14813df09c3e6dc4766c3856da48597a0c" dependencies = [ "cc", "lazy_static", @@ -2263,9 +2262,9 @@ dependencies = [ [[package]] name = "sharded-slab" -version = "0.0.8" +version = "0.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae75d0445b5d3778c9da3d1f840faa16d0627c8607f78a74daf69e5b988c39a1" +checksum = "06d5a3f5166fb5b42a5439f2eee8b9de149e235961e3eb21c5808fc3ea17ff3e" dependencies = [ "lazy_static", ] @@ -2444,11 +2443,11 @@ dependencies = [ [[package]] name = "tokio" -version = "0.2.17" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39fb9142eb6e9cc37f4f29144e62618440b149a138eee01a7bbe9b9226aaf17c" +checksum = "34ef16d072d2b6dc8b4a56c70f5c5ced1a37752116f8e7c1e80c659aa7cb6713" dependencies = [ - "bytes 0.5.3", + "bytes 0.5.4", "fnv", "futures-core", "lazy_static", @@ -2491,7 +2490,7 @@ dependencies = [ "futures-core", "futures-util", "pin-project-lite", - "tokio 0.2.17", + "tokio 0.2.18", "tokio-current-thread", "tokio-executor", "tokio-reactor", @@ -2635,13 +2634,13 @@ dependencies = [ [[package]] name = "tokio-test" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09cf9705471976fa5fc6817d3fbc9c4ff9696a6647af0e5c1870c81ca7445b05" +checksum = "ed0049c119b6d505c4447f5c64873636c7af6c75ab0d45fd9f618d82acb8016d" dependencies = [ - "bytes 0.5.3", + "bytes 0.5.4", "futures-core", - "tokio 0.2.17", + "tokio 0.2.18", ] [[package]] @@ -2711,13 +2710,13 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" dependencies = [ - "bytes 0.5.3", + "bytes 0.5.4", "futures-core", "futures-io", "futures-sink", "log", "pin-project-lite", - "tokio 0.2.17", + "tokio 0.2.18", ] [[package]] @@ -2746,7 +2745,7 @@ dependencies = [ "futures-core", "futures-util", "pin-project", - "tokio 0.2.17", + "tokio 0.2.18", "tower-layer 0.3.0 (git+https://github.com/tower-rs/tower?rev=8752a3811788e94670c62dc0acbc9613207931b1)", "tower-service 0.3.0", "tracing", @@ -2974,7 +2973,7 @@ checksum = "9ba4bbc2c1e4a8543c30d4c13a4c8314ed72d6e07581910f665aa13fde0153c8" dependencies = [ "futures-util", "pin-project", - "tokio 0.2.17", + "tokio 0.2.18", "tokio-test", "tower-layer 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "tower-service 0.3.0", @@ -3029,12 +3028,11 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.6" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "528c8ebaaa16cdac34795180b046c031775b0d56402704d98c096788f33d646a" +checksum = "0aa83a9a47081cd522c09c81b31aec2c9273424976f922ad61c053b58350b715" dependencies = [ "lazy_static", - "spin", ] [[package]] @@ -3051,9 +3049,9 @@ dependencies = [ [[package]] name = "tracing-futures" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58b0b7fd92dc7b71f29623cc6836dd7200f32161a2313dd78be233a8405694f6" +checksum = "ab7bb6f14721aa00656086e9335d363c5c8747bae02ebe32ea2c7dece5689b4c" dependencies = [ "futures 0.1.26", "pin-project", @@ -3083,9 +3081,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.2.3" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dedebcf5813b02261d6bab3a12c6a8ae702580c0405a2e8ec16c3713caf14c20" +checksum = "1d53c40489aa69c9aed21ff483f26886ca8403df33bdc2d2f87c60c1617826d2" dependencies = [ "ansi_term", "chrono", @@ -3333,8 +3331,9 @@ dependencies = [ [[package]] name = "webpki" -version = "0.21.0" -source = "git+https://github.com/seanmonstar/webpki?branch=cert-dns-names-0.21#c4d77fd78a48a5daf05fd7ce2c18d34f9a077e4a" +version = "0.21.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1f50e1972865d6b1adb54167d1c8ed48606004c2c9d0ea5f1eeb34d95e863ef" dependencies = [ "ring", "untrusted", @@ -3426,3 +3425,8 @@ dependencies = [ "winapi 0.2.8", "winapi-build", ] + +[[patch.unused]] +name = "webpki" +version = "0.21.0" +source = "git+https://github.com/seanmonstar/webpki?branch=cert-dns-names-0.21#c4d77fd78a48a5daf05fd7ce2c18d34f9a077e4a" diff --git a/linkerd/lock/Cargo.toml b/linkerd/lock/Cargo.toml index 4ab4ef0224..98ea84b8a3 100644 --- a/linkerd/lock/Cargo.toml +++ b/linkerd/lock/Cargo.toml @@ -9,14 +9,17 @@ A middleware that provides mutual exclusion. """ [dependencies] -futures = "0.1" +futures = "0.3" linkerd2-error = { path = "../error" } -tower = "0.1" +tower = { version = "0.3", default-features = false } tracing = "0.1" +tokio = { version = "0.2", features = ["sync", "macros"] } [dev-dependencies] rand = "0.7" -tokio = "0.1" -tracing-futures = "0.1" +tracing-futures = { version = "0.1", features = ["std-future"] } tracing-log = "0.1" -tracing-subscriber = "0.2.3" +tracing-subscriber = "0.2.5" +tower = { version = "0.3", default-features = false, features = ["util"] } +tokio-test = "0.2" +tower-test = "0.3" \ No newline at end of file diff --git a/linkerd/lock/src/lib.rs b/linkerd/lock/src/lib.rs index 1692281d0e..5de5d14178 100644 --- a/linkerd/lock/src/lib.rs +++ b/linkerd/lock/src/lib.rs @@ -6,7 +6,6 @@ pub mod error; mod layer; mod lock; mod service; -mod shared; #[cfg(test)] mod test; diff --git a/linkerd/lock/src/lock.rs b/linkerd/lock/src/lock.rs index d3f0f6dbcc..e4afdac507 100644 --- a/linkerd/lock/src/lock.rs +++ b/linkerd/lock/src/lock.rs @@ -1,32 +1,34 @@ -use crate::shared::{Shared, Wait}; -use futures::Async; -use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; +use std::{cell::UnsafeCell, future::Future, pin::Pin, sync::Arc}; +use tokio::sync::Semaphore; /// Provides mutually exclusive to a `T`-typed value, asynchronously. -/// -/// Note that, when the lock is contested, waiters are notified in a LIFO -/// fashion. This is done to minimize latency at the expense of unfairness. pub struct Lock { /// Set when this Lock is interested in acquiring the value. - waiting: Option, - shared: Arc>>, + waiting: Option> + 'static>>>, + shared: Arc>, } /// Guards access to a `T`-typed value, ensuring the value is released on Drop. pub struct Guard { - /// Must always be Some; Used to reclaim the value in Drop. - value: Option, + shared: Arc>, +} - shared: Arc>>, +struct Shared { + sem: Semaphore, + value: UnsafeCell, } // === impl Lock === impl Lock { - pub fn new(service: S) -> Self { + pub fn new(value: S) -> Self { Self { waiting: None, - shared: Arc::new(Mutex::new(Shared::new(service))), + shared: Arc::new(Shared { + sem: Semaphore::new(1), + value: UnsafeCell::new(value), + }), } } } @@ -41,62 +43,52 @@ impl Clone for Lock { } } -impl Lock { - fn guard(&self, value: T) -> Guard { - Guard { - value: Some(value), - shared: self.shared.clone(), - } - } - - pub fn poll_acquire(&mut self) -> Async> { - let mut shared = self.shared.lock().expect("Lock poisoned"); - +impl Lock { + pub fn poll_acquire(&mut self, cx: &mut Context<'_>) -> Poll> { loop { self.waiting = match self.waiting { // This instance has not registered interest in the lock. - None => match shared.acquire() { - // This instance has exclusive access to the inner service. - Some(value) => { - // The state remains Released. - return Async::Ready(self.guard(value)); - } - None => Some(Wait::default()), - }, + None => { + let shared = self.shared.clone(); + Some(Box::pin(async move { + let permit = shared.sem.acquire().await; + // We cannot use the `tokio::sync::semaphore::Permit` + // type for the release-on-drop behavior, because it borrows + // the semaphore. Instead, we will manually release the + // permit when dropping a guard. + permit.forget(); + Guard { shared } + })) + } // This instance is interested in the lock. - Some(ref waiter) => match shared.poll_acquire(waiter) { - Async::NotReady => return Async::NotReady, - Async::Ready(value) => { - self.waiting = None; - return Async::Ready(self.guard(value)); - } - }, + Some(ref mut waiter) => { + tokio::pin!(waiter); + let guard = futures::ready!(waiter.poll(cx)); + self.waiting = None; + return Poll::Ready(guard); + } }; } } } -impl Drop for Lock { - fn drop(&mut self) { - if let Some(wait) = self.waiting.take() { - if let Ok(mut shared) = self.shared.lock() { - shared.release_waiter(wait); - } - } - } -} - impl std::ops::Deref for Guard { type Target = T; fn deref(&self) -> &Self::Target { - self.value.as_ref().expect("Value dropped from guard") + // Safety: creating a `Guard` means that the single permit in the + // semaphore has been acquired and we have exclusive access to the + // value. + unsafe { &*self.shared.value.get() } } } impl std::ops::DerefMut for Guard { fn deref_mut(&mut self) -> &mut Self::Target { - self.value.as_mut().expect("Value dropped from guard") + // Safety: creating a `Guard` means that the single permit in the + // semaphore has been acquired and we have exclusive access to the + // value. + unsafe { &mut *self.shared.value.get() } } } @@ -112,11 +104,24 @@ impl std::fmt::Display for Guard { } } +impl Drop for Lock { + fn drop(&mut self) { + // Release the single permit back to the semaphore. + self.shared.sem.add_permits(0); + } +} + impl Drop for Guard { fn drop(&mut self) { - let value = self.value.take().expect("Guard may not be dropped twice"); - if let Ok(mut shared) = self.shared.lock() { - shared.release_and_notify(value); - } + // Release the single permit back to the semaphore. + self.shared.sem.add_permits(1); } } + +// Safety: As long as T: Send, it's fine to send and share Lock between threads. +// If T was not Send, sending and sharing a Lock would be bad, since you can access T through +// Lock. +unsafe impl Send for Lock {} +unsafe impl Sync for Lock {} +unsafe impl Send for Guard {} +unsafe impl Sync for Guard {} diff --git a/linkerd/lock/src/service.rs b/linkerd/lock/src/service.rs index f30cabcca9..e644c004dc 100644 --- a/linkerd/lock/src/service.rs +++ b/linkerd/lock/src/service.rs @@ -1,7 +1,8 @@ use crate::error::{Error, ServiceError}; use crate::{Guard, Lock}; -use futures::{future, Async, Future, Poll}; +use futures::{future, TryFutureExt}; use std::sync::Arc; +use std::task::{Context, Poll}; use tracing::trace; /// A middleware that safely shares an inner service among clones. @@ -33,43 +34,44 @@ impl Clone for LockService { impl tower::Service for LockService where - S: tower::Service, + S: tower::Service + 'static, S::Error: Into, { type Response = S::Response; type Error = Error; type Future = future::MapErr Self::Error>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { loop { trace!(acquired = self.guard.is_some()); if let Some(guard) = self.guard.as_mut() { return match guard.as_mut() { - Err(err) => Err(err.clone().into()), - Ok(ref mut svc) => match svc.poll_ready() { - Ok(ok) => { - trace!(ready = ok.is_ready()); - Ok(ok) - } - Err(inner) => { + Err(err) => Poll::Ready(Err(err.clone().into())), + Ok(ref mut svc) => match svc.poll_ready(cx) { + Poll::Ready(Err(inner)) => { let error = ServiceError::new(Arc::new(inner.into())); **guard = Err(error.clone()); // Drop the guard. self.guard = None; - Err(error.into()) + Poll::Ready(Err(error.into())) + } + Poll::Pending => { + trace!(ready = false); + Poll::Pending + } + Poll::Ready(Ok(())) => { + trace!(ready = true); + debug_assert!(self.guard.is_some()); + Poll::Ready(Ok(())) } }, }; } debug_assert!(self.guard.is_none()); - match self.lock.poll_acquire() { - Async::NotReady => return Ok(Async::NotReady), - Async::Ready(guard) => { - self.guard = Some(guard); - } - } + let guard = futures::ready!(self.lock.poll_acquire(cx)); + self.guard = Some(guard); } } diff --git a/linkerd/lock/src/shared.rs b/linkerd/lock/src/shared.rs deleted file mode 100644 index a3cf3c699f..0000000000 --- a/linkerd/lock/src/shared.rs +++ /dev/null @@ -1,152 +0,0 @@ -use self::waiter::Notify; -pub(crate) use self::waiter::Wait; -use futures::Async; -use tracing::trace; - -/// The shared state between one or more Lock instances. -/// -/// When multiple lock instances are contending for the inner value, waiters are stored in a LIFO -/// stack. This is done to bias for latency instead of fairness -/// -/// N.B. Waiters capacity is released lazily as waiters are notified and, i.e., not when a waiter is -/// dropped. In high-load scenarios where the lock _always_ has new waiters, this could potentially -/// manifest as unbounded memory growth. This situation is not expected to arise in normal operation. -pub(crate) struct Shared { - /// Set when the value is available to be acquired; None when the value is acquired. - value: Option, - - /// A LIFO stack of waiters to be notified when the value is available. - waiters: Vec, -} - -// === impl Shared === - -impl Shared { - pub fn new(value: T) -> Self { - Self { - waiters: Vec::new(), - value: Some(value), - } - } - - /// Try to claim a value without registering a waiter. - /// - /// Once a value is acquired it **must** be returned via `release_and_notify`. - pub fn acquire(&mut self) -> Option { - trace!(acquired = %self.value.is_some()); - self.value.take() - } - - /// Try to acquire a value or register the given waiter to be notified when - /// the lock is available. - /// - /// Once a value is acquired it **must** be returned via `release_and_notify`. - /// - /// If `Async::NotReady` is returned, the polling task, once notified, **must** either call - /// `poll_acquire` again to obtain a value, or the waiter **must** be returned via - /// `release_waiter`. - pub fn poll_acquire(&mut self, wait: &Wait) -> Async { - if let Some(value) = self.acquire() { - return Async::Ready(value); - } - - // Register the current task to be notified. - wait.register(); - // Register the waiter's notify handle if one isn't already registered. - if let Some(notify) = wait.get_notify() { - self.waiters.push(notify); - } - debug_assert!(wait.has_notify()); - - trace!(waiters = self.waiters.len(), "Waiting"); - Async::NotReady - } - - pub fn release_and_notify(&mut self, value: T) { - trace!(waiters = self.waiters.len(), "Releasing"); - assert!(self.value.is_none()); - self.value = Some(value); - self.notify_next_waiter(); - } - - pub fn release_waiter(&mut self, wait: Wait) { - // If a waiter is being released and it does not have a notify, then it must be being - // released after being notified. Notify the next waiter to prevent deadlock. - if self.value.is_some() { - if !wait.has_notify() { - self.notify_next_waiter(); - } - } - } - - fn notify_next_waiter(&mut self) { - while let Some(waiter) = self.waiters.pop() { - if waiter.notify() { - trace!("Notified waiter"); - return; - } - } - } -} - -mod waiter { - use futures::task::AtomicTask; - use std::sync::{Arc, Weak}; - - /// A handle held by Lock instances when waiting to be notified. - #[derive(Default)] - pub(crate) struct Wait(Arc); - - /// A handle held by shared lock state to notify a waiting Lock. - /// - /// There may be at most one `Notify` instance per `Wait` instance at any one time. - pub(super) struct Notify(Weak); - - impl Wait { - /// If a `Notify` handle does not currently exist for this waiter, create a new one. - pub(super) fn get_notify(&self) -> Option { - if !self.has_notify() { - let n = Notify(Arc::downgrade(&self.0)); - debug_assert!(self.has_notify()); - Some(n) - } else { - None - } - } - - /// Register this waiter with the current task. - pub(super) fn register(&self) { - self.0.register(); - } - - /// Returns true iff there is currently a `Notify` handle for this waiter. - pub(super) fn has_notify(&self) -> bool { - let weaks = Arc::weak_count(&self.0); - debug_assert!( - weaks == 0 || weaks == 1, - "There must only be at most one Notify per Wait" - ); - weaks == 1 - } - } - - impl std::fmt::Debug for Wait { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Wait(notify={})", self.has_notify()) - } - } - - impl Notify { - /// Attempt to notify the waiter. - /// - /// Returns true if a waiter was notified and false otherwise. - pub fn notify(self) -> bool { - if let Some(task) = self.0.upgrade() { - task.notify(); - true - } else { - false - } - } - } -} diff --git a/linkerd/lock/src/test.rs b/linkerd/lock/src/test.rs index 398398d3d0..f294315f8a 100644 --- a/linkerd/lock/src/test.rs +++ b/linkerd/lock/src/test.rs @@ -1,199 +1,200 @@ -// use crate::error::ServiceError; +use crate::error::ServiceError; use crate::LockService; -use futures::{future, try_ready, Async, Future, Poll, Stream}; +use futures::{StreamExt, future}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::task::{Context, Poll}; +use std::{future::Future, pin::Pin, sync::Arc}; use tokio::sync::{mpsc, oneshot}; +use tokio_test::{assert_pending, assert_ready, assert_ready_ok}; use tower::Service as _Service; +use tower_test::mock::Spawn; use tracing::{info_span, trace}; use tracing_futures::Instrument; -#[test] -fn exclusive_access() { - run(future::lazy(|| { - let ready = Arc::new(AtomicBool::new(false)); - let mut svc0 = LockService::new(Decr::new(2, ready.clone())); +#[tokio::test] +async fn exclusive_access() { + let ready = Arc::new(AtomicBool::new(false)); + let mut svc0 = Spawn::new(LockService::new(Decr::new(2, ready.clone()))); - // svc0 grabs the lock, but the inner service isn't ready. - assert!(svc0.poll_ready().expect("must not fail").is_not_ready()); + // svc0 grabs the lock, but the inner service isn't ready. + assert_pending!(svc0.poll_ready()); - // Cloning a locked service does not preserve the lock. - let mut svc1 = svc0.clone(); + // Cloning a locked service does not preserve the lock. + let mut svc1 = svc0.clone(); - // svc1 can't grab the lock. - assert!(svc1.poll_ready().expect("must not fail").is_not_ready()); + // svc1 can't grab the lock. + assert_pending!(svc1.poll_ready()); - // svc0 holds the lock and becomes ready with the inner service. - ready.store(true, Ordering::SeqCst); - assert!(svc0.poll_ready().expect("must not fail").is_ready()); + // svc0 holds the lock and becomes ready with the inner service. + ready.store(true, Ordering::SeqCst); + assert_ready_ok!(svc0.poll_ready()); - // svc1 still can't grab the lock. - assert!(svc1.poll_ready().expect("must not fail").is_not_ready()); + // svc1 still can't grab the lock. + assert_pending!(svc1.poll_ready()); - // svc0 remains ready. - let fut0 = svc0.call(1); + // svc0 remains ready. + let fut0 = svc0.call(1); - // svc1 grabs the lock and is immediately ready. - assert!(svc1.poll_ready().expect("must not fail").is_ready()); - // svc0 cannot grab the lock. - assert!(svc0.poll_ready().expect("must not fail").is_not_ready()); + // svc1 grabs the lock and is immediately ready. + assert_ready_ok!(svc1.poll_ready()); + // svc0 cannot grab the lock. + assert_pending!(svc0.poll_ready()); - let fut1 = svc1.call(1); + let fut1 = svc1.call(1); - fut0.join(fut1) - .map(|_| ()) - .map_err(|_| panic!("must not fail")) - })); + tokio::try_join!(fut0, fut1).expect("must not fail!"); } -// #[test] -// fn propagates_errors() { -// run(future::lazy(|| { -// let mut svc0 = LockService::new(Decr::from(1)); - -// // svc0 grabs the lock and we decr the service so it will fail. -// assert!(svc0.poll_ready().expect("must not fail").is_ready()); -// // svc0 remains ready. -// svc0.call(1) -// .map_err(|_| panic!("must not fail")) -// .map(move |_| { -// // svc1 grabs the lock and fails immediately. -// let mut svc1 = svc0.clone(); -// assert!(svc1 -// .poll_ready() -// .expect_err("mut fail") -// .downcast_ref::() -// .expect("must fail with service error") -// .inner() -// .is::()); - -// // svc0 suffers the same fate. -// assert!(svc0 -// .poll_ready() -// .expect_err("mut fail") -// .downcast_ref::() -// .expect("must fail with service error") -// .inner() -// .is::()); -// }) -// })); -// } - -#[test] -fn dropping_releases_access() { +#[tokio::test] +async fn propagates_errors() { + let mut svc0 = Spawn::new(LockService::new(Decr::from(1))); + + // svc0 grabs the lock and we decr the service so it will fail. + assert_ready_ok!(svc0.poll_ready()); + + // svc0 remains ready. + let _ = svc0.call(1).await.expect("must not fail!"); + + // svc1 grabs the lock and fails immediately. + let mut svc1 = svc0.clone(); + assert_ready!(svc1.poll_ready()) + .expect_err("must fail") + .downcast_ref::() + .expect("must fail with service error") + .inner() + .is::(); + + // svc0 suffers the same fate. + assert_ready!(svc0.poll_ready()) + .expect_err("mut fail") + .downcast_ref::() + .expect("must fail with service error") + .inner() + .is::(); +} + +#[tokio::test] +async fn dropping_releases_access() { + let _ = tracing_subscriber::fmt().with_env_filter(tracing_subscriber::EnvFilter::from_default_env()).try_init(); use tower::util::ServiceExt; - run(future::lazy(|| { - let mut svc0 = LockService::new(Decr::new(3, Arc::new(true.into()))); - - // svc0 grabs the lock, but the inner service isn't ready. - assert!(svc0.poll_ready().expect("must not fail").is_ready()); - - let svc1 = svc0.clone(); - let (tx1, rx1) = oneshot::channel(); - tokio::spawn( - svc1.oneshot(1) - .then(move |_| { - trace!("complete"); - tx1.send(()).map_err(|_| ()) - }) - .instrument(info_span!("Svc1")), - ); - - let svc2 = svc0.clone(); - let (tx2, rx2) = oneshot::channel(); - tokio::spawn( - svc2.oneshot(1) - .then(move |_| { - trace!("complete"); - tx2.send(()).map_err(|_| ()) - }) - .instrument(info_span!("Svc2")), - ); - - // svc3 will be the notified waiter when svc0 completes; but it drops - // svc3 before polling the waiter. This test ensures that svc2 is - // notified by svc3's drop. - let svc3 = svc0.clone(); - let (tx3, rx3) = oneshot::channel(); - tokio::spawn(PollAndDrop(svc3, rx3).instrument(info_span!("Svc3"))); - - tokio::spawn( - svc0.ready() - .then(move |_| tx3.send(()).map_err(|_| ())) - .instrument(info_span!("Svc0")), - ); - // svc3 notified; but it is dropped before it can be polled - - rx2.then(move |_| rx1).map_err(|_| ()) - })); - - struct PollAndDrop(LockService, oneshot::Receiver<()>); - impl Future for PollAndDrop { - type Item = (); - type Error = (); - fn poll(&mut self) -> Poll { - trace!("Polling"); - if self.1.poll().map_err(|_| ())?.is_ready() { - trace!("Dropping"); - return Ok(Async::Ready(())); - } - self.0.poll_ready().map_err(|_| ()) + let mut svc0 = LockService::new(Decr::new(3, Arc::new(true.into()))); + + // svc0 grabs the lock, but the inner service isn't ready. + future::poll_fn(|cx| { + assert_ready_ok!(svc0.poll_ready(cx)); + Poll::Ready(()) + }) + .await; + + let svc1 = svc0.clone(); + + let (tx1, rx1) = oneshot::channel(); + tokio::spawn( + async move { + let svc1 = svc1; + trace!("started"); + let _f = svc1.oneshot(1).instrument(info_span!("1shot")).await; + trace!("sending"); + tx1.send(()).unwrap(); + trace!("done"); } - } + .instrument(info_span!("Svc1")), + ); + + let (tx2, rx2) = oneshot::channel(); + let svc2 = svc0.clone(); + tokio::spawn( + async move { + trace!("started"); + let _ = svc2.oneshot(1).await; + trace!("sending"); + tx2.send(()).unwrap(); + trace!("done"); + } + .instrument(info_span!("Svc2")), + ); + + // svc3 will be the notified waiter when svc0 completes; but it drops + // svc3 before polling the waiter. This test ensures that svc2 is + // notified by svc3's drop. + let svc3 = svc0.clone(); + let (tx3, rx3) = oneshot::channel(); + tokio::spawn( + async move { + let mut svc3 = Some(svc3); + let mut rx3 = rx3; + let _ = future::poll_fn(|cx| { + let rx3 = &mut rx3; + tokio::pin!(rx3); + trace!("Polling"); + if let Poll::Ready(ready) = rx3.poll(cx) { + trace!(?ready, "Dropping"); + drop(svc3.take()); + return Poll::Ready(Ok(())); + } + svc3.as_mut() + .expect("polled after ready?") + .poll_ready(cx) + .map_err(|_| ()) + }) + .await; + } + .instrument(info_span!("Svc3")), + ); + + tokio::spawn( + async move { + trace!("started"); + let _ = svc0.ready_and().await; + trace!("ready"); + tx3.send(()).map_err(|_| ())?; + trace!("sent"); + Ok::<(), ()>(()) + } + .instrument(info_span!("Svc0")), + ); + // svc3 notified; but it is dropped before it can be polled + + rx2.await.unwrap(); + rx1.await.unwrap(); } -#[test] -fn fuzz() { +#[tokio::test] +async fn fuzz() { + let _ = tracing_subscriber::fmt().with_env_filter(tracing_subscriber::EnvFilter::from_default_env()).try_init(); const ITERS: usize = 100_000; - for (concurrency, iterations) in &[(1, ITERS), (3, ITERS), (100, ITERS)] { - tokio::run(future::lazy(move || { + for (concurrency, iterations) in &[(1usize, ITERS), (3, ITERS), (100, ITERS)] { + async { + tracing::info!("starting"); let svc = LockService::new(Decr::new(*iterations, Arc::new(true.into()))); let (tx, rx) = mpsc::channel(1); - for _ in 0..*concurrency { - tokio::spawn(Loop { - lock: svc.clone(), - _tx: tx.clone(), - }); + for i in 0..*concurrency { + let lock = svc.clone(); + let tx = tx.clone(); + tokio::spawn(async move { + let mut lock = lock; + let _tx = tx; + future::poll_fn::, _>(|cx| { + loop { + futures::ready!(lock.poll_ready(cx)).map_err(|_|())?; + + // Randomly be busy while holding the lock. + if rand::random::() { + cx.waker().wake_by_ref(); + return Poll::Pending; + } + + tokio::spawn(lock.call(1)); + } + }).await + }.instrument(tracing::trace_span!("task", number = i))); } - rx.fold((), |(), ()| Ok(())).map_err(|_| ()) - })); - } - struct Loop { - lock: LockService, - _tx: mpsc::Sender<()>, + rx.fold((), |(), ()| async { () }).await; + tracing::info!("done"); + }.instrument(tracing::info_span!("fuzz", concurrency, iterations)) + .await } - impl Future for Loop { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll { - loop { - try_ready!(self.lock.poll_ready().map_err(|_| ())); - - // Randomly be busy while holding the lock. - if rand::random::() { - futures::task::current().notify(); - return Ok(Async::NotReady); - } - - tokio::spawn(self.lock.call(1).then(|_| Ok(()))); - } - } - } -} - -fn run(future: F) -where - F: Future + 'static, -{ - let subscriber = tracing_subscriber::fmt::Subscriber::builder() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .with_writer(std::io::stdout) - .finish(); - tracing::subscriber::with_default(subscriber, move || { - tokio::runtime::current_thread::run(future) - }); } #[derive(Debug, Default)] @@ -220,28 +221,36 @@ impl Decr { impl tower::Service for Decr { type Response = usize; type Error = Underflow; - type Future = futures::future::FutureResult; + type Future = + Pin> + Send + Sync + 'static>>; - fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> { + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + let span = tracing::trace_span!("Decr::poll_ready", self.value); + let _g = span.enter(); if self.value == 0 { - return Err(Underflow); + tracing::trace!(ready = true, "underflow"); + return Poll::Ready(Err(Underflow)); } if !self.ready.load(Ordering::SeqCst) { - return Ok(Async::NotReady); + tracing::trace!(ready = false); + return Poll::Pending; } - Ok(().into()) + tracing::trace!(ready = true); + Poll::Ready(Ok(())) } fn call(&mut self, decr: usize) -> Self::Future { if self.value < decr { self.value = 0; - return futures::future::err(Underflow); + + return Box::pin(async { Err(Underflow) }); } self.value -= decr; - futures::future::ok(self.value) + let value = self.value; + Box::pin(async move { Ok(value) }) } } From f084934d8cdac8ae2c00d79007a0e837515da2a9 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 24 Apr 2020 11:51:54 -0700 Subject: [PATCH 02/13] port cache to std::future Signed-off-by: Eliza Weisman --- Cargo.lock | 16 ++++--------- linkerd/cache/Cargo.toml | 6 ++--- linkerd/cache/benches/cache.rs | 31 +++++++++++++----------- linkerd/cache/src/layer.rs | 6 ++--- linkerd/cache/src/lib.rs | 43 +++++++++++++++------------------- 5 files changed, 47 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9598e2d5c8..0500b2d437 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -906,12 +906,12 @@ dependencies = [ name = "linkerd2-cache" version = "0.1.0" dependencies = [ - "futures 0.1.26", + "futures 0.3.4", "linkerd2-error", "linkerd2-lock", "linkerd2-stack", - "tokio 0.1.22", - "tower 0.1.1", + "tokio 0.2.18", + "tower 0.3.1", "tracing", "tracing-futures 0.1.0", ] @@ -3331,9 +3331,8 @@ dependencies = [ [[package]] name = "webpki" -version = "0.21.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1f50e1972865d6b1adb54167d1c8ed48606004c2c9d0ea5f1eeb34d95e863ef" +version = "0.21.0" +source = "git+https://github.com/seanmonstar/webpki?branch=cert-dns-names-0.21#c4d77fd78a48a5daf05fd7ce2c18d34f9a077e4a" dependencies = [ "ring", "untrusted", @@ -3425,8 +3424,3 @@ dependencies = [ "winapi 0.2.8", "winapi-build", ] - -[[patch.unused]] -name = "webpki" -version = "0.21.0" -source = "git+https://github.com/seanmonstar/webpki?branch=cert-dns-names-0.21#c4d77fd78a48a5daf05fd7ce2c18d34f9a077e4a" diff --git a/linkerd/cache/Cargo.toml b/linkerd/cache/Cargo.toml index a2b5139d2f..6bd6a9aa78 100644 --- a/linkerd/cache/Cargo.toml +++ b/linkerd/cache/Cargo.toml @@ -6,11 +6,11 @@ edition = "2018" publish = false [dependencies] -futures = "0.1" +futures = "0.3" linkerd2-error = { path = "../error" } linkerd2-lock = { path = "../lock" } linkerd2-stack = { path = "../stack" } -tokio = "0.1" -tower = "0.1" +tokio = "0.2" +tower = { version = "0.3", default-features = false, features = ["util"] } tracing = "0.1" tracing-futures = "0.1" diff --git a/linkerd/cache/benches/cache.rs b/linkerd/cache/benches/cache.rs index 84db80da1c..5f651ff61d 100644 --- a/linkerd/cache/benches/cache.rs +++ b/linkerd/cache/benches/cache.rs @@ -1,11 +1,12 @@ #![feature(test)] - -extern crate linkerd2_cache; -use futures::{future, Async, Future, Poll}; +use futures::future; use linkerd2_cache::{Cache, Handle}; use linkerd2_error::Never; use linkerd2_stack::NewService; +use std::future::Future; +use std::task::{Context, Poll}; use tower::Service; +use tower::util::ServiceExt; extern crate test; use test::Bencher; @@ -37,10 +38,10 @@ struct NewSometimesEvict; impl Service for NeverEvict { type Response = usize; type Error = Never; - type Future = futures::future::FutureResult; + type Future = future::Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, rhs: usize) -> Self::Future { @@ -62,10 +63,10 @@ impl NewService<(usize, Handle)> for NewNeverEvict { impl Service for AlwaysEvict { type Response = usize; type Error = Never; - type Future = futures::future::FutureResult; + type Future = future::Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, rhs: usize) -> Self::Future { @@ -84,10 +85,10 @@ impl NewService<(usize, Handle)> for NewAlwaysEvict { impl Service for SometimesEvict { type Response = usize; type Error = Never; - type Future = futures::future::FutureResult; + type Future = future::Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, rhs: usize) -> Self::Future { @@ -122,8 +123,10 @@ where let mut cache = Cache::new(new_svc); b.iter(|| { for n in 0..num_svc { - cache.poll_ready().unwrap(); - let _r = cache.call(n).wait().unwrap().call(n); + futures::executor::block_on(async { + let mut svc = cache.ready_and().await.unwrap(); + svc.call(n).await.unwrap().call(n).await.unwrap(); + }) } }); } diff --git a/linkerd/cache/src/layer.rs b/linkerd/cache/src/layer.rs index 34b0865a7d..d07a1211cd 100644 --- a/linkerd/cache/src/layer.rs +++ b/linkerd/cache/src/layer.rs @@ -1,7 +1,7 @@ use crate::Cache; use crate::Handle; -use futures::Poll; use linkerd2_stack::NewService; +use std::task::{Context, Poll}; pub struct CacheLayer { track_layer: L, @@ -85,8 +85,8 @@ impl> tower::Service for Track { type Error = S::Error; type Future = S::Future; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.inner.poll_ready() + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) } fn call(&mut self, req: T) -> Self::Future { diff --git a/linkerd/cache/src/lib.rs b/linkerd/cache/src/lib.rs index da0b2fb1f6..a232a915ec 100644 --- a/linkerd/cache/src/lib.rs +++ b/linkerd/cache/src/lib.rs @@ -1,12 +1,12 @@ #![deny(warnings, rust_2018_idioms)] - -use futures::{future, Async, Poll}; +use futures::future; use linkerd2_error::Never; use linkerd2_lock::{Guard, Lock}; use linkerd2_stack::NewService; use std::collections::HashMap; use std::hash::Hash; use std::sync::{Arc, Weak}; +use std::task::{Context, Poll}; use tracing::{debug, trace}; pub mod layer; @@ -63,38 +63,33 @@ where impl tower::Service for Cache where - T: Clone + Eq + Hash, + T: Clone + Eq + Hash + 'static, N: NewService<(T, Handle)>, - N::Service: Clone, + N::Service: Clone + 'static, { type Response = N::Service; type Error = Never; - type Future = future::FutureResult; + type Future = future::Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { if self.guard.is_none() { - match self.lock.poll_acquire() { - Async::NotReady => return Ok(Async::NotReady), - Async::Ready(mut services) => { - // Drop defunct services before interacting with the cache. - let n = services.len(); - services.retain(|_, (_, weak)| { - if weak.strong_count() > 0 { - true - } else { - debug!("Dropping defunct service"); - false - } - }); - trace!(services = services.len(), dropped = n - services.len()); - - self.guard = Some(services); + let mut services = futures::ready!(self.lock.poll_acquire(cx)); + // Drop defunct services before interacting with the cache. + let n = services.len(); + services.retain(|_, (_, weak)| { + if weak.strong_count() > 0 { + true + } else { + trace!("Dropping defunct service"); + false } - } + }); + debug!(services = services.len(), dropped = n - services.len()); + self.guard = Some(services); } debug_assert!(self.guard.is_some(), "guard must be acquired"); - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } fn call(&mut self, target: T) -> Self::Future { From 30b537e7490eafc91ba010a0085b5d669ef6f720 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 24 Apr 2020 12:01:10 -0700 Subject: [PATCH 03/13] bring back push Signed-off-by: Eliza Weisman --- linkerd/app/core/src/svc.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/linkerd/app/core/src/svc.rs b/linkerd/app/core/src/svc.rs index 5eeb552a46..7db068a49c 100644 --- a/linkerd/app/core/src/svc.rs +++ b/linkerd/app/core/src/svc.rs @@ -251,15 +251,16 @@ impl Stack { // self.push(http::insert::target::layer()) // } - // pub fn cache(self, track: L) -> Stack>> - // where - // T: Eq + std::hash::Hash, - // S: NewService + Clone, - // L: tower::layer::Layer> + Clone, - // L::Service: NewService, - // { - // self.push(cache::CacheLayer::new(track)) - // } + pub fn cache(self, track: L) -> Stack>> + where + T: Eq + std::hash::Hash + 'static, + S: NewService + Clone, + S::Service: 'static, + L: tower::layer::Layer> + Clone, + L::Service: NewService, + { + self.push(cache::CacheLayer::new(track)) + } pub fn push_fallback(self, fallback: F) -> Stack> { self.push(stack::FallbackLayer::new(fallback)) From 767d1605dff2eca9375eb6fa0a604d17d7d436af Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 24 Apr 2020 14:52:14 -0700 Subject: [PATCH 04/13] tokio has owned permits now Signed-off-by: Eliza Weisman --- linkerd/cache/src/lib.rs | 4 ++-- linkerd/lock/Cargo.toml | 2 +- linkerd/lock/src/lock.rs | 2 +- linkerd/lock/src/service.rs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/linkerd/cache/src/lib.rs b/linkerd/cache/src/lib.rs index a232a915ec..3df6a81738 100644 --- a/linkerd/cache/src/lib.rs +++ b/linkerd/cache/src/lib.rs @@ -63,9 +63,9 @@ where impl tower::Service for Cache where - T: Clone + Eq + Hash + 'static, + T: Clone + Eq + Hash, N: NewService<(T, Handle)>, - N::Service: Clone + 'static, + N::Service: Clone, { type Response = N::Service; type Error = Never; diff --git a/linkerd/lock/Cargo.toml b/linkerd/lock/Cargo.toml index 98ea84b8a3..6f6b645836 100644 --- a/linkerd/lock/Cargo.toml +++ b/linkerd/lock/Cargo.toml @@ -13,7 +13,7 @@ futures = "0.3" linkerd2-error = { path = "../error" } tower = { version = "0.3", default-features = false } tracing = "0.1" -tokio = { version = "0.2", features = ["sync", "macros"] } +tokio = { version = "0.2.19", features = ["sync", "macros"] } [dev-dependencies] rand = "0.7" diff --git a/linkerd/lock/src/lock.rs b/linkerd/lock/src/lock.rs index e4afdac507..d4a191fd57 100644 --- a/linkerd/lock/src/lock.rs +++ b/linkerd/lock/src/lock.rs @@ -5,7 +5,7 @@ use tokio::sync::Semaphore; /// Provides mutually exclusive to a `T`-typed value, asynchronously. pub struct Lock { /// Set when this Lock is interested in acquiring the value. - waiting: Option> + 'static>>>, + waiting: Option + 'static>>>, shared: Arc>, } diff --git a/linkerd/lock/src/service.rs b/linkerd/lock/src/service.rs index e644c004dc..c05de1095d 100644 --- a/linkerd/lock/src/service.rs +++ b/linkerd/lock/src/service.rs @@ -34,7 +34,7 @@ impl Clone for LockService { impl tower::Service for LockService where - S: tower::Service + 'static, + S: tower::Service, S::Error: Into, { type Response = S::Response; From 37fcad0be3d6e520b36520621de7487b930d03d4 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 24 Apr 2020 17:10:43 -0700 Subject: [PATCH 05/13] use new acquire_owned API in tokio Signed-off-by: Eliza Weisman --- Cargo.lock | 34 ++++++++++----------- linkerd/lock/src/lock.rs | 66 ++++++++++++++-------------------------- 2 files changed, 39 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0500b2d437..672b8bb709 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -716,7 +716,7 @@ dependencies = [ "ring", "rustls", "tokio 0.1.22", - "tokio 0.2.18", + "tokio 0.2.19", "tokio-compat", "tokio-connect", "tokio-current-thread", @@ -787,7 +787,7 @@ dependencies = [ "rand 0.7.2", "regex 1.0.0", "tokio 0.1.22", - "tokio 0.2.18", + "tokio 0.2.19", "tokio-compat", "tokio-timer", "tower 0.3.1", @@ -910,7 +910,7 @@ dependencies = [ "linkerd2-error", "linkerd2-lock", "linkerd2-stack", - "tokio 0.2.18", + "tokio 0.2.19", "tower 0.3.1", "tracing", "tracing-futures 0.1.0", @@ -959,7 +959,7 @@ dependencies = [ "futures 0.3.4", "linkerd2-error", "pin-project", - "tokio 0.2.18", + "tokio 0.2.19", "tokio-test", ] @@ -1083,7 +1083,7 @@ dependencies = [ "futures 0.3.4", "linkerd2-error", "rand 0.7.2", - "tokio 0.2.18", + "tokio 0.2.19", "tokio-test", "tower 0.3.1", "tower-test 0.3.0", @@ -1171,7 +1171,7 @@ dependencies = [ "futures 0.3.4", "linkerd2-error", "pin-project", - "tokio 0.2.18", + "tokio 0.2.19", "tower 0.3.1", "tracing-futures 0.2.4", ] @@ -1185,7 +1185,7 @@ dependencies = [ "linkerd2-io", "linkerd2-proxy-core", "pin-project", - "tokio 0.2.18", + "tokio 0.2.19", "tower 0.3.1", ] @@ -1231,7 +1231,7 @@ dependencies = [ "rand 0.7.2", "task-compat", "tokio 0.1.22", - "tokio 0.2.18", + "tokio 0.2.19", "tokio-connect", "tokio-timer", "tower 0.1.1", @@ -1434,7 +1434,7 @@ dependencies = [ "futures 0.3.4", "linkerd2-error", "pin-project", - "tokio 0.2.18", + "tokio 0.2.19", "tower 0.3.1", ] @@ -1482,7 +1482,7 @@ dependencies = [ "linkerd2-error", "linkerd2-stack", "pin-project", - "tokio 0.2.18", + "tokio 0.2.19", "tokio-connect", "tokio-test", "tower 0.3.1", @@ -2443,9 +2443,9 @@ dependencies = [ [[package]] name = "tokio" -version = "0.2.18" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34ef16d072d2b6dc8b4a56c70f5c5ced1a37752116f8e7c1e80c659aa7cb6713" +checksum = "7d9c43f1bb96970e153bcbae39a65e249ccb942bd9d36dbdf086024920417c9c" dependencies = [ "bytes 0.5.4", "fnv", @@ -2490,7 +2490,7 @@ dependencies = [ "futures-core", "futures-util", "pin-project-lite", - "tokio 0.2.18", + "tokio 0.2.19", "tokio-current-thread", "tokio-executor", "tokio-reactor", @@ -2640,7 +2640,7 @@ checksum = "ed0049c119b6d505c4447f5c64873636c7af6c75ab0d45fd9f618d82acb8016d" dependencies = [ "bytes 0.5.4", "futures-core", - "tokio 0.2.18", + "tokio 0.2.19", ] [[package]] @@ -2716,7 +2716,7 @@ dependencies = [ "futures-sink", "log", "pin-project-lite", - "tokio 0.2.18", + "tokio 0.2.19", ] [[package]] @@ -2745,7 +2745,7 @@ dependencies = [ "futures-core", "futures-util", "pin-project", - "tokio 0.2.18", + "tokio 0.2.19", "tower-layer 0.3.0 (git+https://github.com/tower-rs/tower?rev=8752a3811788e94670c62dc0acbc9613207931b1)", "tower-service 0.3.0", "tracing", @@ -2973,7 +2973,7 @@ checksum = "9ba4bbc2c1e4a8543c30d4c13a4c8314ed72d6e07581910f665aa13fde0153c8" dependencies = [ "futures-util", "pin-project", - "tokio 0.2.18", + "tokio 0.2.19", "tokio-test", "tower-layer 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "tower-service 0.3.0", diff --git a/linkerd/lock/src/lock.rs b/linkerd/lock/src/lock.rs index d4a191fd57..e3eb2a8775 100644 --- a/linkerd/lock/src/lock.rs +++ b/linkerd/lock/src/lock.rs @@ -1,22 +1,20 @@ use std::task::{Context, Poll}; use std::{cell::UnsafeCell, future::Future, pin::Pin, sync::Arc}; -use tokio::sync::Semaphore; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; /// Provides mutually exclusive to a `T`-typed value, asynchronously. pub struct Lock { /// Set when this Lock is interested in acquiring the value. waiting: Option + 'static>>>, - shared: Arc>, + sem: Arc, + value: Arc>, } /// Guards access to a `T`-typed value, ensuring the value is released on Drop. pub struct Guard { - shared: Arc>, -} - -struct Shared { - sem: Semaphore, - value: UnsafeCell, + value: Arc>, + // Hang onto this to drop it when the access ends. + _permit: OwnedSemaphorePermit, } // === impl Lock === @@ -25,10 +23,11 @@ impl Lock { pub fn new(value: S) -> Self { Self { waiting: None, - shared: Arc::new(Shared { - sem: Semaphore::new(1), - value: UnsafeCell::new(value), - }), + // XXX: Bummer that these have to be arced separately and we can't + // them in a single `Arc`, but `Semaphore::acquire_owned` needs an + // `Arc` receiver... + sem: Arc::new(Semaphore::new(1)), + value: Arc::new(UnsafeCell::new(value)), } } } @@ -38,35 +37,28 @@ impl Clone for Lock { Self { // Clones have an independent local lock state. waiting: None, - shared: self.shared.clone(), + sem: self.sem.clone(), + value: self.value.clone(), } } } -impl Lock { +impl Lock { pub fn poll_acquire(&mut self, cx: &mut Context<'_>) -> Poll> { loop { self.waiting = match self.waiting { // This instance has not registered interest in the lock. - None => { - let shared = self.shared.clone(); - Some(Box::pin(async move { - let permit = shared.sem.acquire().await; - // We cannot use the `tokio::sync::semaphore::Permit` - // type for the release-on-drop behavior, because it borrows - // the semaphore. Instead, we will manually release the - // permit when dropping a guard. - permit.forget(); - Guard { shared } - })) - } + None => Some(Box::pin(self.sem.clone().acquire_owned())), // This instance is interested in the lock. Some(ref mut waiter) => { tokio::pin!(waiter); - let guard = futures::ready!(waiter.poll(cx)); + let _permit = futures::ready!(waiter.poll(cx)); self.waiting = None; - return Poll::Ready(guard); + return Poll::Ready(Guard { + value: self.value.clone(), + _permit, + }); } }; } @@ -79,7 +71,7 @@ impl std::ops::Deref for Guard { // Safety: creating a `Guard` means that the single permit in the // semaphore has been acquired and we have exclusive access to the // value. - unsafe { &*self.shared.value.get() } + unsafe { &*self.value.get() } } } @@ -88,7 +80,7 @@ impl std::ops::DerefMut for Guard { // Safety: creating a `Guard` means that the single permit in the // semaphore has been acquired and we have exclusive access to the // value. - unsafe { &mut *self.shared.value.get() } + unsafe { &mut *self.value.get() } } } @@ -104,20 +96,6 @@ impl std::fmt::Display for Guard { } } -impl Drop for Lock { - fn drop(&mut self) { - // Release the single permit back to the semaphore. - self.shared.sem.add_permits(0); - } -} - -impl Drop for Guard { - fn drop(&mut self) { - // Release the single permit back to the semaphore. - self.shared.sem.add_permits(1); - } -} - // Safety: As long as T: Send, it's fine to send and share Lock between threads. // If T was not Send, sending and sharing a Lock would be bad, since you can access T through // Lock. From 9351ef6a52e667bcf6bee6d642541f608a9d31f7 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 27 Apr 2020 09:42:03 -0700 Subject: [PATCH 06/13] fix webpki patch not being applied Signed-off-by: Eliza Weisman --- linkerd/app/integration/Cargo.toml | 2 +- linkerd/dns/name/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/linkerd/app/integration/Cargo.toml b/linkerd/app/integration/Cargo.toml index 61408b38fb..a130416ace 100644 --- a/linkerd/app/integration/Cargo.toml +++ b/linkerd/app/integration/Cargo.toml @@ -44,7 +44,7 @@ tower = "0.1" tower-grpc = { version = "0.1", default-features = false, features = ["protobuf"] } tracing = "0.1.9" tracing-futures = "0.1" -webpki = "0.21" +webpki = "=0.21.0" futures-03 = { package = "futures", version = "0.3", features = ["compat"]} [dev-dependencies] diff --git a/linkerd/dns/name/Cargo.toml b/linkerd/dns/name/Cargo.toml index ad331b42de..f5441ca617 100644 --- a/linkerd/dns/name/Cargo.toml +++ b/linkerd/dns/name/Cargo.toml @@ -6,5 +6,5 @@ edition = "2018" publish = false [dependencies] -webpki = "0.21" +webpki = "=0.21.0" untrusted = "0.7" From fa72900486159cb6eaa26a2f029a4a661208b961 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 27 Apr 2020 09:43:13 -0700 Subject: [PATCH 07/13] rustfmt Signed-off-by: Eliza Weisman --- linkerd/cache/benches/cache.rs | 2 +- linkerd/lock/src/test.rs | 49 ++++++++++++++++++++-------------- 2 files changed, 30 insertions(+), 21 deletions(-) diff --git a/linkerd/cache/benches/cache.rs b/linkerd/cache/benches/cache.rs index 5f651ff61d..5cec08c16e 100644 --- a/linkerd/cache/benches/cache.rs +++ b/linkerd/cache/benches/cache.rs @@ -5,8 +5,8 @@ use linkerd2_error::Never; use linkerd2_stack::NewService; use std::future::Future; use std::task::{Context, Poll}; -use tower::Service; use tower::util::ServiceExt; +use tower::Service; extern crate test; use test::Bencher; diff --git a/linkerd/lock/src/test.rs b/linkerd/lock/src/test.rs index f294315f8a..f50e9a9dd7 100644 --- a/linkerd/lock/src/test.rs +++ b/linkerd/lock/src/test.rs @@ -1,6 +1,6 @@ use crate::error::ServiceError; use crate::LockService; -use futures::{StreamExt, future}; +use futures::{future, StreamExt}; use std::sync::atomic::{AtomicBool, Ordering}; use std::task::{Context, Poll}; use std::{future::Future, pin::Pin, sync::Arc}; @@ -75,7 +75,9 @@ async fn propagates_errors() { #[tokio::test] async fn dropping_releases_access() { - let _ = tracing_subscriber::fmt().with_env_filter(tracing_subscriber::EnvFilter::from_default_env()).try_init(); + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); use tower::util::ServiceExt; let mut svc0 = LockService::new(Decr::new(3, Arc::new(true.into()))); @@ -161,7 +163,9 @@ async fn dropping_releases_access() { #[tokio::test] async fn fuzz() { - let _ = tracing_subscriber::fmt().with_env_filter(tracing_subscriber::EnvFilter::from_default_env()).try_init(); + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); const ITERS: usize = 100_000; for (concurrency, iterations) in &[(1usize, ITERS), (3, ITERS), (100, ITERS)] { async { @@ -171,28 +175,33 @@ async fn fuzz() { for i in 0..*concurrency { let lock = svc.clone(); let tx = tx.clone(); - tokio::spawn(async move { - let mut lock = lock; - let _tx = tx; - future::poll_fn::, _>(|cx| { - loop { - futures::ready!(lock.poll_ready(cx)).map_err(|_|())?; - - // Randomly be busy while holding the lock. - if rand::random::() { - cx.waker().wake_by_ref(); - return Poll::Pending; + tokio::spawn( + async move { + let mut lock = lock; + let _tx = tx; + future::poll_fn::, _>(|cx| { + loop { + futures::ready!(lock.poll_ready(cx)).map_err(|_| ())?; + + // Randomly be busy while holding the lock. + if rand::random::() { + cx.waker().wake_by_ref(); + return Poll::Pending; + } + + tokio::spawn(lock.call(1)); } - - tokio::spawn(lock.call(1)); - } - }).await - }.instrument(tracing::trace_span!("task", number = i))); + }) + .await + } + .instrument(tracing::trace_span!("task", number = i)), + ); } rx.fold((), |(), ()| async { () }).await; tracing::info!("done"); - }.instrument(tracing::info_span!("fuzz", concurrency, iterations)) + } + .instrument(tracing::info_span!("fuzz", concurrency, iterations)) .await } } From 6a98b08353d4c90d13960fd54a3ed851fd6598f0 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 29 Apr 2020 17:48:23 -0700 Subject: [PATCH 08/13] just punt the unsafety to tokio Signed-off-by: Eliza Weisman --- Cargo.lock | 38 +++++---- Cargo.toml | 3 +- linkerd/cache/benches/cache.rs | 3 +- linkerd/cache/src/lib.rs | 4 +- linkerd/lock/src/lock.rs | 82 +++++--------------- linkerd/lock/src/service.rs | 2 +- linkerd/service-profiles/src/http/service.rs | 2 +- 7 files changed, 45 insertions(+), 89 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 672b8bb709..906ec6796f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -716,7 +716,7 @@ dependencies = [ "ring", "rustls", "tokio 0.1.22", - "tokio 0.2.19", + "tokio 0.2.20", "tokio-compat", "tokio-connect", "tokio-current-thread", @@ -787,7 +787,7 @@ dependencies = [ "rand 0.7.2", "regex 1.0.0", "tokio 0.1.22", - "tokio 0.2.19", + "tokio 0.2.20", "tokio-compat", "tokio-timer", "tower 0.3.1", @@ -910,7 +910,7 @@ dependencies = [ "linkerd2-error", "linkerd2-lock", "linkerd2-stack", - "tokio 0.2.19", + "tokio 0.2.20", "tower 0.3.1", "tracing", "tracing-futures 0.1.0", @@ -959,7 +959,7 @@ dependencies = [ "futures 0.3.4", "linkerd2-error", "pin-project", - "tokio 0.2.19", + "tokio 0.2.20", "tokio-test", ] @@ -1083,7 +1083,7 @@ dependencies = [ "futures 0.3.4", "linkerd2-error", "rand 0.7.2", - "tokio 0.2.19", + "tokio 0.2.20", "tokio-test", "tower 0.3.1", "tower-test 0.3.0", @@ -1171,7 +1171,7 @@ dependencies = [ "futures 0.3.4", "linkerd2-error", "pin-project", - "tokio 0.2.19", + "tokio 0.2.20", "tower 0.3.1", "tracing-futures 0.2.4", ] @@ -1185,7 +1185,7 @@ dependencies = [ "linkerd2-io", "linkerd2-proxy-core", "pin-project", - "tokio 0.2.19", + "tokio 0.2.20", "tower 0.3.1", ] @@ -1231,7 +1231,7 @@ dependencies = [ "rand 0.7.2", "task-compat", "tokio 0.1.22", - "tokio 0.2.19", + "tokio 0.2.20", "tokio-connect", "tokio-timer", "tower 0.1.1", @@ -1434,7 +1434,7 @@ dependencies = [ "futures 0.3.4", "linkerd2-error", "pin-project", - "tokio 0.2.19", + "tokio 0.2.20", "tower 0.3.1", ] @@ -1482,7 +1482,7 @@ dependencies = [ "linkerd2-error", "linkerd2-stack", "pin-project", - "tokio 0.2.19", + "tokio 0.2.20", "tokio-connect", "tokio-test", "tower 0.3.1", @@ -2443,9 +2443,8 @@ dependencies = [ [[package]] name = "tokio" -version = "0.2.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d9c43f1bb96970e153bcbae39a65e249ccb942bd9d36dbdf086024920417c9c" +version = "0.2.20" +source = "git+https://github.com/tokio-rs/tokio?rev=45773c56413267cbcf9d5e7877e8dc4afc1e5b07#45773c56413267cbcf9d5e7877e8dc4afc1e5b07" dependencies = [ "bytes 0.5.4", "fnv", @@ -2490,7 +2489,7 @@ dependencies = [ "futures-core", "futures-util", "pin-project-lite", - "tokio 0.2.19", + "tokio 0.2.20", "tokio-current-thread", "tokio-executor", "tokio-reactor", @@ -2551,8 +2550,7 @@ dependencies = [ [[package]] name = "tokio-macros" version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389" +source = "git+https://github.com/tokio-rs/tokio?rev=45773c56413267cbcf9d5e7877e8dc4afc1e5b07#45773c56413267cbcf9d5e7877e8dc4afc1e5b07" dependencies = [ "proc-macro2 1.0.10", "quote 1.0.2", @@ -2640,7 +2638,7 @@ checksum = "ed0049c119b6d505c4447f5c64873636c7af6c75ab0d45fd9f618d82acb8016d" dependencies = [ "bytes 0.5.4", "futures-core", - "tokio 0.2.19", + "tokio 0.2.20", ] [[package]] @@ -2716,7 +2714,7 @@ dependencies = [ "futures-sink", "log", "pin-project-lite", - "tokio 0.2.19", + "tokio 0.2.20", ] [[package]] @@ -2745,7 +2743,7 @@ dependencies = [ "futures-core", "futures-util", "pin-project", - "tokio 0.2.19", + "tokio 0.2.20", "tower-layer 0.3.0 (git+https://github.com/tower-rs/tower?rev=8752a3811788e94670c62dc0acbc9613207931b1)", "tower-service 0.3.0", "tracing", @@ -2973,7 +2971,7 @@ checksum = "9ba4bbc2c1e4a8543c30d4c13a4c8314ed72d6e07581910f665aa13fde0153c8" dependencies = [ "futures-util", "pin-project", - "tokio 0.2.19", + "tokio 0.2.20", "tokio-test", "tower-layer 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "tower-service 0.3.0", diff --git a/Cargo.toml b/Cargo.toml index 6d0eed064e..d416fcc1f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,4 +65,5 @@ debug = false webpki = { git = "https://github.com/seanmonstar/webpki", branch = "cert-dns-names-0.21" } # backport danburkert/prost#268 to `prost` 0.5 temporarily. prost = { git = "https://github.com/linkerd/prost", branch = "v0.5.x" } -tower = { version = "0.3", git = "https://github.com/tower-rs/tower", rev = "8752a3811788e94670c62dc0acbc9613207931b1"} \ No newline at end of file +tower = { version = "0.3", git = "https://github.com/tower-rs/tower", rev = "8752a3811788e94670c62dc0acbc9613207931b1"} +tokio = { version = "0.2", git = "https://github.com/tokio-rs/tokio", rev = "45773c56413267cbcf9d5e7877e8dc4afc1e5b07"} \ No newline at end of file diff --git a/linkerd/cache/benches/cache.rs b/linkerd/cache/benches/cache.rs index 5cec08c16e..f46d2be435 100644 --- a/linkerd/cache/benches/cache.rs +++ b/linkerd/cache/benches/cache.rs @@ -118,7 +118,8 @@ fn run_bench(num_svc: usize, new_svc: N, b: &mut Bencher) where N: NewService<(usize, Handle)>, N::Service: Service, - N::Service: Clone, + N::Service: Clone + Send + 'static, + >::Error: std::fmt::Debug, { let mut cache = Cache::new(new_svc); b.iter(|| { diff --git a/linkerd/cache/src/lib.rs b/linkerd/cache/src/lib.rs index 3df6a81738..74db5414aa 100644 --- a/linkerd/cache/src/lib.rs +++ b/linkerd/cache/src/lib.rs @@ -63,9 +63,9 @@ where impl tower::Service for Cache where - T: Clone + Eq + Hash, + T: Clone + Eq + Hash + Send + 'static, N: NewService<(T, Handle)>, - N::Service: Clone, + N::Service: Clone + Send + 'static, { type Response = N::Service; type Error = Never; diff --git a/linkerd/lock/src/lock.rs b/linkerd/lock/src/lock.rs index e3eb2a8775..2ba7eb3683 100644 --- a/linkerd/lock/src/lock.rs +++ b/linkerd/lock/src/lock.rs @@ -1,20 +1,13 @@ use std::task::{Context, Poll}; -use std::{cell::UnsafeCell, future::Future, pin::Pin, sync::Arc}; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use std::{future::Future, pin::Pin, sync::Arc}; +use tokio::sync::Mutex; +pub use tokio::sync::OwnedMutexGuard as Guard; /// Provides mutually exclusive to a `T`-typed value, asynchronously. pub struct Lock { /// Set when this Lock is interested in acquiring the value. - waiting: Option + 'static>>>, - sem: Arc, - value: Arc>, -} - -/// Guards access to a `T`-typed value, ensuring the value is released on Drop. -pub struct Guard { - value: Arc>, - // Hang onto this to drop it when the access ends. - _permit: OwnedSemaphorePermit, + waiting: Option> + Send + 'static>>>, + lock: Arc>, } // === impl Lock === @@ -23,11 +16,7 @@ impl Lock { pub fn new(value: S) -> Self { Self { waiting: None, - // XXX: Bummer that these have to be arced separately and we can't - // them in a single `Arc`, but `Semaphore::acquire_owned` needs an - // `Arc` receiver... - sem: Arc::new(Semaphore::new(1)), - value: Arc::new(UnsafeCell::new(value)), + lock: Arc::new(Mutex::new(value)), } } } @@ -37,69 +26,36 @@ impl Clone for Lock { Self { // Clones have an independent local lock state. waiting: None, - sem: self.sem.clone(), - value: self.value.clone(), + lock: self.lock.clone(), } } } -impl Lock { +impl Lock { pub fn poll_acquire(&mut self, cx: &mut Context<'_>) -> Poll> { loop { self.waiting = match self.waiting { // This instance has not registered interest in the lock. - None => Some(Box::pin(self.sem.clone().acquire_owned())), + None => Some(Box::pin(self.lock.clone().lock_owned())), // This instance is interested in the lock. Some(ref mut waiter) => { tokio::pin!(waiter); - let _permit = futures::ready!(waiter.poll(cx)); - self.waiting = None; - return Poll::Ready(Guard { - value: self.value.clone(), - _permit, - }); + return waiter.poll(cx); } }; } } } -impl std::ops::Deref for Guard { - type Target = T; - fn deref(&self) -> &Self::Target { - // Safety: creating a `Guard` means that the single permit in the - // semaphore has been acquired and we have exclusive access to the - // value. - unsafe { &*self.value.get() } - } -} - -impl std::ops::DerefMut for Guard { - fn deref_mut(&mut self) -> &mut Self::Target { - // Safety: creating a `Guard` means that the single permit in the - // semaphore has been acquired and we have exclusive access to the - // value. - unsafe { &mut *self.value.get() } - } -} - -impl std::fmt::Debug for Guard { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Guard({:?})", &**self) - } -} - -impl std::fmt::Display for Guard { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Display::fmt(&**self, f) +impl Lock { + // optimization so we can elide the `Box::pin` when we just want a future + pub async fn acquire(&mut self) -> Guard { + // Are we already waiting? If so, reuse that... + if let Some(waiting) = self.waiting.take() { + waiting.await + } else { + self.lock.clone().lock_owned().await + } } } - -// Safety: As long as T: Send, it's fine to send and share Lock between threads. -// If T was not Send, sending and sharing a Lock would be bad, since you can access T through -// Lock. -unsafe impl Send for Lock {} -unsafe impl Sync for Lock {} -unsafe impl Send for Guard {} -unsafe impl Sync for Guard {} diff --git a/linkerd/lock/src/service.rs b/linkerd/lock/src/service.rs index c05de1095d..2c6eb9dc37 100644 --- a/linkerd/lock/src/service.rs +++ b/linkerd/lock/src/service.rs @@ -34,7 +34,7 @@ impl Clone for LockService { impl tower::Service for LockService where - S: tower::Service, + S: tower::Service + Send + 'static, S::Error: Into, { type Response = S::Response; diff --git a/linkerd/service-profiles/src/http/service.rs b/linkerd/service-profiles/src/http/service.rs index 8f04622934..9d0cf8ee29 100644 --- a/linkerd/service-profiles/src/http/service.rs +++ b/linkerd/service-profiles/src/http/service.rs @@ -4,7 +4,7 @@ //! As the router's Stack is built, a destination is extracted from the stack's //! target and it is used to get route profiles from ` GetRoutes` implementation. //! -//! Each route uses a shared underlying concrete dst router. The concrete dst +//! Each route uses a shared underlying concrete dst . The concrete dst //! router picks a concrete dst (NameAddr) from the profile's `dst_overrides` if //! they exist, or uses the router's target's addr if no `dst_overrides` exist. //! The concrete dst router uses the concrete dst as the target for the From ca5a6b072867e1973eb0ba6c18ca5e10fa3c7cec Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 30 Apr 2020 09:54:36 -0700 Subject: [PATCH 09/13] fixup lockfile Signed-off-by: Eliza Weisman --- Cargo.lock | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dbc1d30a9a..4dfbb5a462 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -896,7 +896,7 @@ dependencies = [ "futures 0.3.4", "linkerd2-error", "pin-project", - "tokio 0.2.19", + "tokio 0.2.20", "tokio-test", "tower 0.3.1", "tower-test", @@ -924,7 +924,7 @@ version = "0.1.0" dependencies = [ "futures 0.3.4", "pin-project", - "tokio 0.2.19", + "tokio 0.2.20", "tower 0.3.1", "tracing", ] @@ -1088,7 +1088,7 @@ dependencies = [ "tokio 0.2.20", "tokio-test", "tower 0.3.1", - "tower-test 0.3.0", + "tower-test", "tracing", "tracing-futures 0.1.0", "tracing-log", @@ -2448,7 +2448,6 @@ dependencies = [ name = "tokio" version = "0.2.20" source = "git+https://github.com/tokio-rs/tokio?rev=45773c56413267cbcf9d5e7877e8dc4afc1e5b07#45773c56413267cbcf9d5e7877e8dc4afc1e5b07" - dependencies = [ "bytes 0.5.4", "fnv", From bd5167fa5e290572b3d0922d79a73cad94c2900d Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 4 May 2020 09:44:03 -0700 Subject: [PATCH 10/13] ag Signed-off-by: Eliza Weisman --- linkerd/service-profiles/src/http/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkerd/service-profiles/src/http/service.rs b/linkerd/service-profiles/src/http/service.rs index 9d0cf8ee29..8f04622934 100644 --- a/linkerd/service-profiles/src/http/service.rs +++ b/linkerd/service-profiles/src/http/service.rs @@ -4,7 +4,7 @@ //! As the router's Stack is built, a destination is extracted from the stack's //! target and it is used to get route profiles from ` GetRoutes` implementation. //! -//! Each route uses a shared underlying concrete dst . The concrete dst +//! Each route uses a shared underlying concrete dst router. The concrete dst //! router picks a concrete dst (NameAddr) from the profile's `dst_overrides` if //! they exist, or uses the router's target's addr if no `dst_overrides` exist. //! The concrete dst router uses the concrete dst as the target for the From be4080ac26a32e107662d4d93d67581e62f23331 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 4 May 2020 11:08:43 -0700 Subject: [PATCH 11/13] lock: fix poll_acquire polling completed futures Signed-off-by: Eliza Weisman --- linkerd/lock/src/lock.rs | 40 +++++++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/linkerd/lock/src/lock.rs b/linkerd/lock/src/lock.rs index 2ba7eb3683..ba8f0dcf1a 100644 --- a/linkerd/lock/src/lock.rs +++ b/linkerd/lock/src/lock.rs @@ -32,19 +32,37 @@ impl Clone for Lock { } impl Lock { + /// Attempt to acquire the lock, returning `Pending` if it is held + /// elsewhere. + /// + /// If this `Lock` instance is not pub fn poll_acquire(&mut self, cx: &mut Context<'_>) -> Poll> { - loop { - self.waiting = match self.waiting { - // This instance has not registered interest in the lock. - None => Some(Box::pin(self.lock.clone().lock_owned())), - - // This instance is interested in the lock. - Some(ref mut waiter) => { - tokio::pin!(waiter); - return waiter.poll(cx); - } - }; + // If we have already registered interest in the lock and are waiting on + // a future, we'll poll that. Otherwise, we need a future. + // + // We `take` the waiting future so that if we drive it to completion on + // this poll, it won't be set on subsequent polls. + let mut waiting = self.waiting.take().unwrap_or_else(|| { + // This instance has not registered interest in the lock. + Box::pin(self.lock.clone().lock_owned()) + }); + + // Poll the future. + let res = { + let future = &mut waiting; + tokio::pin!(future); + future.poll(cx) + }; + + tracing::trace!(ready = res.is_ready()); + + // If the future hasn't completed, save it to be polled again the next + // time `poll_acquire` is called. + if res.is_pending() { + self.waiting = Some(waiting); } + + res } } From ca472ffc19bbbbe800277003a6d2b87d688ca01a Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 4 May 2020 11:10:11 -0700 Subject: [PATCH 12/13] simplify `fuzz` test a bit Signed-off-by: Eliza Weisman --- linkerd/lock/Cargo.toml | 2 +- linkerd/lock/src/test.rs | 28 ++++++++++++++++++---------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/linkerd/lock/Cargo.toml b/linkerd/lock/Cargo.toml index 6f6b645836..307dc2d290 100644 --- a/linkerd/lock/Cargo.toml +++ b/linkerd/lock/Cargo.toml @@ -13,7 +13,7 @@ futures = "0.3" linkerd2-error = { path = "../error" } tower = { version = "0.3", default-features = false } tracing = "0.1" -tokio = { version = "0.2.19", features = ["sync", "macros"] } +tokio = { version = "0.2.19", features = ["sync", "macros", "rt-core"] } [dev-dependencies] rand = "0.7" diff --git a/linkerd/lock/src/test.rs b/linkerd/lock/src/test.rs index f50e9a9dd7..ca4be1e6dc 100644 --- a/linkerd/lock/src/test.rs +++ b/linkerd/lock/src/test.rs @@ -4,7 +4,7 @@ use futures::{future, StreamExt}; use std::sync::atomic::{AtomicBool, Ordering}; use std::task::{Context, Poll}; use std::{future::Future, pin::Pin, sync::Arc}; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::oneshot; use tokio_test::{assert_pending, assert_ready, assert_ready_ok}; use tower::Service as _Service; use tower_test::mock::Spawn; @@ -171,18 +171,19 @@ async fn fuzz() { async { tracing::info!("starting"); let svc = LockService::new(Decr::new(*iterations, Arc::new(true.into()))); - let (tx, rx) = mpsc::channel(1); + let joins = futures::stream::futures_unordered::FuturesUnordered::new(); for i in 0..*concurrency { let lock = svc.clone(); - let tx = tx.clone(); - tokio::spawn( + let join = tokio::spawn( async move { let mut lock = lock; - let _tx = tx; - future::poll_fn::, _>(|cx| { + future::poll_fn(|cx| { loop { - futures::ready!(lock.poll_ready(cx)).map_err(|_| ())?; - + let ready = lock.poll_ready(cx); + tracing::trace!(?ready); + if futures::ready!(ready).is_err() { + return Poll::Ready(()); + } // Randomly be busy while holding the lock. if rand::random::() { cx.waker().wake_by_ref(); @@ -192,13 +193,20 @@ async fn fuzz() { tokio::spawn(lock.call(1)); } }) - .await + .await; + tracing::trace!("task done"); } .instrument(tracing::trace_span!("task", number = i)), ); + joins.push(join); } - rx.fold((), |(), ()| async { () }).await; + joins + .for_each(|res| { + let _ = res.expect("task must not have panicked"); + async {} + }) + .await; tracing::info!("done"); } .instrument(tracing::info_span!("fuzz", concurrency, iterations)) From cc6cf02a56ec14f2640d1f0bcb4dcb58ef08ef9f Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 4 May 2020 11:37:42 -0700 Subject: [PATCH 13/13] fixup lockfile Signed-off-by: Eliza Weisman --- Cargo.lock | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4b947b07c5..45b106287c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -496,7 +496,7 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "377038bf3c89d18d6ca1431e7a5027194fbd724ca10592b9487ede5e8e144f42" dependencies = [ - "bytes 0.5.3", + "bytes 0.5.4", "fnv", "futures-core", "futures-sink", @@ -505,7 +505,7 @@ dependencies = [ "indexmap", "log", "slab", - "tokio 0.2.19", + "tokio 0.2.20", "tokio-util", ] @@ -551,7 +551,7 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d569972648b2c512421b5f2a405ad6ac9666547189d0c5477a3f200f3e02f9" dependencies = [ - "bytes 0.5.3", + "bytes 0.5.4", "fnv", "itoa", ] @@ -574,7 +574,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b" dependencies = [ - "bytes 0.5.3", + "bytes 0.5.4", "http 0.2.1", ] @@ -620,7 +620,7 @@ version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96816e1d921eca64d208a85aab4f7798455a8e34229ee5a88c935bdee1b78b14" dependencies = [ - "bytes 0.5.3", + "bytes 0.5.4", "futures-channel", "futures-core", "futures-util", @@ -633,7 +633,7 @@ dependencies = [ "net2", "pin-project", "time", - "tokio 0.2.19", + "tokio 0.2.20", "tower-service 0.3.0", "want 0.3.0", ] @@ -646,7 +646,7 @@ dependencies = [ "http 0.2.1", "hyper 0.13.5", "pin-project", - "tokio 0.2.19", + "tokio 0.2.20", "tokio-test", "tower 0.3.1", ]