diff --git a/Cargo.lock b/Cargo.lock index dd79650115..45b106287c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -123,9 +123,9 @@ dependencies = [ [[package]] name = "bytes" -version = "0.5.3" +version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10004c15deb332055f7a4a208190aed362cf9a7c2f6ab70a305fba50e1105f38" +checksum = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" [[package]] name = "c2-chacha" @@ -496,7 +496,7 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "377038bf3c89d18d6ca1431e7a5027194fbd724ca10592b9487ede5e8e144f42" dependencies = [ - "bytes 0.5.3", + "bytes 0.5.4", "fnv", "futures-core", "futures-sink", @@ -505,7 +505,7 @@ dependencies = [ "indexmap", "log", "slab", - "tokio 0.2.19", + "tokio 0.2.20", "tokio-util", ] @@ -551,7 +551,7 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d569972648b2c512421b5f2a405ad6ac9666547189d0c5477a3f200f3e02f9" dependencies = [ - "bytes 0.5.3", + "bytes 0.5.4", "fnv", "itoa", ] @@ -574,7 +574,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b" dependencies = [ - "bytes 0.5.3", + "bytes 0.5.4", "http 0.2.1", ] @@ -620,7 +620,7 @@ version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96816e1d921eca64d208a85aab4f7798455a8e34229ee5a88c935bdee1b78b14" dependencies = [ - "bytes 0.5.3", + "bytes 0.5.4", "futures-channel", "futures-core", "futures-util", @@ -633,7 +633,7 @@ dependencies = [ "net2", "pin-project", "time", - "tokio 0.2.19", + "tokio 0.2.20", "tower-service 0.3.0", "want 0.3.0", ] @@ -646,7 +646,7 @@ dependencies = [ "http 0.2.1", "hyper 0.13.5", "pin-project", - "tokio 0.2.19", + "tokio 0.2.20", "tokio-test", "tower 0.3.1", ] @@ -735,9 +735,6 @@ name = "lazy_static" version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc5729f27f159ddd61f4df6228e827e86643d4d3e7c32183cb30a1c08f604a14" -dependencies = [ - "spin", -] [[package]] name = "libc" @@ -784,7 +781,7 @@ dependencies = [ "ring", "rustls", "tokio 0.1.22", - "tokio 0.2.19", + "tokio 0.2.20", "tokio-compat", "tokio-connect", "tokio-current-thread", @@ -855,7 +852,7 @@ dependencies = [ "rand 0.7.2", "regex 1.0.0", "tokio 0.1.22", - "tokio 0.2.19", + "tokio 0.2.20", "tokio-compat", "tokio-timer", "tower 0.3.1", @@ -866,7 +863,7 @@ dependencies = [ "tower-request-modifier", "tower-spawn-ready", "tracing", - "tracing-futures 0.2.3", + "tracing-futures 0.2.4", "tracing-log", "tracing-subscriber", ] @@ -964,7 +961,7 @@ dependencies = [ "futures 0.3.4", "linkerd2-error", "pin-project", - "tokio 0.2.19", + "tokio 0.2.20", "tokio-test", "tower 0.3.1", "tower-test", @@ -976,12 +973,12 @@ dependencies = [ name = "linkerd2-cache" version = "0.1.0" dependencies = [ - "futures 0.1.26", + "futures 0.3.4", "linkerd2-error", "linkerd2-lock", "linkerd2-stack", - "tokio 0.1.22", - "tower 0.1.1", + "tokio 0.2.20", + "tower 0.3.1", "tracing", "tracing-futures 0.1.0", ] @@ -992,7 +989,7 @@ version = "0.1.0" dependencies = [ "futures 0.3.4", "pin-project", - "tokio 0.2.19", + "tokio 0.2.20", "tower 0.3.1", "tracing", ] @@ -1029,7 +1026,7 @@ dependencies = [ "futures 0.3.4", "linkerd2-error", "pin-project", - "tokio 0.2.19", + "tokio 0.2.20", "tokio-test", ] @@ -1151,11 +1148,13 @@ dependencies = [ name = "linkerd2-lock" version = "0.1.0" dependencies = [ - "futures 0.1.26", + "futures 0.3.4", "linkerd2-error", "rand 0.7.2", - "tokio 0.1.22", - "tower 0.1.1", + "tokio 0.2.20", + "tokio-test", + "tower 0.3.1", + "tower-test", "tracing", "tracing-futures 0.1.0", "tracing-log", @@ -1240,9 +1239,9 @@ dependencies = [ "futures 0.3.4", "linkerd2-error", "pin-project", - "tokio 0.2.19", + "tokio 0.2.20", "tower 0.3.1", - "tracing-futures 0.2.3", + "tracing-futures 0.2.4", ] [[package]] @@ -1254,7 +1253,7 @@ dependencies = [ "linkerd2-io", "linkerd2-proxy-core", "pin-project", - "tokio 0.2.19", + "tokio 0.2.20", "tower 0.3.1", ] @@ -1300,7 +1299,7 @@ dependencies = [ "rand 0.7.2", "task-compat", "tokio 0.1.22", - "tokio 0.2.19", + "tokio 0.2.20", "tokio-connect", "tokio-timer", "tower 0.1.1", @@ -1504,7 +1503,7 @@ dependencies = [ "futures 0.3.4", "linkerd2-error", "pin-project", - "tokio 0.2.19", + "tokio 0.2.20", "tower 0.3.1", ] @@ -1552,7 +1551,7 @@ dependencies = [ "linkerd2-error", "linkerd2-stack", "pin-project", - "tokio 0.2.19", + "tokio 0.2.20", "tokio-connect", "tokio-test", "tower 0.3.1", @@ -1832,18 +1831,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "0.4.8" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7804a463a8d9572f13453c516a5faea534a2403d7ced2f0c7e100eeff072772c" +checksum = "6f6a7f5eee6292c559c793430c55c00aea9d3b3d1905e855806ca4d7253426a2" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "0.4.8" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "385322a45f2ecf3410c68d2a549a4a2685e8051d0f278e39743ff4e451cb9b3f" +checksum = "8988430ce790d8682672117bc06dda364c0be32d3abd738234f19f3240bad99a" dependencies = [ "proc-macro2 1.0.10", "quote 1.0.2", @@ -1858,9 +1857,9 @@ checksum = "237844750cfbb86f67afe27eee600dfbbcb6188d734139b534cbfbf4f96792ae" [[package]] name = "pin-utils" -version = "0.1.0-alpha.4" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5894c618ce612a3fa23881b152b608bafb8c56cfc22f434a3ba3120b40f7b587" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "ppv-lite86" @@ -2235,9 +2234,9 @@ dependencies = [ [[package]] name = "ring" -version = "0.16.7" +version = "0.16.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "796ae8317a07b04dffb1983bdc7045ccd02f741f0b411704f07fd35dbf99f757" +checksum = "1ba5a8ec64ee89a76c98c549af81ff14813df09c3e6dc4766c3856da48597a0c" dependencies = [ "cc", "lazy_static", @@ -2332,9 +2331,9 @@ dependencies = [ [[package]] name = "sharded-slab" -version = "0.0.8" +version = "0.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae75d0445b5d3778c9da3d1f840faa16d0627c8607f78a74daf69e5b988c39a1" +checksum = "06d5a3f5166fb5b42a5439f2eee8b9de149e235961e3eb21c5808fc3ea17ff3e" dependencies = [ "lazy_static", ] @@ -2513,11 +2512,10 @@ dependencies = [ [[package]] name = "tokio" -version = "0.2.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d9c43f1bb96970e153bcbae39a65e249ccb942bd9d36dbdf086024920417c9c" +version = "0.2.20" +source = "git+https://github.com/tokio-rs/tokio?rev=45773c56413267cbcf9d5e7877e8dc4afc1e5b07#45773c56413267cbcf9d5e7877e8dc4afc1e5b07" dependencies = [ - "bytes 0.5.3", + "bytes 0.5.4", "fnv", "futures-core", "iovec", @@ -2562,7 +2560,7 @@ dependencies = [ "futures-core", "futures-util", "pin-project-lite", - "tokio 0.2.19", + "tokio 0.2.20", "tokio-current-thread", "tokio-executor", "tokio-reactor", @@ -2623,8 +2621,7 @@ dependencies = [ [[package]] name = "tokio-macros" version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389" +source = "git+https://github.com/tokio-rs/tokio?rev=45773c56413267cbcf9d5e7877e8dc4afc1e5b07#45773c56413267cbcf9d5e7877e8dc4afc1e5b07" dependencies = [ "proc-macro2 1.0.10", "quote 1.0.2", @@ -2706,13 +2703,13 @@ dependencies = [ [[package]] name = "tokio-test" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09cf9705471976fa5fc6817d3fbc9c4ff9696a6647af0e5c1870c81ca7445b05" +checksum = "ed0049c119b6d505c4447f5c64873636c7af6c75ab0d45fd9f618d82acb8016d" dependencies = [ - "bytes 0.5.3", + "bytes 0.5.4", "futures-core", - "tokio 0.2.19", + "tokio 0.2.20", ] [[package]] @@ -2782,13 +2779,13 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" dependencies = [ - "bytes 0.5.3", + "bytes 0.5.4", "futures-core", "futures-io", "futures-sink", "log", "pin-project-lite", - "tokio 0.2.19", + "tokio 0.2.20", ] [[package]] @@ -2817,7 +2814,7 @@ dependencies = [ "futures-core", "futures-util", "pin-project", - "tokio 0.2.19", + "tokio 0.2.20", "tower-layer 0.3.0 (git+https://github.com/tower-rs/tower?rev=8752a3811788e94670c62dc0acbc9613207931b1)", "tower-service 0.3.0", "tracing", @@ -3034,7 +3031,7 @@ checksum = "9ba4bbc2c1e4a8543c30d4c13a4c8314ed72d6e07581910f665aa13fde0153c8" dependencies = [ "futures-util", "pin-project", - "tokio 0.2.19", + "tokio 0.2.20", "tokio-test", "tower-layer 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "tower-service 0.3.0", @@ -3089,12 +3086,11 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.6" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "528c8ebaaa16cdac34795180b046c031775b0d56402704d98c096788f33d646a" +checksum = "0aa83a9a47081cd522c09c81b31aec2c9273424976f922ad61c053b58350b715" dependencies = [ "lazy_static", - "spin", ] [[package]] @@ -3111,9 +3107,9 @@ dependencies = [ [[package]] name = "tracing-futures" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58b0b7fd92dc7b71f29623cc6836dd7200f32161a2313dd78be233a8405694f6" +checksum = "ab7bb6f14721aa00656086e9335d363c5c8747bae02ebe32ea2c7dece5689b4c" dependencies = [ "futures 0.1.26", "pin-project", @@ -3143,9 +3139,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.2.3" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dedebcf5813b02261d6bab3a12c6a8ae702580c0405a2e8ec16c3713caf14c20" +checksum = "1d53c40489aa69c9aed21ff483f26886ca8403df33bdc2d2f87c60c1617826d2" dependencies = [ "ansi_term", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 6d0eed064e..d416fcc1f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,4 +65,5 @@ debug = false webpki = { git = "https://github.com/seanmonstar/webpki", branch = "cert-dns-names-0.21" } # backport danburkert/prost#268 to `prost` 0.5 temporarily. prost = { git = "https://github.com/linkerd/prost", branch = "v0.5.x" } -tower = { version = "0.3", git = "https://github.com/tower-rs/tower", rev = "8752a3811788e94670c62dc0acbc9613207931b1"} \ No newline at end of file +tower = { version = "0.3", git = "https://github.com/tower-rs/tower", rev = "8752a3811788e94670c62dc0acbc9613207931b1"} +tokio = { version = "0.2", git = "https://github.com/tokio-rs/tokio", rev = "45773c56413267cbcf9d5e7877e8dc4afc1e5b07"} \ No newline at end of file diff --git a/linkerd/app/core/src/svc.rs b/linkerd/app/core/src/svc.rs index 9195dbd82b..e0e71c8c12 100644 --- a/linkerd/app/core/src/svc.rs +++ b/linkerd/app/core/src/svc.rs @@ -251,15 +251,16 @@ impl Stack { // self.push(http::insert::target::layer()) // } - // pub fn cache(self, track: L) -> Stack>> - // where - // T: Eq + std::hash::Hash, - // S: NewService + Clone, - // L: tower::layer::Layer> + Clone, - // L::Service: NewService, - // { - // self.push(cache::CacheLayer::new(track)) - // } + pub fn cache(self, track: L) -> Stack>> + where + T: Eq + std::hash::Hash + 'static, + S: NewService + Clone, + S::Service: 'static, + L: tower::layer::Layer> + Clone, + L::Service: NewService, + { + self.push(cache::CacheLayer::new(track)) + } pub fn push_fallback(self, fallback: F) -> Stack> { self.push(stack::FallbackLayer::new(fallback)) diff --git a/linkerd/app/integration/Cargo.toml b/linkerd/app/integration/Cargo.toml index 61408b38fb..a130416ace 100644 --- a/linkerd/app/integration/Cargo.toml +++ b/linkerd/app/integration/Cargo.toml @@ -44,7 +44,7 @@ tower = "0.1" tower-grpc = { version = "0.1", default-features = false, features = ["protobuf"] } tracing = "0.1.9" tracing-futures = "0.1" -webpki = "0.21" +webpki = "=0.21.0" futures-03 = { package = "futures", version = "0.3", features = ["compat"]} [dev-dependencies] diff --git a/linkerd/cache/Cargo.toml b/linkerd/cache/Cargo.toml index a2b5139d2f..6bd6a9aa78 100644 --- a/linkerd/cache/Cargo.toml +++ b/linkerd/cache/Cargo.toml @@ -6,11 +6,11 @@ edition = "2018" publish = false [dependencies] -futures = "0.1" +futures = "0.3" linkerd2-error = { path = "../error" } linkerd2-lock = { path = "../lock" } linkerd2-stack = { path = "../stack" } -tokio = "0.1" -tower = "0.1" +tokio = "0.2" +tower = { version = "0.3", default-features = false, features = ["util"] } tracing = "0.1" tracing-futures = "0.1" diff --git a/linkerd/cache/benches/cache.rs b/linkerd/cache/benches/cache.rs index 84db80da1c..f46d2be435 100644 --- a/linkerd/cache/benches/cache.rs +++ b/linkerd/cache/benches/cache.rs @@ -1,10 +1,11 @@ #![feature(test)] - -extern crate linkerd2_cache; -use futures::{future, Async, Future, Poll}; +use futures::future; use linkerd2_cache::{Cache, Handle}; use linkerd2_error::Never; use linkerd2_stack::NewService; +use std::future::Future; +use std::task::{Context, Poll}; +use tower::util::ServiceExt; use tower::Service; extern crate test; @@ -37,10 +38,10 @@ struct NewSometimesEvict; impl Service for NeverEvict { type Response = usize; type Error = Never; - type Future = futures::future::FutureResult; + type Future = future::Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, rhs: usize) -> Self::Future { @@ -62,10 +63,10 @@ impl NewService<(usize, Handle)> for NewNeverEvict { impl Service for AlwaysEvict { type Response = usize; type Error = Never; - type Future = futures::future::FutureResult; + type Future = future::Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, rhs: usize) -> Self::Future { @@ -84,10 +85,10 @@ impl NewService<(usize, Handle)> for NewAlwaysEvict { impl Service for SometimesEvict { type Response = usize; type Error = Never; - type Future = futures::future::FutureResult; + type Future = future::Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, rhs: usize) -> Self::Future { @@ -117,13 +118,16 @@ fn run_bench(num_svc: usize, new_svc: N, b: &mut Bencher) where N: NewService<(usize, Handle)>, N::Service: Service, - N::Service: Clone, + N::Service: Clone + Send + 'static, + >::Error: std::fmt::Debug, { let mut cache = Cache::new(new_svc); b.iter(|| { for n in 0..num_svc { - cache.poll_ready().unwrap(); - let _r = cache.call(n).wait().unwrap().call(n); + futures::executor::block_on(async { + let mut svc = cache.ready_and().await.unwrap(); + svc.call(n).await.unwrap().call(n).await.unwrap(); + }) } }); } diff --git a/linkerd/cache/src/layer.rs b/linkerd/cache/src/layer.rs index 34b0865a7d..d07a1211cd 100644 --- a/linkerd/cache/src/layer.rs +++ b/linkerd/cache/src/layer.rs @@ -1,7 +1,7 @@ use crate::Cache; use crate::Handle; -use futures::Poll; use linkerd2_stack::NewService; +use std::task::{Context, Poll}; pub struct CacheLayer { track_layer: L, @@ -85,8 +85,8 @@ impl> tower::Service for Track { type Error = S::Error; type Future = S::Future; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.inner.poll_ready() + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) } fn call(&mut self, req: T) -> Self::Future { diff --git a/linkerd/cache/src/lib.rs b/linkerd/cache/src/lib.rs index da0b2fb1f6..74db5414aa 100644 --- a/linkerd/cache/src/lib.rs +++ b/linkerd/cache/src/lib.rs @@ -1,12 +1,12 @@ #![deny(warnings, rust_2018_idioms)] - -use futures::{future, Async, Poll}; +use futures::future; use linkerd2_error::Never; use linkerd2_lock::{Guard, Lock}; use linkerd2_stack::NewService; use std::collections::HashMap; use std::hash::Hash; use std::sync::{Arc, Weak}; +use std::task::{Context, Poll}; use tracing::{debug, trace}; pub mod layer; @@ -63,38 +63,33 @@ where impl tower::Service for Cache where - T: Clone + Eq + Hash, + T: Clone + Eq + Hash + Send + 'static, N: NewService<(T, Handle)>, - N::Service: Clone, + N::Service: Clone + Send + 'static, { type Response = N::Service; type Error = Never; - type Future = future::FutureResult; + type Future = future::Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { if self.guard.is_none() { - match self.lock.poll_acquire() { - Async::NotReady => return Ok(Async::NotReady), - Async::Ready(mut services) => { - // Drop defunct services before interacting with the cache. - let n = services.len(); - services.retain(|_, (_, weak)| { - if weak.strong_count() > 0 { - true - } else { - debug!("Dropping defunct service"); - false - } - }); - trace!(services = services.len(), dropped = n - services.len()); - - self.guard = Some(services); + let mut services = futures::ready!(self.lock.poll_acquire(cx)); + // Drop defunct services before interacting with the cache. + let n = services.len(); + services.retain(|_, (_, weak)| { + if weak.strong_count() > 0 { + true + } else { + trace!("Dropping defunct service"); + false } - } + }); + debug!(services = services.len(), dropped = n - services.len()); + self.guard = Some(services); } debug_assert!(self.guard.is_some(), "guard must be acquired"); - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } fn call(&mut self, target: T) -> Self::Future { diff --git a/linkerd/dns/name/Cargo.toml b/linkerd/dns/name/Cargo.toml index ad331b42de..f5441ca617 100644 --- a/linkerd/dns/name/Cargo.toml +++ b/linkerd/dns/name/Cargo.toml @@ -6,5 +6,5 @@ edition = "2018" publish = false [dependencies] -webpki = "0.21" +webpki = "=0.21.0" untrusted = "0.7" diff --git a/linkerd/lock/Cargo.toml b/linkerd/lock/Cargo.toml index 4ab4ef0224..307dc2d290 100644 --- a/linkerd/lock/Cargo.toml +++ b/linkerd/lock/Cargo.toml @@ -9,14 +9,17 @@ A middleware that provides mutual exclusion. """ [dependencies] -futures = "0.1" +futures = "0.3" linkerd2-error = { path = "../error" } -tower = "0.1" +tower = { version = "0.3", default-features = false } tracing = "0.1" +tokio = { version = "0.2.19", features = ["sync", "macros", "rt-core"] } [dev-dependencies] rand = "0.7" -tokio = "0.1" -tracing-futures = "0.1" +tracing-futures = { version = "0.1", features = ["std-future"] } tracing-log = "0.1" -tracing-subscriber = "0.2.3" +tracing-subscriber = "0.2.5" +tower = { version = "0.3", default-features = false, features = ["util"] } +tokio-test = "0.2" +tower-test = "0.3" \ No newline at end of file diff --git a/linkerd/lock/src/lib.rs b/linkerd/lock/src/lib.rs index 1692281d0e..5de5d14178 100644 --- a/linkerd/lock/src/lib.rs +++ b/linkerd/lock/src/lib.rs @@ -6,7 +6,6 @@ pub mod error; mod layer; mod lock; mod service; -mod shared; #[cfg(test)] mod test; diff --git a/linkerd/lock/src/lock.rs b/linkerd/lock/src/lock.rs index d3f0f6dbcc..ba8f0dcf1a 100644 --- a/linkerd/lock/src/lock.rs +++ b/linkerd/lock/src/lock.rs @@ -1,32 +1,22 @@ -use crate::shared::{Shared, Wait}; -use futures::Async; -use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; +use std::{future::Future, pin::Pin, sync::Arc}; +use tokio::sync::Mutex; +pub use tokio::sync::OwnedMutexGuard as Guard; /// Provides mutually exclusive to a `T`-typed value, asynchronously. -/// -/// Note that, when the lock is contested, waiters are notified in a LIFO -/// fashion. This is done to minimize latency at the expense of unfairness. pub struct Lock { /// Set when this Lock is interested in acquiring the value. - waiting: Option, - shared: Arc>>, -} - -/// Guards access to a `T`-typed value, ensuring the value is released on Drop. -pub struct Guard { - /// Must always be Some; Used to reclaim the value in Drop. - value: Option, - - shared: Arc>>, + waiting: Option> + Send + 'static>>>, + lock: Arc>, } // === impl Lock === impl Lock { - pub fn new(service: S) -> Self { + pub fn new(value: S) -> Self { Self { waiting: None, - shared: Arc::new(Mutex::new(Shared::new(service))), + lock: Arc::new(Mutex::new(value)), } } } @@ -36,87 +26,54 @@ impl Clone for Lock { Self { // Clones have an independent local lock state. waiting: None, - shared: self.shared.clone(), + lock: self.lock.clone(), } } } -impl Lock { - fn guard(&self, value: T) -> Guard { - Guard { - value: Some(value), - shared: self.shared.clone(), +impl Lock { + /// Attempt to acquire the lock, returning `Pending` if it is held + /// elsewhere. + /// + /// If this `Lock` instance is not + pub fn poll_acquire(&mut self, cx: &mut Context<'_>) -> Poll> { + // If we have already registered interest in the lock and are waiting on + // a future, we'll poll that. Otherwise, we need a future. + // + // We `take` the waiting future so that if we drive it to completion on + // this poll, it won't be set on subsequent polls. + let mut waiting = self.waiting.take().unwrap_or_else(|| { + // This instance has not registered interest in the lock. + Box::pin(self.lock.clone().lock_owned()) + }); + + // Poll the future. + let res = { + let future = &mut waiting; + tokio::pin!(future); + future.poll(cx) + }; + + tracing::trace!(ready = res.is_ready()); + + // If the future hasn't completed, save it to be polled again the next + // time `poll_acquire` is called. + if res.is_pending() { + self.waiting = Some(waiting); } - } - - pub fn poll_acquire(&mut self) -> Async> { - let mut shared = self.shared.lock().expect("Lock poisoned"); - - loop { - self.waiting = match self.waiting { - // This instance has not registered interest in the lock. - None => match shared.acquire() { - // This instance has exclusive access to the inner service. - Some(value) => { - // The state remains Released. - return Async::Ready(self.guard(value)); - } - None => Some(Wait::default()), - }, - // This instance is interested in the lock. - Some(ref waiter) => match shared.poll_acquire(waiter) { - Async::NotReady => return Async::NotReady, - Async::Ready(value) => { - self.waiting = None; - return Async::Ready(self.guard(value)); - } - }, - }; - } - } -} - -impl Drop for Lock { - fn drop(&mut self) { - if let Some(wait) = self.waiting.take() { - if let Ok(mut shared) = self.shared.lock() { - shared.release_waiter(wait); - } - } + res } } -impl std::ops::Deref for Guard { - type Target = T; - fn deref(&self) -> &Self::Target { - self.value.as_ref().expect("Value dropped from guard") - } -} - -impl std::ops::DerefMut for Guard { - fn deref_mut(&mut self) -> &mut Self::Target { - self.value.as_mut().expect("Value dropped from guard") - } -} - -impl std::fmt::Debug for Guard { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Guard({:?})", &**self) - } -} - -impl std::fmt::Display for Guard { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Display::fmt(&**self, f) - } -} - -impl Drop for Guard { - fn drop(&mut self) { - let value = self.value.take().expect("Guard may not be dropped twice"); - if let Ok(mut shared) = self.shared.lock() { - shared.release_and_notify(value); +impl Lock { + // optimization so we can elide the `Box::pin` when we just want a future + pub async fn acquire(&mut self) -> Guard { + // Are we already waiting? If so, reuse that... + if let Some(waiting) = self.waiting.take() { + waiting.await + } else { + self.lock.clone().lock_owned().await } } } diff --git a/linkerd/lock/src/service.rs b/linkerd/lock/src/service.rs index f30cabcca9..2c6eb9dc37 100644 --- a/linkerd/lock/src/service.rs +++ b/linkerd/lock/src/service.rs @@ -1,7 +1,8 @@ use crate::error::{Error, ServiceError}; use crate::{Guard, Lock}; -use futures::{future, Async, Future, Poll}; +use futures::{future, TryFutureExt}; use std::sync::Arc; +use std::task::{Context, Poll}; use tracing::trace; /// A middleware that safely shares an inner service among clones. @@ -33,43 +34,44 @@ impl Clone for LockService { impl tower::Service for LockService where - S: tower::Service, + S: tower::Service + Send + 'static, S::Error: Into, { type Response = S::Response; type Error = Error; type Future = future::MapErr Self::Error>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { loop { trace!(acquired = self.guard.is_some()); if let Some(guard) = self.guard.as_mut() { return match guard.as_mut() { - Err(err) => Err(err.clone().into()), - Ok(ref mut svc) => match svc.poll_ready() { - Ok(ok) => { - trace!(ready = ok.is_ready()); - Ok(ok) - } - Err(inner) => { + Err(err) => Poll::Ready(Err(err.clone().into())), + Ok(ref mut svc) => match svc.poll_ready(cx) { + Poll::Ready(Err(inner)) => { let error = ServiceError::new(Arc::new(inner.into())); **guard = Err(error.clone()); // Drop the guard. self.guard = None; - Err(error.into()) + Poll::Ready(Err(error.into())) + } + Poll::Pending => { + trace!(ready = false); + Poll::Pending + } + Poll::Ready(Ok(())) => { + trace!(ready = true); + debug_assert!(self.guard.is_some()); + Poll::Ready(Ok(())) } }, }; } debug_assert!(self.guard.is_none()); - match self.lock.poll_acquire() { - Async::NotReady => return Ok(Async::NotReady), - Async::Ready(guard) => { - self.guard = Some(guard); - } - } + let guard = futures::ready!(self.lock.poll_acquire(cx)); + self.guard = Some(guard); } } diff --git a/linkerd/lock/src/shared.rs b/linkerd/lock/src/shared.rs deleted file mode 100644 index a3cf3c699f..0000000000 --- a/linkerd/lock/src/shared.rs +++ /dev/null @@ -1,152 +0,0 @@ -use self::waiter::Notify; -pub(crate) use self::waiter::Wait; -use futures::Async; -use tracing::trace; - -/// The shared state between one or more Lock instances. -/// -/// When multiple lock instances are contending for the inner value, waiters are stored in a LIFO -/// stack. This is done to bias for latency instead of fairness -/// -/// N.B. Waiters capacity is released lazily as waiters are notified and, i.e., not when a waiter is -/// dropped. In high-load scenarios where the lock _always_ has new waiters, this could potentially -/// manifest as unbounded memory growth. This situation is not expected to arise in normal operation. -pub(crate) struct Shared { - /// Set when the value is available to be acquired; None when the value is acquired. - value: Option, - - /// A LIFO stack of waiters to be notified when the value is available. - waiters: Vec, -} - -// === impl Shared === - -impl Shared { - pub fn new(value: T) -> Self { - Self { - waiters: Vec::new(), - value: Some(value), - } - } - - /// Try to claim a value without registering a waiter. - /// - /// Once a value is acquired it **must** be returned via `release_and_notify`. - pub fn acquire(&mut self) -> Option { - trace!(acquired = %self.value.is_some()); - self.value.take() - } - - /// Try to acquire a value or register the given waiter to be notified when - /// the lock is available. - /// - /// Once a value is acquired it **must** be returned via `release_and_notify`. - /// - /// If `Async::NotReady` is returned, the polling task, once notified, **must** either call - /// `poll_acquire` again to obtain a value, or the waiter **must** be returned via - /// `release_waiter`. - pub fn poll_acquire(&mut self, wait: &Wait) -> Async { - if let Some(value) = self.acquire() { - return Async::Ready(value); - } - - // Register the current task to be notified. - wait.register(); - // Register the waiter's notify handle if one isn't already registered. - if let Some(notify) = wait.get_notify() { - self.waiters.push(notify); - } - debug_assert!(wait.has_notify()); - - trace!(waiters = self.waiters.len(), "Waiting"); - Async::NotReady - } - - pub fn release_and_notify(&mut self, value: T) { - trace!(waiters = self.waiters.len(), "Releasing"); - assert!(self.value.is_none()); - self.value = Some(value); - self.notify_next_waiter(); - } - - pub fn release_waiter(&mut self, wait: Wait) { - // If a waiter is being released and it does not have a notify, then it must be being - // released after being notified. Notify the next waiter to prevent deadlock. - if self.value.is_some() { - if !wait.has_notify() { - self.notify_next_waiter(); - } - } - } - - fn notify_next_waiter(&mut self) { - while let Some(waiter) = self.waiters.pop() { - if waiter.notify() { - trace!("Notified waiter"); - return; - } - } - } -} - -mod waiter { - use futures::task::AtomicTask; - use std::sync::{Arc, Weak}; - - /// A handle held by Lock instances when waiting to be notified. - #[derive(Default)] - pub(crate) struct Wait(Arc); - - /// A handle held by shared lock state to notify a waiting Lock. - /// - /// There may be at most one `Notify` instance per `Wait` instance at any one time. - pub(super) struct Notify(Weak); - - impl Wait { - /// If a `Notify` handle does not currently exist for this waiter, create a new one. - pub(super) fn get_notify(&self) -> Option { - if !self.has_notify() { - let n = Notify(Arc::downgrade(&self.0)); - debug_assert!(self.has_notify()); - Some(n) - } else { - None - } - } - - /// Register this waiter with the current task. - pub(super) fn register(&self) { - self.0.register(); - } - - /// Returns true iff there is currently a `Notify` handle for this waiter. - pub(super) fn has_notify(&self) -> bool { - let weaks = Arc::weak_count(&self.0); - debug_assert!( - weaks == 0 || weaks == 1, - "There must only be at most one Notify per Wait" - ); - weaks == 1 - } - } - - impl std::fmt::Debug for Wait { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Wait(notify={})", self.has_notify()) - } - } - - impl Notify { - /// Attempt to notify the waiter. - /// - /// Returns true if a waiter was notified and false otherwise. - pub fn notify(self) -> bool { - if let Some(task) = self.0.upgrade() { - task.notify(); - true - } else { - false - } - } - } -} diff --git a/linkerd/lock/src/test.rs b/linkerd/lock/src/test.rs index 398398d3d0..ca4be1e6dc 100644 --- a/linkerd/lock/src/test.rs +++ b/linkerd/lock/src/test.rs @@ -1,201 +1,219 @@ -// use crate::error::ServiceError; +use crate::error::ServiceError; use crate::LockService; -use futures::{future, try_ready, Async, Future, Poll, Stream}; +use futures::{future, StreamExt}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use tokio::sync::{mpsc, oneshot}; +use std::task::{Context, Poll}; +use std::{future::Future, pin::Pin, sync::Arc}; +use tokio::sync::oneshot; +use tokio_test::{assert_pending, assert_ready, assert_ready_ok}; use tower::Service as _Service; +use tower_test::mock::Spawn; use tracing::{info_span, trace}; use tracing_futures::Instrument; -#[test] -fn exclusive_access() { - run(future::lazy(|| { - let ready = Arc::new(AtomicBool::new(false)); - let mut svc0 = LockService::new(Decr::new(2, ready.clone())); +#[tokio::test] +async fn exclusive_access() { + let ready = Arc::new(AtomicBool::new(false)); + let mut svc0 = Spawn::new(LockService::new(Decr::new(2, ready.clone()))); - // svc0 grabs the lock, but the inner service isn't ready. - assert!(svc0.poll_ready().expect("must not fail").is_not_ready()); + // svc0 grabs the lock, but the inner service isn't ready. + assert_pending!(svc0.poll_ready()); - // Cloning a locked service does not preserve the lock. - let mut svc1 = svc0.clone(); + // Cloning a locked service does not preserve the lock. + let mut svc1 = svc0.clone(); - // svc1 can't grab the lock. - assert!(svc1.poll_ready().expect("must not fail").is_not_ready()); + // svc1 can't grab the lock. + assert_pending!(svc1.poll_ready()); - // svc0 holds the lock and becomes ready with the inner service. - ready.store(true, Ordering::SeqCst); - assert!(svc0.poll_ready().expect("must not fail").is_ready()); + // svc0 holds the lock and becomes ready with the inner service. + ready.store(true, Ordering::SeqCst); + assert_ready_ok!(svc0.poll_ready()); - // svc1 still can't grab the lock. - assert!(svc1.poll_ready().expect("must not fail").is_not_ready()); + // svc1 still can't grab the lock. + assert_pending!(svc1.poll_ready()); - // svc0 remains ready. - let fut0 = svc0.call(1); + // svc0 remains ready. + let fut0 = svc0.call(1); - // svc1 grabs the lock and is immediately ready. - assert!(svc1.poll_ready().expect("must not fail").is_ready()); - // svc0 cannot grab the lock. - assert!(svc0.poll_ready().expect("must not fail").is_not_ready()); + // svc1 grabs the lock and is immediately ready. + assert_ready_ok!(svc1.poll_ready()); + // svc0 cannot grab the lock. + assert_pending!(svc0.poll_ready()); - let fut1 = svc1.call(1); + let fut1 = svc1.call(1); - fut0.join(fut1) - .map(|_| ()) - .map_err(|_| panic!("must not fail")) - })); + tokio::try_join!(fut0, fut1).expect("must not fail!"); } -// #[test] -// fn propagates_errors() { -// run(future::lazy(|| { -// let mut svc0 = LockService::new(Decr::from(1)); - -// // svc0 grabs the lock and we decr the service so it will fail. -// assert!(svc0.poll_ready().expect("must not fail").is_ready()); -// // svc0 remains ready. -// svc0.call(1) -// .map_err(|_| panic!("must not fail")) -// .map(move |_| { -// // svc1 grabs the lock and fails immediately. -// let mut svc1 = svc0.clone(); -// assert!(svc1 -// .poll_ready() -// .expect_err("mut fail") -// .downcast_ref::() -// .expect("must fail with service error") -// .inner() -// .is::()); - -// // svc0 suffers the same fate. -// assert!(svc0 -// .poll_ready() -// .expect_err("mut fail") -// .downcast_ref::() -// .expect("must fail with service error") -// .inner() -// .is::()); -// }) -// })); -// } - -#[test] -fn dropping_releases_access() { +#[tokio::test] +async fn propagates_errors() { + let mut svc0 = Spawn::new(LockService::new(Decr::from(1))); + + // svc0 grabs the lock and we decr the service so it will fail. + assert_ready_ok!(svc0.poll_ready()); + + // svc0 remains ready. + let _ = svc0.call(1).await.expect("must not fail!"); + + // svc1 grabs the lock and fails immediately. + let mut svc1 = svc0.clone(); + assert_ready!(svc1.poll_ready()) + .expect_err("must fail") + .downcast_ref::() + .expect("must fail with service error") + .inner() + .is::(); + + // svc0 suffers the same fate. + assert_ready!(svc0.poll_ready()) + .expect_err("mut fail") + .downcast_ref::() + .expect("must fail with service error") + .inner() + .is::(); +} + +#[tokio::test] +async fn dropping_releases_access() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); use tower::util::ServiceExt; - run(future::lazy(|| { - let mut svc0 = LockService::new(Decr::new(3, Arc::new(true.into()))); - - // svc0 grabs the lock, but the inner service isn't ready. - assert!(svc0.poll_ready().expect("must not fail").is_ready()); - - let svc1 = svc0.clone(); - let (tx1, rx1) = oneshot::channel(); - tokio::spawn( - svc1.oneshot(1) - .then(move |_| { - trace!("complete"); - tx1.send(()).map_err(|_| ()) - }) - .instrument(info_span!("Svc1")), - ); - - let svc2 = svc0.clone(); - let (tx2, rx2) = oneshot::channel(); - tokio::spawn( - svc2.oneshot(1) - .then(move |_| { - trace!("complete"); - tx2.send(()).map_err(|_| ()) - }) - .instrument(info_span!("Svc2")), - ); - - // svc3 will be the notified waiter when svc0 completes; but it drops - // svc3 before polling the waiter. This test ensures that svc2 is - // notified by svc3's drop. - let svc3 = svc0.clone(); - let (tx3, rx3) = oneshot::channel(); - tokio::spawn(PollAndDrop(svc3, rx3).instrument(info_span!("Svc3"))); - - tokio::spawn( - svc0.ready() - .then(move |_| tx3.send(()).map_err(|_| ())) - .instrument(info_span!("Svc0")), - ); - // svc3 notified; but it is dropped before it can be polled - - rx2.then(move |_| rx1).map_err(|_| ()) - })); - - struct PollAndDrop(LockService, oneshot::Receiver<()>); - impl Future for PollAndDrop { - type Item = (); - type Error = (); - fn poll(&mut self) -> Poll { - trace!("Polling"); - if self.1.poll().map_err(|_| ())?.is_ready() { - trace!("Dropping"); - return Ok(Async::Ready(())); - } - self.0.poll_ready().map_err(|_| ()) + let mut svc0 = LockService::new(Decr::new(3, Arc::new(true.into()))); + + // svc0 grabs the lock, but the inner service isn't ready. + future::poll_fn(|cx| { + assert_ready_ok!(svc0.poll_ready(cx)); + Poll::Ready(()) + }) + .await; + + let svc1 = svc0.clone(); + + let (tx1, rx1) = oneshot::channel(); + tokio::spawn( + async move { + let svc1 = svc1; + trace!("started"); + let _f = svc1.oneshot(1).instrument(info_span!("1shot")).await; + trace!("sending"); + tx1.send(()).unwrap(); + trace!("done"); } - } + .instrument(info_span!("Svc1")), + ); + + let (tx2, rx2) = oneshot::channel(); + let svc2 = svc0.clone(); + tokio::spawn( + async move { + trace!("started"); + let _ = svc2.oneshot(1).await; + trace!("sending"); + tx2.send(()).unwrap(); + trace!("done"); + } + .instrument(info_span!("Svc2")), + ); + + // svc3 will be the notified waiter when svc0 completes; but it drops + // svc3 before polling the waiter. This test ensures that svc2 is + // notified by svc3's drop. + let svc3 = svc0.clone(); + let (tx3, rx3) = oneshot::channel(); + tokio::spawn( + async move { + let mut svc3 = Some(svc3); + let mut rx3 = rx3; + let _ = future::poll_fn(|cx| { + let rx3 = &mut rx3; + tokio::pin!(rx3); + trace!("Polling"); + if let Poll::Ready(ready) = rx3.poll(cx) { + trace!(?ready, "Dropping"); + drop(svc3.take()); + return Poll::Ready(Ok(())); + } + svc3.as_mut() + .expect("polled after ready?") + .poll_ready(cx) + .map_err(|_| ()) + }) + .await; + } + .instrument(info_span!("Svc3")), + ); + + tokio::spawn( + async move { + trace!("started"); + let _ = svc0.ready_and().await; + trace!("ready"); + tx3.send(()).map_err(|_| ())?; + trace!("sent"); + Ok::<(), ()>(()) + } + .instrument(info_span!("Svc0")), + ); + // svc3 notified; but it is dropped before it can be polled + + rx2.await.unwrap(); + rx1.await.unwrap(); } -#[test] -fn fuzz() { +#[tokio::test] +async fn fuzz() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init(); const ITERS: usize = 100_000; - for (concurrency, iterations) in &[(1, ITERS), (3, ITERS), (100, ITERS)] { - tokio::run(future::lazy(move || { + for (concurrency, iterations) in &[(1usize, ITERS), (3, ITERS), (100, ITERS)] { + async { + tracing::info!("starting"); let svc = LockService::new(Decr::new(*iterations, Arc::new(true.into()))); - let (tx, rx) = mpsc::channel(1); - for _ in 0..*concurrency { - tokio::spawn(Loop { - lock: svc.clone(), - _tx: tx.clone(), - }); + let joins = futures::stream::futures_unordered::FuturesUnordered::new(); + for i in 0..*concurrency { + let lock = svc.clone(); + let join = tokio::spawn( + async move { + let mut lock = lock; + future::poll_fn(|cx| { + loop { + let ready = lock.poll_ready(cx); + tracing::trace!(?ready); + if futures::ready!(ready).is_err() { + return Poll::Ready(()); + } + // Randomly be busy while holding the lock. + if rand::random::() { + cx.waker().wake_by_ref(); + return Poll::Pending; + } + + tokio::spawn(lock.call(1)); + } + }) + .await; + tracing::trace!("task done"); + } + .instrument(tracing::trace_span!("task", number = i)), + ); + joins.push(join); } - rx.fold((), |(), ()| Ok(())).map_err(|_| ()) - })); - } - - struct Loop { - lock: LockService, - _tx: mpsc::Sender<()>, - } - impl Future for Loop { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll { - loop { - try_ready!(self.lock.poll_ready().map_err(|_| ())); - - // Randomly be busy while holding the lock. - if rand::random::() { - futures::task::current().notify(); - return Ok(Async::NotReady); - } - tokio::spawn(self.lock.call(1).then(|_| Ok(()))); - } + joins + .for_each(|res| { + let _ = res.expect("task must not have panicked"); + async {} + }) + .await; + tracing::info!("done"); } + .instrument(tracing::info_span!("fuzz", concurrency, iterations)) + .await } } -fn run(future: F) -where - F: Future + 'static, -{ - let subscriber = tracing_subscriber::fmt::Subscriber::builder() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .with_writer(std::io::stdout) - .finish(); - tracing::subscriber::with_default(subscriber, move || { - tokio::runtime::current_thread::run(future) - }); -} - #[derive(Debug, Default)] struct Decr { value: usize, @@ -220,28 +238,36 @@ impl Decr { impl tower::Service for Decr { type Response = usize; type Error = Underflow; - type Future = futures::future::FutureResult; + type Future = + Pin> + Send + Sync + 'static>>; - fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> { + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + let span = tracing::trace_span!("Decr::poll_ready", self.value); + let _g = span.enter(); if self.value == 0 { - return Err(Underflow); + tracing::trace!(ready = true, "underflow"); + return Poll::Ready(Err(Underflow)); } if !self.ready.load(Ordering::SeqCst) { - return Ok(Async::NotReady); + tracing::trace!(ready = false); + return Poll::Pending; } - Ok(().into()) + tracing::trace!(ready = true); + Poll::Ready(Ok(())) } fn call(&mut self, decr: usize) -> Self::Future { if self.value < decr { self.value = 0; - return futures::future::err(Underflow); + + return Box::pin(async { Err(Underflow) }); } self.value -= decr; - futures::future::ok(self.value) + let value = self.value; + Box::pin(async move { Ok(value) }) } }