Skip to content

Commit

Permalink
chore(http): Add a TimeoutBody middleware
Browse files Browse the repository at this point in the history
This commit implements a new TimeoutBody middleware that uses the new
`Body::poll_progess` method to constrain the amount of time a stream waits for
send capacity.

This change does not yet wire up the middleware into the Linkerd server.
  • Loading branch information
olix0r committed Apr 17, 2024
1 parent ad2917e commit 71b511a
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 0 deletions.
26 changes: 26 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,17 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"

[[package]]
name = "futures-macro"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

[[package]]
name = "futures-sink"
version = "0.3.30"
Expand All @@ -661,6 +672,7 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
Expand Down Expand Up @@ -1444,6 +1456,20 @@ dependencies = [
"tracing",
]

[[package]]
name = "linkerd-http-body-timeout"
version = "0.1.0"
dependencies = [
"futures",
"http",
"http-body",
"linkerd-error",
"pin-project",
"thiserror",
"tokio",
"tower-service",
]

[[package]]
name = "linkerd-http-box"
version = "0.1.0"
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ members = [
"linkerd/error-respond",
"linkerd/exp-backoff",
"linkerd/http/access-log",
"linkerd/http/body-timeout",
"linkerd/http/box",
"linkerd/http/classify",
"linkerd/http/metrics",
Expand Down
16 changes: 16 additions & 0 deletions linkerd/http/body-timeout/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "linkerd-http-body-timeout"
version = "0.1.0"
edition = "2021"
publish = false

[dependencies]
http = "0.2"
http-body = "0.4"
futures = "0.3"
pin-project = "1"
thiserror = "1"
tokio = { version = "1", features = ["time"] }
tower-service = "0.3"

linkerd-error = { path = "../../error" }
174 changes: 174 additions & 0 deletions linkerd/http/body-timeout/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)]
#![forbid(unsafe_code)]

use futures::prelude::*;
use http::{HeaderMap, HeaderValue};
use http_body::Body;
use linkerd_error::Error;
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::time;

pub struct TimeoutRequestProgress<S> {
inner: S,
timeout: time::Duration,
}

pub struct TimeoutResponseProgress<S> {
inner: S,
timeout: time::Duration,
}

/// A [`Body`] that imposes a timeout on the amount of time the stream may be
/// stuck waiting for capacity.
#[derive(Debug)]
#[pin_project]
pub struct ProgressTimeoutBody<B> {
#[pin]
inner: B,
sleep: Pin<Box<time::Sleep>>,
timeout: time::Duration,
is_pending: bool,
}

#[derive(Debug, thiserror::Error)]
#[error("body progress timeout after {0:?}")]
pub struct BodyProgressTimeoutError(time::Duration);

// === impl TimeoutRequestProgress ===

impl<S> TimeoutRequestProgress<S> {
pub fn new(timeout: time::Duration, inner: S) -> Self {
Self { inner, timeout }
}
}

impl<B, S> tower_service::Service<http::Request<B>> for TimeoutRequestProgress<S>
where
S: tower_service::Service<http::Request<ProgressTimeoutBody<B>>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

#[inline]
fn call(&mut self, req: http::Request<B>) -> Self::Future {
self.inner
.call(req.map(|b| ProgressTimeoutBody::new(self.timeout, b)))
}
}

// === impl TimeoutResponseProgress ===

impl<S> TimeoutResponseProgress<S> {
pub fn new(timeout: time::Duration, inner: S) -> Self {
Self { inner, timeout }
}
}

impl<Req, B, S> tower_service::Service<Req> for TimeoutResponseProgress<S>
where
S: tower_service::Service<Req, Response = http::Response<B>>,
S::Future: Send + 'static,
{
type Response = http::Response<ProgressTimeoutBody<B>>;
type Error = S::Error;
type Future =
Pin<Box<dyn std::future::Future<Output = Result<Self::Response, S::Error>> + Send>>;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

#[inline]
fn call(&mut self, req: Req) -> Self::Future {
let timeout = self.timeout;
self.inner
.call(req)
.map_ok(move |res| res.map(|b| ProgressTimeoutBody::new(timeout, b)))
.boxed()
}
}

// === impl ProgressTimeoutBody ===

impl<B> ProgressTimeoutBody<B> {
pub fn new(timeout: time::Duration, inner: B) -> Self {
// Avoid overflows by capping MAX to roughly 30 years.
const MAX: time::Duration = time::Duration::from_secs(86400 * 365 * 30);
Self {
inner,
timeout: timeout.min(MAX),
is_pending: false,
sleep: Box::pin(time::sleep(MAX)),
}
}
}

impl<B> Body for ProgressTimeoutBody<B>
where
B: Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<Error>,
{
type Data = B::Data;
type Error = Error;

#[inline]
fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}

#[inline]
fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let this = self.project();
*this.is_pending = false;
this.inner.poll_data(cx).map_err(Into::into)
}

#[inline]
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
let this = self.project();
*this.is_pending = false;
this.inner.poll_trailers(cx).map_err(Into::into)
}

fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();

let _ = this.inner.poll_progress(cx).map_err(Into::into)?;

if !*this.is_pending {
this.sleep
.as_mut()
.reset(time::Instant::now() + *this.timeout);
*this.is_pending = true;
}

match this.sleep.as_mut().poll(cx) {
Poll::Ready(()) => Poll::Ready(Err(BodyProgressTimeoutError(*this.timeout).into())),
Poll::Pending => Poll::Pending,
}
}

#[inline]
fn size_hint(&self) -> http_body::SizeHint {
self.inner.size_hint()
}
}

0 comments on commit 71b511a

Please sign in to comment.