From 5a3cf6ff82a1bd3ed5bcab6eeb15a3bf7f2e3cc9 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 9 Jun 2021 15:15:37 -0700 Subject: [PATCH 01/15] wip cap Signed-off-by: Eliza Weisman --- Cargo.lock | 1 + linkerd/http-retry/Cargo.toml | 1 + linkerd/http-retry/src/replay.rs | 70 ++++++++++++++++++++++++++------ 3 files changed, 60 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 17488e86a1..00e33e6091 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1026,6 +1026,7 @@ dependencies = [ "linkerd-tracing", "parking_lot", "pin-project", + "thiserror", "tokio", "tower", "tracing", diff --git a/linkerd/http-retry/Cargo.toml b/linkerd/http-retry/Cargo.toml index 655fcf2fae..6c73d48e50 100644 --- a/linkerd/http-retry/Cargo.toml +++ b/linkerd/http-retry/Cargo.toml @@ -18,6 +18,7 @@ pin-project = "1" parking_lot = "0.11" tower = { version = "0.4.7", default-features = false, features = ["retry", "util"] } tracing = "0.1.23" +thiserror = "1" [dev-dependencies] hyper = "0.14" diff --git a/linkerd/http-retry/src/replay.rs b/linkerd/http-retry/src/replay.rs index 6e3362b6a3..44d67ffc06 100644 --- a/linkerd/http-retry/src/replay.rs +++ b/linkerd/http-retry/src/replay.rs @@ -1,8 +1,10 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use http::HeaderMap; use http_body::Body; +use linkerd_error::Error; use parking_lot::Mutex; use std::{collections::VecDeque, io::IoSlice, pin::Pin, sync::Arc, task::Context, task::Poll}; +use thiserror::Error; /// Wraps an HTTP body type and lazily buffers data as it is read from the inner /// body. @@ -33,8 +35,15 @@ pub struct ReplayBody { /// Should this clone replay trailers from the shared state? replay_trailers: bool, + + /// Maxiumum number of bytes to buffer. + max_bytes: usize, } +#[derive(Debug, Error)] +#[error("cannot buffer more than {0} bytes")] +pub struct Capped(usize); + /// Data returned by `ReplayBody`'s `http_body::Body` implementation is either /// `Bytes` returned by the initial body, or a list of all `Bytes` chunks /// returned by the initial body (when replaying it). @@ -68,13 +77,14 @@ struct BodyState { trailers: Option, rest: Option, is_completed: bool, + is_capped: bool, } // === impl ReplayBody === impl ReplayBody { /// Wraps an initial `Body` in a `ReplayBody`. - pub fn new(body: B) -> Self { + pub fn new(body: B, max_bytes: usize) -> Self { let was_empty = body.is_end_stream(); Self { state: Some(BodyState { @@ -82,6 +92,7 @@ impl ReplayBody { trailers: None, rest: Some(body), is_completed: false, + is_capped: false, }), shared: Arc::new(SharedState { body: Mutex::new(None), @@ -90,6 +101,7 @@ impl ReplayBody { // The initial `ReplayBody` has nothing to replay replay_body: false, replay_trailers: false, + max_bytes, } } @@ -109,9 +121,13 @@ impl ReplayBody { } } -impl Body for ReplayBody { +impl Body for ReplayBody +where + B: Body + Unpin, + B::Error: Into, +{ type Data = Data; - type Error = B::Error; + type Error = Error; fn poll_data( self: Pin<&mut Self>, @@ -123,16 +139,24 @@ impl Body for ReplayBody { replay_body = this.replay_body, buf.has_remaining = state.buf.has_remaining(), body.is_completed = state.is_completed, + body.is_capped = state.is_capped, "Replay::poll_data" ); // If we haven't replayed the buffer yet, and its not empty, return the // buffered data first. - if this.replay_body && state.buf.has_remaining() { - tracing::trace!("replaying body"); - // Don't return the buffered data again on the next poll. - this.replay_body = false; - return Poll::Ready(Some(Ok(Data::Replay(state.buf.clone())))); + if this.replay_body { + if state.buf.has_remaining() { + tracing::trace!("replaying body"); + // Don't return the buffered data again on the next poll. + this.replay_body = false; + return Poll::Ready(Some(Ok(Data::Replay(state.buf.clone())))); + } + + if state.is_capped { + tracing::trace!("cannot replay buffered body, maximum buffer length reached"); + return Poll::Ready(Some(Err(Capped(this.max_bytes).into()))); + } } // If the inner body has previously ended, don't poll it again. @@ -157,9 +181,30 @@ impl Body for ReplayBody { tracing::trace!("Initial body completed"); state.is_completed = true; } - return Poll::Ready( - opt.map(|ok| ok.map(|data| Data::Initial(state.buf.push_chunk(data)))), - ); + return Poll::Ready(opt.map(|ok| { + ok.map(|data| { + // If we have already buffered the maximum number of bytes, + // allow *this* body to continue, but don't buffer any more. + if state.is_capped { + return Data::Initial(data.copy_to_bytes(data.remaining())); + } + + if state.buf.remaining() + data.remaining() >= this.max_bytes { + tracing::debug!( + max_bytes = this.max_bytes, + "buffered maximum number of bytes, discarding buffer" + ); + // discard the buffer + state.buf = Default::default(); + state.is_capped = true; + return Data::Initial(data.copy_to_bytes(data.remaining())); + } + + // Buffer and return the bytes + Data::Initial(state.buf.push_chunk(data)) + }) + .map_err(Into::into) + })); } // Otherwise, guess we're done! @@ -194,7 +239,7 @@ impl Body for ReplayBody { } tlrs }); - return Poll::Ready(res); + return Poll::Ready(res.map_err(Into::into)); } } @@ -260,6 +305,7 @@ impl Clone for ReplayBody { // reading any additional data from the initial body. replay_body: true, replay_trailers: true, + max_bytes: self.max_bytes, } } } From 64a19fa76867dc4e14db38072b0ebddca8521102 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 9 Jun 2021 16:09:49 -0700 Subject: [PATCH 02/15] wip2 Signed-off-by: Eliza Weisman --- linkerd/http-retry/src/replay.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/linkerd/http-retry/src/replay.rs b/linkerd/http-retry/src/replay.rs index 44d67ffc06..6365dad8a8 100644 --- a/linkerd/http-retry/src/replay.rs +++ b/linkerd/http-retry/src/replay.rs @@ -726,7 +726,7 @@ mod tests { fn empty_body_is_always_eos() { // If the initial body was empty, every clone should always return // `true` from `is_end_stream`. - let initial = ReplayBody::new(hyper::Body::empty()); + let initial = ReplayBody::new(hyper::Body::empty(), 64 * 1024); assert!(initial.is_end_stream()); let replay = initial.clone(); @@ -788,7 +788,7 @@ mod tests { impl Test { fn new() -> Self { let (tx, body) = hyper::Body::channel(); - let initial = ReplayBody::new(body); + let initial = ReplayBody::new(body, 64 * 1024); let replay = initial.clone(); Self { tx: Tx(tx), From 12c7d6dfdf97cffb65c49afa5831c86ec47a66d2 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 10 Jun 2021 09:26:33 -0700 Subject: [PATCH 03/15] add a cap on the max buffer size for replays Signed-off-by: Eliza Weisman --- linkerd/http-retry/src/lib.rs | 4 +- linkerd/http-retry/src/replay.rs | 76 +++++++++++++++++++++++++++++--- 2 files changed, 74 insertions(+), 6 deletions(-) diff --git a/linkerd/http-retry/src/lib.rs b/linkerd/http-retry/src/lib.rs index 74616ccceb..de867d4880 100644 --- a/linkerd/http-retry/src/lib.rs +++ b/linkerd/http-retry/src/lib.rs @@ -136,7 +136,9 @@ where (|req: http::Request>| req.map(BoxBody::new)) as fn(_) -> _, ); let retry = tower::retry::Retry::new(policy.clone(), inner); - return ResponseFuture::Retry(retry.oneshot(req.map(ReplayBody::new))); + return ResponseFuture::Retry( + retry.oneshot(req.map(|x| ReplayBody::new(x, 64 * 1024))), + ); } } diff --git a/linkerd/http-retry/src/replay.rs b/linkerd/http-retry/src/replay.rs index 6365dad8a8..f1dc09c12d 100644 --- a/linkerd/http-retry/src/replay.rs +++ b/linkerd/http-retry/src/replay.rs @@ -135,6 +135,10 @@ where ) -> Poll>> { let this = self.get_mut(); let state = Self::acquire_state(&mut this.state, &this.shared.body); + // Move these out to avoid mutable borrow issues in the `map` closure + // when polling the inner body. + let max_bytes = this.max_bytes; + let is_capped = state.is_capped; tracing::trace!( replay_body = this.replay_body, buf.has_remaining = state.buf.has_remaining(), @@ -182,16 +186,16 @@ where state.is_completed = true; } return Poll::Ready(opt.map(|ok| { - ok.map(|data| { + ok.map(|mut data| { // If we have already buffered the maximum number of bytes, // allow *this* body to continue, but don't buffer any more. - if state.is_capped { + if is_capped { return Data::Initial(data.copy_to_bytes(data.remaining())); } - if state.buf.remaining() + data.remaining() >= this.max_bytes { + if state.buf.remaining() + data.remaining() > max_bytes { tracing::debug!( - max_bytes = this.max_bytes, + max_bytes, "buffered maximum number of bytes, discarding buffer" ); // discard the buffer @@ -740,7 +744,7 @@ mod tests { async fn eos_only_when_fully_replayed() { // Test that each clone of a body is not EOS until the data has been // fully replayed. - let mut initial = ReplayBody::new(hyper::Body::from("hello world")); + let mut initial = ReplayBody::new(hyper::Body::from("hello world"), 64 * 1024); let mut replay = initial.clone(); body_to_string(&mut initial).await; @@ -776,6 +780,68 @@ mod tests { assert!(replay2.is_end_stream()); } + #[tokio::test(flavor = "current_thread")] + async fn caps_buffer() { + // Test that, when the initial body is longer than the preconfigured + // cap, we allow the request to continue, but stop buffering. The + // initial body will complete, but the replay will immediately fail. + + let (mut tx, body) = hyper::Body::channel(); + let mut initial = ReplayBody::new(body, 8); + let mut replay = initial.clone(); + + // Send enough data to reach the cap + tx.send_data(Bytes::from("aaaaaaaa")).await.unwrap(); + assert_eq!(chunk(&mut initial).await, Some("aaaaaaaa".to_string())); + + // Further chunks are still forwarded on the initial body + tx.send_data(Bytes::from("bbbbbbbb")).await.unwrap(); + assert_eq!(chunk(&mut initial).await, Some("bbbbbbbb".to_string())); + + drop(initial); + + // The request's replay should error, since we discarded the buffer when + // we hit the cap. + let err = replay + .data() + .await + .expect("replay must yield Some(Err(..)) when capped") + .expect_err("replay must error when cappped"); + assert!(err.is::()) + } + + #[tokio::test(flavor = "current_thread")] + async fn caps_across_replays() { + // Test that, when the initial body is longer than the preconfigured + // cap, we allow the request to continue, but stop buffering. + + let (mut tx, body) = hyper::Body::channel(); + let mut initial = ReplayBody::new(body, 8); + let mut replay = initial.clone(); + + // Send enough data to reach the cap + tx.send_data(Bytes::from("aaaaaaaa")).await.unwrap(); + assert_eq!(chunk(&mut initial).await, Some("aaaaaaaa".to_string())); + drop(initial); + + let mut replay2 = replay.clone(); + + // The replay will reach the cap, but it should still return data from + // the original body. + tx.send_data(Bytes::from("bbbbbbbb")).await.unwrap(); + assert_eq!(chunk(&mut replay).await, Some("aaaaaaaa".to_string())); + assert_eq!(chunk(&mut replay).await, Some("bbbbbbbb".to_string())); + drop(replay); + + // The second replay will fail, though, because the buffer was discarded. + let err = replay2 + .data() + .await + .expect("replay must yield Some(Err(..)) when capped") + .expect_err("replay must error when cappped"); + assert!(err.is::()) + } + struct Test { tx: Tx, initial: ReplayBody, From 6ccdb3306950accfa2d40b0280f78d908b48316a Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 10 Jun 2021 10:10:23 -0700 Subject: [PATCH 04/15] make retry policies responsible for wrapping Signed-off-by: Eliza Weisman --- linkerd/app/core/src/retry.rs | 75 ++++++++++++++--------- linkerd/app/inbound/src/http/mod.rs | 4 +- linkerd/app/outbound/src/http/detect.rs | 2 +- linkerd/app/outbound/src/http/logical.rs | 1 + linkerd/http-box/src/box_any_request.rs | 0 linkerd/http-box/src/request.rs | 37 ++++++++---- linkerd/http-retry/src/lib.rs | 77 ++++++++++-------------- 7 files changed, 108 insertions(+), 88 deletions(-) create mode 100644 linkerd/http-box/src/box_any_request.rs diff --git a/linkerd/app/core/src/retry.rs b/linkerd/app/core/src/retry.rs index dfb8ecbe5e..f5cccf7c6d 100644 --- a/linkerd/app/core/src/retry.rs +++ b/linkerd/app/core/src/retry.rs @@ -4,10 +4,11 @@ use super::http_metrics::retries::Handle; use super::metrics::HttpRouteRetry; use crate::profiles; use futures::future; +use linkerd_error::Error; use linkerd_http_classify::{Classify, ClassifyEos, ClassifyResponse}; -use linkerd_stack::Param; +use linkerd_stack::{Either, Param}; use std::sync::Arc; -use tower::retry::budget::Budget; +use tower::retry::{budget::Budget, Policy}; pub use linkerd_http_retry::*; @@ -55,6 +56,37 @@ impl NewPolicy for NewRetry { // === impl Retry === +impl Retry { + fn can_retry(&self, req: &http::Request) -> bool { + let content_length = |req: &http::Request<_>| { + req.headers() + .get(http::header::CONTENT_LENGTH) + .and_then(|value| value.to_str().ok()?.parse::().ok()) + }; + + // Requests without bodies can always be retried, as we will not need to + // buffer the body. If the request *does* have a body, retry it if and + // only if the request contains a `content-length` header and the + // content length is >= 64 kb. + let has_body = !req.body().is_end_stream(); + if has_body && content_length(&req).unwrap_or(usize::MAX) > MAX_BUFFERED_BYTES { + tracing::trace!( + req.has_body = has_body, + req.content_length = ?content_length(&req), + "not retryable", + ); + return false; + } + + tracing::trace!( + req.has_body = has_body, + req.content_length = ?content_length(&req), + "retryable", + ); + true + } +} + impl Policy, http::Response, E> for Retry where A: http_body::Body + Clone, @@ -109,33 +141,20 @@ where } } -impl CanRetry for Retry { - fn can_retry(&self, req: &http::Request) -> bool { - let content_length = |req: &http::Request<_>| { - req.headers() - .get(http::header::CONTENT_LENGTH) - .and_then(|value| value.to_str().ok()?.parse::().ok()) - }; +impl RetryPolicy, http::Response, E> for Retry +where + A: http_body::Body + Unpin, + A::Error: Into, +{ + type RetryRequest = http::Request>; - // Requests without bodies can always be retried, as we will not need to - // buffer the body. If the request *does* have a body, retry it if and - // only if the request contains a `content-length` header and the - // content length is >= 64 kb. - let has_body = !req.body().is_end_stream(); - if has_body && content_length(&req).unwrap_or(usize::MAX) > MAX_BUFFERED_BYTES { - tracing::trace!( - req.has_body = has_body, - req.content_length = ?content_length(&req), - "not retryable", - ); - return false; + fn prepare_request( + &self, + req: http::Request, + ) -> Either> { + if self.can_retry(&req) { + return Either::A(req.map(|body| ReplayBody::new(body, MAX_BUFFERED_BYTES))); } - - tracing::trace!( - req.has_body = has_body, - req.content_length = ?content_length(&req), - "retryable", - ); - true + Either::B(req) } } diff --git a/linkerd/app/inbound/src/http/mod.rs b/linkerd/app/inbound/src/http/mod.rs index f25885020f..640f4db964 100644 --- a/linkerd/app/inbound/src/http/mod.rs +++ b/linkerd/app/inbound/src/http/mod.rs @@ -86,7 +86,7 @@ impl Inbound { .push(http::BoxRequest::layer()) .push(http::BoxResponse::layer()), ) - .check_new_service::>() + // .check_new_service::>() .instrument(|t: &T| debug_span!("http", v=%Param::::param(t))) .push(http::NewServeHttp::layer(h2_settings, rt.drain.clone())) .push(svc::BoxNewService::layer()); @@ -184,7 +184,7 @@ where )) .push_map_target(Logical::from) .push_on_response(http::BoxResponse::layer()) - .check_new_service::<(profiles::Receiver, Target), _>() + // .check_new_service::<(profiles::Receiver, Target), _>() .push(svc::UnwrapOr::layer(no_profile)) .push(profiles::discover::layer( profiles, diff --git a/linkerd/app/outbound/src/http/detect.rs b/linkerd/app/outbound/src/http/detect.rs index d823f2e224..71477eff60 100644 --- a/linkerd/app/outbound/src/http/detect.rs +++ b/linkerd/app/outbound/src/http/detect.rs @@ -53,7 +53,7 @@ impl Outbound { .push(http::BoxRequest::layer()) .push(svc::MapErrLayer::new(Into::into)), ) - .check_new_service::() + // .check_new_service::() .push(http::NewServeHttp::layer(h2_settings, rt.drain.clone())) .push_map_target(U::from) .instrument(|(v, _): &(http::Version, _)| debug_span!("http", %v)) diff --git a/linkerd/app/outbound/src/http/logical.rs b/linkerd/app/outbound/src/http/logical.rs index 3381e2478f..562ad87688 100644 --- a/linkerd/app/outbound/src/http/logical.rs +++ b/linkerd/app/outbound/src/http/logical.rs @@ -120,6 +120,7 @@ impl Outbound { .http_route_actual .to_layer::(), ) + .push_on_response(http::BoxRequest::layer()) // Sets an optional retry policy. .push(retry::layer(rt.metrics.http_route_retry.clone())) // Sets an optional request timeout. diff --git a/linkerd/http-box/src/box_any_request.rs b/linkerd/http-box/src/box_any_request.rs new file mode 100644 index 0000000000..e69de29bb2 diff --git a/linkerd/http-box/src/request.rs b/linkerd/http-box/src/request.rs index 30b4340301..28579e53ac 100644 --- a/linkerd/http-box/src/request.rs +++ b/linkerd/http-box/src/request.rs @@ -2,28 +2,25 @@ use crate::BoxBody; use linkerd_error::Error; -use linkerd_stack::layer; -use std::{ - marker::PhantomData, - task::{Context, Poll}, -}; +use linkerd_stack::{layer, Proxy}; +use std::task::{Context, Poll}; #[derive(Debug)] -pub struct BoxRequest(S, PhantomData); +pub struct BoxRequest(S); -impl BoxRequest { +impl BoxRequest { pub fn layer() -> impl layer::Layer + Clone + Copy { - layer::mk(|inner| BoxRequest(inner, PhantomData)) + layer::mk(BoxRequest) } } -impl Clone for BoxRequest { +impl Clone for BoxRequest { fn clone(&self) -> Self { - BoxRequest(self.0.clone(), self.1) + BoxRequest(self.0.clone()) } } -impl tower::Service> for BoxRequest +impl tower::Service> for BoxRequest where B: http_body::Body + Send + 'static, B::Data: Send + 'static, @@ -42,3 +39,21 @@ where self.0.call(req.map(BoxBody::new)) } } + +impl Proxy, S> for BoxRequest

+where + B: http_body::Body + Send + 'static, + B::Data: Send + 'static, + B::Error: Into, + S: tower::Service, + P: Proxy, S>, +{ + type Request = P::Request; + type Response = P::Response; + type Error = P::Error; + type Future = P::Future; + + fn proxy(&self, inner: &mut S, req: http::Request) -> Self::Future { + self.0.proxy(inner, req.map(BoxBody::new)) + } +} diff --git a/linkerd/http-retry/src/lib.rs b/linkerd/http-retry/src/lib.rs index de867d4880..c824a67e46 100644 --- a/linkerd/http-retry/src/lib.rs +++ b/linkerd/http-retry/src/lib.rs @@ -4,13 +4,11 @@ #![allow(clippy::type_complexity)] use linkerd_error::Error; -use linkerd_http_box::BoxBody; -use linkerd_stack::{NewService, Proxy, ProxyService}; +use linkerd_stack::{Either, NewService, Proxy, ProxyService}; use pin_project::pin_project; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -pub use tower::retry::{budget::Budget, Policy}; use tower::util::{Oneshot, ServiceExt}; use tracing::trace; @@ -43,34 +41,22 @@ pub struct Retry { policy: Option

, inner: S, } +pub trait RetryPolicy: tower::retry::Policy { + type RetryRequest; -pub trait CanRetry { - /// Returns `true` if a request can be retried. - fn can_retry(&self, req: &http::Request) -> bool; + fn prepare_request(&self, req: Req) -> Either; } #[pin_project(project = ResponseFutureProj)] -pub enum ResponseFuture +pub enum ResponseFuture where - R: tower::retry::Policy>, P::Response, Error> + Clone, - P: Proxy, S> + Clone, + R: tower::retry::Policy + Clone, + P: Proxy + Clone, S: tower::Service + Clone, S::Error: Into, { Disabled(#[pin] P::Future), - Retry( - #[pin] - Oneshot< - tower::retry::Retry< - R, - tower::util::MapRequest< - ProxyService, - fn(http::Request>) -> http::Request, - >, - >, - http::Request>, - >, - ), + Retry(#[pin] Oneshot>, Req>), } // === impl NewRetryLayer === @@ -112,44 +98,43 @@ where // === impl Retry === -impl Proxy, S> for Retry +impl Proxy for Retry where - R: tower::retry::Policy>, P::Response, Error> + CanRetry + Clone, - P: Proxy, S> + Clone, - S: tower::Service + Clone, + R: RetryPolicy + Clone, + P: Proxy + + Proxy + + Clone, + S: tower::Service + Clone, S::Error: Into, - B: http_body::Body + Unpin + Send + 'static, - B::Data: Send, - B::Error: Into, { - type Request = P::Request; - type Response = P::Response; + type Request = PReq; + type Response = PRsp; type Error = Error; - type Future = ResponseFuture; + type Future = ResponseFuture; - fn proxy(&self, svc: &mut S, req: http::Request) -> Self::Future { + fn proxy(&self, svc: &mut S, req: Req) -> Self::Future { trace!(retryable = %self.policy.is_some()); if let Some(policy) = self.policy.as_ref() { - if policy.can_retry(&req) { - let inner = self.inner.clone().wrap_service(svc.clone()).map_request( - (|req: http::Request>| req.map(BoxBody::new)) as fn(_) -> _, - ); - let retry = tower::retry::Retry::new(policy.clone(), inner); - return ResponseFuture::Retry( - retry.oneshot(req.map(|x| ReplayBody::new(x, 64 * 1024))), - ); - } + return match policy.prepare_request(req) { + Either::A(retry_req) => { + let inner = + Proxy::::wrap_service(self.inner.clone(), svc.clone()); + let retry = tower::retry::Retry::new(policy.clone(), inner); + ResponseFuture::Retry(retry.oneshot(retry_req)) + } + Either::B(req) => ResponseFuture::Disabled(self.inner.proxy(svc, req)), + }; } - ResponseFuture::Disabled(self.inner.proxy(svc, req.map(BoxBody::new))) + ResponseFuture::Disabled(self.inner.proxy(svc, req)) } } -impl Future for ResponseFuture +impl Future for ResponseFuture where - R: tower::retry::Policy>, P::Response, Error> + Clone, - P: Proxy, S> + Clone, + R: tower::retry::Policy + Clone, + P: Proxy + Clone, S: tower::Service + Clone, S::Error: Into, { From a3d3469fa1eb3d6afc8dfed696a0a5737004ae3f Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 10 Jun 2021 10:23:02 -0700 Subject: [PATCH 05/15] move all HTTP & policy code out of `linkerd-retry` Signed-off-by: Eliza Weisman --- Cargo.lock | 14 +- Cargo.toml | 1 + linkerd/app/core/Cargo.toml | 1 + linkerd/app/core/src/retry.rs | 3 +- linkerd/http-retry/Cargo.toml | 3 - linkerd/http-retry/src/lib.rs | 992 +++++++++++++++++++++++++++---- linkerd/http-retry/src/replay.rs | 919 ---------------------------- linkerd/retry/Cargo.toml | 14 + linkerd/retry/src/lib.rs | 162 +++++ 9 files changed, 1078 insertions(+), 1031 deletions(-) delete mode 100644 linkerd/http-retry/src/replay.rs create mode 100644 linkerd/retry/Cargo.toml create mode 100644 linkerd/retry/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 00e33e6091..36fd82adb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1016,19 +1016,16 @@ name = "linkerd-http-retry" version = "0.1.0" dependencies = [ "bytes", - "futures", "http", "http-body", "hyper", "linkerd-error", "linkerd-http-box", - "linkerd-stack", "linkerd-tracing", "parking_lot", "pin-project", "thiserror", "tokio", - "tower", "tracing", ] @@ -1300,6 +1297,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "linkerd-retry" +version = "0.1.0" +dependencies = [ + "linkerd-error", + "linkerd-stack", + "pin-project", + "tower", + "tracing", +] + [[package]] name = "linkerd-service-profiles" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 448efad877..adae4608f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ members = [ "linkerd/proxy/tcp", "linkerd/proxy/transport", "linkerd/reconnect", + "linkerd/retry", "linkerd/service-profiles", "linkerd/signal", "linkerd/stack", diff --git a/linkerd/app/core/Cargo.toml b/linkerd/app/core/Cargo.toml index 09c680d947..b05306f649 100644 --- a/linkerd/app/core/Cargo.toml +++ b/linkerd/app/core/Cargo.toml @@ -53,6 +53,7 @@ linkerd-proxy-tap = { path = "../../proxy/tap" } linkerd-proxy-tcp = { path = "../../proxy/tcp" } linkerd-proxy-transport = { path = "../../proxy/transport" } linkerd-reconnect = { path = "../../reconnect" } +linkerd-retry = { path = "../../reconnect" } linkerd-timeout = { path = "../../timeout" } linkerd-tracing = { path = "../../tracing" } linkerd-service-profiles = { path = "../../service-profiles" } diff --git a/linkerd/app/core/src/retry.rs b/linkerd/app/core/src/retry.rs index f5cccf7c6d..9a12ebe6cd 100644 --- a/linkerd/app/core/src/retry.rs +++ b/linkerd/app/core/src/retry.rs @@ -10,7 +10,8 @@ use linkerd_stack::{Either, Param}; use std::sync::Arc; use tower::retry::{budget::Budget, Policy}; -pub use linkerd_http_retry::*; +use linkerd_http_retry::ReplayBody; +use linkerd_retry::RetryPolicy; pub fn layer(metrics: HttpRouteRetry) -> NewRetryLayer { NewRetryLayer::new(NewRetry::new(metrics)) diff --git a/linkerd/http-retry/Cargo.toml b/linkerd/http-retry/Cargo.toml index 6c73d48e50..d635ab0d67 100644 --- a/linkerd/http-retry/Cargo.toml +++ b/linkerd/http-retry/Cargo.toml @@ -12,11 +12,8 @@ futures = { version = "0.3", default-features = false } http-body = "0.4" http = "0.2" linkerd-error = { path = "../error" } -linkerd-http-box = { path = "../http-box" } -linkerd-stack = { path = "../stack" } pin-project = "1" parking_lot = "0.11" -tower = { version = "0.4.7", default-features = false, features = ["retry", "util"] } tracing = "0.1.23" thiserror = "1" diff --git a/linkerd/http-retry/src/lib.rs b/linkerd/http-retry/src/lib.rs index c824a67e46..26f4efac6a 100644 --- a/linkerd/http-retry/src/lib.rs +++ b/linkerd/http-retry/src/lib.rs @@ -3,147 +3,929 @@ #![allow(clippy::inconsistent_struct_constructor)] #![allow(clippy::type_complexity)] +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use http::HeaderMap; +use http_body::Body; use linkerd_error::Error; -use linkerd_stack::{Either, NewService, Proxy, ProxyService}; -use pin_project::pin_project; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; -use tower::util::{Oneshot, ServiceExt}; -use tracing::trace; - -pub mod replay; -pub use self::replay::ReplayBody; - -/// A strategy for obtaining per-target retry polices. -pub trait NewPolicy { - type Policy; - - fn new_policy(&self, target: &T) -> Option; -} +use parking_lot::Mutex; +use std::{collections::VecDeque, io::IoSlice, pin::Pin, sync::Arc, task::Context, task::Poll}; +use thiserror::Error; -/// A layer that applies per-target retry polcies. +/// Wraps an HTTP body type and lazily buffers data as it is read from the inner +/// body. +/// +/// When this body is dropped, if a clone exists, any buffered data is shared +/// with its cloned. The first clone to be polled will take ownership over the +/// data until it is dropped. When *that* clone is dropped, the buffered data +/// --- including any new data read from the body by the clone, if the body has +/// not yet completed --- will be shared with any remaining clones. /// -/// Composes `NewService`s that produce a `Proxy`. -#[derive(Clone, Debug)] -pub struct NewRetryLayer

{ - new_policy: P, +/// The buffered data can then be used to retry the request if the original +/// request fails. +#[derive(Debug)] +pub struct ReplayBody { + /// Buffered state owned by this body if it is actively being polled. If + /// this body has been polled and no other body owned the state, this will + /// be `Some`. + state: Option>, + + /// Copy of the state shared across all clones. When the active clone is + /// dropped, it moves its state back into the shared state to be taken by the + /// next clone to be polled. + shared: Arc>, + + /// Should this clone replay the buffered body from the shared state before + /// polling the initial body? + replay_body: bool, + + /// Should this clone replay trailers from the shared state? + replay_trailers: bool, + + /// Maxiumum number of bytes to buffer. + max_bytes: usize, } -#[derive(Clone, Debug)] -pub struct NewRetry { - new_policy: P, - inner: N, +#[derive(Debug, Error)] +#[error("cannot buffer more than {0} bytes")] +pub struct Capped(usize); + +/// Data returned by `ReplayBody`'s `http_body::Body` implementation is either +/// `Bytes` returned by the initial body, or a list of all `Bytes` chunks +/// returned by the initial body (when replaying it). +#[derive(Debug)] +pub enum Data { + Initial(Bytes), + Replay(BufList), } -#[derive(Clone, Debug)] -pub struct Retry { - policy: Option

, - inner: S, +/// Body data composed of multiple `Bytes` chunks. +#[derive(Clone, Debug, Default)] +pub struct BufList { + bufs: VecDeque, } -pub trait RetryPolicy: tower::retry::Policy { - type RetryRequest; - fn prepare_request(&self, req: Req) -> Either; +#[derive(Debug)] +struct SharedState { + body: Mutex>>, + /// Did the initial body return `true` from `is_end_stream` before it was + /// ever polled? If so, always return `true`; the body is completely empty. + /// + /// We store this separately so that clones of a totally empty body can + /// always return `true` from `is_end_stream` even when they don't own the + /// shared state. + was_empty: bool, } -#[pin_project(project = ResponseFutureProj)] -pub enum ResponseFuture +#[derive(Debug)] +struct BodyState { + buf: BufList, + trailers: Option, + rest: Option, + is_completed: bool, + is_capped: bool, +} + +// === impl ReplayBody === + +impl ReplayBody { + /// Wraps an initial `Body` in a `ReplayBody`. + /// + /// In order to prevent unbounded buffering, this takes a maximum number of + /// bytes to buffer as a second parameter. If more than than that number of + /// bytes would be buffered, the buffered data is discarded and any + /// subsequent clones of this body will fail. However, the *currently + /// active* clone of the body is allowed to continue without erroring. It + /// will simply stop buffering any additional data for retries. + pub fn new(body: B, max_bytes: usize) -> Self { + let was_empty = body.is_end_stream(); + Self { + state: Some(BodyState { + buf: Default::default(), + trailers: None, + rest: Some(body), + is_completed: false, + is_capped: false, + }), + shared: Arc::new(SharedState { + body: Mutex::new(None), + was_empty, + }), + // The initial `ReplayBody` has nothing to replay + replay_body: false, + replay_trailers: false, + max_bytes, + } + } + + /// Mutably borrows the body state if this clone currently owns it, + /// or else tries to acquire it from the shared state. + /// + /// # Panics + /// + /// This panics if another clone has currently acquired the state, based on + /// the assumption that a retry body will not be polled until the previous + /// request has been dropped. + fn acquire_state<'a>( + state: &'a mut Option>, + shared: &Mutex>>, + ) -> &'a mut BodyState { + state.get_or_insert_with(|| shared.lock().take().expect("missing body state")) + } +} + +impl Body for ReplayBody where - R: tower::retry::Policy + Clone, - P: Proxy + Clone, - S: tower::Service + Clone, - S::Error: Into, + B: Body + Unpin, + B::Error: Into, { - Disabled(#[pin] P::Future), - Retry(#[pin] Oneshot>, Req>), -} + type Data = Data; + type Error = Error; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let this = self.get_mut(); + let state = Self::acquire_state(&mut this.state, &this.shared.body); + // Move these out to avoid mutable borrow issues in the `map` closure + // when polling the inner body. + let max_bytes = this.max_bytes; + let is_capped = state.is_capped; + tracing::trace!( + replay_body = this.replay_body, + buf.has_remaining = state.buf.has_remaining(), + body.is_completed = state.is_completed, + body.is_capped = state.is_capped, + "Replay::poll_data" + ); + + // If we haven't replayed the buffer yet, and its not empty, return the + // buffered data first. + if this.replay_body { + if state.buf.has_remaining() { + tracing::trace!("replaying body"); + // Don't return the buffered data again on the next poll. + this.replay_body = false; + return Poll::Ready(Some(Ok(Data::Replay(state.buf.clone())))); + } + + if state.is_capped { + tracing::trace!("cannot replay buffered body, maximum buffer length reached"); + return Poll::Ready(Some(Err(Capped(this.max_bytes).into()))); + } + } + + // If the inner body has previously ended, don't poll it again. + // + // NOTE(eliza): we would expect the inner body to just happily return + // `None` multiple times here, but `hyper::Body::channel` (which we use + // in the tests) will panic if it is polled after returning `None`, so + // we have to special-case this. :/ + if state.is_completed { + return Poll::Ready(None); + } + + // If there's more data in the initial body, poll that... + if let Some(rest) = state.rest.as_mut() { + tracing::trace!("Polling initial body"); + let opt = futures::ready!(Pin::new(rest).poll_data(cx)); -// === impl NewRetryLayer === + // If the body has ended, remember that so that future clones will + // not try polling it again --- some `Body` types will panic if they + // are polled after returning `None`. + if opt.is_none() { + tracing::trace!("Initial body completed"); + state.is_completed = true; + } + return Poll::Ready(opt.map(|ok| { + ok.map(|mut data| { + // If we have already buffered the maximum number of bytes, + // allow *this* body to continue, but don't buffer any more. + if is_capped { + return Data::Initial(data.copy_to_bytes(data.remaining())); + } -impl

NewRetryLayer

{ - pub fn new(new_policy: P) -> Self { - Self { new_policy } + if state.buf.remaining() + data.remaining() > max_bytes { + tracing::debug!( + max_bytes, + "buffered maximum number of bytes, discarding buffer" + ); + // discard the buffer + state.buf = Default::default(); + state.is_capped = true; + return Data::Initial(data.copy_to_bytes(data.remaining())); + } + + // Buffer and return the bytes + Data::Initial(state.buf.push_chunk(data)) + }) + .map_err(Into::into) + })); + } + + // Otherwise, guess we're done! + Poll::Ready(None) + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + let this = self.get_mut(); + let state = Self::acquire_state(&mut this.state, &this.shared.body); + tracing::trace!( + replay_trailers = this.replay_trailers, + "Replay::poll_trailers" + ); + + if this.replay_trailers { + this.replay_trailers = false; + if let Some(ref trailers) = state.trailers { + tracing::trace!("Replaying trailers"); + return Poll::Ready(Ok(Some(trailers.clone()))); + } + } + + if let Some(rest) = state.rest.as_mut() { + // If the inner body has previously ended, don't poll it again. + if !rest.is_end_stream() { + let res = futures::ready!(Pin::new(rest).poll_trailers(cx)).map(|tlrs| { + if state.trailers.is_none() { + state.trailers = tlrs.clone(); + } + tlrs + }); + return Poll::Ready(res.map_err(Into::into)); + } + } + + Poll::Ready(Ok(None)) + } + + fn is_end_stream(&self) -> bool { + // if the initial body was EOS as soon as it was wrapped, then we are + // empty. + if self.shared.was_empty { + return true; + } + + let is_inner_eos = self + .state + .as_ref() + .and_then(|state| state.rest.as_ref().map(Body::is_end_stream)) + .unwrap_or(false); + + // if this body has data or trailers remaining to play back, it + // is not EOS + !self.replay_body && !self.replay_trailers + // if we have replayed everything, the initial body may + // still have data remaining, so ask it + && is_inner_eos + } + + fn size_hint(&self) -> http_body::SizeHint { + let mut hint = http_body::SizeHint::default(); + if let Some(ref state) = self.state { + let rem = state.buf.remaining() as u64; + + // Have we read the entire body? If so, the size is exactly the size + // of the buffer. + if state.is_completed { + return http_body::SizeHint::with_exact(rem); + } + + // Otherwise, the size is the size of the current buffer plus the + // size hint returned by the inner body. + let (rest_lower, rest_upper) = state + .rest + .as_ref() + .map(|rest| { + let hint = rest.size_hint(); + (hint.lower(), hint.upper().unwrap_or(0)) + }) + .unwrap_or_default(); + hint.set_lower(rem + rest_lower); + hint.set_upper(rem + rest_upper); + } + + hint } } -impl tower::layer::Layer for NewRetryLayer

{ - type Service = NewRetry; +impl Clone for ReplayBody { + fn clone(&self) -> Self { + Self { + state: None, + shared: self.shared.clone(), + // The clone should try to replay from the shared state before + // reading any additional data from the initial body. + replay_body: true, + replay_trailers: true, + max_bytes: self.max_bytes, + } + } +} - fn layer(&self, inner: N) -> Self::Service { - Self::Service { - inner, - new_policy: self.new_policy.clone(), +impl Drop for ReplayBody { + fn drop(&mut self) { + // If this clone owned the shared state, put it back.`s + if let Some(state) = self.state.take() { + *self.shared.body.lock() = Some(state); } } } -// === impl NewRetry === +// === impl Data === -impl NewService for NewRetry -where - N: NewService, - P: NewPolicy, -{ - type Service = Retry; +impl Buf for Data { + #[inline] + fn remaining(&self) -> usize { + match self { + Data::Initial(buf) => buf.remaining(), + Data::Replay(bufs) => bufs.remaining(), + } + } - fn new_service(&mut self, target: T) -> Self::Service { - // Determine if there is a retry policy for the given target. - let policy = self.new_policy.new_policy(&target); + #[inline] + fn chunk(&self) -> &[u8] { + match self { + Data::Initial(buf) => buf.chunk(), + Data::Replay(bufs) => bufs.chunk(), + } + } - let inner = self.inner.new_service(target); - Retry { policy, inner } + #[inline] + fn chunks_vectored<'iovs>(&'iovs self, iovs: &mut [IoSlice<'iovs>]) -> usize { + match self { + Data::Initial(buf) => buf.chunks_vectored(iovs), + Data::Replay(bufs) => bufs.chunks_vectored(iovs), + } + } + + #[inline] + fn advance(&mut self, amt: usize) { + match self { + Data::Initial(buf) => buf.advance(amt), + Data::Replay(bufs) => bufs.advance(amt), + } + } + + #[inline] + fn copy_to_bytes(&mut self, len: usize) -> Bytes { + match self { + Data::Initial(buf) => buf.copy_to_bytes(len), + Data::Replay(bufs) => bufs.copy_to_bytes(len), + } } } -// === impl Retry === +// === impl BufList === -impl Proxy for Retry -where - R: RetryPolicy + Clone, - P: Proxy - + Proxy - + Clone, - S: tower::Service + Clone, - S::Error: Into, -{ - type Request = PReq; - type Response = PRsp; - type Error = Error; - type Future = ResponseFuture; - - fn proxy(&self, svc: &mut S, req: Req) -> Self::Future { - trace!(retryable = %self.policy.is_some()); - - if let Some(policy) = self.policy.as_ref() { - return match policy.prepare_request(req) { - Either::A(retry_req) => { - let inner = - Proxy::::wrap_service(self.inner.clone(), svc.clone()); - let retry = tower::retry::Retry::new(policy.clone(), inner); - ResponseFuture::Retry(retry.oneshot(retry_req)) - } - Either::B(req) => ResponseFuture::Disabled(self.inner.proxy(svc, req)), - }; +impl BufList { + fn push_chunk(&mut self, mut data: impl Buf) -> Bytes { + let len = data.remaining(); + // `data` is (almost) certainly a `Bytes`, so `copy_to_bytes` should + // internally be a cheap refcount bump almost all of the time. + // But, if it isn't, this will copy it to a `Bytes` that we can + // now clone. + let bytes = data.copy_to_bytes(len); + // Buffer a clone of the bytes read on this poll. + self.bufs.push_back(bytes.clone()); + // Return the bytes + bytes + } +} + +impl Buf for BufList { + fn remaining(&self) -> usize { + self.bufs.iter().map(Buf::remaining).sum() + } + + fn chunk(&self) -> &[u8] { + self.bufs.front().map(Buf::chunk).unwrap_or(&[]) + } + + fn chunks_vectored<'iovs>(&'iovs self, iovs: &mut [IoSlice<'iovs>]) -> usize { + // Are there more than zero iovecs to write to? + if iovs.is_empty() { + return 0; + } + + // Loop over the buffers in the replay buffer list, and try to fill as + // many iovecs as we can from each buffer. + let mut filled = 0; + for buf in &self.bufs { + filled += buf.chunks_vectored(&mut iovs[filled..]); + if filled == iovs.len() { + return filled; + } } - ResponseFuture::Disabled(self.inner.proxy(svc, req)) + filled + } + + fn advance(&mut self, mut amt: usize) { + while amt > 0 { + let rem = self.bufs[0].remaining(); + // If the amount to advance by is less than the first buffer in + // the buffer list, advance that buffer's cursor by `amt`, + // and we're done. + if rem > amt { + self.bufs[0].advance(amt); + return; + } + + // Otherwise, advance the first buffer to its end, and + // continue. + self.bufs[0].advance(rem); + amt -= rem; + + self.bufs.pop_front(); + } + } + + fn copy_to_bytes(&mut self, len: usize) -> Bytes { + // If the length of the requested `Bytes` is <= the length of the front + // buffer, we can just use its `copy_to_bytes` implementation (which is + // just a reference count bump). + match self.bufs.front_mut() { + Some(first) if len <= first.remaining() => { + let buf = first.copy_to_bytes(len); + // if we consumed the first buffer, also advance our "cursor" by + // popping it. + if first.remaining() == 0 { + self.bufs.pop_front(); + } + + buf + } + _ => { + assert!(len <= self.remaining(), "`len` greater than remaining"); + let mut buf = BytesMut::with_capacity(len); + buf.put(self.take(len)); + buf.freeze() + } + } } } -impl Future for ResponseFuture -where - R: tower::retry::Policy + Clone, - P: Proxy + Clone, - S: tower::Service + Clone, - S::Error: Into, -{ - type Output = Result; +#[cfg(test)] +mod tests { + use super::*; + use http::{HeaderMap, HeaderValue}; + + #[tokio::test] + async fn replays_one_chunk() { + let Test { + mut tx, + initial, + replay, + _trace, + } = Test::new(); + tx.send_data("hello world").await; + drop(tx); + + let initial = body_to_string(initial).await; + assert_eq!(initial, "hello world"); + + let replay = body_to_string(replay).await; + assert_eq!(replay, "hello world"); + } + + #[tokio::test] + async fn replays_several_chunks() { + let Test { + mut tx, + initial, + replay, + _trace, + } = Test::new(); + + tokio::spawn(async move { + tx.send_data("hello").await; + tx.send_data(" world").await; + tx.send_data(", have lots").await; + tx.send_data(" of fun!").await; + }); + + let initial = body_to_string(initial).await; + assert_eq!(initial, "hello world, have lots of fun!"); + + let replay = body_to_string(replay).await; + assert_eq!(replay, "hello world, have lots of fun!"); + } + + #[tokio::test] + async fn replays_trailers() { + let Test { + mut tx, + mut initial, + mut replay, + _trace, + } = Test::new(); + + let mut tlrs = HeaderMap::new(); + tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); + tlrs.insert("x-foo", HeaderValue::from_str("bar").unwrap()); + + tx.send_data("hello world").await; + tx.send_trailers(tlrs.clone()).await; + drop(tx); - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.project() { - ResponseFutureProj::Disabled(f) => f.poll(cx).map_err(Into::into), - ResponseFutureProj::Retry(f) => f.poll(cx).map_err(Into::into), + while initial.data().await.is_some() { + // do nothing } + let initial_tlrs = initial.trailers().await.expect("trailers should not error"); + assert_eq!(initial_tlrs.as_ref(), Some(&tlrs)); + + // drop the initial body to send the data to the replay + drop(initial); + + while replay.data().await.is_some() { + // do nothing + } + let replay_tlrs = replay.trailers().await.expect("trailers should not error"); + assert_eq!(replay_tlrs.as_ref(), Some(&tlrs)); + } + + #[tokio::test] + async fn trailers_only() { + let Test { + mut tx, + mut initial, + mut replay, + _trace, + } = Test::new(); + + let mut tlrs = HeaderMap::new(); + tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); + tlrs.insert("x-foo", HeaderValue::from_str("bar").unwrap()); + + tx.send_trailers(tlrs.clone()).await; + + drop(tx); + + assert!(dbg!(initial.data().await).is_none(), "no data in body"); + let initial_tlrs = initial.trailers().await.expect("trailers should not error"); + assert_eq!(initial_tlrs.as_ref(), Some(&tlrs)); + + // drop the initial body to send the data to the replay + drop(initial); + + assert!(dbg!(replay.data().await).is_none(), "no data in body"); + let replay_tlrs = replay.trailers().await.expect("trailers should not error"); + assert_eq!(replay_tlrs.as_ref(), Some(&tlrs)); + } + + #[tokio::test(flavor = "current_thread")] + async fn switches_with_body_remaining() { + // This simulates a case where the server returns an error _before_ the + // entire body has been read. + let Test { + mut tx, + mut initial, + mut replay, + _trace, + } = Test::new(); + + tx.send_data("hello").await; + assert_eq!(chunk(&mut initial).await.unwrap(), "hello"); + + tx.send_data(" world").await; + assert_eq!(chunk(&mut initial).await.unwrap(), " world"); + + // drop the initial body to send the data to the replay + drop(initial); + tracing::info!("dropped initial body"); + + tokio::spawn(async move { + tx.send_data(", have lots of fun").await; + tx.send_trailers(HeaderMap::new()).await; + }); + + assert_eq!( + body_to_string(&mut replay).await, + "hello world, have lots of fun" + ); + } + + #[tokio::test(flavor = "current_thread")] + async fn multiple_replays() { + let Test { + mut tx, + mut initial, + mut replay, + _trace, + } = Test::new(); + + let mut tlrs = HeaderMap::new(); + tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); + tlrs.insert("x-foo", HeaderValue::from_str("bar").unwrap()); + + let tlrs2 = tlrs.clone(); + tokio::spawn(async move { + tx.send_data("hello").await; + tx.send_data(" world").await; + tx.send_trailers(tlrs2).await; + }); + + assert_eq!(body_to_string(&mut initial).await, "hello world"); + + let initial_tlrs = initial.trailers().await.expect("trailers should not error"); + assert_eq!(initial_tlrs.as_ref(), Some(&tlrs)); + + // drop the initial body to send the data to the replay + drop(initial); + + let mut replay2 = replay.clone(); + assert_eq!(body_to_string(&mut replay).await, "hello world"); + + let replay_tlrs = replay.trailers().await.expect("trailers should not error"); + assert_eq!(replay_tlrs.as_ref(), Some(&tlrs)); + + // drop the initial body to send the data to the replay + drop(replay); + + assert_eq!(body_to_string(&mut replay2).await, "hello world"); + + let replay2_tlrs = replay2.trailers().await.expect("trailers should not error"); + assert_eq!(replay2_tlrs.as_ref(), Some(&tlrs)); + } + + #[tokio::test(flavor = "current_thread")] + async fn multiple_incomplete_replays() { + let Test { + mut tx, + mut initial, + mut replay, + _trace, + } = Test::new(); + + let mut tlrs = HeaderMap::new(); + tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); + tlrs.insert("x-foo", HeaderValue::from_str("bar").unwrap()); + + tx.send_data("hello").await; + assert_eq!(chunk(&mut initial).await.unwrap(), "hello"); + + // drop the initial body to send the data to the replay + drop(initial); + tracing::info!("dropped initial body"); + + let mut replay2 = replay.clone(); + + tx.send_data(" world").await; + assert_eq!(chunk(&mut replay).await.unwrap(), "hello"); + assert_eq!(chunk(&mut replay).await.unwrap(), " world"); + + // drop the replay body to send the data to the second replay + drop(replay); + tracing::info!("dropped first replay body"); + + let tlrs2 = tlrs.clone(); + tokio::spawn(async move { + tx.send_data(", have lots").await; + tx.send_data(" of fun!").await; + tx.send_trailers(tlrs2).await; + }); + + assert_eq!( + body_to_string(&mut replay2).await, + "hello world, have lots of fun!" + ); + + let replay2_tlrs = replay2.trailers().await.expect("trailers should not error"); + assert_eq!(replay2_tlrs.as_ref(), Some(&tlrs)); + } + + #[tokio::test(flavor = "current_thread")] + async fn drop_clone_early() { + let Test { + mut tx, + mut initial, + mut replay, + _trace, + } = Test::new(); + + let mut tlrs = HeaderMap::new(); + tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); + tlrs.insert("x-foo", HeaderValue::from_str("bar").unwrap()); + + let tlrs2 = tlrs.clone(); + tokio::spawn(async move { + tx.send_data("hello").await; + tx.send_data(" world").await; + tx.send_trailers(tlrs2).await; + }); + + assert_eq!(body_to_string(&mut initial).await, "hello world"); + + let initial_tlrs = initial.trailers().await.expect("trailers should not error"); + assert_eq!(initial_tlrs.as_ref(), Some(&tlrs)); + + // drop the initial body to send the data to the replay + drop(initial); + + // clone the body again and then drop it + let replay2 = replay.clone(); + drop(replay2); + + assert_eq!(body_to_string(&mut replay).await, "hello world"); + let replay_tlrs = replay.trailers().await.expect("trailers should not error"); + assert_eq!(replay_tlrs.as_ref(), Some(&tlrs)); + } + + // This test is specifically for behavior across clones, so the clippy lint + // is wrong here. + #[allow(clippy::redundant_clone)] + #[test] + fn empty_body_is_always_eos() { + // If the initial body was empty, every clone should always return + // `true` from `is_end_stream`. + let initial = ReplayBody::new(hyper::Body::empty(), 64 * 1024); + assert!(initial.is_end_stream()); + + let replay = initial.clone(); + assert!(replay.is_end_stream()); + + let replay2 = replay.clone(); + assert!(replay2.is_end_stream()); + } + + #[tokio::test(flavor = "current_thread")] + async fn eos_only_when_fully_replayed() { + // Test that each clone of a body is not EOS until the data has been + // fully replayed. + let mut initial = ReplayBody::new(hyper::Body::from("hello world"), 64 * 1024); + let mut replay = initial.clone(); + + body_to_string(&mut initial).await; + assert!(!replay.is_end_stream()); + + initial.trailers().await.expect("trailers should not error"); + assert!(initial.is_end_stream()); + assert!(!replay.is_end_stream()); + + // drop the initial body to send the data to the replay + drop(initial); + + assert!(!replay.is_end_stream()); + + body_to_string(&mut replay).await; + assert!(!replay.is_end_stream()); + + replay.trailers().await.expect("trailers should not error"); + assert!(replay.is_end_stream()); + + // Even if we clone a body _after_ it has been driven to EOS, the clone + // must not be EOS. + let mut replay2 = replay.clone(); + assert!(!replay2.is_end_stream()); + + // drop the initial body to send the data to the replay + drop(replay); + + body_to_string(&mut replay2).await; + assert!(!replay2.is_end_stream()); + + replay2.trailers().await.expect("trailers should not error"); + assert!(replay2.is_end_stream()); + } + + #[tokio::test(flavor = "current_thread")] + async fn caps_buffer() { + // Test that, when the initial body is longer than the preconfigured + // cap, we allow the request to continue, but stop buffering. The + // initial body will complete, but the replay will immediately fail. + + let (mut tx, body) = hyper::Body::channel(); + let mut initial = ReplayBody::new(body, 8); + let mut replay = initial.clone(); + + // Send enough data to reach the cap + tx.send_data(Bytes::from("aaaaaaaa")).await.unwrap(); + assert_eq!(chunk(&mut initial).await, Some("aaaaaaaa".to_string())); + + // Further chunks are still forwarded on the initial body + tx.send_data(Bytes::from("bbbbbbbb")).await.unwrap(); + assert_eq!(chunk(&mut initial).await, Some("bbbbbbbb".to_string())); + + drop(initial); + + // The request's replay should error, since we discarded the buffer when + // we hit the cap. + let err = replay + .data() + .await + .expect("replay must yield Some(Err(..)) when capped") + .expect_err("replay must error when cappped"); + assert!(err.is::()) + } + + #[tokio::test(flavor = "current_thread")] + async fn caps_across_replays() { + // Test that, when the initial body is longer than the preconfigured + // cap, we allow the request to continue, but stop buffering. + + let (mut tx, body) = hyper::Body::channel(); + let mut initial = ReplayBody::new(body, 8); + let mut replay = initial.clone(); + + // Send enough data to reach the cap + tx.send_data(Bytes::from("aaaaaaaa")).await.unwrap(); + assert_eq!(chunk(&mut initial).await, Some("aaaaaaaa".to_string())); + drop(initial); + + let mut replay2 = replay.clone(); + + // The replay will reach the cap, but it should still return data from + // the original body. + tx.send_data(Bytes::from("bbbbbbbb")).await.unwrap(); + assert_eq!(chunk(&mut replay).await, Some("aaaaaaaa".to_string())); + assert_eq!(chunk(&mut replay).await, Some("bbbbbbbb".to_string())); + drop(replay); + + // The second replay will fail, though, because the buffer was discarded. + let err = replay2 + .data() + .await + .expect("replay must yield Some(Err(..)) when capped") + .expect_err("replay must error when cappped"); + assert!(err.is::()) + } + + struct Test { + tx: Tx, + initial: ReplayBody, + replay: ReplayBody, + _trace: tracing::subscriber::DefaultGuard, + } + + struct Tx(hyper::body::Sender); + + impl Test { + fn new() -> Self { + let (tx, body) = hyper::Body::channel(); + let initial = ReplayBody::new(body, 64 * 1024); + let replay = initial.clone(); + Self { + tx: Tx(tx), + initial, + replay, + _trace: linkerd_tracing::test::with_default_filter("linkerd_http_retry=debug"), + } + } + } + + impl Tx { + #[tracing::instrument(skip(self))] + async fn send_data(&mut self, data: impl Into + std::fmt::Debug) { + let data = data.into(); + tracing::trace!("sending data..."); + self.0.send_data(data).await.expect("rx is not dropped"); + tracing::info!("sent data"); + } + + #[tracing::instrument(skip(self))] + async fn send_trailers(&mut self, trailers: HeaderMap) { + tracing::trace!("sending trailers..."); + self.0 + .send_trailers(trailers) + .await + .expect("rx is not dropped"); + tracing::info!("sent trailers"); + } + } + + async fn chunk(body: &mut T) -> Option + where + T: http_body::Body + Unpin, + { + tracing::trace!("waiting for a body chunk..."); + let chunk = body + .data() + .await + .map(|res| res.map_err(|_| ()).unwrap()) + .map(string); + tracing::info!(?chunk); + chunk + } + + async fn body_to_string(mut body: T) -> String + where + T: http_body::Body + Unpin, + T::Error: std::fmt::Debug, + { + let mut s = String::new(); + while let Some(chunk) = chunk(&mut body).await { + s.push_str(&chunk[..]); + } + tracing::info!(body = ?s, "no more data"); + s + } + + fn string(mut data: impl Buf) -> String { + let bytes = data.copy_to_bytes(data.remaining()); + String::from_utf8(bytes.to_vec()).unwrap() } } diff --git a/linkerd/http-retry/src/replay.rs b/linkerd/http-retry/src/replay.rs deleted file mode 100644 index f1dc09c12d..0000000000 --- a/linkerd/http-retry/src/replay.rs +++ /dev/null @@ -1,919 +0,0 @@ -use bytes::{Buf, BufMut, Bytes, BytesMut}; -use http::HeaderMap; -use http_body::Body; -use linkerd_error::Error; -use parking_lot::Mutex; -use std::{collections::VecDeque, io::IoSlice, pin::Pin, sync::Arc, task::Context, task::Poll}; -use thiserror::Error; - -/// Wraps an HTTP body type and lazily buffers data as it is read from the inner -/// body. -/// -/// When this body is dropped, if a clone exists, any buffered data is shared -/// with its cloned. The first clone to be polled will take ownership over the -/// data until it is dropped. When *that* clone is dropped, the buffered data -/// --- including any new data read from the body by the clone, if the body has -/// not yet completed --- will be shared with any remaining clones. -/// -/// The buffered data can then be used to retry the request if the original -/// request fails. -#[derive(Debug)] -pub struct ReplayBody { - /// Buffered state owned by this body if it is actively being polled. If - /// this body has been polled and no other body owned the state, this will - /// be `Some`. - state: Option>, - - /// Copy of the state shared across all clones. When the active clone is - /// dropped, it moves its state back into the shared state to be taken by the - /// next clone to be polled. - shared: Arc>, - - /// Should this clone replay the buffered body from the shared state before - /// polling the initial body? - replay_body: bool, - - /// Should this clone replay trailers from the shared state? - replay_trailers: bool, - - /// Maxiumum number of bytes to buffer. - max_bytes: usize, -} - -#[derive(Debug, Error)] -#[error("cannot buffer more than {0} bytes")] -pub struct Capped(usize); - -/// Data returned by `ReplayBody`'s `http_body::Body` implementation is either -/// `Bytes` returned by the initial body, or a list of all `Bytes` chunks -/// returned by the initial body (when replaying it). -#[derive(Debug)] -pub enum Data { - Initial(Bytes), - Replay(BufList), -} - -/// Body data composed of multiple `Bytes` chunks. -#[derive(Clone, Debug, Default)] -pub struct BufList { - bufs: VecDeque, -} - -#[derive(Debug)] -struct SharedState { - body: Mutex>>, - /// Did the initial body return `true` from `is_end_stream` before it was - /// ever polled? If so, always return `true`; the body is completely empty. - /// - /// We store this separately so that clones of a totally empty body can - /// always return `true` from `is_end_stream` even when they don't own the - /// shared state. - was_empty: bool, -} - -#[derive(Debug)] -struct BodyState { - buf: BufList, - trailers: Option, - rest: Option, - is_completed: bool, - is_capped: bool, -} - -// === impl ReplayBody === - -impl ReplayBody { - /// Wraps an initial `Body` in a `ReplayBody`. - pub fn new(body: B, max_bytes: usize) -> Self { - let was_empty = body.is_end_stream(); - Self { - state: Some(BodyState { - buf: Default::default(), - trailers: None, - rest: Some(body), - is_completed: false, - is_capped: false, - }), - shared: Arc::new(SharedState { - body: Mutex::new(None), - was_empty, - }), - // The initial `ReplayBody` has nothing to replay - replay_body: false, - replay_trailers: false, - max_bytes, - } - } - - /// Mutably borrows the body state if this clone currently owns it, - /// or else tries to acquire it from the shared state. - /// - /// # Panics - /// - /// This panics if another clone has currently acquired the state, based on - /// the assumption that a retry body will not be polled until the previous - /// request has been dropped. - fn acquire_state<'a>( - state: &'a mut Option>, - shared: &Mutex>>, - ) -> &'a mut BodyState { - state.get_or_insert_with(|| shared.lock().take().expect("missing body state")) - } -} - -impl Body for ReplayBody -where - B: Body + Unpin, - B::Error: Into, -{ - type Data = Data; - type Error = Error; - - fn poll_data( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - let this = self.get_mut(); - let state = Self::acquire_state(&mut this.state, &this.shared.body); - // Move these out to avoid mutable borrow issues in the `map` closure - // when polling the inner body. - let max_bytes = this.max_bytes; - let is_capped = state.is_capped; - tracing::trace!( - replay_body = this.replay_body, - buf.has_remaining = state.buf.has_remaining(), - body.is_completed = state.is_completed, - body.is_capped = state.is_capped, - "Replay::poll_data" - ); - - // If we haven't replayed the buffer yet, and its not empty, return the - // buffered data first. - if this.replay_body { - if state.buf.has_remaining() { - tracing::trace!("replaying body"); - // Don't return the buffered data again on the next poll. - this.replay_body = false; - return Poll::Ready(Some(Ok(Data::Replay(state.buf.clone())))); - } - - if state.is_capped { - tracing::trace!("cannot replay buffered body, maximum buffer length reached"); - return Poll::Ready(Some(Err(Capped(this.max_bytes).into()))); - } - } - - // If the inner body has previously ended, don't poll it again. - // - // NOTE(eliza): we would expect the inner body to just happily return - // `None` multiple times here, but `hyper::Body::channel` (which we use - // in the tests) will panic if it is polled after returning `None`, so - // we have to special-case this. :/ - if state.is_completed { - return Poll::Ready(None); - } - - // If there's more data in the initial body, poll that... - if let Some(rest) = state.rest.as_mut() { - tracing::trace!("Polling initial body"); - let opt = futures::ready!(Pin::new(rest).poll_data(cx)); - - // If the body has ended, remember that so that future clones will - // not try polling it again --- some `Body` types will panic if they - // are polled after returning `None`. - if opt.is_none() { - tracing::trace!("Initial body completed"); - state.is_completed = true; - } - return Poll::Ready(opt.map(|ok| { - ok.map(|mut data| { - // If we have already buffered the maximum number of bytes, - // allow *this* body to continue, but don't buffer any more. - if is_capped { - return Data::Initial(data.copy_to_bytes(data.remaining())); - } - - if state.buf.remaining() + data.remaining() > max_bytes { - tracing::debug!( - max_bytes, - "buffered maximum number of bytes, discarding buffer" - ); - // discard the buffer - state.buf = Default::default(); - state.is_capped = true; - return Data::Initial(data.copy_to_bytes(data.remaining())); - } - - // Buffer and return the bytes - Data::Initial(state.buf.push_chunk(data)) - }) - .map_err(Into::into) - })); - } - - // Otherwise, guess we're done! - Poll::Ready(None) - } - - fn poll_trailers( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - let this = self.get_mut(); - let state = Self::acquire_state(&mut this.state, &this.shared.body); - tracing::trace!( - replay_trailers = this.replay_trailers, - "Replay::poll_trailers" - ); - - if this.replay_trailers { - this.replay_trailers = false; - if let Some(ref trailers) = state.trailers { - tracing::trace!("Replaying trailers"); - return Poll::Ready(Ok(Some(trailers.clone()))); - } - } - - if let Some(rest) = state.rest.as_mut() { - // If the inner body has previously ended, don't poll it again. - if !rest.is_end_stream() { - let res = futures::ready!(Pin::new(rest).poll_trailers(cx)).map(|tlrs| { - if state.trailers.is_none() { - state.trailers = tlrs.clone(); - } - tlrs - }); - return Poll::Ready(res.map_err(Into::into)); - } - } - - Poll::Ready(Ok(None)) - } - - fn is_end_stream(&self) -> bool { - // if the initial body was EOS as soon as it was wrapped, then we are - // empty. - if self.shared.was_empty { - return true; - } - - let is_inner_eos = self - .state - .as_ref() - .and_then(|state| state.rest.as_ref().map(Body::is_end_stream)) - .unwrap_or(false); - - // if this body has data or trailers remaining to play back, it - // is not EOS - !self.replay_body && !self.replay_trailers - // if we have replayed everything, the initial body may - // still have data remaining, so ask it - && is_inner_eos - } - - fn size_hint(&self) -> http_body::SizeHint { - let mut hint = http_body::SizeHint::default(); - if let Some(ref state) = self.state { - let rem = state.buf.remaining() as u64; - - // Have we read the entire body? If so, the size is exactly the size - // of the buffer. - if state.is_completed { - return http_body::SizeHint::with_exact(rem); - } - - // Otherwise, the size is the size of the current buffer plus the - // size hint returned by the inner body. - let (rest_lower, rest_upper) = state - .rest - .as_ref() - .map(|rest| { - let hint = rest.size_hint(); - (hint.lower(), hint.upper().unwrap_or(0)) - }) - .unwrap_or_default(); - hint.set_lower(rem + rest_lower); - hint.set_upper(rem + rest_upper); - } - - hint - } -} - -impl Clone for ReplayBody { - fn clone(&self) -> Self { - Self { - state: None, - shared: self.shared.clone(), - // The clone should try to replay from the shared state before - // reading any additional data from the initial body. - replay_body: true, - replay_trailers: true, - max_bytes: self.max_bytes, - } - } -} - -impl Drop for ReplayBody { - fn drop(&mut self) { - // If this clone owned the shared state, put it back.`s - if let Some(state) = self.state.take() { - *self.shared.body.lock() = Some(state); - } - } -} - -// === impl Data === - -impl Buf for Data { - #[inline] - fn remaining(&self) -> usize { - match self { - Data::Initial(buf) => buf.remaining(), - Data::Replay(bufs) => bufs.remaining(), - } - } - - #[inline] - fn chunk(&self) -> &[u8] { - match self { - Data::Initial(buf) => buf.chunk(), - Data::Replay(bufs) => bufs.chunk(), - } - } - - #[inline] - fn chunks_vectored<'iovs>(&'iovs self, iovs: &mut [IoSlice<'iovs>]) -> usize { - match self { - Data::Initial(buf) => buf.chunks_vectored(iovs), - Data::Replay(bufs) => bufs.chunks_vectored(iovs), - } - } - - #[inline] - fn advance(&mut self, amt: usize) { - match self { - Data::Initial(buf) => buf.advance(amt), - Data::Replay(bufs) => bufs.advance(amt), - } - } - - #[inline] - fn copy_to_bytes(&mut self, len: usize) -> Bytes { - match self { - Data::Initial(buf) => buf.copy_to_bytes(len), - Data::Replay(bufs) => bufs.copy_to_bytes(len), - } - } -} - -// === impl BufList === - -impl BufList { - fn push_chunk(&mut self, mut data: impl Buf) -> Bytes { - let len = data.remaining(); - // `data` is (almost) certainly a `Bytes`, so `copy_to_bytes` should - // internally be a cheap refcount bump almost all of the time. - // But, if it isn't, this will copy it to a `Bytes` that we can - // now clone. - let bytes = data.copy_to_bytes(len); - // Buffer a clone of the bytes read on this poll. - self.bufs.push_back(bytes.clone()); - // Return the bytes - bytes - } -} - -impl Buf for BufList { - fn remaining(&self) -> usize { - self.bufs.iter().map(Buf::remaining).sum() - } - - fn chunk(&self) -> &[u8] { - self.bufs.front().map(Buf::chunk).unwrap_or(&[]) - } - - fn chunks_vectored<'iovs>(&'iovs self, iovs: &mut [IoSlice<'iovs>]) -> usize { - // Are there more than zero iovecs to write to? - if iovs.is_empty() { - return 0; - } - - // Loop over the buffers in the replay buffer list, and try to fill as - // many iovecs as we can from each buffer. - let mut filled = 0; - for buf in &self.bufs { - filled += buf.chunks_vectored(&mut iovs[filled..]); - if filled == iovs.len() { - return filled; - } - } - - filled - } - - fn advance(&mut self, mut amt: usize) { - while amt > 0 { - let rem = self.bufs[0].remaining(); - // If the amount to advance by is less than the first buffer in - // the buffer list, advance that buffer's cursor by `amt`, - // and we're done. - if rem > amt { - self.bufs[0].advance(amt); - return; - } - - // Otherwise, advance the first buffer to its end, and - // continue. - self.bufs[0].advance(rem); - amt -= rem; - - self.bufs.pop_front(); - } - } - - fn copy_to_bytes(&mut self, len: usize) -> Bytes { - // If the length of the requested `Bytes` is <= the length of the front - // buffer, we can just use its `copy_to_bytes` implementation (which is - // just a reference count bump). - match self.bufs.front_mut() { - Some(first) if len <= first.remaining() => { - let buf = first.copy_to_bytes(len); - // if we consumed the first buffer, also advance our "cursor" by - // popping it. - if first.remaining() == 0 { - self.bufs.pop_front(); - } - - buf - } - _ => { - assert!(len <= self.remaining(), "`len` greater than remaining"); - let mut buf = BytesMut::with_capacity(len); - buf.put(self.take(len)); - buf.freeze() - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use http::{HeaderMap, HeaderValue}; - - #[tokio::test] - async fn replays_one_chunk() { - let Test { - mut tx, - initial, - replay, - _trace, - } = Test::new(); - tx.send_data("hello world").await; - drop(tx); - - let initial = body_to_string(initial).await; - assert_eq!(initial, "hello world"); - - let replay = body_to_string(replay).await; - assert_eq!(replay, "hello world"); - } - - #[tokio::test] - async fn replays_several_chunks() { - let Test { - mut tx, - initial, - replay, - _trace, - } = Test::new(); - - tokio::spawn(async move { - tx.send_data("hello").await; - tx.send_data(" world").await; - tx.send_data(", have lots").await; - tx.send_data(" of fun!").await; - }); - - let initial = body_to_string(initial).await; - assert_eq!(initial, "hello world, have lots of fun!"); - - let replay = body_to_string(replay).await; - assert_eq!(replay, "hello world, have lots of fun!"); - } - - #[tokio::test] - async fn replays_trailers() { - let Test { - mut tx, - mut initial, - mut replay, - _trace, - } = Test::new(); - - let mut tlrs = HeaderMap::new(); - tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); - tlrs.insert("x-foo", HeaderValue::from_str("bar").unwrap()); - - tx.send_data("hello world").await; - tx.send_trailers(tlrs.clone()).await; - drop(tx); - - while initial.data().await.is_some() { - // do nothing - } - let initial_tlrs = initial.trailers().await.expect("trailers should not error"); - assert_eq!(initial_tlrs.as_ref(), Some(&tlrs)); - - // drop the initial body to send the data to the replay - drop(initial); - - while replay.data().await.is_some() { - // do nothing - } - let replay_tlrs = replay.trailers().await.expect("trailers should not error"); - assert_eq!(replay_tlrs.as_ref(), Some(&tlrs)); - } - - #[tokio::test] - async fn trailers_only() { - let Test { - mut tx, - mut initial, - mut replay, - _trace, - } = Test::new(); - - let mut tlrs = HeaderMap::new(); - tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); - tlrs.insert("x-foo", HeaderValue::from_str("bar").unwrap()); - - tx.send_trailers(tlrs.clone()).await; - - drop(tx); - - assert!(dbg!(initial.data().await).is_none(), "no data in body"); - let initial_tlrs = initial.trailers().await.expect("trailers should not error"); - assert_eq!(initial_tlrs.as_ref(), Some(&tlrs)); - - // drop the initial body to send the data to the replay - drop(initial); - - assert!(dbg!(replay.data().await).is_none(), "no data in body"); - let replay_tlrs = replay.trailers().await.expect("trailers should not error"); - assert_eq!(replay_tlrs.as_ref(), Some(&tlrs)); - } - - #[tokio::test(flavor = "current_thread")] - async fn switches_with_body_remaining() { - // This simulates a case where the server returns an error _before_ the - // entire body has been read. - let Test { - mut tx, - mut initial, - mut replay, - _trace, - } = Test::new(); - - tx.send_data("hello").await; - assert_eq!(chunk(&mut initial).await.unwrap(), "hello"); - - tx.send_data(" world").await; - assert_eq!(chunk(&mut initial).await.unwrap(), " world"); - - // drop the initial body to send the data to the replay - drop(initial); - tracing::info!("dropped initial body"); - - tokio::spawn(async move { - tx.send_data(", have lots of fun").await; - tx.send_trailers(HeaderMap::new()).await; - }); - - assert_eq!( - body_to_string(&mut replay).await, - "hello world, have lots of fun" - ); - } - - #[tokio::test(flavor = "current_thread")] - async fn multiple_replays() { - let Test { - mut tx, - mut initial, - mut replay, - _trace, - } = Test::new(); - - let mut tlrs = HeaderMap::new(); - tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); - tlrs.insert("x-foo", HeaderValue::from_str("bar").unwrap()); - - let tlrs2 = tlrs.clone(); - tokio::spawn(async move { - tx.send_data("hello").await; - tx.send_data(" world").await; - tx.send_trailers(tlrs2).await; - }); - - assert_eq!(body_to_string(&mut initial).await, "hello world"); - - let initial_tlrs = initial.trailers().await.expect("trailers should not error"); - assert_eq!(initial_tlrs.as_ref(), Some(&tlrs)); - - // drop the initial body to send the data to the replay - drop(initial); - - let mut replay2 = replay.clone(); - assert_eq!(body_to_string(&mut replay).await, "hello world"); - - let replay_tlrs = replay.trailers().await.expect("trailers should not error"); - assert_eq!(replay_tlrs.as_ref(), Some(&tlrs)); - - // drop the initial body to send the data to the replay - drop(replay); - - assert_eq!(body_to_string(&mut replay2).await, "hello world"); - - let replay2_tlrs = replay2.trailers().await.expect("trailers should not error"); - assert_eq!(replay2_tlrs.as_ref(), Some(&tlrs)); - } - - #[tokio::test(flavor = "current_thread")] - async fn multiple_incomplete_replays() { - let Test { - mut tx, - mut initial, - mut replay, - _trace, - } = Test::new(); - - let mut tlrs = HeaderMap::new(); - tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); - tlrs.insert("x-foo", HeaderValue::from_str("bar").unwrap()); - - tx.send_data("hello").await; - assert_eq!(chunk(&mut initial).await.unwrap(), "hello"); - - // drop the initial body to send the data to the replay - drop(initial); - tracing::info!("dropped initial body"); - - let mut replay2 = replay.clone(); - - tx.send_data(" world").await; - assert_eq!(chunk(&mut replay).await.unwrap(), "hello"); - assert_eq!(chunk(&mut replay).await.unwrap(), " world"); - - // drop the replay body to send the data to the second replay - drop(replay); - tracing::info!("dropped first replay body"); - - let tlrs2 = tlrs.clone(); - tokio::spawn(async move { - tx.send_data(", have lots").await; - tx.send_data(" of fun!").await; - tx.send_trailers(tlrs2).await; - }); - - assert_eq!( - body_to_string(&mut replay2).await, - "hello world, have lots of fun!" - ); - - let replay2_tlrs = replay2.trailers().await.expect("trailers should not error"); - assert_eq!(replay2_tlrs.as_ref(), Some(&tlrs)); - } - - #[tokio::test(flavor = "current_thread")] - async fn drop_clone_early() { - let Test { - mut tx, - mut initial, - mut replay, - _trace, - } = Test::new(); - - let mut tlrs = HeaderMap::new(); - tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); - tlrs.insert("x-foo", HeaderValue::from_str("bar").unwrap()); - - let tlrs2 = tlrs.clone(); - tokio::spawn(async move { - tx.send_data("hello").await; - tx.send_data(" world").await; - tx.send_trailers(tlrs2).await; - }); - - assert_eq!(body_to_string(&mut initial).await, "hello world"); - - let initial_tlrs = initial.trailers().await.expect("trailers should not error"); - assert_eq!(initial_tlrs.as_ref(), Some(&tlrs)); - - // drop the initial body to send the data to the replay - drop(initial); - - // clone the body again and then drop it - let replay2 = replay.clone(); - drop(replay2); - - assert_eq!(body_to_string(&mut replay).await, "hello world"); - let replay_tlrs = replay.trailers().await.expect("trailers should not error"); - assert_eq!(replay_tlrs.as_ref(), Some(&tlrs)); - } - - // This test is specifically for behavior across clones, so the clippy lint - // is wrong here. - #[allow(clippy::redundant_clone)] - #[test] - fn empty_body_is_always_eos() { - // If the initial body was empty, every clone should always return - // `true` from `is_end_stream`. - let initial = ReplayBody::new(hyper::Body::empty(), 64 * 1024); - assert!(initial.is_end_stream()); - - let replay = initial.clone(); - assert!(replay.is_end_stream()); - - let replay2 = replay.clone(); - assert!(replay2.is_end_stream()); - } - - #[tokio::test(flavor = "current_thread")] - async fn eos_only_when_fully_replayed() { - // Test that each clone of a body is not EOS until the data has been - // fully replayed. - let mut initial = ReplayBody::new(hyper::Body::from("hello world"), 64 * 1024); - let mut replay = initial.clone(); - - body_to_string(&mut initial).await; - assert!(!replay.is_end_stream()); - - initial.trailers().await.expect("trailers should not error"); - assert!(initial.is_end_stream()); - assert!(!replay.is_end_stream()); - - // drop the initial body to send the data to the replay - drop(initial); - - assert!(!replay.is_end_stream()); - - body_to_string(&mut replay).await; - assert!(!replay.is_end_stream()); - - replay.trailers().await.expect("trailers should not error"); - assert!(replay.is_end_stream()); - - // Even if we clone a body _after_ it has been driven to EOS, the clone - // must not be EOS. - let mut replay2 = replay.clone(); - assert!(!replay2.is_end_stream()); - - // drop the initial body to send the data to the replay - drop(replay); - - body_to_string(&mut replay2).await; - assert!(!replay2.is_end_stream()); - - replay2.trailers().await.expect("trailers should not error"); - assert!(replay2.is_end_stream()); - } - - #[tokio::test(flavor = "current_thread")] - async fn caps_buffer() { - // Test that, when the initial body is longer than the preconfigured - // cap, we allow the request to continue, but stop buffering. The - // initial body will complete, but the replay will immediately fail. - - let (mut tx, body) = hyper::Body::channel(); - let mut initial = ReplayBody::new(body, 8); - let mut replay = initial.clone(); - - // Send enough data to reach the cap - tx.send_data(Bytes::from("aaaaaaaa")).await.unwrap(); - assert_eq!(chunk(&mut initial).await, Some("aaaaaaaa".to_string())); - - // Further chunks are still forwarded on the initial body - tx.send_data(Bytes::from("bbbbbbbb")).await.unwrap(); - assert_eq!(chunk(&mut initial).await, Some("bbbbbbbb".to_string())); - - drop(initial); - - // The request's replay should error, since we discarded the buffer when - // we hit the cap. - let err = replay - .data() - .await - .expect("replay must yield Some(Err(..)) when capped") - .expect_err("replay must error when cappped"); - assert!(err.is::()) - } - - #[tokio::test(flavor = "current_thread")] - async fn caps_across_replays() { - // Test that, when the initial body is longer than the preconfigured - // cap, we allow the request to continue, but stop buffering. - - let (mut tx, body) = hyper::Body::channel(); - let mut initial = ReplayBody::new(body, 8); - let mut replay = initial.clone(); - - // Send enough data to reach the cap - tx.send_data(Bytes::from("aaaaaaaa")).await.unwrap(); - assert_eq!(chunk(&mut initial).await, Some("aaaaaaaa".to_string())); - drop(initial); - - let mut replay2 = replay.clone(); - - // The replay will reach the cap, but it should still return data from - // the original body. - tx.send_data(Bytes::from("bbbbbbbb")).await.unwrap(); - assert_eq!(chunk(&mut replay).await, Some("aaaaaaaa".to_string())); - assert_eq!(chunk(&mut replay).await, Some("bbbbbbbb".to_string())); - drop(replay); - - // The second replay will fail, though, because the buffer was discarded. - let err = replay2 - .data() - .await - .expect("replay must yield Some(Err(..)) when capped") - .expect_err("replay must error when cappped"); - assert!(err.is::()) - } - - struct Test { - tx: Tx, - initial: ReplayBody, - replay: ReplayBody, - _trace: tracing::subscriber::DefaultGuard, - } - - struct Tx(hyper::body::Sender); - - impl Test { - fn new() -> Self { - let (tx, body) = hyper::Body::channel(); - let initial = ReplayBody::new(body, 64 * 1024); - let replay = initial.clone(); - Self { - tx: Tx(tx), - initial, - replay, - _trace: linkerd_tracing::test::with_default_filter("linkerd_http_retry=debug"), - } - } - } - - impl Tx { - #[tracing::instrument(skip(self))] - async fn send_data(&mut self, data: impl Into + std::fmt::Debug) { - let data = data.into(); - tracing::trace!("sending data..."); - self.0.send_data(data).await.expect("rx is not dropped"); - tracing::info!("sent data"); - } - - #[tracing::instrument(skip(self))] - async fn send_trailers(&mut self, trailers: HeaderMap) { - tracing::trace!("sending trailers..."); - self.0 - .send_trailers(trailers) - .await - .expect("rx is not dropped"); - tracing::info!("sent trailers"); - } - } - - async fn chunk(body: &mut T) -> Option - where - T: http_body::Body + Unpin, - { - tracing::trace!("waiting for a body chunk..."); - let chunk = body - .data() - .await - .map(|res| res.map_err(|_| ()).unwrap()) - .map(string); - tracing::info!(?chunk); - chunk - } - - async fn body_to_string(mut body: T) -> String - where - T: http_body::Body + Unpin, - T::Error: std::fmt::Debug, - { - let mut s = String::new(); - while let Some(chunk) = chunk(&mut body).await { - s.push_str(&chunk[..]); - } - tracing::info!(body = ?s, "no more data"); - s - } - - fn string(mut data: impl Buf) -> String { - let bytes = data.copy_to_bytes(data.remaining()); - String::from_utf8(bytes.to_vec()).unwrap() - } -} diff --git a/linkerd/retry/Cargo.toml b/linkerd/retry/Cargo.toml new file mode 100644 index 0000000000..dd9c962ac7 --- /dev/null +++ b/linkerd/retry/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "linkerd-retry" +version = "0.1.0" +authors = ["Linkerd Developers "] +license = "Apache-2.0" +edition = "2018" +publish = false + +[dependencies] +linkerd-error = { path = "../error" } +linkerd-stack = { path = "../stack" } +pin-project = "1" +tower = { version = "0.4.7", default-features = false, features = ["retry", "util"] } +tracing = "0.1.23" diff --git a/linkerd/retry/src/lib.rs b/linkerd/retry/src/lib.rs new file mode 100644 index 0000000000..a470b98e7b --- /dev/null +++ b/linkerd/retry/src/lib.rs @@ -0,0 +1,162 @@ +#![deny(warnings, rust_2018_idioms)] +#![forbid(unsafe_code)] +#![allow(clippy::inconsistent_struct_constructor)] +#![allow(clippy::type_complexity)] + +use linkerd_error::Error; +use linkerd_stack::{Either, NewService, Proxy, ProxyService}; +use pin_project::pin_project; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tower::util::{Oneshot, ServiceExt}; +use tracing::trace; + +/// A strategy for obtaining per-target retry polices. +pub trait NewPolicy { + type Policy; + + fn new_policy(&self, target: &T) -> Option; +} + +/// A layer that applies per-target retry polcies. +/// +/// Composes `NewService`s that produce a `Proxy`. +#[derive(Clone, Debug)] +pub struct NewRetryLayer

{ + new_policy: P, +} + +#[derive(Clone, Debug)] +pub struct NewRetry { + new_policy: P, + inner: N, +} + +#[derive(Clone, Debug)] +pub struct Retry { + policy: Option

, + inner: S, +} + +/// An extension to [`tower::retry::Policy`] that adds a method to prepare a +/// request to be retried, possibly changing its type. +pub trait RetryPolicy: tower::retry::Policy { + /// A request type that can be retried. + /// + /// This *may* be the same as the `Req` type parameter, but it can also be a + /// different type, if retries can only be attempted for a specific request type. + type RetryRequest; + + /// Prepare an initial request for a potential retry. + /// + /// If the request is retryable, this should return `Either::A`. Otherwise, + /// if this returns `Either::B`, the request will not be retried if it + /// fails. + /// + /// If retrying requires a specific request type other than the input type + /// to this policy, this function may transform the request into a request + /// of that type. + fn prepare_request(&self, req: Req) -> Either; +} + +#[pin_project(project = ResponseFutureProj)] +pub enum ResponseFuture +where + R: tower::retry::Policy + Clone, + P: Proxy + Clone, + S: tower::Service + Clone, + S::Error: Into, +{ + Disabled(#[pin] P::Future), + Retry(#[pin] Oneshot>, Req>), +} + +// === impl NewRetryLayer === + +impl

NewRetryLayer

{ + pub fn new(new_policy: P) -> Self { + Self { new_policy } + } +} + +impl tower::layer::Layer for NewRetryLayer

{ + type Service = NewRetry; + + fn layer(&self, inner: N) -> Self::Service { + Self::Service { + inner, + new_policy: self.new_policy.clone(), + } + } +} + +// === impl NewRetry === + +impl NewService for NewRetry +where + N: NewService, + P: NewPolicy, +{ + type Service = Retry; + + fn new_service(&mut self, target: T) -> Self::Service { + // Determine if there is a retry policy for the given target. + let policy = self.new_policy.new_policy(&target); + + let inner = self.inner.new_service(target); + Retry { policy, inner } + } +} + +// === impl Retry === + +impl Proxy for Retry +where + R: RetryPolicy + Clone, + P: Proxy + + Proxy + + Clone, + S: tower::Service + Clone, + S::Error: Into, +{ + type Request = PReq; + type Response = PRsp; + type Error = Error; + type Future = ResponseFuture; + + fn proxy(&self, svc: &mut S, req: Req) -> Self::Future { + trace!(retryable = %self.policy.is_some()); + + if let Some(policy) = self.policy.as_ref() { + return match policy.prepare_request(req) { + Either::A(retry_req) => { + let inner = + Proxy::::wrap_service(self.inner.clone(), svc.clone()); + let retry = tower::retry::Retry::new(policy.clone(), inner); + ResponseFuture::Retry(retry.oneshot(retry_req)) + } + Either::B(req) => ResponseFuture::Disabled(self.inner.proxy(svc, req)), + }; + } + + ResponseFuture::Disabled(self.inner.proxy(svc, req)) + } +} + +impl Future for ResponseFuture +where + R: tower::retry::Policy + Clone, + P: Proxy + Clone, + S: tower::Service + Clone, + S::Error: Into, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.project() { + ResponseFutureProj::Disabled(f) => f.poll(cx).map_err(Into::into), + ResponseFutureProj::Retry(f) => f.poll(cx).map_err(Into::into), + } + } +} From 9bcf6b37b51451f35ec68688b0233dfb602ed463 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 10 Jun 2021 10:36:32 -0700 Subject: [PATCH 06/15] put back commented out checks Signed-off-by: Eliza Weisman --- linkerd/app/inbound/src/http/mod.rs | 4 +- linkerd/app/outbound/src/http/detect.rs | 2 +- linkerd/app/outbound/src/http/logical.rs | 7 ++- linkerd/http-box/src/box_any_request.rs | 0 linkerd/http-box/src/erase_request.rs | 77 ++++++++++++++++++++++++ linkerd/http-box/src/lib.rs | 2 + linkerd/http-retry/src/lib.rs | 7 +++ linkerd/proxy/http/src/lib.rs | 2 +- 8 files changed, 96 insertions(+), 5 deletions(-) delete mode 100644 linkerd/http-box/src/box_any_request.rs create mode 100644 linkerd/http-box/src/erase_request.rs diff --git a/linkerd/app/inbound/src/http/mod.rs b/linkerd/app/inbound/src/http/mod.rs index 640f4db964..f25885020f 100644 --- a/linkerd/app/inbound/src/http/mod.rs +++ b/linkerd/app/inbound/src/http/mod.rs @@ -86,7 +86,7 @@ impl Inbound { .push(http::BoxRequest::layer()) .push(http::BoxResponse::layer()), ) - // .check_new_service::>() + .check_new_service::>() .instrument(|t: &T| debug_span!("http", v=%Param::::param(t))) .push(http::NewServeHttp::layer(h2_settings, rt.drain.clone())) .push(svc::BoxNewService::layer()); @@ -184,7 +184,7 @@ where )) .push_map_target(Logical::from) .push_on_response(http::BoxResponse::layer()) - // .check_new_service::<(profiles::Receiver, Target), _>() + .check_new_service::<(profiles::Receiver, Target), _>() .push(svc::UnwrapOr::layer(no_profile)) .push(profiles::discover::layer( profiles, diff --git a/linkerd/app/outbound/src/http/detect.rs b/linkerd/app/outbound/src/http/detect.rs index 71477eff60..d823f2e224 100644 --- a/linkerd/app/outbound/src/http/detect.rs +++ b/linkerd/app/outbound/src/http/detect.rs @@ -53,7 +53,7 @@ impl Outbound { .push(http::BoxRequest::layer()) .push(svc::MapErrLayer::new(Into::into)), ) - // .check_new_service::() + .check_new_service::() .push(http::NewServeHttp::layer(h2_settings, rt.drain.clone())) .push_map_target(U::from) .instrument(|(v, _): &(http::Version, _)| debug_span!("http", %v)) diff --git a/linkerd/app/outbound/src/http/logical.rs b/linkerd/app/outbound/src/http/logical.rs index 562ad87688..d233e06b37 100644 --- a/linkerd/app/outbound/src/http/logical.rs +++ b/linkerd/app/outbound/src/http/logical.rs @@ -120,7 +120,12 @@ impl Outbound { .http_route_actual .to_layer::(), ) - .push_on_response(http::BoxRequest::layer()) + // Depending on whether or not the request can be retried, + // it may have one of two `Body` types. This layer unifies + // any `Body` type into `BoxBody` so that the rest of the + // stack doesn't have to implement `Service` for requests + // with both body types. + .push_on_response(http::EraseRequest::layer()) // Sets an optional retry policy. .push(retry::layer(rt.metrics.http_route_retry.clone())) // Sets an optional request timeout. diff --git a/linkerd/http-box/src/box_any_request.rs b/linkerd/http-box/src/box_any_request.rs deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/linkerd/http-box/src/erase_request.rs b/linkerd/http-box/src/erase_request.rs new file mode 100644 index 0000000000..6288167668 --- /dev/null +++ b/linkerd/http-box/src/erase_request.rs @@ -0,0 +1,77 @@ +//! A middleware that boxes HTTP request bodies. +use linkerd_error::Error; +use linkerd_http_box::BoxBody; +use linkerd_stack::{layer, Proxy}; +use std::task::{Context, Poll}; + +/// A middleware that boxes HTTP request bodies. +/// +/// This is *very* similar to the [`BoxRequest`] middleware. However, that +/// middleware is generic over a specific body type that is erased. A given +/// instance of `EraseRequest` can only erase the type of one particular `Body` +/// type, while this middleware will erase +/// bodies of *any* type. +/// +/// An astute reader may ask, why not simply replace `BoxRequest` with this +/// middleware, if it is a more flexible superset of the same behavior? The +/// answer is that in many cases, the use of this more flexible middleware +/// renders request body types uninferrable. If all `BoxRequest`s in the stack +/// are replaced with `EraseRequest`, suddenly a great deal of +/// `check_new_service` and `check_service` checks will require explicit +/// annotations for the pre-erasure body type. This is not great. +/// +/// Instead, this type is implemented separately and should be used only when a +/// stack must be able to implement `Service>` for *multiple +/// distinct values of `B`*. +#[derive(Debug)] +pub struct EraseRequest(S); + +impl EraseRequest { + pub fn layer() -> impl layer::Layer + Clone + Copy { + layer::mk(EraseRequest) + } +} + +impl Clone for EraseRequest { + fn clone(&self) -> Self { + EraseRequest(self.0.clone()) + } +} + +impl tower::Service> for EraseRequest +where + B: http_body::Body + Send + 'static, + B::Data: Send + 'static, + B::Error: Into, + S: tower::Service>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.0.poll_ready(cx) + } + + fn call(&mut self, req: http::Request) -> Self::Future { + self.0.call(req.map(BoxBody::new)) + } +} + +impl Proxy, S> for EraseRequest

+where + B: http_body::Body + Send + 'static, + B::Data: Send + 'static, + B::Error: Into, + S: tower::Service, + P: Proxy, S>, +{ + type Request = P::Request; + type Response = P::Response; + type Error = P::Error; + type Future = P::Future; + + fn proxy(&self, inner: &mut S, req: http::Request) -> Self::Future { + self.0.proxy(inner, req.map(BoxBody::new)) + } +} diff --git a/linkerd/http-box/src/lib.rs b/linkerd/http-box/src/lib.rs index 49a8ca7078..062e37bf3f 100644 --- a/linkerd/http-box/src/lib.rs +++ b/linkerd/http-box/src/lib.rs @@ -3,11 +3,13 @@ #![allow(clippy::inconsistent_struct_constructor)] mod body; +mod erase_request; mod request; mod response; pub use self::{ body::{BoxBody, Data}, + erase_request::EraseRequest, request::BoxRequest, response::BoxResponse, }; diff --git a/linkerd/http-retry/src/lib.rs b/linkerd/http-retry/src/lib.rs index 26f4efac6a..cf6d41188c 100644 --- a/linkerd/http-retry/src/lib.rs +++ b/linkerd/http-retry/src/lib.rs @@ -4,12 +4,19 @@ #![allow(clippy::type_complexity)] use bytes::{Buf, BufMut, Bytes, BytesMut}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use http::HeaderMap; use http::HeaderMap; use http_body::Body; +use http_body::Body; +use linkerd_error::Error; use linkerd_error::Error; use parking_lot::Mutex; +use parking_lot::Mutex; +use std::{collections::VecDeque, io::IoSlice, pin::Pin, sync::Arc, task::Context, task::Poll}; use std::{collections::VecDeque, io::IoSlice, pin::Pin, sync::Arc, task::Context, task::Poll}; use thiserror::Error; +use thiserror::Error; /// Wraps an HTTP body type and lazily buffers data as it is read from the inner /// body. diff --git a/linkerd/proxy/http/src/lib.rs b/linkerd/proxy/http/src/lib.rs index 58eb07276f..7ee909ed19 100644 --- a/linkerd/proxy/http/src/lib.rs +++ b/linkerd/proxy/http/src/lib.rs @@ -42,7 +42,7 @@ pub use http::{ uri, Request, Response, StatusCode, }; pub use hyper::body::HttpBody; -pub use linkerd_http_box::{BoxBody, BoxRequest, BoxResponse}; +pub use linkerd_http_box::{BoxBody, BoxRequest, BoxResponse, EraseRequest}; #[derive(Clone, Debug)] pub struct HeaderPair(pub HeaderName, pub HeaderValue); From a961aeae671704c30fdf70179f80a4edb53331cf Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 10 Jun 2021 11:11:01 -0700 Subject: [PATCH 07/15] whoops Signed-off-by: Eliza Weisman --- Cargo.lock | 3 ++- linkerd/app/core/Cargo.toml | 2 +- linkerd/app/core/src/retry.rs | 2 +- linkerd/http-box/src/erase_request.rs | 2 +- linkerd/http-box/src/request.rs | 37 ++++++++------------------- 5 files changed, 16 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 36fd82adb7..4d1f67b177 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -703,6 +703,7 @@ dependencies = [ "linkerd-proxy-tcp", "linkerd-proxy-transport", "linkerd-reconnect", + "linkerd-retry", "linkerd-service-profiles", "linkerd-stack", "linkerd-stack-metrics", @@ -1016,11 +1017,11 @@ name = "linkerd-http-retry" version = "0.1.0" dependencies = [ "bytes", + "futures", "http", "http-body", "hyper", "linkerd-error", - "linkerd-http-box", "linkerd-tracing", "parking_lot", "pin-project", diff --git a/linkerd/app/core/Cargo.toml b/linkerd/app/core/Cargo.toml index b05306f649..28b4a6e8a6 100644 --- a/linkerd/app/core/Cargo.toml +++ b/linkerd/app/core/Cargo.toml @@ -53,7 +53,7 @@ linkerd-proxy-tap = { path = "../../proxy/tap" } linkerd-proxy-tcp = { path = "../../proxy/tcp" } linkerd-proxy-transport = { path = "../../proxy/transport" } linkerd-reconnect = { path = "../../reconnect" } -linkerd-retry = { path = "../../reconnect" } +linkerd-retry = { path = "../../retry" } linkerd-timeout = { path = "../../timeout" } linkerd-tracing = { path = "../../tracing" } linkerd-service-profiles = { path = "../../service-profiles" } diff --git a/linkerd/app/core/src/retry.rs b/linkerd/app/core/src/retry.rs index 9a12ebe6cd..3a7e40814a 100644 --- a/linkerd/app/core/src/retry.rs +++ b/linkerd/app/core/src/retry.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use tower::retry::{budget::Budget, Policy}; use linkerd_http_retry::ReplayBody; -use linkerd_retry::RetryPolicy; +use linkerd_retry::*; pub fn layer(metrics: HttpRouteRetry) -> NewRetryLayer { NewRetryLayer::new(NewRetry::new(metrics)) diff --git a/linkerd/http-box/src/erase_request.rs b/linkerd/http-box/src/erase_request.rs index 6288167668..5b455ce0e6 100644 --- a/linkerd/http-box/src/erase_request.rs +++ b/linkerd/http-box/src/erase_request.rs @@ -1,6 +1,6 @@ //! A middleware that boxes HTTP request bodies. +use crate::BoxBody; use linkerd_error::Error; -use linkerd_http_box::BoxBody; use linkerd_stack::{layer, Proxy}; use std::task::{Context, Poll}; diff --git a/linkerd/http-box/src/request.rs b/linkerd/http-box/src/request.rs index 28579e53ac..30b4340301 100644 --- a/linkerd/http-box/src/request.rs +++ b/linkerd/http-box/src/request.rs @@ -2,25 +2,28 @@ use crate::BoxBody; use linkerd_error::Error; -use linkerd_stack::{layer, Proxy}; -use std::task::{Context, Poll}; +use linkerd_stack::layer; +use std::{ + marker::PhantomData, + task::{Context, Poll}, +}; #[derive(Debug)] -pub struct BoxRequest(S); +pub struct BoxRequest(S, PhantomData); -impl BoxRequest { +impl BoxRequest { pub fn layer() -> impl layer::Layer + Clone + Copy { - layer::mk(BoxRequest) + layer::mk(|inner| BoxRequest(inner, PhantomData)) } } -impl Clone for BoxRequest { +impl Clone for BoxRequest { fn clone(&self) -> Self { - BoxRequest(self.0.clone()) + BoxRequest(self.0.clone(), self.1) } } -impl tower::Service> for BoxRequest +impl tower::Service> for BoxRequest where B: http_body::Body + Send + 'static, B::Data: Send + 'static, @@ -39,21 +42,3 @@ where self.0.call(req.map(BoxBody::new)) } } - -impl Proxy, S> for BoxRequest

-where - B: http_body::Body + Send + 'static, - B::Data: Send + 'static, - B::Error: Into, - S: tower::Service, - P: Proxy, S>, -{ - type Request = P::Request; - type Response = P::Response; - type Error = P::Error; - type Future = P::Future; - - fn proxy(&self, inner: &mut S, req: http::Request) -> Self::Future { - self.0.proxy(inner, req.map(BoxBody::new)) - } -} From 1fa98883131d5d1289f8fd90f1a9d5d6d3f04518 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 10 Jun 2021 11:11:57 -0700 Subject: [PATCH 08/15] nicer way of checking if at capacity this shouldn't require summing all the buffers in the buflist :) Signed-off-by: Eliza Weisman --- linkerd/http-retry/src/lib.rs | 61 +++++++++++++++++------------------ 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/linkerd/http-retry/src/lib.rs b/linkerd/http-retry/src/lib.rs index cf6d41188c..5cf0ef8e21 100644 --- a/linkerd/http-retry/src/lib.rs +++ b/linkerd/http-retry/src/lib.rs @@ -4,18 +4,11 @@ #![allow(clippy::type_complexity)] use bytes::{Buf, BufMut, Bytes, BytesMut}; -use bytes::{Buf, BufMut, Bytes, BytesMut}; -use http::HeaderMap; use http::HeaderMap; use http_body::Body; -use http_body::Body; use linkerd_error::Error; -use linkerd_error::Error; -use parking_lot::Mutex; use parking_lot::Mutex; use std::{collections::VecDeque, io::IoSlice, pin::Pin, sync::Arc, task::Context, task::Poll}; -use std::{collections::VecDeque, io::IoSlice, pin::Pin, sync::Arc, task::Context, task::Poll}; -use thiserror::Error; use thiserror::Error; /// Wraps an HTTP body type and lazily buffers data as it is read from the inner @@ -47,14 +40,11 @@ pub struct ReplayBody { /// Should this clone replay trailers from the shared state? replay_trailers: bool, - - /// Maxiumum number of bytes to buffer. - max_bytes: usize, } #[derive(Debug, Error)] -#[error("cannot buffer more than {0} bytes")] -pub struct Capped(usize); +#[error("replay body discarded after reaching maximum buffered bytes limit")] +pub struct Capped; /// Data returned by `ReplayBody`'s `http_body::Body` implementation is either /// `Bytes` returned by the initial body, or a list of all `Bytes` chunks @@ -89,7 +79,9 @@ struct BodyState { trailers: Option, rest: Option, is_completed: bool, - is_capped: bool, + + /// Maxiumum number of bytes to buffer. + max_bytes: usize, } // === impl ReplayBody === @@ -111,7 +103,7 @@ impl ReplayBody { trailers: None, rest: Some(body), is_completed: false, - is_capped: false, + max_bytes: max_bytes + 1, }), shared: Arc::new(SharedState { body: Mutex::new(None), @@ -120,7 +112,6 @@ impl ReplayBody { // The initial `ReplayBody` has nothing to replay replay_body: false, replay_trailers: false, - max_bytes, } } @@ -156,13 +147,11 @@ where let state = Self::acquire_state(&mut this.state, &this.shared.body); // Move these out to avoid mutable borrow issues in the `map` closure // when polling the inner body. - let max_bytes = this.max_bytes; - let is_capped = state.is_capped; tracing::trace!( replay_body = this.replay_body, buf.has_remaining = state.buf.has_remaining(), body.is_completed = state.is_completed, - body.is_capped = state.is_capped, + body.max_bytes_remaining = state.max_bytes, "Replay::poll_data" ); @@ -176,9 +165,9 @@ where return Poll::Ready(Some(Ok(Data::Replay(state.buf.clone())))); } - if state.is_capped { + if state.is_capped() { tracing::trace!("cannot replay buffered body, maximum buffer length reached"); - return Poll::Ready(Some(Err(Capped(this.max_bytes).into()))); + return Poll::Ready(Some(Err(Capped.into()))); } } @@ -206,20 +195,20 @@ where } return Poll::Ready(opt.map(|ok| { ok.map(|mut data| { - // If we have already buffered the maximum number of bytes, - // allow *this* body to continue, but don't buffer any more. - if is_capped { + // If we have buffered the maximum number of bytes, allow + // *this* body to continue, but don't buffer any more. + if state.is_capped() { + // If there's data in the buffer, discard it now, since + // we won't allow any clones to have a complete body. + if state.buf.has_remaining() { + tracing::debug!("buffered maximum capacity, discarding buffer"); + state.buf = Default::default(); + } return Data::Initial(data.copy_to_bytes(data.remaining())); } - if state.buf.remaining() + data.remaining() > max_bytes { - tracing::debug!( - max_bytes, - "buffered maximum number of bytes, discarding buffer" - ); - // discard the buffer - state.buf = Default::default(); - state.is_capped = true; + state.max_bytes = state.max_bytes.saturating_sub(data.remaining()); + if state.is_capped() { return Data::Initial(data.copy_to_bytes(data.remaining())); } @@ -328,7 +317,6 @@ impl Clone for ReplayBody { // reading any additional data from the initial body. replay_body: true, replay_trailers: true, - max_bytes: self.max_bytes, } } } @@ -476,6 +464,15 @@ impl Buf for BufList { } } +// === impl BodyState === + +impl BodyState { + #[inline] + fn is_capped(&self) -> bool { + self.max_bytes == 0 + } +} + #[cfg(test)] mod tests { use super::*; From b4f4ed4cb20e126eb81d6429406005aaf8618bb4 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 10 Jun 2021 11:46:02 -0700 Subject: [PATCH 09/15] fixy Signed-off-by: Eliza Weisman --- linkerd/http-retry/src/lib.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/linkerd/http-retry/src/lib.rs b/linkerd/http-retry/src/lib.rs index 5cf0ef8e21..abab681017 100644 --- a/linkerd/http-retry/src/lib.rs +++ b/linkerd/http-retry/src/lib.rs @@ -197,19 +197,23 @@ where ok.map(|mut data| { // If we have buffered the maximum number of bytes, allow // *this* body to continue, but don't buffer any more. + let length = data.remaining(); + state.max_bytes = state.max_bytes.saturating_sub(length); if state.is_capped() { // If there's data in the buffer, discard it now, since // we won't allow any clones to have a complete body. if state.buf.has_remaining() { - tracing::debug!("buffered maximum capacity, discarding buffer"); + tracing::debug!( + buf.size = state.buf.remaining(), + "buffered maximum capacity, discarding buffer" + ); state.buf = Default::default(); } - return Data::Initial(data.copy_to_bytes(data.remaining())); + return Data::Initial(data.copy_to_bytes(length)); } - state.max_bytes = state.max_bytes.saturating_sub(data.remaining()); if state.is_capped() { - return Data::Initial(data.copy_to_bytes(data.remaining())); + return Data::Initial(data.copy_to_bytes(length)); } // Buffer and return the bytes @@ -801,6 +805,7 @@ mod tests { // Test that, when the initial body is longer than the preconfigured // cap, we allow the request to continue, but stop buffering. The // initial body will complete, but the replay will immediately fail. + let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=trace"); let (mut tx, body) = hyper::Body::channel(); let mut initial = ReplayBody::new(body, 8); @@ -830,6 +835,7 @@ mod tests { async fn caps_across_replays() { // Test that, when the initial body is longer than the preconfigured // cap, we allow the request to continue, but stop buffering. + let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=debug"); let (mut tx, body) = hyper::Body::channel(); let mut initial = ReplayBody::new(body, 8); From 28cbca5bf107247efc2f372d7b809a3699ec749a Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 10 Jun 2021 20:21:47 +0000 Subject: [PATCH 10/15] fuzzlock --- linkerd/app/inbound/fuzz/Cargo.lock | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/linkerd/app/inbound/fuzz/Cargo.lock b/linkerd/app/inbound/fuzz/Cargo.lock index 21f0f9cf3a..baec5ef58c 100644 --- a/linkerd/app/inbound/fuzz/Cargo.lock +++ b/linkerd/app/inbound/fuzz/Cargo.lock @@ -610,6 +610,7 @@ dependencies = [ "linkerd-proxy-tcp", "linkerd-proxy-transport", "linkerd-reconnect", + "linkerd-retry", "linkerd-service-profiles", "linkerd-stack", "linkerd-stack-metrics", @@ -864,11 +865,9 @@ dependencies = [ "http", "http-body", "linkerd-error", - "linkerd-http-box", - "linkerd-stack", "parking_lot", "pin-project", - "tower", + "thiserror", "tracing", ] @@ -1128,6 +1127,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "linkerd-retry" +version = "0.1.0" +dependencies = [ + "linkerd-error", + "linkerd-stack", + "pin-project", + "tower", + "tracing", +] + [[package]] name = "linkerd-service-profiles" version = "0.1.0" From a6e91f47dd72612c9b12306192e6d1914b5878d0 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 11 Jun 2021 11:28:47 -0700 Subject: [PATCH 11/15] Update linkerd/http-retry/src/lib.rs --- linkerd/http-retry/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkerd/http-retry/src/lib.rs b/linkerd/http-retry/src/lib.rs index abab681017..df16deead1 100644 --- a/linkerd/http-retry/src/lib.rs +++ b/linkerd/http-retry/src/lib.rs @@ -450,7 +450,7 @@ impl Buf for BufList { match self.bufs.front_mut() { Some(first) if len <= first.remaining() => { let buf = first.copy_to_bytes(len); - // if we consumed the first buffer, also advance our "cursor" by + // If we consumed the first buffer, also advance our "cursor" by // popping it. if first.remaining() == 0 { self.bufs.pop_front(); From 9da2a449d8f37ea8e44660040b8260c43d832cb0 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 11 Jun 2021 12:00:25 -0700 Subject: [PATCH 12/15] Update linkerd/http-box/src/erase_request.rs --- linkerd/http-box/src/erase_request.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/linkerd/http-box/src/erase_request.rs b/linkerd/http-box/src/erase_request.rs index 5b455ce0e6..feff365034 100644 --- a/linkerd/http-box/src/erase_request.rs +++ b/linkerd/http-box/src/erase_request.rs @@ -9,8 +9,7 @@ use std::task::{Context, Poll}; /// This is *very* similar to the [`BoxRequest`] middleware. However, that /// middleware is generic over a specific body type that is erased. A given /// instance of `EraseRequest` can only erase the type of one particular `Body` -/// type, while this middleware will erase -/// bodies of *any* type. +/// type, while this middleware will erase bodies of *any* type. /// /// An astute reader may ask, why not simply replace `BoxRequest` with this /// middleware, if it is a more flexible superset of the same behavior? The From bc61327d8d4225c942273d9dacdc12bc8b7364f7 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 11 Jun 2021 12:05:52 -0700 Subject: [PATCH 13/15] Apply suggestions from code review --- linkerd/http-box/src/erase_request.rs | 1 + linkerd/retry/src/lib.rs | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/linkerd/http-box/src/erase_request.rs b/linkerd/http-box/src/erase_request.rs index feff365034..216fb2ed84 100644 --- a/linkerd/http-box/src/erase_request.rs +++ b/linkerd/http-box/src/erase_request.rs @@ -1,4 +1,5 @@ //! A middleware that boxes HTTP request bodies. + use crate::BoxBody; use linkerd_error::Error; use linkerd_stack::{layer, Proxy}; diff --git a/linkerd/retry/src/lib.rs b/linkerd/retry/src/lib.rs index a470b98e7b..d595c5d86f 100644 --- a/linkerd/retry/src/lib.rs +++ b/linkerd/retry/src/lib.rs @@ -1,7 +1,6 @@ #![deny(warnings, rust_2018_idioms)] #![forbid(unsafe_code)] #![allow(clippy::inconsistent_struct_constructor)] -#![allow(clippy::type_complexity)] use linkerd_error::Error; use linkerd_stack::{Either, NewService, Proxy, ProxyService}; From 026651817e92c45f0f3e2efc9c708eca384403ff Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 11 Jun 2021 14:08:14 -0700 Subject: [PATCH 14/15] Update linkerd/retry/src/lib.rs --- linkerd/retry/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/linkerd/retry/src/lib.rs b/linkerd/retry/src/lib.rs index d595c5d86f..20272c5085 100644 --- a/linkerd/retry/src/lib.rs +++ b/linkerd/retry/src/lib.rs @@ -18,7 +18,7 @@ pub trait NewPolicy { fn new_policy(&self, target: &T) -> Option; } -/// A layer that applies per-target retry polcies. +/// A layer that applies per-target retry policies. /// /// Composes `NewService`s that produce a `Proxy`. #[derive(Clone, Debug)] From 712c2a4956f4a4ea81e9afbe0a9f9d44da4c7ce4 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Mon, 14 Jun 2021 12:01:26 -0700 Subject: [PATCH 15/15] RetryPolicy refactor suggestions (#1045) * wip * inline * Hang the EraseRequest layer off of BoxRequest --- linkerd/app/core/src/retry.rs | 36 ++++++------ linkerd/app/outbound/src/http/logical.rs | 4 +- linkerd/http-box/src/erase_request.rs | 11 +++- linkerd/http-box/src/request.rs | 11 +++- linkerd/http-box/src/response.rs | 2 + linkerd/proxy/http/src/lib.rs | 2 +- linkerd/retry/Cargo.toml | 2 +- linkerd/retry/src/lib.rs | 71 ++++++++++-------------- linkerd/stack/src/lib.rs | 2 +- 9 files changed, 72 insertions(+), 69 deletions(-) diff --git a/linkerd/app/core/src/retry.rs b/linkerd/app/core/src/retry.rs index 3a7e40814a..2db752234b 100644 --- a/linkerd/app/core/src/retry.rs +++ b/linkerd/app/core/src/retry.rs @@ -6,48 +6,48 @@ use crate::profiles; use futures::future; use linkerd_error::Error; use linkerd_http_classify::{Classify, ClassifyEos, ClassifyResponse}; -use linkerd_stack::{Either, Param}; -use std::sync::Arc; -use tower::retry::{budget::Budget, Policy}; - use linkerd_http_retry::ReplayBody; -use linkerd_retry::*; +use linkerd_retry as retry; +use linkerd_stack::{layer, Either, Param}; +use std::sync::Arc; -pub fn layer(metrics: HttpRouteRetry) -> NewRetryLayer { - NewRetryLayer::new(NewRetry::new(metrics)) +pub fn layer( + metrics: HttpRouteRetry, +) -> impl layer::Layer> + Clone { + retry::NewRetry::<_, N>::layer(NewRetryPolicy::new(metrics)) } #[derive(Clone, Debug)] -pub struct NewRetry { +pub struct NewRetryPolicy { metrics: HttpRouteRetry, } #[derive(Clone, Debug)] -pub struct Retry { +pub struct RetryPolicy { metrics: Handle, - budget: Arc, + budget: Arc, response_classes: profiles::http::ResponseClasses, } /// Allow buffering requests up to 64 kb const MAX_BUFFERED_BYTES: usize = 64 * 1024; -// === impl NewRetry === +// === impl NewRetryPolicy === -impl NewRetry { +impl NewRetryPolicy { pub fn new(metrics: HttpRouteRetry) -> Self { Self { metrics } } } -impl NewPolicy for NewRetry { - type Policy = Retry; +impl retry::NewPolicy for NewRetryPolicy { + type Policy = RetryPolicy; fn new_policy(&self, route: &Route) -> Option { let retries = route.route.retries().cloned()?; let metrics = self.metrics.get_handle(route.param()); - Some(Retry { + Some(RetryPolicy { metrics, budget: retries.budget().clone(), response_classes: route.route.response_classes().clone(), @@ -57,7 +57,7 @@ impl NewPolicy for NewRetry { // === impl Retry === -impl Retry { +impl RetryPolicy { fn can_retry(&self, req: &http::Request) -> bool { let content_length = |req: &http::Request<_>| { req.headers() @@ -88,7 +88,7 @@ impl Retry { } } -impl Policy, http::Response, E> for Retry +impl retry::Policy, http::Response, E> for RetryPolicy where A: http_body::Body + Clone, { @@ -142,7 +142,7 @@ where } } -impl RetryPolicy, http::Response, E> for Retry +impl retry::PrepareRequest, http::Response, E> for RetryPolicy where A: http_body::Body + Unpin, A::Error: Into, diff --git a/linkerd/app/outbound/src/http/logical.rs b/linkerd/app/outbound/src/http/logical.rs index d233e06b37..2ec32225ad 100644 --- a/linkerd/app/outbound/src/http/logical.rs +++ b/linkerd/app/outbound/src/http/logical.rs @@ -125,7 +125,7 @@ impl Outbound { // any `Body` type into `BoxBody` so that the rest of the // stack doesn't have to implement `Service` for requests // with both body types. - .push_on_response(http::EraseRequest::layer()) + .push_on_response(http::BoxRequest::erased()) // Sets an optional retry policy. .push(retry::layer(rt.metrics.http_route_retry.clone())) // Sets an optional request timeout. @@ -140,7 +140,7 @@ impl Outbound { )) // Strips headers that may be set by this proxy and add an outbound // canonical-dst-header. The response body is boxed unify the profile - // stack's response type. withthat of to endpoint stack. + // stack's response type with that of to endpoint stack. .push(http::NewHeaderFromTarget::::layer()) .push_on_response(svc::layers().push(http::BoxResponse::layer())) .instrument(|l: &Logical| debug_span!("logical", dst = %l.logical_addr)) diff --git a/linkerd/http-box/src/erase_request.rs b/linkerd/http-box/src/erase_request.rs index 216fb2ed84..1c4181ee44 100644 --- a/linkerd/http-box/src/erase_request.rs +++ b/linkerd/http-box/src/erase_request.rs @@ -5,7 +5,7 @@ use linkerd_error::Error; use linkerd_stack::{layer, Proxy}; use std::task::{Context, Poll}; -/// A middleware that boxes HTTP request bodies. +/// Boxes request bodies, erasing the original type. /// /// This is *very* similar to the [`BoxRequest`] middleware. However, that /// middleware is generic over a specific body type that is erased. A given @@ -27,8 +27,12 @@ use std::task::{Context, Poll}; pub struct EraseRequest(S); impl EraseRequest { + pub fn new(inner: S) -> Self { + Self(inner) + } + pub fn layer() -> impl layer::Layer + Clone + Copy { - layer::mk(EraseRequest) + layer::mk(Self::new) } } @@ -49,10 +53,12 @@ where type Error = S::Error; type Future = S::Future; + #[inline] fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.0.poll_ready(cx) } + #[inline] fn call(&mut self, req: http::Request) -> Self::Future { self.0.call(req.map(BoxBody::new)) } @@ -71,6 +77,7 @@ where type Error = P::Error; type Future = P::Future; + #[inline] fn proxy(&self, inner: &mut S, req: http::Request) -> Self::Future { self.0.proxy(inner, req.map(BoxBody::new)) } diff --git a/linkerd/http-box/src/request.rs b/linkerd/http-box/src/request.rs index 30b4340301..714f2a206f 100644 --- a/linkerd/http-box/src/request.rs +++ b/linkerd/http-box/src/request.rs @@ -1,6 +1,6 @@ //! A middleware that boxes HTTP request bodies. -use crate::BoxBody; +use crate::{erase_request::EraseRequest, BoxBody}; use linkerd_error::Error; use linkerd_stack::layer; use std::{ @@ -17,6 +17,13 @@ impl BoxRequest { } } +impl BoxRequest { + /// Constructs a boxing layer that erases the inner request type with [`EraseRequest`]. + pub fn erased() -> impl layer::Layer> + Clone + Copy { + EraseRequest::layer() + } +} + impl Clone for BoxRequest { fn clone(&self) -> Self { BoxRequest(self.0.clone(), self.1) @@ -34,10 +41,12 @@ where type Error = S::Error; type Future = S::Future; + #[inline] fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.0.poll_ready(cx) } + #[inline] fn call(&mut self, req: http::Request) -> Self::Future { self.0.call(req.map(BoxBody::new)) } diff --git a/linkerd/http-box/src/response.rs b/linkerd/http-box/src/response.rs index a3ad21f840..73e88e608a 100644 --- a/linkerd/http-box/src/response.rs +++ b/linkerd/http-box/src/response.rs @@ -26,10 +26,12 @@ where type Error = S::Error; type Future = future::MapOk Self::Response>; + #[inline] fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.0.poll_ready(cx) } + #[inline] fn call(&mut self, req: Req) -> Self::Future { self.0.call(req).map_ok(|rsp| rsp.map(BoxBody::new)) } diff --git a/linkerd/proxy/http/src/lib.rs b/linkerd/proxy/http/src/lib.rs index 7ee909ed19..58eb07276f 100644 --- a/linkerd/proxy/http/src/lib.rs +++ b/linkerd/proxy/http/src/lib.rs @@ -42,7 +42,7 @@ pub use http::{ uri, Request, Response, StatusCode, }; pub use hyper::body::HttpBody; -pub use linkerd_http_box::{BoxBody, BoxRequest, BoxResponse, EraseRequest}; +pub use linkerd_http_box::{BoxBody, BoxRequest, BoxResponse}; #[derive(Clone, Debug)] pub struct HeaderPair(pub HeaderName, pub HeaderValue); diff --git a/linkerd/retry/Cargo.toml b/linkerd/retry/Cargo.toml index dd9c962ac7..faefaa840f 100644 --- a/linkerd/retry/Cargo.toml +++ b/linkerd/retry/Cargo.toml @@ -10,5 +10,5 @@ publish = false linkerd-error = { path = "../error" } linkerd-stack = { path = "../stack" } pin-project = "1" -tower = { version = "0.4.7", default-features = false, features = ["retry", "util"] } +tower = { version = "0.4.7", default-features = false, features = ["retry"] } tracing = "0.1.23" diff --git a/linkerd/retry/src/lib.rs b/linkerd/retry/src/lib.rs index 20272c5085..54adb17098 100644 --- a/linkerd/retry/src/lib.rs +++ b/linkerd/retry/src/lib.rs @@ -3,12 +3,14 @@ #![allow(clippy::inconsistent_struct_constructor)] use linkerd_error::Error; -use linkerd_stack::{Either, NewService, Proxy, ProxyService}; +use linkerd_stack::{layer, Either, NewService, Oneshot, Proxy, ProxyService, ServiceExt}; use pin_project::pin_project; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; -use tower::util::{Oneshot, ServiceExt}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +pub use tower::retry::{budget::Budget, Policy}; use tracing::trace; /// A strategy for obtaining per-target retry polices. @@ -18,29 +20,9 @@ pub trait NewPolicy { fn new_policy(&self, target: &T) -> Option; } -/// A layer that applies per-target retry policies. -/// -/// Composes `NewService`s that produce a `Proxy`. -#[derive(Clone, Debug)] -pub struct NewRetryLayer

{ - new_policy: P, -} - -#[derive(Clone, Debug)] -pub struct NewRetry { - new_policy: P, - inner: N, -} - -#[derive(Clone, Debug)] -pub struct Retry { - policy: Option

, - inner: S, -} - /// An extension to [`tower::retry::Policy`] that adds a method to prepare a /// request to be retried, possibly changing its type. -pub trait RetryPolicy: tower::retry::Policy { +pub trait PrepareRequest: tower::retry::Policy { /// A request type that can be retried. /// /// This *may* be the same as the `Req` type parameter, but it can also be a @@ -59,6 +41,19 @@ pub trait RetryPolicy: tower::retry::Policy Either; } +/// Applies per-target retry policies. +#[derive(Clone, Debug)] +pub struct NewRetry { + new_policy: P, + inner: N, +} + +#[derive(Clone, Debug)] +pub struct Retry { + policy: Option

, + inner: S, +} + #[pin_project(project = ResponseFutureProj)] pub enum ResponseFuture where @@ -71,27 +66,17 @@ where Retry(#[pin] Oneshot>, Req>), } -// === impl NewRetryLayer === - -impl

NewRetryLayer

{ - pub fn new(new_policy: P) -> Self { - Self { new_policy } - } -} - -impl tower::layer::Layer for NewRetryLayer

{ - type Service = NewRetry; +// === impl NewRetry === - fn layer(&self, inner: N) -> Self::Service { - Self::Service { +impl NewRetry { + pub fn layer(new_policy: P) -> impl layer::Layer + Clone { + layer::mk(move |inner| Self { inner, - new_policy: self.new_policy.clone(), - } + new_policy: new_policy.clone(), + }) } } -// === impl NewRetry === - impl NewService for NewRetry where N: NewService, @@ -112,7 +97,7 @@ where impl Proxy for Retry where - R: RetryPolicy + Clone, + R: PrepareRequest + Clone, P: Proxy + Proxy + Clone, diff --git a/linkerd/stack/src/lib.rs b/linkerd/stack/src/lib.rs index 04c234d98c..3c3aa30322 100644 --- a/linkerd/stack/src/lib.rs +++ b/linkerd/stack/src/lib.rs @@ -41,7 +41,7 @@ pub use self::{ unwrap_or::UnwrapOr, }; pub use tower::{ - util::{future_service, FutureService, MapErr, MapErrLayer, ServiceExt}, + util::{future_service, FutureService, MapErr, MapErrLayer, Oneshot, ServiceExt}, Service, };