Skip to content

Commit

Permalink
feat(client): http2 builder now requires an Executor (hyperium#3135)
Browse files Browse the repository at this point in the history
This commit removes `common::Exec::Default` that just panics when used. We are
removing `tokio`, leaving `Exec::Default` with no implementation and
panics when `Exec::execute` is called.

Since `Exec::Default` has no purpose, it is being removed and user
should now provide an implementation of executor.

Closes hyperium#3128 

BREAKING CHANGE:  `hyper::client::conn::Http2::Builder::new` now requires an executor argument.
  • Loading branch information
dswij authored and 0xE282B0 committed Jan 12, 2024
1 parent b5b127d commit 42b20a1
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 57 deletions.
3 changes: 1 addition & 2 deletions benches/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,11 +316,10 @@ impl Opts {
let mut client = rt.block_on(async {
if self.http2 {
let io = tokio::net::TcpStream::connect(&addr).await.unwrap();
let (tx, conn) = hyper::client::conn::http2::Builder::new()
let (tx, conn) = hyper::client::conn::http2::Builder::new(support::TokioExecutor)
.initial_stream_window_size(self.http2_stream_window)
.initial_connection_window_size(self.http2_conn_window)
.adaptive_window(self.http2_adaptive_window)
.executor(support::TokioExecutor)
.handshake(io)
.await
.unwrap();
Expand Down
22 changes: 9 additions & 13 deletions src/client/conn/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,18 @@ pub struct Builder {
///
/// This is a shortcut for `Builder::new().handshake(io)`.
/// See [`client::conn`](crate::client::conn) for more.
pub async fn handshake<T, B>(
pub async fn handshake<E, T, B>(
exec: E,
io: T,
) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
where
E: Executor<BoxSendFuture> + Send + Sync + 'static,
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Body + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
Builder::new().handshake(io).await
Builder::new(exec).handshake(io).await
}

// ===== impl SendRequest
Expand Down Expand Up @@ -244,23 +246,17 @@ where
impl Builder {
/// Creates a new connection builder.
#[inline]
pub fn new() -> Builder {
pub fn new<E>(exec: E) -> Builder
where
E: Executor<BoxSendFuture> + Send + Sync + 'static,
{
Builder {
exec: Exec::Default,
exec: Exec::new(exec),
timer: Time::Empty,
h2_builder: Default::default(),
}
}

/// Provide an executor to execute background HTTP2 tasks.
pub fn executor<E>(&mut self, exec: E) -> &mut Builder
where
E: Executor<BoxSendFuture> + Send + Sync + 'static,
{
self.exec = Exec::Executor(Arc::new(exec));
self
}

/// Provide a timer to execute background HTTP2 tasks.
pub fn timer<M>(&mut self, timer: M) -> &mut Builder
where
Expand Down
25 changes: 10 additions & 15 deletions src/common/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,25 @@ pub trait ConnStreamExec<F, B: Body>: Clone {

pub(crate) type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;

// Either the user provides an executor for background tasks, or we panic.
// TODO: with the `runtime`feature, `Exec::Default` used `tokio::spawn`. With the
// removal of the opt-in default runtime, this should be refactored.
// Executor must be provided by the user
#[derive(Clone)]
pub(crate) enum Exec {
Default,
Executor(Arc<dyn Executor<BoxSendFuture> + Send + Sync>),
}
pub(crate) struct Exec(Arc<dyn Executor<BoxSendFuture> + Send + Sync>);

// ===== impl Exec =====

impl Exec {
pub(crate) fn new<E>(exec: E) -> Self
where
E: Executor<BoxSendFuture> + Send + Sync + 'static,
{
Self(Arc::new(exec))
}

pub(crate) fn execute<F>(&self, fut: F)
where
F: Future<Output = ()> + Send + 'static,
{
match *self {
Exec::Default => {
panic!("executor must be set");
}
Exec::Executor(ref e) => {
e.execute(Box::pin(fut));
}
}
self.0.execute(Box::pin(fut))
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/ffi/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ ffi_fn! {
#[cfg(feature = "http2")]
{
if options.http2 {
return conn::http2::Builder::new()
.executor(options.exec.clone())
return conn::http2::Builder::new(options.exec.clone())
.handshake::<_, crate::body::Incoming>(io)
.await
.map(|(tx, conn)| {
Expand Down
21 changes: 7 additions & 14 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1922,8 +1922,7 @@ mod conn {
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::http2::Builder::new()
.executor(TokioExecutor)
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.handshake(io)
.await
.expect("http handshake");
Expand Down Expand Up @@ -1979,8 +1978,7 @@ mod conn {
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (_client, conn) = conn::http2::Builder::new()
.executor(TokioExecutor)
let (_client, conn) = conn::http2::Builder::new(TokioExecutor)
.timer(TokioTimer)
.keep_alive_interval(Duration::from_secs(1))
.keep_alive_timeout(Duration::from_secs(1))
Expand Down Expand Up @@ -2008,8 +2006,7 @@ mod conn {
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::http2::Builder::new()
.executor(TokioExecutor)
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.timer(TokioTimer)
.keep_alive_interval(Duration::from_secs(1))
.keep_alive_timeout(Duration::from_secs(1))
Expand Down Expand Up @@ -2040,8 +2037,7 @@ mod conn {
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::http2::Builder::new()
.executor(TokioExecutor)
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.timer(TokioTimer)
.keep_alive_interval(Duration::from_secs(1))
.keep_alive_timeout(Duration::from_secs(1))
Expand Down Expand Up @@ -2100,8 +2096,7 @@ mod conn {
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::http2::Builder::new()
.executor(TokioExecutor)
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.timer(TokioTimer)
.keep_alive_interval(Duration::from_secs(1))
.keep_alive_timeout(Duration::from_secs(1))
Expand Down Expand Up @@ -2156,8 +2151,7 @@ mod conn {
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::http2::Builder::new()
.executor(TokioExecutor)
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.handshake(io)
.await
.expect("http handshake");
Expand Down Expand Up @@ -2207,8 +2201,7 @@ mod conn {
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::http2::Builder::new()
.executor(TokioExecutor)
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.handshake::<_, Empty<Bytes>>(io)
.await
.expect("http handshake");
Expand Down
6 changes: 2 additions & 4 deletions tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2389,8 +2389,7 @@ async fn http2_keep_alive_with_responsive_client() {
});

let tcp = connect_async(addr).await;
let (mut client, conn) = hyper::client::conn::http2::Builder::new()
.executor(TokioExecutor)
let (mut client, conn) = hyper::client::conn::http2::Builder::new(TokioExecutor)
.handshake(tcp)
.await
.expect("http handshake");
Expand Down Expand Up @@ -3017,8 +3016,7 @@ impl TestClient {
.unwrap();

if self.http2_only {
let (mut sender, conn) = hyper::client::conn::http2::Builder::new()
.executor(TokioExecutor)
let (mut sender, conn) = hyper::client::conn::http2::Builder::new(TokioExecutor)
.handshake(stream)
.await
.unwrap();
Expand Down
13 changes: 6 additions & 7 deletions tests/support/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,7 @@ async fn async_test(cfg: __TestConfig) {
let stream = TcpStream::connect(addr).await.unwrap();

let res = if http2_only {
let (mut sender, conn) = hyper::client::conn::http2::Builder::new()
.executor(TokioExecutor)
let (mut sender, conn) = hyper::client::conn::http2::Builder::new(TokioExecutor)
.handshake(stream)
.await
.unwrap();
Expand Down Expand Up @@ -526,11 +525,11 @@ async fn naive_proxy(cfg: ProxyConfig) -> (SocketAddr, impl Future<Output = ()>)
.unwrap();

let resp = if http2_only {
let (mut sender, conn) = hyper::client::conn::http2::Builder::new()
.executor(TokioExecutor)
.handshake(stream)
.await
.unwrap();
let (mut sender, conn) =
hyper::client::conn::http2::Builder::new(TokioExecutor)
.handshake(stream)
.await
.unwrap();

tokio::task::spawn(async move {
if let Err(err) = conn.await {
Expand Down

0 comments on commit 42b20a1

Please sign in to comment.