Skip to content

Commit

Permalink
Add support for retries and timeouts (#12888)
Browse files Browse the repository at this point in the history
Adds support for configuring retries and timeouts as outbound policy.  Http retries can be configured as annotations on HttpRoute or Service resources like

```
retry.linkerd.io/http: 5xx,gateway-error
retry.linkerd.io/limit: "2"
retry.linkerd.io/timeout: 400ms
```

If any of these retry annotations are specified on an HttpRoute resource, they will override ALL retry annotations on the parent Service resource.

Similarly, Grpc retries can be configured as annotations on GrpcRoute or Service resources like

```
retry.linkerd.io/grpc: cancelled,deadline-exceeded,internal,resource-exhausted,unavailable
retry.linkerd.io/limit: "2"
retry.linkerd.io/timeout: 400ms
```

Outbound timeouts can be configured on HttpRoute, GrpcRoute, or Service resources like

```
timeout.linkerd.io/request: 500ms
timeout.linkerd.io/response: 100ms
timeout.linkerd.io/idle: 50ms
```

If any of these timeout annotations are specified on a HttpRoute or GrpcRoute resource, they will override ALL timeout annotations on the parent Service resource.

Signed-off-by: Alex Leong <alex@buoyant.io>
Co-authored-by: Oliver Gould <ver@buoyant.io>
  • Loading branch information
adleong and olix0r authored Jul 26, 2024
1 parent 2ead03f commit aed4850
Show file tree
Hide file tree
Showing 17 changed files with 1,385 additions and 641 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1321,9 +1321,9 @@ dependencies = [

[[package]]
name = "linkerd2-proxy-api"
version = "0.13.1"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65678e4c506a7e5fdf1a664c629a9b658afa70e254dffcd24df72e937b2c0159"
checksum = "26c72fb98d969e1e94e95d52a6fcdf4693764702c369e577934256e72fb5bc61"
dependencies = [
"http",
"ipnet",
Expand Down
50 changes: 42 additions & 8 deletions policy-controller/core/src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ pub trait DiscoverOutboundPolicy<T> {

pub type OutboundPolicyStream = Pin<Box<dyn Stream<Item = OutboundPolicy> + Send + Sync + 'static>>;

pub type RouteSet<M> = HashMap<GroupKindNamespaceName, OutboundRoute<M>>;
pub type HttpRoute = OutboundRoute<HttpRouteMatch, HttpRetryCondition>;
pub type GrpcRoute = OutboundRoute<GrpcRouteMatch, GrpcRetryCondition>;
pub type RouteSet<T> = HashMap<GroupKindNamespaceName, T>;

pub struct OutboundDiscoverTarget {
pub service_name: String,
Expand All @@ -31,32 +33,35 @@ pub struct OutboundDiscoverTarget {

#[derive(Clone, Debug, PartialEq)]
pub struct OutboundPolicy {
pub http_routes: RouteSet<HttpRouteMatch>,
pub grpc_routes: RouteSet<GrpcRouteMatch>,
pub http_routes: RouteSet<HttpRoute>,
pub grpc_routes: RouteSet<GrpcRoute>,
pub authority: String,
pub name: String,
pub namespace: String,
pub port: NonZeroU16,
pub opaque: bool,
pub accrual: Option<FailureAccrual>,
pub http_retry: Option<RouteRetry<HttpRetryCondition>>,
pub grpc_retry: Option<RouteRetry<GrpcRetryCondition>>,
pub timeouts: RouteTimeouts,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct OutboundRoute<M> {
pub struct OutboundRoute<M, R> {
pub hostnames: Vec<HostMatch>,
pub rules: Vec<OutboundRouteRule<M>>,
pub rules: Vec<OutboundRouteRule<M, R>>,

/// This is required for ordering returned routes
/// by their creation timestamp.
pub creation_timestamp: Option<DateTime<Utc>>,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct OutboundRouteRule<M> {
pub struct OutboundRouteRule<M, R> {
pub matches: Vec<M>,
pub backends: Vec<Backend>,
pub request_timeout: Option<time::Duration>,
pub backend_request_timeout: Option<time::Duration>,
pub retry: Option<RouteRetry<R>>,
pub timeouts: RouteTimeouts,
pub filters: Vec<Filter>,
}

Expand Down Expand Up @@ -104,3 +109,32 @@ pub enum Filter {
RequestRedirect(RequestRedirectFilter),
FailureInjector(FailureInjectorFilter),
}

#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct RouteTimeouts {
pub response: Option<time::Duration>,
pub request: Option<time::Duration>,
pub idle: Option<time::Duration>,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RouteRetry<R> {
pub limit: u16,
pub timeout: Option<time::Duration>,
pub conditions: Option<Vec<R>>,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct HttpRetryCondition {
pub status_min: u32,
pub status_max: u32,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum GrpcRetryCondition {
Cancelled,
DeadlineExceeded,
ResourceExhausted,
Internal,
Unavailable,
}
2 changes: 1 addition & 1 deletion policy-controller/grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"

[dependencies.linkerd2-proxy-api]
version = "0.13"
version = "0.14"
features = ["inbound", "outbound"]
57 changes: 46 additions & 11 deletions policy-controller/grpc/src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,24 @@ pub struct OutboundPolicyServer<T> {
index: T,
// Used to parse named addresses into <svc>.<ns>.svc.<cluster-domain>.
cluster_domain: Arc<str>,
allow_l5d_request_headers: bool,
drain: drain::Watch,
}

impl<T> OutboundPolicyServer<T>
where
T: DiscoverOutboundPolicy<OutboundDiscoverTarget> + Send + Sync + 'static,
{
pub fn new(discover: T, cluster_domain: impl Into<Arc<str>>, drain: drain::Watch) -> Self {
pub fn new(
discover: T,
cluster_domain: impl Into<Arc<str>>,
allow_l5d_request_headers: bool,
drain: drain::Watch,
) -> Self {
Self {
index: discover,
cluster_domain: cluster_domain.into(),
allow_l5d_request_headers,
drain,
}
}
Expand Down Expand Up @@ -149,7 +156,10 @@ where
})?;

if let Some(policy) = policy {
Ok(tonic::Response::new(to_service(policy)))
Ok(tonic::Response::new(to_service(
policy,
self.allow_l5d_request_headers,
)))
} else {
Err(tonic::Status::not_found("No such policy"))
}
Expand All @@ -170,15 +180,23 @@ where
.await
.map_err(|e| tonic::Status::internal(format!("lookup failed: {e}")))?
.ok_or_else(|| tonic::Status::not_found("unknown server"))?;
Ok(tonic::Response::new(response_stream(drain, rx)))
Ok(tonic::Response::new(response_stream(
drain,
rx,
self.allow_l5d_request_headers,
)))
}
}

type BoxWatchStream = std::pin::Pin<
Box<dyn Stream<Item = Result<outbound::OutboundPolicy, tonic::Status>> + Send + Sync>,
>;

fn response_stream(drain: drain::Watch, mut rx: OutboundPolicyStream) -> BoxWatchStream {
fn response_stream(
drain: drain::Watch,
mut rx: OutboundPolicyStream,
allow_l5d_request_headers: bool,
) -> BoxWatchStream {
Box::pin(async_stream::try_stream! {
tokio::pin! {
let shutdown = drain.signaled();
Expand All @@ -189,7 +207,7 @@ fn response_stream(drain: drain::Watch, mut rx: OutboundPolicyStream) -> BoxWatc
// When the port is updated with a new server, update the server watch.
res = rx.next() => match res {
Some(policy) => {
yield to_service(policy);
yield to_service(policy, allow_l5d_request_headers);
}
None => return,
},
Expand All @@ -204,7 +222,10 @@ fn response_stream(drain: drain::Watch, mut rx: OutboundPolicyStream) -> BoxWatc
})
}

fn to_service(outbound: OutboundPolicy) -> outbound::OutboundPolicy {
fn to_service(
outbound: OutboundPolicy,
allow_l5d_request_headers: bool,
) -> outbound::OutboundPolicy {
let backend: outbound::Backend = default_backend(&outbound);

let kind = if outbound.opaque {
Expand Down Expand Up @@ -235,10 +256,24 @@ fn to_service(outbound: OutboundPolicy) -> outbound::OutboundPolicy {

if !grpc_routes.is_empty() {
grpc_routes.sort_by(timestamp_then_name);
grpc::protocol(backend, grpc_routes.into_iter(), accrual)
grpc::protocol(
backend,
grpc_routes.into_iter(),
accrual,
outbound.grpc_retry,
outbound.timeouts,
allow_l5d_request_headers,
)
} else {
http_routes.sort_by(timestamp_then_name);
http::protocol(backend, http_routes.into_iter(), accrual)
http::protocol(
backend,
http_routes.into_iter(),
accrual,
outbound.http_retry,
outbound.timeouts,
allow_l5d_request_headers,
)
}
};

Expand All @@ -259,9 +294,9 @@ fn to_service(outbound: OutboundPolicy) -> outbound::OutboundPolicy {
}
}

fn timestamp_then_name<LeftMatchType, RightMatchType>(
(left_id, left_route): &(GroupKindNamespaceName, OutboundRoute<LeftMatchType>),
(right_id, right_route): &(GroupKindNamespaceName, OutboundRoute<RightMatchType>),
fn timestamp_then_name<LM, LR, RM, RR>(
(left_id, left_route): &(GroupKindNamespaceName, OutboundRoute<LM, LR>),
(right_id, right_route): &(GroupKindNamespaceName, OutboundRoute<RM, RR>),
) -> std::cmp::Ordering {
let by_ts = match (
&left_route.creation_timestamp,
Expand Down
Loading

0 comments on commit aed4850

Please sign in to comment.