From 0bf30ccc68feefb0196d2db9536232e5913598da Mon Sep 17 00:00:00 2001 From: Yusuke Sasaki Date: Thu, 28 Feb 2019 02:30:52 +0900 Subject: [PATCH] feat(service): add `poll_ready` to `Service` and `MakeService` (#1767) --- src/proto/h1/dispatch.rs | 8 +++++-- src/proto/h2/server.rs | 42 ++++++++++++++++++++++++++----------- src/server/conn.rs | 9 ++++++++ src/service/make_service.rs | 21 +++++++++++++++++-- src/service/new_service.rs | 11 +++++++++- src/service/service.rs | 11 +++++++++- 6 files changed, 84 insertions(+), 18 deletions(-) diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 622ce96100..2857b90548 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -189,7 +189,7 @@ where // can dispatch receive, or does it still care about, an incoming message? match self.dispatch.poll_ready() { Ok(Async::Ready(())) => (), - Ok(Async::NotReady) => unreachable!("dispatch not ready when conn is"), + Ok(Async::NotReady) => return Ok(Async::NotReady), // service might not be ready Err(()) => { trace!("dispatch no longer receiving messages"); self.close(); @@ -410,7 +410,11 @@ where if self.in_flight.is_some() { Ok(Async::NotReady) } else { - Ok(Async::Ready(())) + self.service.poll_ready() + .map_err(|_e| { + // FIXME: return error value. + trace!("service closed"); + }) } } diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index 8bc36bc33e..d88104cfd8 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -128,19 +128,37 @@ where S::Error: Into>, E: H2Exec, { - while let Some((req, respond)) = try_ready!(self.conn.poll().map_err(::Error::new_h2)) { - trace!("incoming request"); - let content_length = content_length_parse_all(req.headers()); - let req = req.map(|stream| { - ::Body::h2(stream, content_length) - }); - let fut = H2Stream::new(service.call(req), respond); - exec.execute_h2stream(fut)?; - } + loop { + // At first, polls the readiness of supplied service. + match service.poll_ready() { + Ok(Async::Ready(())) => (), + Ok(Async::NotReady) => { + // use `poll_close` instead of `poll`, in order to avoid accepting a request. + try_ready!(self.conn.poll_close().map_err(::Error::new_h2)); + trace!("incoming connection complete"); + return Ok(Async::Ready(())); + } + Err(err) => { + trace!("service closed"); + return Err(::Error::new_user_service(err)); + } + } - // no more incoming streams... - trace!("incoming connection complete"); - Ok(Async::Ready(())) + // When the service is ready, accepts an incoming request. + if let Some((req, respond)) = try_ready!(self.conn.poll().map_err(::Error::new_h2)) { + trace!("incoming request"); + let content_length = content_length_parse_all(req.headers()); + let req = req.map(|stream| { + ::Body::h2(stream, content_length) + }); + let fut = H2Stream::new(service.call(req), respond); + exec.execute_h2stream(fut)?; + } else { + // no more incoming streams... + trace!("incoming connection complete"); + return Ok(Async::Ready(())) + } + } } } diff --git a/src/server/conn.rs b/src/server/conn.rs index 296221d17f..99772b36c2 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -646,6 +646,15 @@ where type Error = ::Error; fn poll(&mut self) -> Poll, Self::Error> { + match self.make_service.poll_ready_ref() { + Ok(Async::Ready(())) => (), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(e) => { + trace!("make_service closed"); + return Err(::Error::new_user_new_service(e)); + } + } + if let Some(io) = try_ready!(self.incoming.poll().map_err(::Error::new_accept)) { let new_fut = self.make_service.make_service_ref(&io); Ok(Async::Ready(Some(Connecting { diff --git a/src/service/make_service.rs b/src/service/make_service.rs index 0472d2f983..e26bdd01bd 100644 --- a/src/service/make_service.rs +++ b/src/service/make_service.rs @@ -1,7 +1,7 @@ use std::error::Error as StdError; use std::fmt; -use futures::{Future, IntoFuture}; +use futures::{Async, Future, IntoFuture, Poll}; use body::Payload; use super::Service; @@ -30,6 +30,15 @@ pub trait MakeService { /// The error type that can be returned when creating a new `Service`. type MakeError: Into>; + /// Returns `Ready` when the constructor is ready to create a new `Service`. + /// + /// The implementation of this method is allowed to return a `Ready` even if + /// the factory is not ready to create a new service. In this case, the future + /// returned from `make_service` will resolve to an error. + fn poll_ready(&mut self) -> Poll<(), Self::MakeError> { + Ok(Async::Ready(())) + } + /// Create a new `Service`. fn make_service(&mut self, ctx: Ctx) -> Self::Future; } @@ -46,7 +55,8 @@ pub trait MakeServiceRef: self::sealed::Sealed { ResBody=Self::ResBody, Error=Self::Error, >; - type Future: Future; + type MakeError: Into>; + type Future: Future; // Acting like a #[non_exhaustive] for associated types of this trait. // @@ -59,6 +69,8 @@ pub trait MakeServiceRef: self::sealed::Sealed { // if necessary. type __DontNameMe: self::sealed::CantImpl; + fn poll_ready_ref(&mut self) -> Poll<(), Self::MakeError>; + fn make_service_ref(&mut self, ctx: &Ctx) -> Self::Future; } @@ -76,10 +88,15 @@ where type Service = S; type ReqBody = IB; type ResBody = OB; + type MakeError = ME; type Future = F; type __DontNameMe = self::sealed::CantName; + fn poll_ready_ref(&mut self) -> Poll<(), Self::MakeError> { + self.poll_ready() + } + fn make_service_ref(&mut self, ctx: &Ctx) -> Self::Future { self.make_service(ctx) } diff --git a/src/service/new_service.rs b/src/service/new_service.rs index 80f0e6bb9e..e8c121b011 100644 --- a/src/service/new_service.rs +++ b/src/service/new_service.rs @@ -1,6 +1,6 @@ use std::error::Error as StdError; -use futures::{Future, IntoFuture}; +use futures::{Async, Future, IntoFuture, Poll}; use body::Payload; use super::{MakeService, Service}; @@ -29,6 +29,11 @@ pub trait NewService { /// The error type that can be returned when creating a new `Service`. type InitError: Into>; + #[doc(hidden)] + fn poll_ready(&mut self) -> Poll<(), Self::InitError> { + Ok(Async::Ready(())) + } + /// Create a new `Service`. fn new_service(&self) -> Self::Future; } @@ -63,6 +68,10 @@ where type Future = N::Future; type MakeError = N::InitError; + fn poll_ready(&mut self) -> Poll<(), Self::MakeError> { + NewService::poll_ready(self) + } + fn make_service(&mut self, _: Ctx) -> Self::Future { self.new_service() } diff --git a/src/service/service.rs b/src/service/service.rs index 53d3e166c9..93cf469ae2 100644 --- a/src/service/service.rs +++ b/src/service/service.rs @@ -2,7 +2,7 @@ use std::error::Error as StdError; use std::fmt; use std::marker::PhantomData; -use futures::{future, Future, IntoFuture}; +use futures::{future, Async, Future, IntoFuture, Poll}; use body::Payload; use common::Never; @@ -26,6 +26,15 @@ pub trait Service { /// The `Future` returned by this `Service`. type Future: Future, Error=Self::Error>; + /// Returns `Ready` when the service is able to process requests. + /// + /// The implementation of this method is allowed to return a `Ready` even if + /// the service is not ready to process. In this case, the future returned + /// from `call` will resolve to an error. + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + /// Calls this `Service` with a request, returning a `Future` of the response. fn call(&mut self, req: Request) -> Self::Future; }