Skip to content

Commit

Permalink
feat(service): add poll_ready to Service and MakeService (#1767)
Browse files Browse the repository at this point in the history
  • Loading branch information
ubnt-intrepid authored and seanmonstar committed Feb 27, 2019
1 parent ce2b540 commit 0bf30cc
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 18 deletions.
8 changes: 6 additions & 2 deletions src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ where
// can dispatch receive, or does it still care about, an incoming message?
match self.dispatch.poll_ready() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => unreachable!("dispatch not ready when conn is"),
Ok(Async::NotReady) => return Ok(Async::NotReady), // service might not be ready
Err(()) => {
trace!("dispatch no longer receiving messages");
self.close();
Expand Down Expand Up @@ -410,7 +410,11 @@ where
if self.in_flight.is_some() {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
self.service.poll_ready()
.map_err(|_e| {
// FIXME: return error value.
trace!("service closed");
})
}
}

Expand Down
42 changes: 30 additions & 12 deletions src/proto/h2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,37 @@ where
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
E: H2Exec<S::Future, B>,
{
while let Some((req, respond)) = try_ready!(self.conn.poll().map_err(::Error::new_h2)) {
trace!("incoming request");
let content_length = content_length_parse_all(req.headers());
let req = req.map(|stream| {
::Body::h2(stream, content_length)
});
let fut = H2Stream::new(service.call(req), respond);
exec.execute_h2stream(fut)?;
}
loop {
// At first, polls the readiness of supplied service.
match service.poll_ready() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => {
// use `poll_close` instead of `poll`, in order to avoid accepting a request.
try_ready!(self.conn.poll_close().map_err(::Error::new_h2));
trace!("incoming connection complete");
return Ok(Async::Ready(()));
}
Err(err) => {
trace!("service closed");
return Err(::Error::new_user_service(err));
}
}

// no more incoming streams...
trace!("incoming connection complete");
Ok(Async::Ready(()))
// When the service is ready, accepts an incoming request.
if let Some((req, respond)) = try_ready!(self.conn.poll().map_err(::Error::new_h2)) {
trace!("incoming request");
let content_length = content_length_parse_all(req.headers());
let req = req.map(|stream| {
::Body::h2(stream, content_length)
});
let fut = H2Stream::new(service.call(req), respond);
exec.execute_h2stream(fut)?;
} else {
// no more incoming streams...
trace!("incoming connection complete");
return Ok(Async::Ready(()))
}
}
}
}

Expand Down
9 changes: 9 additions & 0 deletions src/server/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,15 @@ where
type Error = ::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.make_service.poll_ready_ref() {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => {
trace!("make_service closed");
return Err(::Error::new_user_new_service(e));
}
}

if let Some(io) = try_ready!(self.incoming.poll().map_err(::Error::new_accept)) {
let new_fut = self.make_service.make_service_ref(&io);
Ok(Async::Ready(Some(Connecting {
Expand Down
21 changes: 19 additions & 2 deletions src/service/make_service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::error::Error as StdError;
use std::fmt;

use futures::{Future, IntoFuture};
use futures::{Async, Future, IntoFuture, Poll};

use body::Payload;
use super::Service;
Expand Down Expand Up @@ -30,6 +30,15 @@ pub trait MakeService<Ctx> {
/// The error type that can be returned when creating a new `Service`.
type MakeError: Into<Box<StdError + Send + Sync>>;

/// Returns `Ready` when the constructor is ready to create a new `Service`.
///
/// The implementation of this method is allowed to return a `Ready` even if
/// the factory is not ready to create a new service. In this case, the future
/// returned from `make_service` will resolve to an error.
fn poll_ready(&mut self) -> Poll<(), Self::MakeError> {
Ok(Async::Ready(()))
}

/// Create a new `Service`.
fn make_service(&mut self, ctx: Ctx) -> Self::Future;
}
Expand All @@ -46,7 +55,8 @@ pub trait MakeServiceRef<Ctx>: self::sealed::Sealed<Ctx> {
ResBody=Self::ResBody,
Error=Self::Error,
>;
type Future: Future<Item=Self::Service>;
type MakeError: Into<Box<StdError + Send + Sync>>;
type Future: Future<Item=Self::Service, Error=Self::MakeError>;

// Acting like a #[non_exhaustive] for associated types of this trait.
//
Expand All @@ -59,6 +69,8 @@ pub trait MakeServiceRef<Ctx>: self::sealed::Sealed<Ctx> {
// if necessary.
type __DontNameMe: self::sealed::CantImpl;

fn poll_ready_ref(&mut self) -> Poll<(), Self::MakeError>;

fn make_service_ref(&mut self, ctx: &Ctx) -> Self::Future;
}

Expand All @@ -76,10 +88,15 @@ where
type Service = S;
type ReqBody = IB;
type ResBody = OB;
type MakeError = ME;
type Future = F;

type __DontNameMe = self::sealed::CantName;

fn poll_ready_ref(&mut self) -> Poll<(), Self::MakeError> {
self.poll_ready()
}

fn make_service_ref(&mut self, ctx: &Ctx) -> Self::Future {
self.make_service(ctx)
}
Expand Down
11 changes: 10 additions & 1 deletion src/service/new_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::error::Error as StdError;

use futures::{Future, IntoFuture};
use futures::{Async, Future, IntoFuture, Poll};

use body::Payload;
use super::{MakeService, Service};
Expand Down Expand Up @@ -29,6 +29,11 @@ pub trait NewService {
/// The error type that can be returned when creating a new `Service`.
type InitError: Into<Box<StdError + Send + Sync>>;

#[doc(hidden)]
fn poll_ready(&mut self) -> Poll<(), Self::InitError> {
Ok(Async::Ready(()))
}

/// Create a new `Service`.
fn new_service(&self) -> Self::Future;
}
Expand Down Expand Up @@ -63,6 +68,10 @@ where
type Future = N::Future;
type MakeError = N::InitError;

fn poll_ready(&mut self) -> Poll<(), Self::MakeError> {
NewService::poll_ready(self)
}

fn make_service(&mut self, _: Ctx) -> Self::Future {
self.new_service()
}
Expand Down
11 changes: 10 additions & 1 deletion src/service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::error::Error as StdError;
use std::fmt;
use std::marker::PhantomData;

use futures::{future, Future, IntoFuture};
use futures::{future, Async, Future, IntoFuture, Poll};

use body::Payload;
use common::Never;
Expand All @@ -26,6 +26,15 @@ pub trait Service {
/// The `Future` returned by this `Service`.
type Future: Future<Item=Response<Self::ResBody>, Error=Self::Error>;

/// Returns `Ready` when the service is able to process requests.
///
/// The implementation of this method is allowed to return a `Ready` even if
/// the service is not ready to process. In this case, the future returned
/// from `call` will resolve to an error.
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}

/// Calls this `Service` with a request, returning a `Future` of the response.
fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future;
}
Expand Down

0 comments on commit 0bf30cc

Please sign in to comment.