Skip to content

Commit

Permalink
Reimplement the Lock middleware with tokio::sync
Browse files Browse the repository at this point in the history
As described in tokio-rs/tokio#2237, the `tokio::sync::Semaphore` can
hold unbounded memory, especially when the semaphor is being contested
and consumers drop interest. Unfortunately, this use case is common in
the proxy, especially when a destination service is unavailable and the
proxy is timing out requests.

This change reimplements the Lock middleware without using
`tokio::sync::Semaphore`. This implementation is in some ways more
naive and inefficient, but it appears to be better suited for the
proxy's needs. Specifically, waiters are stored in a LIFO stack, which
optimizes for minimizing latency. Under certain high-load scenarios,
this Lock could be forced to grow its waiters set without cleaning up
expired watchers. If this becomes a more serious concern, we could
change the implementation to use a FIFO queue of waiters.
  • Loading branch information
olix0r committed Feb 17, 2020
1 parent 4534327 commit 2b7b875
Show file tree
Hide file tree
Showing 8 changed files with 677 additions and 394 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -874,9 +874,13 @@ version = "0.1.0"
dependencies = [
"futures",
"linkerd2-error",
"rand 0.7.2",
"tokio",
"tower",
"tracing",
"tracing-futures",
"tracing-log",
"tracing-subscriber",
]

[[package]]
Expand Down
6 changes: 6 additions & 0 deletions linkerd/lock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,9 @@ linkerd2-error = { path = "../error" }
tokio = "0.1"
tower = "0.1"
tracing = "0.1"

[dev-dependencies]
rand = "0.7"
tracing-futures = "0.1"
tracing-log = "0.1"
tracing-subscriber = "0.2"
44 changes: 44 additions & 0 deletions linkerd/lock/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
pub use linkerd2_error::Error;
use std::sync::Arc;

#[derive(Debug)]
pub struct Poisoned(());

#[derive(Debug)]
pub struct ServiceError(Arc<Error>);

// === impl POisoned ===

impl Poisoned {
pub fn new() -> Self {
Poisoned(())
}
}

impl std::fmt::Display for Poisoned {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "poisoned")
}
}

impl std::error::Error for Poisoned {}

// === impl ServiceError ===

impl ServiceError {
pub(crate) fn new(e: Arc<Error>) -> Self {
ServiceError(e)
}

pub fn inner(&self) -> &Error {
self.0.as_ref()
}
}

impl std::fmt::Display for ServiceError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}

impl std::error::Error for ServiceError {}
20 changes: 20 additions & 0 deletions linkerd/lock/src/layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use super::Lock;

#[derive(Clone, Debug, Default)]
pub struct LockLayer(());

// === impl Layer ===

impl LockLayer {
pub fn new() -> Self {
LockLayer(())
}
}

impl<S> tower::layer::Layer<S> for LockLayer {
type Service = Lock<S>;

fn layer(&self, inner: S) -> Self::Service {
Self::Service::new(inner)
}
}
Loading

0 comments on commit 2b7b875

Please sign in to comment.