-
Notifications
You must be signed in to change notification settings - Fork 271
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure services in failfast can become ready #858
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,55 +47,60 @@ where | |
let watchdog = cache_max_idle_age * 2; | ||
|
||
svc::stack(endpoint.clone()) | ||
.check_new_service::<Endpoint, http::Request<http::BoxBody>>() | ||
.push_on_response( | ||
svc::layers() | ||
.push(svc::layer::mk(svc::SpawnReady::new)) | ||
.push(http::BoxRequest::layer()) | ||
.push( | ||
metrics | ||
.stack | ||
.layer(stack_labels("http", "balance.endpoint")), | ||
) | ||
.push(http::BoxRequest::layer()), | ||
// Ensure individual endpoints are driven to readiness so that | ||
// the balancer need not drive them all directly. | ||
.push(svc::layer::mk(svc::SpawnReady::new)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TIOLI: could be nice to add a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
) | ||
.check_new_service::<Endpoint, http::Request<_>>() | ||
// Resolve the service to its endponts and balance requests over them. | ||
// | ||
// If the balancer has been empty/unavailable, eagerly fail requests. | ||
// When the balancer is in failfast, spawn the service in a background | ||
// task so it becomes ready without new requests. | ||
.push(resolve::layer(resolve, watchdog)) | ||
.check_service::<Concrete>() | ||
.push_on_response( | ||
svc::layers() | ||
.push(http::balance::layer( | ||
crate::EWMA_DEFAULT_RTT, | ||
crate::EWMA_DECAY, | ||
)) | ||
.push(svc::layer::mk(svc::SpawnReady::new)) | ||
// If the balancer has been empty/unavailable for 10s, eagerly fail | ||
// requests. | ||
.push(svc::FailFast::layer("HTTP Balancer", dispatch_timeout)) | ||
.push(metrics.stack.layer(stack_labels("http", "concrete"))), | ||
) | ||
.push(svc::MapErrLayer::new(Into::into)) | ||
// Drives the initial resolution via the service's readiness. | ||
.into_new_service() | ||
.check_new_service::<Concrete, http::Request<_>>() | ||
// The concrete address is only set when the profile could be | ||
// resolved. Endpoint resolution is skipped when there is no | ||
// concrete address. | ||
.instrument(|c: &Concrete| match c.resolve.as_ref() { | ||
None => debug_span!("concrete"), | ||
Some(addr) => debug_span!("concrete", %addr), | ||
}) | ||
.check_new_service::<Concrete, http::Request<_>>() | ||
// The concrete address is only set when the profile could be | ||
// resolved. Endpoint resolution is skipped when there is no | ||
// concrete address. | ||
.push_map_target(Concrete::from) | ||
.check_new_service::<(Option<Addr>, Logical), http::Request<_>>() | ||
// Distribute requests over a distribution of balancers via a traffic | ||
// split. | ||
// | ||
// If the traffic split is empty/unavailable, eagerly fail | ||
// requests requests. When the split is in failfast, spawn | ||
// the service in a background task so it becomes ready without | ||
// new requests. | ||
.push(profiles::split::layer()) | ||
.check_new_service::<Logical, http::Request<_>>() | ||
// Drives concrete stacks to readiness and makes the split | ||
// cloneable, as required by the retry middleware. | ||
.push_on_response( | ||
svc::layers() | ||
.push(svc::layer::mk(svc::SpawnReady::new)) | ||
.push(svc::FailFast::layer("HTTP Logical", dispatch_timeout)) | ||
.push_spawn_buffer(buffer_capacity), | ||
) | ||
.check_new_service::<Logical, http::Request<_>>() | ||
// Note: routes can't exert backpressure. | ||
.push(profiles::http::route_request::layer( | ||
svc::proxies() | ||
.push( | ||
|
@@ -115,16 +120,16 @@ where | |
.push_map_target(Logical::mk_route) | ||
.into_inner(), | ||
)) | ||
.check_new_service::<Logical, http::Request<_>>() | ||
// Strips headers that may be set by this proxy and add an outbound | ||
// canonical-dst-header. The response body is boxed unify the profile | ||
// stack's response type. withthat of to endpoint stack. | ||
.push(http::NewHeaderFromTarget::layer(CANONICAL_DST_HEADER)) | ||
.push_on_response( | ||
svc::layers() | ||
// Strips headers that may be set by this proxy. | ||
.push(http::strip_header::request::layer(DST_OVERRIDE_HEADER)) | ||
.push(http::BoxResponse::layer()), | ||
) | ||
.instrument(|l: &Logical| debug_span!("logical", dst = %l.addr())) | ||
.check_new_service::<Logical, http::Request<_>>() | ||
.push_switch( | ||
Logical::should_resolve, | ||
svc::stack(endpoint) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,13 +41,13 @@ where | |
+ 'static, | ||
TSvc::Error: Into<Error>, | ||
TSvc::Future: Send, | ||
H: svc::NewService<http::Logical, Service = HSvc> + Clone + Send + Sync + 'static, | ||
H: svc::NewService<http::Logical, Service = HSvc> + Clone + Send + Sync + Unpin + 'static, | ||
HSvc: svc::Service<http::Request<http::BoxBody>, Response = http::Response<http::BoxBody>> | ||
+ Send | ||
+ 'static, | ||
HSvc::Error: Into<Error>, | ||
HSvc::Future: Send, | ||
P: profiles::GetProfile<Addr> + Clone + Send + Sync + 'static, | ||
P: profiles::GetProfile<Addr> + Clone + Send + Sync + Unpin + 'static, | ||
P::Error: Send, | ||
P::Future: Send, | ||
{ | ||
|
@@ -73,42 +73,48 @@ where | |
.into_inner(); | ||
|
||
svc::stack(http) | ||
.push_on_response(svc::MapErrLayer::new(Into::into)) | ||
.check_new_service::<http::Logical, http::Request<_>>() | ||
.push_on_response( | ||
svc::layers() | ||
.push(http::BoxRequest::layer()) | ||
.push(svc::MapErrLayer::new(Into::into)), | ||
) | ||
// Lookup the profile for the outbound HTTP target, if appropriate. | ||
// | ||
// This service is buffered because it needs to initialize the profile | ||
// resolution and a failfast is instrumented in case it becomes | ||
// unavailable | ||
// When this service is in failfast, ensure that we drive the | ||
// inner service to readiness even if new requests aren't | ||
// received. | ||
.push_map_target(http::Logical::from) | ||
.push(profiles::discover::layer( | ||
profiles, | ||
AllowHttpProfile(allow_discovery), | ||
)) | ||
.check_new_service::<Target, http::Request<_>>() | ||
.push_on_response( | ||
svc::layers() | ||
.push(svc::FailFast::layer("Logical", dispatch_timeout)) | ||
.push(svc::layer::mk(svc::SpawnReady::new)) | ||
.push(svc::FailFast::layer("HTTP Logical", dispatch_timeout)) | ||
Comment on lines
+96
to
+97
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TIOLI: might be worth having a "push spawn ready and failfast in one method" helper since we now want to pair them in most places. might also help avoid accidentally adding new failfasts without a spawn ready? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed, but we don't have a nameable spawn ready layer at the moment. |
||
.push_spawn_buffer(buffer_capacity), | ||
) | ||
.push_cache(cache_max_idle_age) | ||
.push_on_response(http::Retain::layer()) | ||
.check_new_service::<Target, http::Request<_>>() | ||
.instrument(|t: &Target| info_span!("target", dst = %t.dst)) | ||
// Obtain a new inner service for each request (fom the above cache). | ||
// | ||
// Note that the router service is always ready, so the `FailFast` layer | ||
// need not use a `SpawnReady` to drive the service to ready. | ||
.push(svc::NewRouter::layer(TargetPerRequest::accept)) | ||
.check_new_service::<http::Accept, http::Request<_>>() | ||
.push_on_response( | ||
svc::layers() | ||
.push(http::BoxRequest::layer()) | ||
// Limits the number of in-flight requests. | ||
.push(svc::ConcurrencyLimit::layer(max_in_flight_requests)) | ||
// Eagerly fail requests when the proxy is out of capacity for a | ||
// dispatch_timeout. | ||
.push(svc::FailFast::layer("Server", dispatch_timeout)) | ||
.push(svc::FailFast::layer("HTTP Server", dispatch_timeout)) | ||
.push(metrics.http_errors.clone()) | ||
// Synthesizes responses for proxy errors. | ||
.push(errors::layer()) | ||
// Initiates OpenCensus tracing. | ||
.push(TraceContext::layer(span_sink.map(|span_sink| { | ||
SpanConverter::server(span_sink, trace_labels()) | ||
}))) | ||
.push(metrics.stack.layer(stack_labels("http", "server"))) | ||
.push_spawn_buffer(buffer_capacity) | ||
.push(http::BoxResponse::layer()), | ||
) | ||
.check_new_service::<http::Accept, http::Request<_>>() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just out of curiosity, why put the box above metrics now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly I want the box as close to the inner service as possible, as it's responsible for satisfying the type signature of the inner service.