Skip to content

Commit

Permalink
feat(server): introduce Accept trait
Browse files Browse the repository at this point in the history
The `Accept` trait is used by the server types to asynchronously accept
incoming connections. This replaces the previous usage of `Stream`.

BREAKING CHANGE: Passing a `Stream` to `Server::builder` or
  `Http::serve_incoming` must be changed to pass an `Accept` instead. The
  `stream` optional feature can be enabled, and the a stream can be
  converted using `hyper::server::accept::from_stream`.
  • Loading branch information
seanmonstar committed Sep 5, 2019
1 parent 0867ad5 commit b3e5506
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 71 deletions.
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ runtime = [
"tokio-net",
"tokio-timer",
]

# unstable features
stream = []

# internal features used in CI
nightly = []
__internal_flaky_tests = []
__internal_happy_eyeballs_tests = []
Expand Down
99 changes: 99 additions & 0 deletions src/server/accept.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
//! The `Accept` trait and supporting types.
//!
//! This module contains:
//!
//! - The [`Accept`](Accept) trait used to asynchronously accept incoming
//! connections.
//! - Utilities like `poll_fn` to ease creating a custom `Accept`.

#[cfg(feature = "stream")]
use futures_core::Stream;

use crate::common::{Pin, task::{self, Poll}};

