@@ -66,6 +66,7 @@ use tower::{
6666 layer:: util:: { Identity , Stack } ,
6767 layer:: Layer ,
6868 limit:: concurrency:: ConcurrencyLimitLayer ,
69+ load_shed:: LoadShedLayer ,
6970 util:: BoxCloneService ,
7071 Service , ServiceBuilder , ServiceExt ,
7172} ;
@@ -87,6 +88,7 @@ const DEFAULT_HTTP2_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(20);
8788pub struct Server < L = Identity > {
8889 trace_interceptor : Option < TraceInterceptor > ,
8990 concurrency_limit : Option < usize > ,
91+ load_shed : bool ,
9092 timeout : Option < Duration > ,
9193 #[ cfg( feature = "_tls-any" ) ]
9294 tls : Option < TlsAcceptor > ,
@@ -111,6 +113,7 @@ impl Default for Server<Identity> {
111113 Self {
112114 trace_interceptor : None ,
113115 concurrency_limit : None ,
116+ load_shed : false ,
114117 timeout : None ,
115118 #[ cfg( feature = "_tls-any" ) ]
116119 tls : None ,
@@ -179,6 +182,27 @@ impl<L> Server<L> {
179182 }
180183 }
181184
185+ /// Enable or disable load shedding. The default is disabled.
186+ ///
187+ /// When load shedding is enabled, if the service responds with not ready
188+ /// the request will immediately be rejected with a
189+ /// [`resource_exhausted`](https://docs.rs/tonic/latest/tonic/struct.Status.html#method.resource_exhausted) error.
190+ /// The default is to buffer requests. This is especially useful in combination with
191+ /// setting a concurrency limit per connection.
192+ ///
193+ /// # Example
194+ ///
195+ /// ```
196+ /// # use tonic::transport::Server;
197+ /// # use tower_service::Service;
198+ /// # let builder = Server::builder();
199+ /// builder.load_shed(true);
200+ /// ```
201+ #[ must_use]
202+ pub fn load_shed ( self , load_shed : bool ) -> Self {
203+ Server { load_shed, ..self }
204+ }
205+
182206 /// Set a timeout on for all request handlers.
183207 ///
184208 /// # Example
@@ -514,6 +538,7 @@ impl<L> Server<L> {
514538 service_builder : self . service_builder . layer ( new_layer) ,
515539 trace_interceptor : self . trace_interceptor ,
516540 concurrency_limit : self . concurrency_limit ,
541+ load_shed : self . load_shed ,
517542 timeout : self . timeout ,
518543 #[ cfg( feature = "_tls-any" ) ]
519544 tls : self . tls ,
@@ -643,6 +668,7 @@ impl<L> Server<L> {
643668 {
644669 let trace_interceptor = self . trace_interceptor . clone ( ) ;
645670 let concurrency_limit = self . concurrency_limit ;
671+ let load_shed = self . load_shed ;
646672 let init_connection_window_size = self . init_connection_window_size ;
647673 let init_stream_window_size = self . init_stream_window_size ;
648674 let max_concurrent_streams = self . max_concurrent_streams ;
@@ -667,6 +693,7 @@ impl<L> Server<L> {
667693 let mut svc = MakeSvc {
668694 inner : svc,
669695 concurrency_limit,
696+ load_shed,
670697 timeout,
671698 trace_interceptor,
672699 _io : PhantomData ,
@@ -1051,6 +1078,7 @@ impl<S> fmt::Debug for Svc<S> {
10511078#[ derive( Clone ) ]
10521079struct MakeSvc < S , IO > {
10531080 concurrency_limit : Option < usize > ,
1081+ load_shed : bool ,
10541082 timeout : Option < Duration > ,
10551083 inner : S ,
10561084 trace_interceptor : Option < TraceInterceptor > ,
@@ -1084,6 +1112,7 @@ where
10841112
10851113 let svc = ServiceBuilder :: new ( )
10861114 . layer ( RecoverErrorLayer :: new ( ) )
1115+ . option_layer ( self . load_shed . then_some ( LoadShedLayer :: new ( ) ) )
10871116 . option_layer ( concurrency_limit. map ( ConcurrencyLimitLayer :: new) )
10881117 . layer_fn ( |s| GrpcTimeout :: new ( s, timeout) )
10891118 . service ( svc) ;
0 commit comments