Skip to content

Commit

Permalink
feat(lib): change custom executors to be Fn
Browse files Browse the repository at this point in the history
Instead of custom executors needing to be `tokio::Executor` or
`tokio::TypedExecutor`, they are now just `Fn`s.

BREAKING CHANGE: Configuring an `executor` for a client or server should
  now pass a closure or function to spawn the future.

Closes #1944
  • Loading branch information
seanmonstar committed Nov 15, 2019
1 parent 71d088d commit 376472f
Show file tree
Hide file tree
Showing 10 changed files with 36 additions and 118 deletions.
4 changes: 3 additions & 1 deletion examples/single_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ async fn main() {
let exec = current_thread::TaskExecutor::current();

let server = Server::bind(&addr)
.executor(exec)
.executor(move |fut| {
exec.clone().spawn_local(fut).unwrap();
})
.serve(make_service);

println!("Listening on http://{}", addr);
Expand Down
3 changes: 1 addition & 2 deletions src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,7 @@ impl Builder {
/// Provide an executor to execute background HTTP2 tasks.
pub fn executor<E>(&mut self, exec: E) -> &mut Builder
where
for<'a> &'a E: tokio_executor::Executor,
E: Send + Sync + 'static,
E: Fn(crate::common::exec::BoxFuture) + Send + Sync + 'static,
{
self.exec = Exec::Executor(Arc::new(exec));
self
Expand Down
24 changes: 5 additions & 19 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,7 @@ where C: Connect + Clone + Send + Sync + 'static,
drop(delayed_tx);
});

if let Err(err) = executor.execute(on_idle) {
// This task isn't critical, so just log and ignore.
warn!("error spawning task to insert idle connection: {}", err);
}
executor.execute(on_idle);
} else {
// There's no body to delay, but the connection isn't
// ready yet. Only re-insert when it's ready
Expand All @@ -371,10 +368,7 @@ where C: Connect + Clone + Send + Sync + 'static,
})
.map(|_| ());