/// Asynchronously accept incoming connections.
pub trait Accept {
/// The connection type that can be accepted.
type Conn;
/// The error type that can occur when accepting a connection.
type Error;

/// Poll to accept the next connection.
fn poll_accept(self: Pin<&mut Self>, cx: &mut task::Context<'_>)
-> Poll<Option<Result<Self::Conn, Self::Error>>>;
}

/// Create an `Accept` with a polling function.
///
/// # Example
///
/// ```
/// use std::task::Poll;
/// use hyper::server::{accept, Server};
///
/// # let mock_conn = ();
/// // If we created some mocked connection...
/// let mut conn = Some(mock_conn);
///
/// // And accept just the mocked conn once...
/// let once = accept::poll_fn(move |cx| {
/// Poll::Ready(conn.take().map(Ok::<_, ()>))
/// });
///
/// let builder = Server::builder(once);
/// ```
pub fn poll_fn<F, IO, E>(func: F) -> impl Accept<Conn = IO, Error = E>
where
F: FnMut(&mut task::Context<'_>) -> Poll<Option<Result<IO, E>>>,
{
struct PollFn<F>(F);

impl<F, IO, E> Accept for PollFn<F>
where
F: FnMut(&mut task::Context<'_>) -> Poll<Option<Result<IO, E>>>,
{
type Conn = IO;
type Error = E;
fn poll_accept(self: Pin<&mut Self>, cx: &mut task::Context<'_>)
-> Poll<Option<Result<Self::Conn, Self::Error>>>
{
unsafe {
(self.get_unchecked_mut().0)(cx)
}
}
}

PollFn(func)
}

/// Adapt a `Stream` of incoming connections into an `Accept`.
///
/// # Unstable
///
/// This function requires enabling the unstable `stream` feature in your
/// `Cargo.toml`.
#[cfg(feature = "stream")]
pub fn from_stream<S, IO, E>(stream: S) -> impl Accept<Conn = IO, Error = E>
where
S: Stream<Item = Result<IO, E>>,
{
struct FromStream<S>(S);

impl<S, IO, E> Accept for FromStream<S>
where
S: Stream<Item = Result<IO, E>>,
{
type Conn = IO;
type Error = E;
fn poll_accept(self: Pin<&mut Self>, cx: &mut task::Context<'_>)
-> Poll<Option<Result<Self::Conn, Self::Error>>>
{
unsafe {
Pin::new_unchecked(&mut self.get_unchecked_mut().0)
.poll_next(cx)
}
}
}

FromStream(stream)
}
74 changes: 43 additions & 31 deletions src/server/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::error::{Kind, Parse};
use crate::proto;
use crate::service::{MakeServiceRef, Service};
use crate::upgrade::Upgraded;
use super::Accept;

pub(super) use self::spawn_all::NoopWatcher;
use self::spawn_all::NewSvcTask;
Expand Down Expand Up @@ -403,13 +404,10 @@ impl<E> Http<E> {
}
}

/// Bind the provided `addr` with the default `Handle` and return [`Serve`](Serve).
///
/// This method will bind the `addr` provided with a new TCP listener ready
/// to accept connections. Each connection will be processed with the
/// `make_service` object provided, creating a new service per
/// connection.
#[cfg(feature = "runtime")]
#[doc(hidden)]
#[deprecated]
#[allow(deprecated)]
pub fn serve_addr<S, Bd>(&self, addr: &SocketAddr, make_service: S) -> crate::Result<Serve<AddrIncoming, S, E>>
where
S: MakeServiceRef<
Expand All @@ -428,13 +426,10 @@ impl<E> Http<E> {
Ok(self.serve_incoming(incoming, make_service))
}

/// Bind the provided `addr` with the `Handle` and return a [`Serve`](Serve)
///
/// This method will bind the `addr` provided with a new TCP listener ready
/// to accept connections. Each connection will be processed with the
/// `make_service` object provided, creating a new service per
/// connection.
#[cfg(feature = "runtime")]
#[doc(hidden)]
#[deprecated]
#[allow(deprecated)]
pub fn serve_addr_handle<S, Bd>(&self, addr: &SocketAddr, handle: &Handle, make_service: S) -> crate::Result<Serve<AddrIncoming, S, E>>
where
S: MakeServiceRef<
Expand All @@ -453,10 +448,11 @@ impl<E> Http<E> {
Ok(self.serve_incoming(incoming, make_service))
}

/// Bind the provided stream of incoming IO objects with a `MakeService`.
#[doc(hidden)]
#[deprecated]
pub fn serve_incoming<I, IO, IE, S, Bd>(&self, incoming: I, make_service: S) -> Serve<I, S, E>
where
I: Stream<Item = Result<IO, IE>>,
I: Accept<Conn=IO, Error=IE>,
IE: Into<Box<dyn StdError + Send + Sync>>,
IO: AsyncRead + AsyncWrite + Unpin,
S: MakeServiceRef<
Expand Down Expand Up @@ -678,13 +674,6 @@ where
// ===== impl Serve =====

impl<I, S, E> Serve<I, S, E> {
/// Spawn all incoming connections onto the executor in `Http`.
pub(super) fn spawn_all(self) -> SpawnAll<I, S, E> {
SpawnAll {
serve: self,
}
}

/// Get a reference to the incoming stream.
#[inline]
pub fn incoming_ref(&self) -> &I {
Expand All @@ -696,22 +685,28 @@ impl<I, S, E> Serve<I, S, E> {
pub fn incoming_mut(&mut self) -> &mut I {
&mut self.incoming
}

/// Spawn all incoming connections onto the executor in `Http`.
pub(super) fn spawn_all(self) -> SpawnAll<I, S, E> {
SpawnAll {
serve: self,
}
}
}

impl<I, IO, IE, S, B, E> Stream for Serve<I, S, E>



impl<I, IO, IE, S, B, E> Serve<I, S, E>
where
I: Stream<Item = Result<IO, IE>>,
I: Accept<Conn=IO, Error=IE>,
IO: AsyncRead + AsyncWrite + Unpin,
IE: Into<Box<dyn StdError + Send + Sync>>,
S: MakeServiceRef<IO, Body, ResBody=B>,
//S::Error2: Into<Box<StdError + Send + Sync>>,
//SME: Into<Box<StdError + Send + Sync>>,
B: Payload,
E: H2Exec<<S::Service as Service<Body>>::Future, B>,
{
type Item = crate::Result<Connecting<IO, S::Future, E>>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
fn poll_next_(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Connecting<IO, S::Future, E>>>> {
match ready!(self.project().make_service.poll_ready_ref(cx)) {
Ok(()) => (),
Err(e) => {
Expand All @@ -720,7 +715,7 @@ where
}
}

if let Some(item) = ready!(self.project().incoming.poll_next(cx)) {
if let Some(item) = ready!(self.project().incoming.poll_accept(cx)) {
let io = item.map_err(crate::Error::new_accept)?;
let new_fut = self.project().make_service.make_service_ref(&io);
Poll::Ready(Some(Ok(Connecting {
Expand All @@ -734,6 +729,23 @@ where
}
}

// deprecated
impl<I, IO, IE, S, B, E> Stream for Serve<I, S, E>
where
I: Accept<Conn=IO, Error=IE>,
IO: AsyncRead + AsyncWrite + Unpin,
IE: Into<Box<dyn StdError + Send + Sync>>,
S: MakeServiceRef<IO, Body, ResBody=B>,
B: Payload,
E: H2Exec<<S::Service as Service<Body>>::Future, B>,
{
type Item = crate::Result<Connecting<IO, S::Future, E>>;

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_next_(cx)
}
}

// ===== impl Connecting =====


Expand Down Expand Up @@ -772,7 +784,7 @@ impl<I, S, E> SpawnAll<I, S, E> {

impl<I, IO, IE, S, B, E> SpawnAll<I, S, E>
where
I: Stream<Item=Result<IO, IE>>,
I: Accept<Conn=IO, Error=IE>,
IE: Into<Box<dyn StdError + Send + Sync>>,
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<
Expand All @@ -789,7 +801,7 @@ where
W: Watcher<IO, S::Service, E>,
{
loop {
if let Some(connecting) = ready!(self.project().serve.poll_next(cx)?) {
if let Some(connecting) = ready!(self.project().serve.poll_next_(cx)?) {
let fut = NewSvcTask::new(connecting, watcher.clone());
self.project().serve.project().protocol.exec.execute_new_svc(fut)?;
} else {
Expand Down
19 changes: 10 additions & 9 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
//! # fn main() {}
//! ```

pub mod accept;
pub mod conn;
mod shutdown;
#[cfg(feature = "runtime")] mod tcp;
Expand All @@ -58,14 +59,14 @@ use std::fmt;

#[cfg(feature = "runtime")] use std::time::Duration;

use futures_core::Stream;
use tokio_io::{AsyncRead, AsyncWrite};
use pin_project::pin_project;

use crate::body::{Body, Payload};
use crate::common::exec::{Exec, H2Exec, NewSvcExec};
use crate::common::{Future, Pin, Poll, Unpin, task};
use crate::service::{MakeServiceRef, Service};
use self::accept::Accept;
// Renamed `Http` as `Http_` for now so that people upgrading don't see an
// error that `hyper::server::Http` is private...
use self::conn::{Http as Http_, NoopWatcher, SpawnAll};
Expand Down Expand Up @@ -143,7 +144,7 @@ impl<S> Server<AddrIncoming, S> {

impl<I, IO, IE, S, E, B> Server<I, S, E>
where
I: Stream<Item=Result<IO, IE>>,
I: Accept<Conn=IO, Error=IE>,
IE: Into<Box<dyn StdError + Send + Sync>>,
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<IO, Body, ResBody=B>,
Expand Down Expand Up @@ -200,7 +201,7 @@ where

impl<I, IO, IE, S, B, E> Future for Server<I, S, E>
where
I: Stream<Item=Result<IO, IE>>,
I: Accept<Conn=IO, Error=IE>,
IE: Into<Box<dyn StdError + Send + Sync>>,
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<IO, Body, ResBody=B>,
Expand Down Expand Up @@ -380,17 +381,17 @@ impl<I, E> Builder<I, E> {
/// // Finally, spawn `server` onto an Executor...
/// # }
/// ```
pub fn serve<S, B, IO, IE>(self, new_service: S) -> Server<I, S, E>
pub fn serve<S, B>(self, new_service: S) -> Server<I, S, E>
where
I: Stream<Item=Result<IO, IE>>,
IE: Into<Box<dyn StdError + Send + Sync>>,
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<IO, Body, ResBody=B>,
I: Accept,
I::Error: Into<Box<dyn StdError + Send + Sync>>,
I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<I::Conn, Body, ResBody=B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::Service: 'static,
B: Payload,
B::Data: Unpin,
E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>,
E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
E: H2Exec<<S::Service as Service<Body>>::Future, B>,
{
let serve = self.protocol.serve_incoming(self.incoming, new_service);
Expand Down
4 changes: 2 additions & 2 deletions src/server/shutdown.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::error::Error as StdError;

use futures_core::Stream;
use tokio_io::{AsyncRead, AsyncWrite};
use pin_project::{pin_project, project};

Expand All @@ -9,6 +8,7 @@ use crate::common::drain::{self, Draining, Signal, Watch, Watching};
use crate::common::exec::{H2Exec, NewSvcExec};
use crate::common::{Future, Pin, Poll, Unpin, task};
use crate::service::{MakeServiceRef, Service};
use super::Accept;
use super::conn::{SpawnAll, UpgradeableConnection, Watcher};

#[allow(missing_debug_implementations)]
Expand Down Expand Up @@ -46,7 +46,7 @@ impl<I, S, F, E> Graceful<I, S, F, E> {

impl<I, IO, IE, S, B, F, E> Future for Graceful<I, S, F, E>
where
I: Stream<Item=Result<IO, IE>>,
I: Accept<Conn=IO, Error=IE>,
IE: Into<Box<dyn StdError + Send + Sync>>,
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<IO, Body, ResBody=B>,
Expand Down
Loading

0 comments on commit b3e5506

Please sign in to comment.