Skip to content

Commit

Permalink
Reimplement the Lock middleware with tokio::sync
Browse files Browse the repository at this point in the history
As described in tokio-rs/tokio#2237, the `tokio::sync::Semaphore` can
hold unbounded memory, especially when the semaphor is being contested
and consumers drop interest. Unfortunately, this use case is common in
the proxy, especially when a destination service is unavailable and the
proxy is timing out requests.

This change reimplements the Lock middleware without using
`tokio::sync::Semaphore`. This implementation is in some ways more
naive and inefficient, but it appears to be better suited for the
proxy's needs. Specifically, waiters are stored in a LIFO stack, which
optimizes for minimizing latency. Under certain high-load scenarios,
this Lock could be forced to grow its waiters set without cleaning up
expired watchers. If this becomes a more serious concern, we could
change the implementation to use a FIFO queue of waiters.
  • Loading branch information
olix0r committed Feb 17, 2020
1 parent d1f18a8 commit 427b96c
Show file tree
Hide file tree
Showing 8 changed files with 713 additions and 396 deletions.
41 changes: 39 additions & 2 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ dependencies = [
"tracing",
"tracing-futures",
"tracing-log",
"tracing-subscriber",
"tracing-subscriber 0.2.0-alpha.6",
]

[[package]]
Expand Down Expand Up @@ -872,11 +872,16 @@ dependencies = [
name = "linkerd2-lock"
version = "0.1.0"
dependencies = [
"fnv",
"futures",
"linkerd2-error",
"rand 0.7.2",
"tokio",
"tower",
"tracing",
"tracing-futures",
"tracing-log",
"tracing-subscriber 0.1.6",
]

[[package]]
Expand Down Expand Up @@ -1115,7 +1120,7 @@ dependencies = [
"tower",
"tower-util",
"tracing",
"tracing-subscriber",
"tracing-subscriber 0.2.0-alpha.6",
"untrusted",
"webpki",
]
Expand Down Expand Up @@ -1426,6 +1431,15 @@ dependencies = [
"tower-grpc-build",
]

[[package]]
name = "owning_ref"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49a4b8ea2179e6a2e27411d3bca09ca6dd630821cf6894c6c7c8467a8ee7ef13"
dependencies = [
"stable_deref_trait",
]

[[package]]
name = "percent-encoding"
version = "1.0.1"
Expand Down Expand Up @@ -1947,6 +1961,12 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"

[[package]]
name = "stable_deref_trait"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dba1a27d3efae4351c8051072d619e3ade2820635c3958d826bfea39d59b54c8"

[[package]]
name = "string"
version = "0.2.0"
Expand Down Expand Up @@ -2563,6 +2583,23 @@ dependencies = [
"tracing-core",
]

[[package]]
name = "tracing-subscriber"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "192ca16595cdd0661ce319e8eede9c975f227cdaabc4faaefdc256f43d852e45"
dependencies = [
"ansi_term",
"chrono",
"lazy_static",
"matchers",
"owning_ref",
"regex 1.0.0",
"smallvec 0.6.10",
"tracing-core",
"tracing-log",
]

[[package]]
name = "tracing-subscriber"
version = "0.2.0-alpha.6"
Expand Down
7 changes: 7 additions & 0 deletions linkerd/lock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@ A middleware that provides mutual exclusion.
"""

[dependencies]
fnv = "1.0"
futures = "0.1"
linkerd2-error = { path = "../error" }
tokio = "0.1"
tower = "0.1"
tracing = "0.1"

[dev-dependencies]
rand = "0.7"
tracing-futures = "0.1"
tracing-log = "0.1"
tracing-subscriber = "0.1"
44 changes: 44 additions & 0 deletions linkerd/lock/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
pub use linkerd2_error::Error;
use std::sync::Arc;

#[derive(Debug)]
pub struct Poisoned(());

#[derive(Debug)]
pub struct ServiceError(Arc<Error>);

// === impl POisoned ===

impl Poisoned {
pub fn new() -> Self {
Poisoned(())
}
}

impl std::fmt::Display for Poisoned {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "poisoned")
}
}

impl std::error::Error for Poisoned {}

// === impl ServiceError ===

impl ServiceError {
pub(crate) fn new(e: Arc<Error>) -> Self {
ServiceError(e)
}

pub fn inner(&self) -> &Error {
self.0.as_ref()
}
}

impl std::fmt::Display for ServiceError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}

impl std::error::Error for ServiceError {}
20 changes: 20 additions & 0 deletions linkerd/lock/src/layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use super::Lock;

#[derive(Clone, Debug, Default)]
pub struct LockLayer(());

// === impl Layer ===

impl LockLayer {
pub fn new() -> Self {
LockLayer(())
}
}

impl<S> tower::layer::Layer<S> for LockLayer {
type Service = Lock<S>;

fn layer(&self, inner: S) -> Self::Service {
Self::Service::new(inner)
}
}
Loading

0 comments on commit 427b96c

Please sign in to comment.