if let Err(err) = executor.execute(on_idle) {
// This task isn't critical, so just log and ignore.
warn!("error spawning task to insert idle connection: {}", err);
}
executor.execute(on_idle);
}
res
})))
Expand Down Expand Up @@ -513,20 +507,13 @@ where C: Connect + Clone + Send + Sync + 'static,
.handshake(io)
.and_then(move |(tx, conn)| {
trace!("handshake complete, spawning background dispatcher task");
let bg = executor.execute(conn.map_err(|e| {
executor.execute(conn.map_err(|e| {
debug!("client connection error: {}", e)
}).map(|_| ()));

// This task is critical, so an execute error
// should be returned.
if let Err(err) = bg {
warn!("error spawning critical client task: {}", err);
return Either::Left(future::err(err));
}

// Wait for 'conn' to ready up before we
// declare this tx as usable
Either::Right(tx.when_ready())
tx.when_ready()
})
.map_ok(move |tx| {
pool.pooled(connecting, PoolClient {
Expand Down Expand Up @@ -1013,8 +1000,7 @@ impl Builder {
/// Provide an executor to execute background `Connection` tasks.
pub fn executor<E>(&mut self, exec: E) -> &mut Self
where
for<'a> &'a E: tokio_executor::Executor,
E: Send + Sync + 'static,
E: Fn(crate::common::exec::BoxFuture) + Send + Sync + 'static,
{
self.conn_builder.executor(exec);
self
Expand Down
9 changes: 2 additions & 7 deletions src/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,16 +416,11 @@ impl<T: Poolable> PoolInner<T> {

let start = Instant::now() + dur;

let interval = IdleTask {
self.exec.execute(IdleTask {
interval: Interval::new(start, dur),
pool: WeakOpt::downgrade(pool_ref),
pool_drop_notifier: rx,
};

if let Err(err) = self.exec.execute(interval) {
// This task isn't critical, so simply log and ignore.
warn!("error spawning connection pool idle interval: {}", err);
}
});
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/client/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ where
if let Err(e) = conn.await {
debug!("connection error: {:?}", e);
}
})?;
});
Ok(sr)
},
Err(e) => Err(e)
Expand Down
93 changes: 19 additions & 74 deletions src/common/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,83 +3,41 @@ use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use tokio_executor::{SpawnError, TypedExecutor};

use crate::body::{Payload, Body};
use crate::proto::h2::server::H2Stream;
use crate::server::conn::spawn_all::{NewSvcTask, Watcher};
use crate::service::HttpService;

pub trait H2Exec<F, B: Payload>: Clone {
fn execute_h2stream(&mut self, fut: H2Stream<F, B>) -> crate::Result<()>;
fn execute_h2stream(&self, fut: H2Stream<F, B>);
}

pub trait NewSvcExec<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>>: Clone {
fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) -> crate::Result<()>;
}

type BoxFuture = Pin<Box<dyn Future<Output=()> + Send>>;

pub trait SharedExecutor {
fn shared_spawn(&self, future: BoxFuture) -> Result<(), SpawnError>;
fn execute_new_svc(&self, fut: NewSvcTask<I, N, S, E, W>);
}

impl<E> SharedExecutor for E
where
for<'a> &'a E: tokio_executor::Executor,
{
fn shared_spawn(mut self: &Self, future: BoxFuture) -> Result<(), SpawnError> {
tokio_executor::Executor::spawn(&mut self, future)
}
}
pub type BoxFuture = Pin<Box<dyn Future<Output=()> + Send>>;

// Either the user provides an executor for background tasks, or we use
// `tokio::spawn`.
#[derive(Clone)]
pub enum Exec {
Default,
Executor(Arc<dyn SharedExecutor + Send + Sync>),
Executor(Arc<dyn Fn(BoxFuture) + Send + Sync>),
}

// ===== impl Exec =====

impl Exec {
pub(crate) fn execute<F>(&self, fut: F) -> crate::Result<()>
pub(crate) fn execute<F>(&self, fut: F)
where
F: Future<Output=()> + Send + 'static,
{
match *self {
Exec::Default => {
#[cfg(feature = "tcp")]
{
use std::error::Error as StdError;

struct TokioSpawnError;

impl fmt::Debug for TokioSpawnError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt("tokio::spawn failed (is a tokio runtime running this future?)", f)
}
}

impl fmt::Display for TokioSpawnError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt("tokio::spawn failed (is a tokio runtime running this future?)", f)
}
}

impl StdError for TokioSpawnError {
fn description(&self) -> &str {
"tokio::spawn failed"
}
}

::tokio_executor::DefaultExecutor::current()
.spawn(Box::pin(fut))
.map_err(|err| {
warn!("executor error: {:?}", err);
crate::Error::new_execute(TokioSpawnError)
})
tokio::spawn(fut);
}
#[cfg(not(feature = "tcp"))]
{
Expand All @@ -88,20 +46,18 @@ impl Exec {
}
},
Exec::Executor(ref e) => {
e.shared_spawn(Box::pin(fut))
.map_err(|err| {
warn!("executor error: {:?}", err);
crate::Error::new_execute("custom executor failed")
})
e(Box::pin(fut));
},
}
}
}

impl fmt::Debug for Exec {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Exec")
.finish()
match self {
Exec::Default => f.write_str("Exec::Default"),
Exec::Executor(..) => f.write_str("Exec::Custom"),
}
}
}

