Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure services in failfast can become ready #858

Merged
merged 2 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,11 @@ impl Config {
.check_new_service::<Target, http::Request<http::BoxBody>>()
.push_on_response(
svc::layers()
.push(svc::FailFast::layer("Logical", self.proxy.dispatch_timeout))
.push(svc::layer::mk(svc::SpawnReady::new))
.push(svc::FailFast::layer(
"HTTP Logical",
self.proxy.dispatch_timeout,
))
.push_spawn_buffer(self.proxy.buffer_capacity)
.push(metrics.stack.layer(stack_labels("http", "logical"))),
)
Expand Down Expand Up @@ -345,10 +349,13 @@ impl Config {
svc::layers()
// Downgrades the protocol if upgraded by an outbound proxy.
.push(orig_proto::Downgrade::layer())
// Limits the number of in-flight requests.
// Limit the number of in-flight requests. When the proxy is
// at capacity, go into failfast after a dispatch timeout.
// Note that the inner service _always_ returns ready (due
// to `NewRouter`) and the concurrency limit need not be
// driven outside of the request path, so there's no need
// for SpawnReady
.push(svc::ConcurrencyLimit::layer(max_in_flight_requests))
// Eagerly fail requests when the proxy is out of capacity for a
// dispatch_timeout.
.push(svc::FailFast::layer("HTTP Server", dispatch_timeout))
.push(metrics.http_errors.clone())
// Synthesizes responses for proxy errors.
Expand Down
45 changes: 25 additions & 20 deletions linkerd/app/outbound/src/http/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,55 +47,60 @@ where
let watchdog = cache_max_idle_age * 2;

svc::stack(endpoint.clone())
.check_new_service::<Endpoint, http::Request<http::BoxBody>>()
.push_on_response(
svc::layers()
.push(svc::layer::mk(svc::SpawnReady::new))
.push(http::BoxRequest::layer())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just out of curiosity, why put the box above metrics now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly I want the box as close to the inner service as possible, as it's responsible for satisfying the type signature of the inner service.

.push(
metrics
.stack
.layer(stack_labels("http", "balance.endpoint")),
)
.push(http::BoxRequest::layer()),
// Ensure individual endpoints are driven to readiness so that
// the balancer need not drive them all directly.
.push(svc::layer::mk(svc::SpawnReady::new)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIOLI: could be nice to add a SpawnReady::layer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)
.check_new_service::<Endpoint, http::Request<_>>()
// Resolve the service to its endponts and balance requests over them.
//
// If the balancer has been empty/unavailable, eagerly fail requests.
// When the balancer is in failfast, spawn the service in a background
// task so it becomes ready without new requests.
.push(resolve::layer(resolve, watchdog))
.check_service::<Concrete>()
.push_on_response(
svc::layers()
.push(http::balance::layer(
crate::EWMA_DEFAULT_RTT,
crate::EWMA_DECAY,
))
.push(svc::layer::mk(svc::SpawnReady::new))
// If the balancer has been empty/unavailable for 10s, eagerly fail
// requests.
.push(svc::FailFast::layer("HTTP Balancer", dispatch_timeout))
.push(metrics.stack.layer(stack_labels("http", "concrete"))),
)
.push(svc::MapErrLayer::new(Into::into))
// Drives the initial resolution via the service's readiness.
.into_new_service()
.check_new_service::<Concrete, http::Request<_>>()
// The concrete address is only set when the profile could be
// resolved. Endpoint resolution is skipped when there is no
// concrete address.
.instrument(|c: &Concrete| match c.resolve.as_ref() {
None => debug_span!("concrete"),
Some(addr) => debug_span!("concrete", %addr),
})
.check_new_service::<Concrete, http::Request<_>>()
// The concrete address is only set when the profile could be
// resolved. Endpoint resolution is skipped when there is no
// concrete address.
.push_map_target(Concrete::from)
.check_new_service::<(Option<Addr>, Logical), http::Request<_>>()
// Distribute requests over a distribution of balancers via a traffic
// split.
//
// If the traffic split is empty/unavailable, eagerly fail
// requests requests. When the split is in failfast, spawn
// the service in a background task so it becomes ready without
// new requests.
.push(profiles::split::layer())
.check_new_service::<Logical, http::Request<_>>()
// Drives concrete stacks to readiness and makes the split
// cloneable, as required by the retry middleware.
.push_on_response(
svc::layers()
.push(svc::layer::mk(svc::SpawnReady::new))
.push(svc::FailFast::layer("HTTP Logical", dispatch_timeout))
.push_spawn_buffer(buffer_capacity),
)
.check_new_service::<Logical, http::Request<_>>()
// Note: routes can't exert backpressure.
.push(profiles::http::route_request::layer(
svc::proxies()
.push(
Expand All @@ -115,16 +120,16 @@ where
.push_map_target(Logical::mk_route)
.into_inner(),
))
.check_new_service::<Logical, http::Request<_>>()
// Strips headers that may be set by this proxy and add an outbound
// canonical-dst-header. The response body is boxed unify the profile
// stack's response type. withthat of to endpoint stack.
.push(http::NewHeaderFromTarget::layer(CANONICAL_DST_HEADER))
.push_on_response(
svc::layers()
// Strips headers that may be set by this proxy.
.push(http::strip_header::request::layer(DST_OVERRIDE_HEADER))
.push(http::BoxResponse::layer()),
)
.instrument(|l: &Logical| debug_span!("logical", dst = %l.addr()))
.check_new_service::<Logical, http::Request<_>>()
.push_switch(
Logical::should_resolve,
svc::stack(endpoint)
Expand Down
38 changes: 22 additions & 16 deletions linkerd/app/outbound/src/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ where
+ 'static,
TSvc::Error: Into<Error>,
TSvc::Future: Send,
H: svc::NewService<http::Logical, Service = HSvc> + Clone + Send + Sync + 'static,
H: svc::NewService<http::Logical, Service = HSvc> + Clone + Send + Sync + Unpin + 'static,
HSvc: svc::Service<http::Request<http::BoxBody>, Response = http::Response<http::BoxBody>>
+ Send
+ 'static,
HSvc::Error: Into<Error>,
HSvc::Future: Send,
P: profiles::GetProfile<Addr> + Clone + Send + Sync + 'static,
P: profiles::GetProfile<Addr> + Clone + Send + Sync + Unpin + 'static,
P::Error: Send,
P::Future: Send,
{
Expand All @@ -73,42 +73,48 @@ where
.into_inner();

svc::stack(http)
.push_on_response(svc::MapErrLayer::new(Into::into))
.check_new_service::<http::Logical, http::Request<_>>()
.push_on_response(
svc::layers()
.push(http::BoxRequest::layer())
.push(svc::MapErrLayer::new(Into::into)),
)
// Lookup the profile for the outbound HTTP target, if appropriate.
//
// This service is buffered because it needs to initialize the profile
// resolution and a failfast is instrumented in case it becomes
// unavailable
// When this service is in failfast, ensure that we drive the
// inner service to readiness even if new requests aren't
// received.
.push_map_target(http::Logical::from)
.push(profiles::discover::layer(
profiles,
AllowHttpProfile(allow_discovery),
))
.check_new_service::<Target, http::Request<_>>()
.push_on_response(
svc::layers()
.push(svc::FailFast::layer("Logical", dispatch_timeout))
.push(svc::layer::mk(svc::SpawnReady::new))
.push(svc::FailFast::layer("HTTP Logical", dispatch_timeout))
Comment on lines +96 to +97
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIOLI: might be worth having a "push spawn ready and failfast in one method" helper since we now want to pair them in most places. might also help avoid accidentally adding new failfasts without a spawn ready?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, but we don't have a nameable spawn ready layer at the moment.

.push_spawn_buffer(buffer_capacity),
)
.push_cache(cache_max_idle_age)
.push_on_response(http::Retain::layer())
.check_new_service::<Target, http::Request<_>>()
.instrument(|t: &Target| info_span!("target", dst = %t.dst))
// Obtain a new inner service for each request (fom the above cache).
//
// Note that the router service is always ready, so the `FailFast` layer
// need not use a `SpawnReady` to drive the service to ready.
.push(svc::NewRouter::layer(TargetPerRequest::accept))
.check_new_service::<http::Accept, http::Request<_>>()
.push_on_response(
svc::layers()
.push(http::BoxRequest::layer())
// Limits the number of in-flight requests.
.push(svc::ConcurrencyLimit::layer(max_in_flight_requests))
// Eagerly fail requests when the proxy is out of capacity for a
// dispatch_timeout.
.push(svc::FailFast::layer("Server", dispatch_timeout))
.push(svc::FailFast::layer("HTTP Server", dispatch_timeout))
.push(metrics.http_errors.clone())
// Synthesizes responses for proxy errors.
.push(errors::layer())
// Initiates OpenCensus tracing.
.push(TraceContext::layer(span_sink.map(|span_sink| {
SpanConverter::server(span_sink, trace_labels())
})))
.push(metrics.stack.layer(stack_labels("http", "server")))
.push_spawn_buffer(buffer_capacity)
.push(http::BoxResponse::layer()),
)
.check_new_service::<http::Accept, http::Request<_>>()
Expand Down
21 changes: 15 additions & 6 deletions linkerd/app/outbound/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ where
svc::stack(stack)
.push_on_response(
svc::layers()
.push(svc::FailFast::layer("Accept", config.dispatch_timeout))
// If the traffic split is empty/unavailable, eagerly fail
// requests requests. When the split is in failfast, spawn
// the service in a background task so it becomes ready without
// new requests.
.push(svc::layer::mk(svc::SpawnReady::new))
.push(svc::FailFast::layer("TCP Server", config.dispatch_timeout))
.push_spawn_buffer(config.buffer_capacity),
)
.push_cache(config.cache_max_idle_age)
Expand Down Expand Up @@ -158,11 +163,15 @@ where
.push_on_response(
svc::layers()
.push(http::BoxRequest::layer())
// Limits the number of in-flight requests.
// Limit the number of in-flight requests. When the proxy is
// at capacity, go into failfast after a dispatch timeout. If
// the router is unavailable, then spawn the service on a
// background task to ensure it becomes ready without new
// requests being processed.
.push(svc::layer::mk(svc::SpawnReady::new))
.push(svc::ConcurrencyLimit::layer(max_in_flight_requests))
// Eagerly fail requests when the proxy is out of capacity for a
// dispatch_timeout.
.push(svc::FailFast::layer("Server", dispatch_timeout))
.push(svc::FailFast::layer("HTTP Server", dispatch_timeout))
.push_spawn_buffer(buffer_capacity)
.push(metrics.http_errors.clone())
// Synthesizes responses for proxy errors.
.push(errors::layer())
Expand All @@ -171,7 +180,6 @@ where
SpanConverter::server(span_sink, trace_labels())
})))
.push(metrics.stack.layer(stack_labels("http", "server")))
.push_spawn_buffer(buffer_capacity)
.push(http::BoxResponse::layer()),
)
.push(http::NewNormalizeUri::layer())
Expand All @@ -188,6 +196,7 @@ where
.push_switch(tcp::Logical::should_resolve, tcp_forward.clone())
.push_on_response(
svc::layers()
.push(svc::layer::mk(svc::SpawnReady::new))
.push(svc::FailFast::layer("TCP Logical", dispatch_timeout))
.push_spawn_buffer(buffer_capacity)
.push(metrics.stack.layer(stack_labels("tcp", "logical"))),
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ const DEFAULT_ADMIN_LISTEN_ADDR: &str = "127.0.0.1:4191";
const DEFAULT_METRICS_RETAIN_IDLE: Duration = Duration::from_secs(10 * 60);
const DEFAULT_INBOUND_DISPATCH_TIMEOUT: Duration = Duration::from_secs(1);
const DEFAULT_INBOUND_DETECT_TIMEOUT: Duration = Duration::from_secs(10);
const DEFAULT_INBOUND_CONNECT_TIMEOUT: Duration = Duration::from_millis(100);
const DEFAULT_INBOUND_CONNECT_TIMEOUT: Duration = Duration::from_millis(300);
const DEFAULT_INBOUND_CONNECT_BACKOFF: ExponentialBackoff = ExponentialBackoff {
min: Duration::from_millis(100),
max: Duration::from_millis(500),
Expand Down