Expand All @@ -111,7 +67,7 @@ where
H2Stream<F, B>: Future<Output = ()> + Send + 'static,
B: Payload,
{
fn execute_h2stream(&mut self, fut: H2Stream<F, B>) -> crate::Result<()> {
fn execute_h2stream(&self, fut: H2Stream<F, B>) {
self.execute(fut)
}
}
Expand All @@ -122,7 +78,7 @@ where
S: HttpService<Body>,
W: Watcher<I, S, E>,
{
fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) -> crate::Result<()> {
fn execute_new_svc(&self, fut: NewSvcTask<I, N, S, E, W>) {
self.execute(fut)
}
}
Expand All @@ -131,34 +87,23 @@ where

impl<E, F, B> H2Exec<F, B> for E
where
E: TypedExecutor<H2Stream<F, B>> + Clone,
E: Fn(H2Stream<F, B>) + Clone,
H2Stream<F, B>: Future<Output=()>,
B: Payload,
{
fn execute_h2stream(&mut self, fut: H2Stream<F, B>) -> crate::Result<()> {
self.spawn(fut)
.map_err(|err| {
warn!("executor error: {:?}", err);
crate::Error::new_execute("custom executor failed")
})
fn execute_h2stream(&self, fut: H2Stream<F, B>) {
self(fut);
}
}

impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for E
where
E: TypedExecutor<NewSvcTask<I, N, S, E, W>> + Clone,
E: Fn(NewSvcTask<I, N, S, E, W>) + Clone,
NewSvcTask<I, N, S, E, W>: Future<Output=()>,
S: HttpService<Body>,
W: Watcher<I, S, E>,
{
fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) -> crate::Result<()> {
self.spawn(fut)
.map_err(|err| {
warn!("executor error: {:?}", err);
crate::Error::new_execute("custom executor failed")
})
fn execute_new_svc(&self, fut: NewSvcTask<I, N, S, E, W>) {
self(fut);
}
}

// ===== StdError impls =====

9 changes: 0 additions & 9 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,8 @@ pub(crate) enum User {

/// User tried polling for an upgrade that doesn't exist.
NoUpgrade,

/// User polled for an upgrade, but low-level API is not using upgrades.
ManualUpgrade,

/// Error trying to call `Executor::execute`.
Execute,
}

impl Error {
Expand Down Expand Up @@ -277,10 +273,6 @@ impl Error {
Error::new(Kind::Shutdown).with(cause)
}

pub(crate) fn new_execute<E: Into<Cause>>(cause: E) -> Error {
Error::new_user(User::Execute).with(cause)
}

pub(crate) fn new_h2(cause: ::h2::Error) -> Error {
if cause.is_io() {
Error::new_io(cause.into_io().expect("h2::Error::is_io"))
Expand Down Expand Up @@ -346,7 +338,6 @@ impl StdError for Error {
Kind::User(User::AbsoluteUriRequired) => "client requires absolute-form URIs",
Kind::User(User::NoUpgrade) => "no upgrade available",
Kind::User(User::ManualUpgrade) => "upgrade expected but low level API in use",
Kind::User(User::Execute) => "executor failed to spawn task",
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/proto/h2/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
}
};

exec.execute(conn_task)?;
exec.execute(conn_task);

Ok(ClientTask {
conn_drop_ref,
Expand Down Expand Up @@ -155,7 +155,7 @@ where
drop(conn_drop_ref);
x
});
self.executor.execute(pipe)?;
self.executor.execute(pipe);
}
}
}
Expand All @@ -175,7 +175,7 @@ where
}
}
});
self.executor.execute(cb.send_when(fut))?;
self.executor.execute(cb.send_when(fut));
continue;
},

Expand Down
2 changes: 1 addition & 1 deletion src/proto/h2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ where
crate::Body::h2(stream, content_length)
});
let fut = H2Stream::new(service.call(req), respond);
exec.execute_h2stream(fut)?;
exec.execute_h2stream(fut);
},
Some(Err(e)) => {
return Poll::Ready(Err(crate::Error::new_h2(e)));
Expand Down
2 changes: 1 addition & 1 deletion src/server/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ where
loop {
if let Some(connecting) = ready!(me.serve.as_mut().poll_next_(cx)?) {
let fut = NewSvcTask::new(connecting, watcher.clone());
me.serve.as_mut().project().protocol.exec.execute_new_svc(fut)?;
me.serve.as_mut().project().protocol.exec.execute_new_svc(fut);
} else {
return Poll::Ready(Ok(()));
}
Expand Down

0 comments on commit 376472f

Please sign in to comment.